You need to enable JavaScript to run this app.
导航
Serverless Ray 使用指南
最近更新时间:2025.08.08 13:18:13首次发布时间:2024.10.11 20:30:27
复制全文
我的收藏
有用
有用
无用
无用

前期准备

将ray_demo.zip 上传到对应的TOS桶里。如下操作步骤中以 tos://emr/lei/demo/ray/ray_demo.zip为例。

rayjob_demo.zip
未知大小

可视化提交

操作步骤

  1. 进入 EMR Serverless 控制台>队列名称>队列详情>作业提交,选择RayJob开发类型。
    Image
  2. 编辑作业页面,填写作业信息,点击运行

参数名称

说明

镜像地址

  • 镜像名称中带有cu是GPU (cuda)镜像
  • 镜像名称中没有cu是CPU镜像

Image

资源文件

前期准备中的资源文件所在的TOS路径

代码入口

代码执行文件,例:python sleep.py

资源参数

Head CPU、Worker Mem、Worker CPU、Worker Mem、Worker Replicas等,以实际需求为准

PythonSDK 使用文档

安装 Python 开发环境

# 安装
wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
sh Miniconda3-latest-Linux-x86_64.sh

# 环境变量,/root/miniconda3/bin 以实际安装路径为准
vi .bashrc
export PATH="/root/miniconda3/bin:$PATH" 
source .bashrc


conda create --name python38 python=3.8
source activate python38

SDK 安装

SDK安装操作请参考:Python SDK 安装

创建 RayJob 任务

  • 提交任务

将如下代码保存为ray_demo.py文件,并执行 ** python3 ray_demo.py

说明

  • AK/SK 获取请参考:获取访问凭证
  • 请按实际情况替换region信息,当前支持区域如下:
    • cn-beijing
    • cn-shanghai
    • cn-guangzhou
    • ap-southeast-1
import time

from serverless import StaticCredentials
from serverless import ServerlessClient
from serverless import RayJobTask
from serverless import JobStatus
from serverless.exceptions import QuerySdkError

ak = ''
sk = ''
endpoint='open.volcengineapi.com'
service='emr_serverless'
region='cn-beijing'

def init():
    credentials = StaticCredentials(ak, sk)

    # 初始化client
    return ServerlessClient(credentials,
                            endpoint,
                            service,
                            region,
                            connection_timeout=30,
                            socket_timeout=30)


def submit():
    client = init()
    # 提交作业
    job = client.execute(task=RayJobTask(name="test rayjob task",
                                         conf={
                                             # 开启弹性伸缩
                                             "serverless.ray.enable.autoscaling": "true",
                                             # 最新worker数量
                                             "serverless.ray.worker.min.replicas": "5",
                                             # 最大worker数量
                                             "serverless.ray.worker.max.replicas": "20",
                                             "serverless.ray.version":"2.22.0"
                                             # 如需使用自定义镜像,添加该配置,替换镜像地址
                                             #"serverless.ray.image": "emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-ray/ray:XXX"
                                         },
                                         head_cpu='4',
                                         head_memory='16Gi',
                                         worker_cpu='4',
                                         worker_memory='16Gi',
                                         # 可以设置为最大worker数量,首次启动无需弹性,缩短作业启动阶段资源弹性时间
                                         worker_replicas=20,
                                         entrypoint_cmd='python sleep.py',
                                         # tos 文件路径
                                         entrypoint_resource='tos://emr/lei/demo/ray/ray_demo.zip',
                                         runtime_env={
                                             # 需要runtime引入的pip包
                                             "pip": ["s3fs"],
                                             "env_vars": {
                                                 "counter_name": "test_counter"
                                             }
                                         }
                                         # queue=xxx
                                         ),
                         is_sync=False)


    while not JobStatus.is_finished(job.status):
        job = client.get_job(job.id)
        print('Id: %s, Status: %s' % (job.id, job.status))
        try:
            print('Tracking ui: %s' % job.get_tracking_url())
        except QuerySdkError:
            pass
        time.sleep(3)

    print('The task executed successfully!!!')
  

    # 查看执行日志
    log_cursor = client.get_submission_log(job)
    while log_cursor.has_next():
        log_cursor.fetch_next_page()
        current_rows = log_cursor.current_rows
        for log_entry in current_rows:
            print(log_entry)


