用户使用 Ray 转换数据或者预处理数据到 lance 时,可能出现运行过程中 ray 任务失败的情况,导致部分数据未能完全完成提交。这种情况会给数据处理带来诸多不便和潜在问题。为了解决这一难题,我们并尝试提出了一些关于 Ray Core 作业提交失败重启时避免全量数据重复处理的业务处理方案,同时还深入研究了多个与 Lance 相关的数据处理场景和快捷使用方法。用户可以通过自定义用户代码,在业务层面记录处理过的文件,这样在恢复的时候,可以尽可能避免从头处理的情况。
关键要点包括:
这意味着我们在处理数据时需要特别注意操作的类型和范围,以避免不必要的错误和重复处理。同时,我们也将继续努力,不断完善和扩展支持的操作类型,提高数据处理的效率和灵活性。
由于 checkpoint 的设计与业务的关联极为紧密,所以,我们务必从业务代码层面实施一些容错处理手段。我们能够结合 ray 处理作业的逻辑,在文件层面进行分割。
具体而言,整体 Ray 架构图如上:在业务层面清晰地掌握最小分割点,从文件级别出发,充分利用外部的 redis 或者对象存储作为 checkpoint 存储,详细记录哪些文件已经被处理,哪些文件尚未被处理。同时,我们还需考虑到文件的大小、类型以及处理的优先级等因素,以确保 checkpoint 存储的高效性和准确性。此外,为了提高系统的可靠性和容错性,我们还可以设置定时备份机制,将 checkpoint 数据定期备份到其他存储介质中,以防数据丢失或损坏。
设计要点
定义外部checkpoint存储
我们使用装饰器模式(Decorator pattern)实现一个通用的checkpoint机制,我们可以创建一个装饰器,这个装饰器在被装饰的函数执行之前检查是否已完成其任务,并在执行后设置其状态。这种方法可以应用于任何需要记录执行状态和支持幂等操作的功能。
import ray import redis import functools import hashlib import lance import pyarrow as pa # -------------------------- # Redis Checkpoint 核心模块 # -------------------------- redis_conn = redis.Redis(host='localhost', port=6379) def checkpoint(func): """简化的Checkpoint装饰器""" @functools.wraps(func) def wrapper(file_info): # 生成唯一文件标识 file_id = hashlib.md5(file_info['url'].encode()).hexdigest() key = f"checkpoint:{file_id}" # 检查是否已处理 if redis_conn.exists(key): print(f"跳过已处理文件: {file_info['url']}") return None try: # 执行处理任务 result = func(file_info) # 标记为已完成 redis_conn.set(key, "processed", ex=86400) # 24小时过期 return result except Exception as e: print(f"处理失败: {file_info['url']} - {str(e)}") raise return wrapper
基于业务checkpoint处理数据
# -------------------------- # 数据处理核心逻辑 # -------------------------- @ray.remote(max_retries=2) @checkpoint def process_file(file_info): """文件处理主逻辑""" print(f"开始处理: {file_info['url']}") # 1. 读取文本文件 data = [] with open(file_info['url'], 'r') as f: for line in f: data.append({"text": line.strip().upper()}) # 2. 转换为Arrow Table table = pa.Table.from_pylist(data) # 3. 写入Lance文件 lance.write_dataset(table, file_info["output_path"],mode="append") print(f"成功写入: {file_info['output_path']}") return True # -------------------------- # 任务调度 # -------------------------- def run_pipeline(files, output_dir): """任务入口函数""" futures = [] for file_url in files: output_path = f"{output_dir}/{file_url.split('/')[-1]}.lance" futures.append(process_file.remote({ "url": file_url, "output_path": output_path })) # 等待所有任务完成 ray.get(futures) print("所有文件处理完成!") # -------------------------- # 使用示例 # -------------------------- if __name__ == "__main__": # 初始化Ray ray.init() # 示例文件列表 input_files = [ "/data/input/file1.txt", "/data/input/file2.txt" ] # 执行处理 run_pipeline(input_files, "/data/output")
整体的样例,主要给出一种基本的设计思想,其中:
lance.write_dataset
简化写入逻辑max_retries=2
)生产设计测试时,可以联系EMR团队技术专家,进一步咨询幂等性处理和容错重试,来保障生产可用。