Ray: Plataforma de Machine Learning II

6 June 2024| Tags: Machine Learning, ML, Cluster ML

Introducción

Ray es un framework de Python que permite la ejecución de aplicaciones Python en paralelo y de forma distribuida. Está diseñado principalmente para ser usado en aplicaciones de Machine Learning y AI, pero también se puede usar en cualquier aplicación Python que requiera computación distribuida.

En el blog anterior Cluster Ray vimos como desplegar un cluster de Ray en una sola máquina con Dockers (Docker Compose) y explicamos por encima las distintas partes del framework: Ray Data, Ray Train, Ray Tune, Ray Serve y como se puede usar ésta última, Ray Serve, para servir un modelo de Hugging Face que traducía textos del inglés al francés.

En este post vamos a ver como hemos desplegado un cluster de Ray en nuestras máquinas de producción y como hemos empezado a usarlo para entrenar y servir los modelos de machine learning que necesitan nuestros productos.

Nodos del cluster

Nodos

Un cluster de Ray se compone de un node head y uno o varios nodos worker.

El nodo head es el nodo maestro del cluster, se encarga de coordinar el cluster y de servir la interfaz web de monitorización.

Los nodos worker son los nodos de trabajo y se encargan de ejecutar las tareas de Python que se desean procesar.

El node head también se comporta como un nodo worker pero principalmente se caracteriza por tener procesos (singleton) responsables del manejo del cluster:

  • GCS: Global Control Service, que centraliza los metadatos del cluster, maneja los nodos asociados y el directorio de procesos que se están ejecutando.
  • Autoescaler: Es el proceso que reacciona cuando se lanzan tareas y controla si los recursos que demandan esas tareas exceden la capacidad actual del cluster. Incrementa el número de nodos workers si las excede o detiene los nodos que no se necesitan si llevan “inactivos” el tiempo que se haya configurado.
  • Raylet: Es el proceso que se encarga de ejecutar las tareas y manejar los objetos y actores en un nodo worker.

Despliegue del cluster Ray de Taniwa (On Premise):

Configuración

Se pueden desplegar clusters de Ray en cualquier infraestructura: en las nubes de AWS, GCP, Azure, en un Kubernetes, On Premise, etc.

Nosotros hemos desplegado un cluster de Ray en dos de nuestras máquinas On Premise utilizando el cluster-luncher del framework

El cluster-luncher lo forman una serie de comandos capaces de interpretar la configuración del cluster definido en un fichero yaml donde se específica el número de nodos, el tipo de máquina, el número de CPUs, el número de GPUs, etc.

En el caso de On Premise la plantilla del fichero de configuración se puede descargar de aquí: Plantilla On Premise

Vamos a ir explicando como quedan las partes principales del fichero en nuestro cluster (ray_cluster.yaml):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# A unique identifier for the head node and workers of this cluster.
cluster_name: default

# Running Ray in Docker images is optional (this docker section can be commented out).
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled. Assumes Docker is installed.
docker:
    #image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and 
    #        want a faster startup
    #image: "rayapps:1.0.0"
    image: "rayproject/ray-ml"
    # image: rayproject/ray:latest-gpu   # use this one if you don't need ML dependencies, it's faster to pull
    container_name: "ray_container"
    # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
    # if no cached version is present.
    pull_before_run: False
    run_options:   # Extra options to pass into "docker run"
        - --ulimit nofile=65536:65536
    head_run_options:
      - --cpus=6
      - --shm-size=1g
    worker_run_options:
      - --cpus=6
      - --shm-size=1g    

Arriba se puede ver como configuramos el cluster para que se ejecute en contenedores Docker. En nuestro caso hemos usado la imagen de Docker rayproject/ray-ml que ya tiene todas las librerías de Machine Learning necesarias para ejecutar los modelos de Hugging Face, PyTorch, TensorFlow, etc. También se podría utilizar la imaen rayproject/ray que es más ligera y después ir instalando las las librerías de Machine Learning que necesitemos (pero somos un poco vagos).

