将ray_demo.zip 上传到对应的TOS桶里。如下操作步骤中以 tos://emr/lei/demo/ray/ray_demo.zip为例。
参数名称 | 说明 |
|---|---|
镜像地址 |
|
资源文件 | 前期准备中的资源文件所在的TOS路径 |
代码入口 | 代码执行文件,例:python sleep.py |
资源参数 | Head CPU、Worker Mem、Worker CPU、Worker Mem、Worker Replicas等,以实际需求为准 |
# 安装 wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh sh Miniconda3-latest-Linux-x86_64.sh # 环境变量,/root/miniconda3/bin 以实际安装路径为准 vi .bashrc export PATH="/root/miniconda3/bin:$PATH" source .bashrc conda create --name python38 python=3.8 source activate python38
SDK安装操作请参考:Python SDK 安装。
将如下代码保存为ray_demo.py文件,并执行 ** python3 ray_demo.py。
说明
region信息,当前支持区域如下:
cn-beijingcn-shanghaicn-guangzhouap-southeast-1import time from serverless import StaticCredentials from serverless import ServerlessClient from serverless import RayJobTask from serverless import JobStatus from serverless.exceptions import QuerySdkError ak = '' sk = '' endpoint='open.volcengineapi.com' service='emr_serverless' region='cn-beijing' def init(): credentials = StaticCredentials(ak, sk) # 初始化client return ServerlessClient(credentials, endpoint, service, region, connection_timeout=30, socket_timeout=30) def submit(): client = init() # 提交作业 job = client.execute(task=RayJobTask(name="test rayjob task", conf={ # 开启弹性伸缩 "serverless.ray.enable.autoscaling": "true", # 最新worker数量 "serverless.ray.worker.min.replicas": "5", # 最大worker数量 "serverless.ray.worker.max.replicas": "20", "serverless.ray.version":"2.22.0" # 如需使用自定义镜像,添加该配置,替换镜像地址 #"serverless.ray.image": "emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-ray/ray:XXX" }, head_cpu='4', head_memory='16Gi', worker_cpu='4', worker_memory='16Gi', # 可以设置为最大worker数量,首次启动无需弹性,缩短作业启动阶段资源弹性时间 worker_replicas=20, entrypoint_cmd='python sleep.py', # tos 文件路径 entrypoint_resource='tos://emr/lei/demo/ray/ray_demo.zip', runtime_env={ # 需要runtime引入的pip包 "pip": ["s3fs"], "env_vars": { "counter_name": "test_counter" } } # queue=xxx ), is_sync=False) while not JobStatus.is_finished(job.status): job = client.get_job(job.id) print('Id: %s, Status: %s' % (job.id, job.status)) try: print('Tracking ui: %s' % job.get_tracking_url()) except QuerySdkError: pass time.sleep(3) print('The task executed successfully!!!') # 查看执行日志 log_cursor = client.get_submission_log(job) while log_cursor.has_next(): log_cursor.fetch_next_page() current_rows = log_cursor.current_rows for log_entry in current_rows: print(log_entry) def get_ray_ui(): client = init() job = client.get_job(280218916) print('start_time: %s' % job.__getattribute__("start_time")) print('end_time: %s' % job.__getattribute__("end_time")) print('status: %s' % job.__getattribute__("status")) print('queue_name: %s' % job.__getattribute__("queue_name")) print('conf: %s' % job.__getattribute__("conf")) print('tracking_url: %s' % job.get_tracking_url()) def cancel_job(job_id: str): client = init() client.cancel_job(client.get_job(job_id)) def query_submit_log(job_id: str): client = init() _log_cursor = client.get_submission_log(client.get_job(job_id)) while _log_cursor.has_next(): _log_cursor.fetch_next_page() current_rows = _log_cursor.current_rows for log_entry in current_rows: print(log_entry) if __name__ == '__main__': submit() #query_log('281463004') #cancel_job('283371762') #query_submit_log('281463468')
import time from serverless import StaticCredentials from serverless import ServerlessClient from serverless import RayJobTask from serverless import JobStatus from serverless.exceptions import QuerySdkError ak = '' sk = '' endpoint='open.volcengineapi.com' service='emr_serverless' region='cn-beijing' def init(): credentials = StaticCredentials(ak, sk) # 初始化client return ServerlessClient(credentials, endpoint, service, region, connection_timeout=30, socket_timeout=30) def submit(): client = init() # 提交作业 job = client.execute(task=RayJobTask(name="test rayjob task", conf={ # 开启弹性伸缩 "serverless.ray.enable.autoscaling": "true", # 最小worker数量 "serverless.ray.worker.min.replicas": "5", # 最大worker数量 "serverless.ray.worker.max.replicas": "20", "serverless.ray.version":"2.22.0", # 如需使用自定义镜像,添加该配置,替换镜像地址 "serverless.ray.image": "emr-serverless-online-cn-beijing.cr.volces.com/emr-serverless-ray/ray:XXX" }, head_cpu='4', head_memory='16Gi', worker_cpu='4', worker_memory='16Gi', worker_replicas=1, # entrypoint_cmd 格式为 /home/ray/workdir/{zip包名}/{py文件名} # 其中 /home/ray/workdir 目前为固定值 entrypoint_cmd='python /home/ray/workdir/ray_demo/sleep.py', # tos 文件路径 entrypoint_resource='tos://emr/lei/demo/ray/ray_demo.zip', runtime_env={ # 需要runtime引入的pip包 "pip": ["s3fs"], "env_vars": { "counter_name": "test_counter" } } # queue=xxx ), is_sync=False) while not JobStatus.is_finished(job.status): job = client.get_job(job.id) print('Id: %s, Status: %s' % (job.id, job.status)) try: print('Tracking ui: %s' % job.get_tracking_url()) except QuerySdkError: pass time.sleep(3) print('The task executed successfully!!!') # 查看执行日志 log_cursor = client.get_submission_log(job) while log_cursor.has_next(): log_cursor.fetch_next_page() current_rows = log_cursor.current_rows for log_entry in current_rows: print(log_entry) def get_ray_ui(): client = init() job = client.get_job(280218916) print('start_time: %s' % job.__getattribute__("start_time")) print('end_time: %s' % job.__getattribute__("end_time")) print('status: %s' % job.__getattribute__("status")) print('queue_name: %s' % job.__getattribute__("queue_name")) print('conf: %s' % job.__getattribute__("conf")) print('tracking_url: %s' % job.get_tracking_url()) def cancel_job(job_id: str): client = init() client.cancel_job(client.get_job(job_id)) def query_submit_log(job_id: str): client = init() _log_cursor = client.get_submission_log(client.get_job(job_id)) while _log_cursor.has_next(): _log_cursor.fetch_next_page() current_rows = _log_cursor.current_rows for log_entry in current_rows: print(log_entry) if __name__ == '__main__': submit() #query_log('281463004') #cancel_job('283371762') #query_submit_log('281463468')
import time from serverless import StaticCredentials from serverless import ServerlessClient from serverless import RayJobTask from serverless import JobStatus from serverless.exceptions import QuerySdkError ak = '' sk = '' endpoint='open.volcengineapi.com' service='emr_serverless' region='cn-beijing' def init(): credentials = StaticCredentials(ak, sk) # 初始化client return ServerlessClient(credentials, endpoint, service, region, connection_timeout=30, socket_timeout=30) def submit(): client = init() # 提交作业 job = client.execute(task=RayJobTask(name="test rayjob task", conf={ # 选择已有的rayCluster 计算组 "serverless.compute.group.name": "wray" }, # 执行命令 entrypoint_cmd='python test.py', # tos 文件路径 entrypoint_resource='tos://wbw-dev/kehu/demo/demo.zip', runtime_env={ # 需要runtime引入的pip包 "pip": ["s3fs"], "env_vars": { "counter_name": "test_counter" } } ## queue="test_queue" ), is_sync=False) while not JobStatus.is_finished(job.status): job = client.get_job(job.id) print('Id: %s, Status: %s' % (job.id, job.status)) try: print('Tracking ui: %s' % job.get_tracking_url()) except QuerySdkError: pass time.sleep(3) print('The task executed successfully!!!') # 查看执行日志 log_cursor = client.get_submission_log(job) while log_cursor.has_next(): log_cursor.fetch_next_page() current_rows = log_cursor.current_rows for log_entry in current_rows: print(log_entry) def get_ray_ui(): client = init() job = client.get_job(280218916) print('start_time: %s' % job.__getattribute__("start_time")) print('end_time: %s' % job.__getattribute__("end_time")) print('status: %s' % job.__getattribute__("status")) print('queue_name: %s' % job.__getattribute__("queue_name")) print('conf: %s' % job.__getattribute__("conf")) print('tracking_url: %s' % job.get_tracking_url()) def cancel_job(job_id: str): client = init() client.cancel_job(client.get_job(job_id)) def query_submit_log(job_id: str): client = init() _log_cursor = client.get_submission_log(client.get_job(job_id)) while _log_cursor.has_next(): _log_cursor.fetch_next_page() current_rows = _log_cursor.current_rows for log_entry in current_rows: print(log_entry) if __name__ == '__main__': submit() # query_log('500877044') # cancel_job('500877044') # query_submit_log('500877044')