You need to enable JavaScript to run this app.
E-MapReduce

E-MapReduce

复制全文
最佳实践
将传统的 Python 程序改造为 Ray 程序的实践指南
复制全文
将传统的 Python 程序改造为 Ray 程序的实践指南

Ray 是一个灵活的分布式计算框架,支持并行任务和基于 Actor 模型的异步执行。通过将传统的 Python 程序改造为 Ray 程序,可以充分利用 Ray 的并行和分布式计算能力,提高程序的性能和扩展性。本文将详细介绍如何将单机 Python 程序和多进程程序改造为Ray程序。

Ray API介绍

API

描述

示例

ray.init()

初始化Ray上下文。

@ray.remote

函数或类的装饰器,指定函数将作为task执行以及类将作为actor,在不同的进程中执行。

@ray.remote
class Actor(object):
    def method(y)
        ...                                     …

.remote

用于每个远程函数、远程类声明或远程类方法调用。远程操作是异步的。

ret_id = fun.remote(x)
a = Actor.remote()
ret_id = a.method.remote(y)

ray.put()

将对象存储在object store中,并返回其ID。此ID可用于将对象作为参数传递给任何远程函数或方法调用。这是一个同步操作。

x_id = ray.put(x)

ray.get()

从对象ID或对象ID列表中返回一个对象或对象列表。这是一个同步(即阻塞)操作。

x = ray.get(x_id)
…
objects = ray.get(object_ids)

ray.wait()

从对象ID列表中返回:已准备好的对象ID列表,和尚未准备好的对象ID列表。

ready_ids, not_ready_ids = ray.wait(object_ids)

将传统Python程序改造为Ray程序场景示例

准备工作

部署Ray的环境。建议在EMR on VKE产品中部署Ray服务,参考EMR官网进行环境部署。也可以按照官网执行pip install ray方式进行手动部署。

单机 Python 程序改造为 Ray 程序

示例:
计算一组数字的平方。

传统单机 Python 程序

改造为 Ray 程序

def compute_square(x):
    return x * x

numbers = [1, 2, 3, 4, 5]
squares = [compute_square(x) for x in numbers]
print(squares)
import ray

# 初始化 Ray
ray.init()

# 将函数转换为 Ray 任务
@ray.remote
def compute_square(x):
    return x * x

numbers = [1, 2, 3, 4, 5]

# 异步调用 Ray 任务
futures = [compute_square.remote(x) for x in numbers]

# 获取任务结果
squares = ray.get(futures)
print(squares)

# 关闭 Ray
ray.shutdown()

改造步骤:

  1. 导入 Ray 并初始化 Ray 集群。
  2. 使用 @ray.remote 装饰器将函数转换为 Ray 任务。
  3. 使用 .remote() 方法调用 Ray 任务。
  4. 使用 ray.get() 获取任务结果。

多进程改造为 Ray 程序

示例:
并行处理多个数据块。

传统多进程 Python 程序

Ray 程序

import multiprocessing

def process_data(data):
    return [x * 2 for x in data]

data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

with multiprocessing.Pool(processes=3) as pool:
    results = pool.map(process_data, data_chunks)

print(results)
import ray

# 初始化 Ray
ray.init()

# 将函数转换为 Ray 任务
@ray.remote
def process_data(data):
    return [x * 2 for x in data]

data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

# 异步调用 Ray 任务
futures = [process_data.remote(chunk) for chunk in data_chunks]

# 获取任务结果
results = ray.get(futures)
print(results)

# 关闭 Ray
ray.shutdown()

改造为 Ray 程序的步骤

  1. 导入 Ray 并初始化 Ray 集群。
  2. 使用 @ray.remote 装饰器将函数转换为 Ray 任务。
  3. 使用 .remote() 方法调用 Ray 任务。
  4. 使用 ray.get() 获取任务结果。

改造为 Ray 程序的优势:

  • 高效的数据处理:
    • 在Ray提供有共享的对象存储,避免了Python multiprocessing在进程间传递大对象时的序列化和反序列化开销。
    • 对于有状态的计算:Ray提供了actor抽象,使得在分布式环境中封装和修改状态变得简单,而Python multiprocessing缺乏这种自然的并行化方式,导致实现复杂且性能低下。
  • 扩展性和容错性:
    • 分布式计算:Ray设计用于可扩展性,可以在单机、集群甚至云环境中运行相同的代码。它能够轻松扩展到数百甚至数千个节点。
    • 自动故障恢复:Ray工作负载自动从机器和进程故障中恢复,确保计算任务的可靠性和稳定性。