def get_ray_ui():
    client = init()
    job = client.get_job(280218916)
    print('start_time: %s' % job.__getattribute__("start_time"))
    print('end_time: %s' % job.__getattribute__("end_time"))
    print('status: %s' % job.__getattribute__("status"))
    print('queue_name: %s' % job.__getattribute__("queue_name"))
    print('conf: %s' % job.__getattribute__("conf"))
    print('tracking_url: %s' % job.get_tracking_url())


def cancel_job(job_id: str):
    client = init()
    client.cancel_job(client.get_job(job_id))


def query_submit_log(job_id: str):
    client = init()
    _log_cursor = client.get_submission_log(client.get_job(job_id))
    while _log_cursor.has_next():
        _log_cursor.fetch_next_page()
        current_rows = _log_cursor.current_rows
        for log_entry in current_rows:
            print(log_entry)

if __name__ == '__main__':
    submit()
    #query_log('281463004')
    #cancel_job('283371762')
    #query_submit_log('281463468')

弹性伸缩

import time

from serverless import StaticCredentials
from serverless import ServerlessClient
from serverless import RayJobTask
from serverless import JobStatus
from serverless.exceptions import QuerySdkError

ak = ''
sk = ''
endpoint='open.volcengineapi.com'
service='emr_serverless'
region='cn-beijing'


def init():
    credentials = StaticCredentials(ak, sk)

    # 初始化client
    return ServerlessClient(credentials,
                            endpoint,
                            service,
                            region,
                            connection_timeout=30,
                            socket_timeout=30)


def submit():
    client = init()
    # 提交作业
    job = client.execute(task=RayJobTask(name="test rayjob task",
                                         conf={
                                             # 开启弹性伸缩
                                             "serverless.ray.enable.autoscaling": "true",
                                             # 最小worker数量
                                             "serverless.ray.worker.min.replicas": "5",
                                             # 最大worker数量
                                              "serverless.ray.worker.max.replicas": "20",
                                             "serverless.ray.version":"2.22.0",
                                             # 如需使用自定义镜像,添加该配置,替换镜像地址
                                             "serverless.ray.image": "emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-ray/ray:XXX"
                                         },
                                         head_cpu='4',
                                         head_memory='16Gi',
                                         worker_cpu='4',
                                         worker_memory='16Gi',
                                         worker_replicas=1,
                                         # entrypoint_cmd 格式为 /home/ray/workdir/{zip包名}/{py文件名}
                                         # 其中 /home/ray/workdir 目前为固定值
                                         entrypoint_cmd='python /home/ray/workdir/ray_demo/sleep.py',
                                         # tos 文件路径
                                         entrypoint_resource='tos://emr/lei/demo/ray/ray_demo.zip',
                                         runtime_env={
                                             # 需要runtime引入的pip包
                                             "pip": ["s3fs"],
                                             "env_vars": {
                                                 "counter_name": "test_counter"
                                             }
                                         }
                                         # queue=xxx
                                         ),
                         is_sync=False)

    while not JobStatus.is_finished(job.status):
        job = client.get_job(job.id)
        print('Id: %s, Status: %s' % (job.id, job.status))
        try:
            print('Tracking ui: %s' % job.get_tracking_url())
        except QuerySdkError:
            pass
        time.sleep(3)

    print('The task executed successfully!!!')

    # 查看执行日志
    log_cursor = client.get_submission_log(job)
    while log_cursor.has_next():
        log_cursor.fetch_next_page()
        current_rows = log_cursor.current_rows
        for log_entry in current_rows:
            print(log_entry)


