You need to enable JavaScript to run this app.
导航
任务容错
最近更新时间:2025.04.22 20:03:33首次发布时间:2025.04.22 20:03:33
我的收藏
有用
有用
无用
无用

方案背景

用户使用 Ray 转换数据或者预处理数据到 lance 时,可能出现运行过程中 ray 任务失败的情况,导致部分数据未能完全完成提交。这种情况会给数据处理带来诸多不便和潜在问题。为了解决这一难题,我们并尝试提出了一些关于 Ray Core 作业提交失败重启时避免全量数据重复处理的业务处理方案,同时还深入研究了多个与 Lance 相关的数据处理场景和快捷使用方法。用户可以通过自定义用户代码,在业务层面记录处理过的文件,这样在恢复的时候,可以尽可能避免从头处理的情况。
关键要点包括:

  1. RayCore Checkpoint 机制实现:通过巧妙运用装饰器模式,创建了一种通用的 checkpoint 机制。在函数执行之前和之后,都会认真细致地检查和设置任务状态,确保任务的准确性和完整性。
  2. 目前仅支持 OneToOneOperator 操作:现阶段,仅对 OneToOneOperator 操作提供支持。利用 Redis 来记录已处理文件,然而对于涉及 repartition/shuffle 的操作,目前暂不支持。

这意味着我们在处理数据时需要特别注意操作的类型和范围,以避免不必要的错误和重复处理。同时,我们也将继续努力,不断完善和扩展支持的操作类型,提高数据处理的效率和灵活性。

方案架构

由于 checkpoint 的设计与业务的关联极为紧密,所以,我们务必从业务代码层面实施一些容错处理手段。我们能够结合 ray 处理作业的逻辑,在文件层面进行分割。
Image
具体而言,整体 Ray 架构图如上:在业务层面清晰地掌握最小分割点,从文件级别出发,充分利用外部的 redis 或者对象存储作为 checkpoint 存储,详细记录哪些文件已经被处理,哪些文件尚未被处理。同时,我们还需考虑到文件的大小、类型以及处理的优先级等因素,以确保 checkpoint 存储的高效性和准确性。此外,为了提高系统的可靠性和容错性,我们还可以设置定时备份机制,将 checkpoint 数据定期备份到其他存储介质中,以防数据丢失或损坏。
设计要点

  • 采用装饰器模式实现可插拔的 Checkpoint 中间件,以增强系统的灵活性和可扩展性。通过这种设计,能够在不修改原有代码的基础上,轻松地添加或替换中间件,满足不同的业务需求和场景。
  • 对状态存储层进行抽象,从业务角度考虑支持 Redis/TOS 作为存储介质。这样可以根据实际应用的性能要求、数据规模和成本预算等因素,灵活选择合适的存储方案,提高系统的适应性和效率。
  • 具备异常状态捕获与元数据持久化功能,当系统出现异常时,能够及时捕获并记录相关状态信息和元数据,以便进行故障排查和恢复,保障系统的稳定性和可靠性。
  • RayData 作业在发生提交失败重启时,无需全量数据重复处理。这一特性能够显著提高作业的执行效率和资源利用率。
    • 目前只支持 OneToOneOperator,如各种形式的 Map,包含 read, write, map, map_batches, limit, filter 等。这意味着在处理数据时,能够以一对一的方式进行操作,保证数据的准确性和一致性。
    • 该方案无法支持涉及 repartition/shuffle 的操作。这是由于这类操作的复杂性和资源消耗较大,需要进一步的研究和优化来实现支持。

代码样例

定义外部checkpoint存储
我们使用装饰器模式(Decorator pattern)实现一个通用的checkpoint机制,我们可以创建一个装饰器,这个装饰器在被装饰的函数执行之前检查是否已完成其任务,并在执行后设置其状态。这种方法可以应用于任何需要记录执行状态和支持幂等操作的功能。

import ray
import redis
import functools
import hashlib
import lance
import pyarrow as pa
# --------------------------
# Redis Checkpoint 核心模块
# --------------------------
redis_conn = redis.Redis(host='localhost', port=6379)
def checkpoint(func):
    """简化的Checkpoint装饰器"""
    @functools.wraps(func)
    def wrapper(file_info):
        # 生成唯一文件标识
        file_id = hashlib.md5(file_info['url'].encode()).hexdigest()
        key = f"checkpoint:{file_id}"
        
        # 检查是否已处理
        if redis_conn.exists(key):
            print(f"跳过已处理文件: {file_info['url']}")
            return None
            
        try:
            # 执行处理任务
            result = func(file_info)
            # 标记为已完成
            redis_conn.set(key, "processed", ex=86400)  # 24小时过期
            return result
        except Exception as e:
            print(f"处理失败: {file_info['url']} - {str(e)}")
            raise
            
    return wrapper

基于业务checkpoint处理数据

# --------------------------
# 数据处理核心逻辑
# --------------------------
@ray.remote(max_retries=2)
@checkpoint
def process_file(file_info):
    """文件处理主逻辑"""
    print(f"开始处理: {file_info['url']}")
    
    # 1. 读取文本文件
    data = []
    with open(file_info['url'], 'r') as f:
        for line in f:
            data.append({"text": line.strip().upper()})
    
    # 2. 转换为Arrow Table
    table = pa.Table.from_pylist(data)
    
    # 3. 写入Lance文件
    lance.write_dataset(table, file_info["output_path"],mode="append")
    print(f"成功写入: {file_info['output_path']}")
    return True

# --------------------------
# 任务调度
# --------------------------
def run_pipeline(files, output_dir):
    """任务入口函数"""
    futures = []
    for file_url in files:
        output_path = f"{output_dir}/{file_url.split('/')[-1]}.lance"
        futures.append(process_file.remote({
            "url": file_url,
            "output_path": output_path
        }))
    
    # 等待所有任务完成
    ray.get(futures)
    print("所有文件处理完成!")

# --------------------------
# 使用示例
# --------------------------
if __name__ == "__main__":
    # 初始化Ray
    ray.init()
    
    # 示例文件列表
    input_files = [
        "/data/input/file1.txt",
        "/data/input/file2.txt"
    ]
    
    # 执行处理
    run_pipeline(input_files, "/data/output")

整体的样例,主要给出一种基本的设计思想,其中:

  1. Checkpoint模块
  • 使用MD5哈希生成唯一文件标识
  • 通过Redis简单键值对记录处理状态
  • 自动跳过已处理文件
  1. 核心处理流程
  • 分步注释:读取 → 转换 → 写入
  • 直接使用lance.write_dataset简化写入逻辑
  • 保留必要重试机制(max_retries=2)
  • 使用原生Lance写入接口

生产设计测试时,可以联系EMR团队技术专家,进一步咨询幂等性处理和容错重试,来保障生产可用。