Ray 是一个灵活的分布式计算框架,支持并行任务和基于 Actor 模型的异步执行。通过将传统的 Python 程序改造为 Ray 程序,可以充分利用 Ray 的并行和分布式计算能力,提高程序的性能和扩展性。本文将详细介绍如何将单机 Python 程序和多进程程序改造为Ray程序。
Ray API介绍 API
描述
示例
ray.init()
初始化Ray上下文。
@ray.remote
函数或类的装饰器,指定函数将作为task执行以及类将作为actor,在不同的进程中执行。
@ray.remote
class Actor(object):
def method(y)
... …
.remote
用于每个远程函数、远程类声明或远程类方法调用。远程操作是异步的。
ret_id = fun.remote(x)
a = Actor.remote()
ret_id = a.method.remote(y)
ray.put()
将对象存储在object store中,并返回其ID。此ID可用于将对象作为参数传递给任何远程函数或方法调用。这是一个同步操作。
x_id = ray.put(x)
ray.get()
从对象ID或对象ID列表中返回一个对象或对象列表。这是一个同步(即阻塞)操作。
x = ray.get(x_id)
…
objects = ray.get(object_ids)
ray.wait()
从对象ID列表中返回:已准备好的对象ID列表,和尚未准备好的对象ID列表。
ready_ids, not_ready_ids = ray.wait(object_ids)
将传统Python程序改造为Ray程序场景示例
准备工作 部署Ray的环境。建议在EMR on VKE产品中部署Ray服务,参考EMR官网 进行环境部署。也可以按照官网 执行pip install ray方式进行手动部署。
单机 Python 程序改造为 Ray 程序 示例:
计算一组数字的平方。
传统单机 Python 程序
改造为 Ray 程序
def compute_square(x):
return x * x
numbers = [1, 2, 3, 4, 5]
squares = [compute_square(x) for x in numbers]
print(squares)
import ray
# 初始化 Ray
ray.init()
# 将函数转换为 Ray 任务
@ray.remote
def compute_square(x):
return x * x
numbers = [1, 2, 3, 4, 5]
# 异步调用 Ray 任务
futures = [compute_square.remote(x) for x in numbers]
# 获取任务结果
squares = ray.get(futures)
print(squares)
# 关闭 Ray
ray.shutdown()
改造步骤:
导入 Ray 并初始化 Ray 集群。 使用 @ray.remote 装饰器将函数转换为 Ray 任务。 使用 .remote() 方法调用 Ray 任务。 使用 ray.get() 获取任务结果。
多进程改造为 Ray 程序 示例:
并行处理多个数据块。
传统多进程 Python 程序
Ray 程序
import multiprocessing
def process_data(data):
return [x * 2 for x in data]
data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
with multiprocessing.Pool(processes=3) as pool:
results = pool.map(process_data, data_chunks)
print(results)
import ray
# 初始化 Ray
ray.init()
# 将函数转换为 Ray 任务
@ray.remote
def process_data(data):
return [x * 2 for x in data]
data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
# 异步调用 Ray 任务
futures = [process_data.remote(chunk) for chunk in data_chunks]
# 获取任务结果
results = ray.get(futures)
print(results)
# 关闭 Ray
ray.shutdown()
改造为 Ray 程序的步骤 :
导入 Ray 并初始化 Ray 集群。 使用 @ray.remote 装饰器将函数转换为 Ray 任务。 使用 .remote() 方法调用 Ray 任务。 使用 ray.get() 获取任务结果。 改造为 Ray 程序的优势:
高效的数据处理:
在Ray提供有共享的对象存储,避免了Python multiprocessing在进程间传递大对象时的序列化和反序列化开销。 对于有状态的计算:Ray提供了actor抽象,使得在分布式环境中封装和修改状态变得简单,而Python multiprocessing缺乏这种自然的并行化方式,导致实现复杂且性能低下。 扩展性和容错性:
分布式计算:Ray设计用于可扩展性,可以在单机、集群甚至云环境中运行相同的代码。它能够轻松扩展到数百甚至数千个节点。 自动故障恢复:Ray工作负载自动从机器和进程故障中恢复,确保计算任务的可靠性和稳定性。
使用Ray Data模块改造pipeline链路 Ray data模块采用Dataset定义分布的数据集。当需要对数据集进行一系列的pipeline操作时,可以采用Ray data模型。
示例:
multiprocessing的示例
Ray示例
import multiprocessing
import time
# 数据处理步骤 1
def process_step1(data):
time.sleep(1)
return [item * 2 for item in data]
# 数据处理步骤 2
def process_step2(data):
time.sleep(1)
return [item + 5 for item in data]
# 数据处理步骤 3
def process_step3(data):
time.sleep(1)
return [item * 3 for item in data]
if __name__ == '__main__':
data = [i for i in range(10)]
# 使用 multiprocessing 进行并行处理
pool = multiprocessing.Pool()
step1_results = pool.apply_async(process_step1, (data,))
step2_input = step1_results.get()
step2_results = pool.apply_async(process_step2, (step2_input,))
step3_input = step2_results.get()
final_results = pool.apply_async(process_step3, (step3_input,))
pool.close()
pool.join()
print(final_results.get())
import ray
import time
from typing import Dict
ray.init()
# 数据处理步骤 1 data的类型为:Dict[str, int]
def process_step1(data):
time.sleep(1)
return {'id':[item * 2 for item in data['id']]}
# 数据处理步骤 2
def process_step2(data):
time.sleep(1)
return {'id':[item + 5 for item in data['id']]}
# 数据处理步骤 3
def process_step3(data):
time.sleep(1)
return {'id':[item * 3 for item in data['id']]}
if __name__ == '__main__':
# 启动数据生成任务
result = ray.data.range(10)\
.map_batches(process_step1)\
.map_batches(process_step2)\
.map_batches(process_step3)\
.take_all()
# ray.data.Dataset可以带有schema元数据。
# 这里返回结果{'id': 27}, {'id': 15}, {'id': 21}, {'id': 69}, {'id': 39}, {'id': 63}, {'id': 57}, {'id': 51}, {'id': 33}, {'id': 45}]
print(result)
ray.shutdown()
使用Ray API的注意事项
调用远程函数和方法 远程函数和方法调用是异步的,返回一个对象ID(ObjectID),而不是实际结果。要获取实际结果,需要使用ray.get()。
# 异步调用
result_id = my_function.remote(5)
# 获取结果
result = ray.get(result_id)
延迟调用ray.get() ray.get()是一个阻塞操作,会等待结果准备好。如果过早调用ray.get()会阻塞程序的其他操作,影响并行性。建议在调用所有任务后再调用ray.get()。
# 不推荐的做法
results = [ray.get(my_function.remote(x)) for x in range(4)]
# 推荐的做法
result_ids = [my_function.remote(x) for x in range(4)]
results = ray.get(result_ids)
避免微小任务 每个远程任务调用都有一定的开销。如果任务非常小,Ray程序可能比等效的Python程序更慢。建议将任务设计为至少耗时几毫秒,以摊薄调用开销。
# 不推荐的做法
@ray.remote
def tiny_task(x):
return x
# 推荐的做法
@ray.remote
def larger_task(start, end):
return [x for x in range(start, end)]
使用ray.put()存储大对象 当将大对象作为参数传递给远程函数时,Ray会在后台调用ray.put()将对象存储在本地对象存储中。如果多次传递相同的大对象,建议显式调用ray.put()并传递其ID,以避免重复复制对象。
# 不推荐的做法
result_ids = [my_function.remote(large_object) for _ in range(10)]
# 推荐的做法
large_object_id = ray.put(large_object)
result_ids = [my_function.remote(large_object_id) for _ in range(10)]
使用ray.wait()尽早处理结果 如果对多个任务的结果使用ray.get(),需要等待最后一个任务完成,这会增加程序运行时间。建议使用ray.wait(),它会在任意一个对象准备好时返回,从而可以尽早处理结果。
result_ids = [my_function.remote(x) for x in range(4)]
while result_ids:
done_id, result_ids = ray.wait(result_ids)
result = ray.get(done_id[0])
process_result(result)