def get_ray_ui():
    client = init()
    job = client.get_job(280218916)
    print('start_time: %s' % job.__getattribute__("start_time"))
    print('end_time: %s' % job.__getattribute__("end_time"))
    print('status: %s' % job.__getattribute__("status"))
    print('queue_name: %s' % job.__getattribute__("queue_name"))
    print('conf: %s' % job.__getattribute__("conf"))
    print('tracking_url: %s' % job.get_tracking_url())


def cancel_job(job_id: str):
    client = init()
    client.cancel_job(client.get_job(job_id))


def query_submit_log(job_id: str):
    client = init()
    _log_cursor = client.get_submission_log(client.get_job(job_id))
    while _log_cursor.has_next():
        _log_cursor.fetch_next_page()
        current_rows = _log_cursor.current_rows
        for log_entry in current_rows:
            print(log_entry)

if __name__ == '__main__':
    submit()
    #query_log('281463004')
    #cancel_job('283371762')
    #query_submit_log('281463468')

提交到已有 RayCluster

  1. 在独占队列创建 Ray 计算组(即 RayCluster)。
  2. 将以下提交参数中 Ray 计算组名替换为实际名称。
import time

from serverless import StaticCredentials
from serverless import ServerlessClient
from serverless import RayJobTask
from serverless import JobStatus
from serverless.exceptions import QuerySdkError

ak = ''
sk = ''
endpoint='open.volcengineapi.com'
service='emr_serverless'
region='cn-beijing'

def init():
    credentials = StaticCredentials(ak, sk)

    # 初始化client
    return ServerlessClient(credentials,
                            endpoint,
                            service,
                            region,
                            connection_timeout=30,
                            socket_timeout=30)


def submit():
    client = init()
    # 提交作业
    job = client.execute(task=RayJobTask(name="test rayjob task",
                                         conf={
                                            # 选择已有的rayCluster 计算组
                                            "serverless.compute.group.name": "wray"
                                         },
                                         # 执行命令
                                         entrypoint_cmd='python test.py',
                                         # tos 文件路径
                                         entrypoint_resource='tos://wbw-dev/kehu/demo/demo.zip',
                                         runtime_env={
                                             # 需要runtime引入的pip包
                                             "pip": ["s3fs"],
                                             "env_vars": {
                                                 "counter_name": "test_counter"
                                             }
                                         }
                                         ## queue="test_queue"
                                         ),
                         is_sync=False)

    while not JobStatus.is_finished(job.status):
        job = client.get_job(job.id)
        print('Id: %s, Status: %s' % (job.id, job.status))
        try:
            print('Tracking ui: %s' % job.get_tracking_url())
        except QuerySdkError:
            pass
        time.sleep(3)

    print('The task executed successfully!!!')

    # 查看执行日志
    log_cursor = client.get_submission_log(job)
    while log_cursor.has_next():
        log_cursor.fetch_next_page()
        current_rows = log_cursor.current_rows
        for log_entry in current_rows:
            print(log_entry)


def get_ray_ui():
    client = init()
    job = client.get_job(280218916)
    print('start_time: %s' % job.__getattribute__("start_time"))
    print('end_time: %s' % job.__getattribute__("end_time"))
    print('status: %s' % job.__getattribute__("status"))
    print('queue_name: %s' % job.__getattribute__("queue_name"))
    print('conf: %s' % job.__getattribute__("conf"))
    print('tracking_url: %s' % job.get_tracking_url())


def cancel_job(job_id: str):
    client = init()
    client.cancel_job(client.get_job(job_id))


def query_submit_log(job_id: str):
    client = init()
    _log_cursor = client.get_submission_log(client.get_job(job_id))
    while _log_cursor.has_next():
        _log_cursor.fetch_next_page()
        current_rows = _log_cursor.current_rows
        for log_entry in current_rows:
            print(log_entry)

if __name__ == '__main__':
    submit()
    # query_log('500877044')
    # cancel_job('500877044')
    # query_submit_log('500877044')