Configuramos los recusos del nodo head y del nodo worker con 6 cpus y 1g de memoria compartida.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
provider:
    type: local
    head_ip: 1.1.1.1
    # You may need to supply a public ip for the head node if you need
    # to run `ray up` from outside of the Ray cluster's network
    # (e.g. the cluster is in an AWS VPC and you're starting ray from your laptop)
    # This is useful when debugging the local node provider with cloud VMs.
    # external_head_ip:
    worker_ips: [ 2.2.2.2 ]
    # Optional when running automatic cluster management on prem. If you use a coordinator server,
    # then you can launch multiple autoscaling clusters on the same set of machines, and the coordinator
    # will assign individual nodes to clusters as needed.
    #    coordinator_address: "<host>:<port>"

# How Ray will authenticate with newly launched nodes.
auth:
    ssh_user: taniwa
    # You can comment out `ssh_private_key` if the following machines don't need a private key for SSH access to the Ray
    # cluster:
    #   (1) The machine on which `ray up` is executed.
    #   (2) The head node of the Ray cluster.
    #
    # The machine that runs ray up executes SSH commands to set up the Ray head node. The Ray head node subsequently
    # executes SSH commands to set up the Ray worker nodes. When you run ray up, ssh credentials sitting on the ray up
    # machine are copied to the head node -- internally, the ssh key is added to the list of file mounts to rsync to head node.
    ssh_private_key: /home/ray/.ssh/test_ray    

En la sección anterior se especifica que el tipo de proveedor es local (on premise), se especifican las ips de los nodos head y worker y el usuario/clave privada para acceder a través de ssh a los nodos.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
# Typically, min_workers == max_workers == len(worker_ips).
# This field is optional.
min_workers: 1

# The maximum number of workers nodes to launch in addition to the head node.
# This takes precedence over min_workers.
# Typically, min_workers == max_workers == len(worker_ips).
# This field is optional.
max_workers: 1
# The default behavior for manually managed clusters is
# min_workers == max_workers == len(worker_ips),
# meaning that Ray is started on all available nodes of the cluster.
# For automatically managed clusters, max_workers is required and min_workers defaults to 0.

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0

idle_timeout_minutes: 5

Nosotros, de momento, no vamos a hacer uso del autoescalado y vamos a mantener siempre activos el nodo head y el nodo worker. En caso de que quisiéramos hacer uso del autoescalado en nuestras dos máquinas, pondríamos el min_workers a 0 y el max_workers a 1. Si el nodo worker estuviera inactivo durante 5 minutos (idle_timeout_minutes), el autoscaler lo pararía.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH. E.g. you could save your conda env to an environment.yaml file, mount
# that directory to all nodes and call `conda -n my_env -f /path1/on/remote/machine/environment.yaml`. In this
# example paths on all nodes must be the same (so that conda can be called always with the same argument)
file_mounts: {
    "/tmp/apps": "/workspaces/taniray/apps",
#    "/path1/on/remote/machine": "/path1/on/local/machine",
#    "/path2/on/remote/machine": "/path2/on/local/machine",
}

# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False

# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
    - "**/.git"
    - "**/.git/**"

# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
    - ".gitignore"

Esta sección se usa para copiar ficheros o directorios desde la máquina local a los nodos head y worker. En nuestro caso montamos el directorio apps que contiene los ficheros necesarios para desplegar las aplicaciones (Ray Serve) de entrenamiento de los modelos de machine learning y de servicio de dichos modelos, como veremos más adelante.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []

# List of shell commands to run to set up each nodes.
setup_commands: [
    "cp -r /tmp/apps/* /home/ray/",
    "cp /tmp/apps/.env /home/ray/",
    "pip install -r /tmp/apps/requirements.txt",
    ]
    # If we have e.g. conda dependencies stored in "/path1/on/local/machine/environment.yaml", we can prepare the
    # work environment on each worker by:
    #   1. making sure each worker has access to this file i.e. see the `file_mounts` section
    #   2. adding a command here that creates a new conda environment on each node or if the environment already exists,
    #     it updates it:
    #      conda env create -q -n my_venv -f /path1/on/local/machine/environment.yaml || 
    #      conda env update -q -n my_venv -f /path1/on/local/machine/environment.yaml
    #
    # Ray developers:
    # you probably want to create a Docker image that
    # has your Ray repo pre-cloned. Then, you can replace the pip installs
    # below with a git checkout <your_sha> (and possibly a recompile).
    # To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image
    # that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line:
    # - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.
    #dev0-cp37-cp37m-manylinux2014_x86_64.whl"

# Custom commands that will be run on the head node after common setup.
head_setup_commands: []

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

Aquí se han especificado los comandos que se ejecutarán en los nodos head y worker al desplegarlos (o actualizarlos) y que preparan el entorno de ejecución. En nuestro caso copiamos los ficheros de la carpeta apps que hemos montado antes a la carpeta /home/ray y ejecutamos el pip install de los requerimientos.

La razón de copiar los ficheros de apps a la carpeta /home/ray/ se debe a que en las aplicaciones y jobs que se ejecuten en el cluster no hemos conseguido evitar que tengan que estar en ese directorio para poder lanzarse. (lo investigaremos más adelante…seguramente)

Cuando se van desarrollando nuevas aplicaciones de Machine Learning y necesitan nuevas librerías de Python que no están en el entorno de Python actualmente desplegado, se añaden al fichero requirements.txt y se lanza una actualización del cluster que las instalará automáticamente en el nodo head y en todos los workers.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
  # If we have e.g. conda dependencies, we could create on each node a conda environment (see `setup_commands` section).
  # In that case we'd have to activate that env on each node before running `ray`:
  # - conda activate my_venv && ray stop
  # - conda activate my_venv && ulimit -c unlimited && ray start --head --port=6379 --autoscaling-config=~/
  # ray_bootstrap_config.yaml
    - ray stop
    - ulimit -c unlimited && ray start --head --port=10000 --ray-client-server-port=10001 --dashboard-grpc-port=10004
    --dashboard-port=10005 --node-manager-port=10006 --object-manager-port=10007 --runtime-env-agent-port=10008
    --dashboard-agent-grpc-port=10009 --dashboard-agent-listen-port=10010 --metrics-export-port=10011
    --min-worker-port=10012 --max-worker-port=10050 --autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
  # If we have e.g. conda dependencies, we could create on each node a conda environment (see `setup_commands` section).
  # In that case we'd have to activate that env on each node before running `ray`:
  # - conda activate my_venv && ray stop
  # - ray start --address=$RAY_HEAD_IP:6379
    - ray stop
    - ray start --node-manager-port=10006 --object-manager-port=10007 --runtime-env-agent-port=10008
    --dashboard-agent-grpc-port=10009 --dashboard-agent-listen-port=10010 --metrics-export-port=10011 
    --min-worker-port=10012 --max-worker-port=10100 --address=1.1.1.1:10000

Por último se configura el lanzamiento, dentro de los contenedores de Docker, de los comandos para arrancar el nodo head y el nodo worker. En el caso del nodo head se arranca con el comando ray start –head y en el caso del nodo worker con el comando ray start –address=1.1.1.11:10000 para que se conecte al nodo head.

El tema de especificar en el lanzamiento los puertos de cada uno de los servicios de Ray en vez de dejar los de por defecto, es porque en nuestra arquitectura las máquinas no se encuentran en una Red Privada (trabajo futuro…) y cada una se encuentra detrás de un firewall en el que tenemos que abrir los puertos que necesitan todos los componentes de ray.

Por eso específicamos todos los puertos entre 10000 y 10100 para que estén en un rango controlado.

Los puertos a destacar son:

  • 10000: Puerto de comunicación entre los nodos workers y el nodo head.
  • 10005: Puerto de la interfaz web de monitorización del cluster de Ray.
  • min-worker-port y max-worker-port: Puerto de comunicación entre los nodos workers y el nodo head.

