在分布式计算中,内存管理和负载均衡是两个关键问题。Ray 提供了一系列工具和方法来帮助开发者监控和优化内存使用,确保集群在高负载下仍能稳定运行。本文将介绍如何解决 Ray 集群的压力负载,以及如何定位和解决内存溢出的问题。
通过EMR管控平台提供Ray集群的监控,查看集群的资源使用情况:包括节点资源情况和Task的监控信息。
示例
ray memory
命令也可以采用ray memory
命令查看内存使用情况的详细报告:
该命令会输出当前对象存储中的对象及其内存使用情况,帮助识别哪些对象占用了大量内存。
Ray 会在日志中记录内存使用情况。如果某个任务因为内存不足而失败,也可以在日志中找到相关信息。
日志文件通常位于 /tmp/ray/session_latest/logs
目录下或/var/log/emr/ray/session_latest/logs
。
也可以在Ray dashboard界面上查看日志。
在 Ray 集群中,是有一个Head和多个worker节点组成,其中:Head 节点扮演着至关重要的角色。它不仅负责集群的管理和协调,还承担了许多关键功能:集群管理、任务调度、日志和监控等等。如果 Head 节点出现故障,整个集群的运行将受到严重影响。
因此在大规模集群中,建议将 Head 节点专用于集群管理任务,而不用于执行计算任务,即在Head节点上避免启动worker Process。通过在启动Ray Head的参数中添加“ num-cpus: '0'
”,将会阻止具有非零 CPU 需求的 Ray 工作负载被调度到Head中。
示例
以RayJob的方式为例,下面对应的Yaml文件,在启动Head时配置“num-cpus: '0'
”:
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-sample
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /$1
spec:
entrypoint: python /home/ray/samples/sample_code.py
# rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
rayClusterSpec:
rayVersion: '2.30.0' # should match the Ray version in the image of the containers
# Ray Head pod template
HeadGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
num-cpus: '0'
template:
spec:
containers:
- name: ray-Head
image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.30.0-py3.11-ubuntu20.04-207-1.5.0
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265 # Ray dashboard
name: dashboard
- containerPort: 10001
name: client
resources:
limits:
cpu: "5"
memory: 20Gi
requests:
cpu: "5"
memory: 20Gi
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
volumes:
- name: code-sample
configMap:
# Provide the name of the ConfigMap you want to mount.
name: ray-job-code-sample
# An array of keys from the ConfigMap to create as files
items:
- key: sample_code.py
path: sample_code.py
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
minReplicas: 1
maxReplicas: 5
# logical group name, for this called small-group, also can be functional
groupName: small-group
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
rayStartParams: {}
#pod template
template:
spec:
containers:
- name: ray-worker
image: emr-vke-public-cn-beijing.cr.volces.com/emr/ray:2.30.0-py3.11-ubuntu20.04-207-1.5.0
lifecycle:
preStop:
exec:
command: [ "/bin/sh","-c","ray stop" ]
resources:
limits:
cpu: "5"
memory: 20Gi
requests:
cpu: "5"
memory: 20Gi
---
apiVersion: v1
kind: ConfigMap
metadata:
name: ray-job-code-sample
data:
sample_code.py: |
import ray
import time
def map1(n):
time.sleep(1)
return n
def column_udf_class(col, udf):
class UDFClass:
def __call__(self, row):
return {col: udf(row[col])}
return UDFClass
ray.data.range(20).map_batches(
column_udf_class("id", map1),
concurrency=(10),
num_cpus = 0.5
).show()
该用例中,执行的作业脚本/home/ray/samples/sample_code.py
中,启动10个task作业。在Ray的UI中可以看到这10个作业都在worker node上运行。
在高并发环境下,多个任务同时运行会占用大量的内存资源,可能导致内存不足问题。通过减少并发度,可以降低同时运行的任务数量,从而减少内存占用,避免 OOM 问题。
配置num_cpus
参数。
通过增加ray.remote()中的 num_cpus
参数,可以控制任务的并发度。增加 CPU 要求可以减少同时运行的任务数量,从而降低内存使用。
示例
import ray
ray.init()
# 定义一个内存密集型任务,并增加 num_cpus 参数
@ray.remote(num_cpus=2) # 每个任务需要 2 个 CPU
def memory_intensive_task():
# 模拟内存密集型操作
large_array = [0] * (10**8)
return len(large_array)
# 提交多个任务
tasks = [memory_intensive_task.remote() for _ in range(20)]
# 获取结果
results = ray.get(tasks)
print(results)
ray.data模块,有些api可以通过配置concurrency调整并发度。
**示例 **
ray.data.Dataset.map_batches
import ray
import time
# 初始化 Ray
ray.init()
# 定义两个模拟的 UDF(用户定义函数),每个函数都会睡眠 1 秒钟
def map1(n):
time.sleep(1)
return n
def map2(n):
time.sleep(1)
return n
# 定义一个类,用于将列名和 UDF 结合起来
def column_udf_class(col, udf):
class UDFClass:
def __call__(self, row):
return {col: udf(row[col])}
return UDFClass
# 创建一个包含 1000 个整数的 Ray Dataset
dataset = ray.data.range(1000)
# 使用 map_batches 处理数据集,并设置并发度
processed_dataset = (
dataset.map_batches(
column_udf_class("id", map1),
concurrency=2, # 设置并发度为 2
)
.map_batches(
column_udf_class("id", map2),
concurrency=3, # 设置并发度为 3
)
)
# 显示处理后的数据集
processed_dataset.show()
在启动 Ray 集群时通过调大worker和Head的内存,,可以减少内存不足的风险。特别是在处理大规模数据集或内存密集型任务时,增加内存配置可以显著提高任务的成功率和集群的稳定性。
在任务完成后,及时删除不再需要的对象,以减少对象存储的内存使用。可以使用 ray._private.internal_api.free
函数来手动删除不再需要的对象。
示例
import numpy as np
import ray
ray.init()
@ray.remote
def create_large_object():
return np.zeros(1000000)
object_id = create_large_object.remote()
# 使用对象
result = ray.get(object_id)
# 删除对象
ray._private.internal_api.free([object_id])