使用Ray Data模块改造pipeline链路

Ray data模块采用Dataset定义分布的数据集。当需要对数据集进行一系列的pipeline操作时,可以采用Ray data模型。
图片
示例:

multiprocessing的示例

Ray示例

import multiprocessing
import time


# 数据处理步骤 1
def process_step1(data):
    time.sleep(1)
    return [item * 2 for item in data]

# 数据处理步骤 2
def process_step2(data):
    time.sleep(1)
    return [item + 5 for item in data]

# 数据处理步骤 3
def process_step3(data):
    time.sleep(1)
    return [item * 3 for item in data]

if __name__ == '__main__':
    data = [i for i in range(10)]

    # 使用 multiprocessing 进行并行处理
    pool = multiprocessing.Pool()

    step1_results = pool.apply_async(process_step1, (data,))
    step2_input = step1_results.get()
    step2_results = pool.apply_async(process_step2, (step2_input,))
    step3_input = step2_results.get()
    final_results = pool.apply_async(process_step3, (step3_input,))

    pool.close()
    pool.join()

    print(final_results.get())
import ray
import time
from typing import Dict

ray.init()

# 数据处理步骤 1 data的类型为:Dict[str, int]
def process_step1(data):
    time.sleep(1)
    return {'id':[item * 2 for item in data['id']]}

# 数据处理步骤 2
def process_step2(data):
    time.sleep(1)
    return {'id':[item + 5 for item in data['id']]}

# 数据处理步骤 3
def process_step3(data):
    time.sleep(1)
    return {'id':[item * 3 for item in data['id']]}

if __name__ == '__main__':
    # 启动数据生成任务
    result = ray.data.range(10)\
        .map_batches(process_step1)\
        .map_batches(process_step2)\
        .map_batches(process_step3)\
        .take_all()

    # ray.data.Dataset可以带有schema元数据。
    # 这里返回结果{'id': 27}, {'id': 15}, {'id': 21}, {'id': 69}, {'id': 39}, {'id': 63}, {'id': 57}, {'id': 51}, {'id': 33}, {'id': 45}]
    print(result)

    ray.shutdown()

使用Ray API的注意事项

调用远程函数和方法

远程函数和方法调用是异步的,返回一个对象ID(ObjectID),而不是实际结果。要获取实际结果,需要使用ray.get()

# 异步调用
result_id = my_function.remote(5)

# 获取结果
result = ray.get(result_id)

延迟调用ray.get()

ray.get()是一个阻塞操作,会等待结果准备好。如果过早调用ray.get()会阻塞程序的其他操作,影响并行性。建议在调用所有任务后再调用ray.get()

# 不推荐的做法
results = [ray.get(my_function.remote(x)) for x in range(4)]

# 推荐的做法
result_ids = [my_function.remote(x) for x in range(4)]
results = ray.get(result_ids)

避免微小任务

每个远程任务调用都有一定的开销。如果任务非常小,Ray程序可能比等效的Python程序更慢。建议将任务设计为至少耗时几毫秒,以摊薄调用开销。

# 不推荐的做法
@ray.remote
def tiny_task(x):
    return x

# 推荐的做法
@ray.remote
def larger_task(start, end):
    return [x for x in range(start, end)]

使用ray.put()存储大对象

当将大对象作为参数传递给远程函数时,Ray会在后台调用ray.put()将对象存储在本地对象存储中。如果多次传递相同的大对象,建议显式调用ray.put()并传递其ID,以避免重复复制对象。

# 不推荐的做法
result_ids = [my_function.remote(large_object) for _ in range(10)]

# 推荐的做法
large_object_id = ray.put(large_object)
result_ids = [my_function.remote(large_object_id) for _ in range(10)]

使用ray.wait()尽早处理结果

如果对多个任务的结果使用ray.get(),需要等待最后一个任务完成,这会增加程序运行时间。建议使用ray.wait(),它会在任意一个对象准备好时返回,从而可以尽早处理结果。

result_ids = [my_function.remote(x) for x in range(4)]
while result_ids:
    done_id, result_ids = ray.wait(result_ids)
    result = ray.get(done_id[0])
    process_result(result)
最近更新时间:2024.09.04 20:01:42
这个页面对您有帮助吗?
有用
有用
无用
无用