Destacar que es un poco confuso, pero el rango la configuración de los puertos workers es amplio porque no hace referencia al número de nodos workers que se pueden conectar al nodo head, sino a los puertos que se tienen que usar para la comunicación individual de cada uno de los procesos, tareas, trabajos, etc que se lanzan en el cluster.

Lanzamiento

Para iniciar/actualizar y configurar el cluster de Ray en nuestras máquinas On Premise ejecutamos en local el siguiente comando:

1
ray up ray_cluster.yaml

Para hacer un ssh tuneling a la interfaz web de monitorización del cluster de Ray que se está ejecutando en el puerto 10005 a nuestra máquina local el puerto por defecto (8265) hacemos

1
ray dashboard --remote-port 10005 ray_cluster.yaml

Con esto tunel también podemos desplegar las aplicaciones Ray Serve que lanzarán los trabajos de entrenamiento de los modelos de Machine Learning y los servicios de los mismos, nos metemos en el directorio local apps y lanzamos el comando:

1
server deploy app.yaml

Si ahora miramos el dashboard de monitorización del cluster veremos las aplicaciones desplegadas:

Nodos

Y por último, en caso de querer parar el cluster:

1
ray down ray_cluster.yaml

Entrenamiento de Modelos de Machine Learning

Hemos creado un servicio REST con Ray Serve que permite lanzar trabajos de entrenamiento de modelos de Machine Learning en el cluster de Ray.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
    @app.post("/procon")
    def train_procon(self, file: UploadFile = File(...)):
        # Process the uploaded file
        # Submit the job
        
        try:
            try:
                contents = file.file.read()
                # Generate a unique filename using UUID
                file_name = f"{str(uuid.uuid4())}_{file.filename}"

                # Create a file with the generated filename in the /tmp/data directory
                with open(os.path.join(f"{CLUSTER_WORK_DIR}/train/news/data/", file_name), 'wb') as f:
                    f.write(contents)
                                
            except Exception:
                raise HTTPException(status_code=500, detail='Error on uploading the file')
            finally:
                file.file.close()        
        
            job_id = self.client.submit_job(
                # Entrypoint shell command to execute
                entrypoint=f"python 
                  {CLUSTER_WORK_DIR}/train/news/beto_transformers_train_news_progresistas_vs_conservadores.py
                  {CLUSTER_WORK_DIR}/train/news/data/{file_name}",
                # Path to the local directory that contains the script.py file
                runtime_env={"working_dir": f"{CLUSTER_WORK_DIR}/train/news/data/"},
            )
            
            wait_until_status(self.client,job_id, {JobStatus.RUNNING}) 
            logs = self.client.get_job_logs(job_id)
            print(logs)
             
            return {"job_id": job_id}
        
        except Exception:
            raise HTTPException(status_code=500, detail='Something went wrong')
        finally:
            f.close() 

Destacar que usamos la biblioteca python SDK de Ray para lanzar el trabajo de entrenamiento en el cluster de Ray: client.submit_job, en el que le indicamos el script de Python que se tiene que ejecutar y un directorio de trabajo dónde en este caso estamos subiendo el dataset que se ha subido por la API REST y que se va a usar para entrenar el modelo.

Ray copia el dataset al nodo worker que tiene más recursos libres y lanza el trabajo.

Dashboard

Conclusión

En este post hemos vuelto a ver que es muy fácil deplegar un cluster de Ray on Premise, el cuál ya estamos usando en producción.

En él estamos utilizando servicios API REST con Ray Serve que nos permiten:

  • Realizar las ETLs de los datos que se van a usar para entrenar los modelos de Machine Learning.
  • Entrenar los modelos de Machine Learning.
  • Servir esos modelos de Machine Learning.

Todo de forma distribuida y paralela en un cluster de máquinas.

A ver que tal nos va funcionando …

SO WHAT DO YOU THINK ?

Contact us and tell us your needs
+34 644 237 135

Contact hola@taniwa.es