lance-ray 是一个为 Ray 生态系统提供的高性能 I/O 库,专用于读写 Lance 格式的数据集,它为开发者提供了一套强大的、面向大规模数据集的管理与高效读写解决方案。本文为您介绍火山引擎 Lance-Ray 库的通用使用指导。
lance-ray 针对 Ray 的分布式特性进行了优化,支持并行化写入、覆盖 (overwrite) 和追加 (append) 操作,简化了数据ETL流程。lance-ray 提供了 write_lance 函数,用于将 Ray Dataset 数据集高效地写入为 Lance 格式。它支持写入本地文件系统,也支持直接写入 TOS、S3 等兼容 S3 协议的对象存储服务。
您可以轻松地将一个 Ray Dataset 写入本地路径或远端对象存储。
# 创建一个简单的 Ray Dataset import pandas as pd import ray import lance import lance_ray as lr data = pd.DataFrame({ "id": range(10), "value": [f"value_{i}" for i in range(10)] }) ds = ray.data.from_pandas(data) # 定义本地存储路径 local_path = "/tmp/my_lance_dataset.lance" # 使用 lr.write_lance 写入数据 lr.write_lance(ds, uri=local_path, mode="overwrite") print(f"数据已成功写入本地路径: {local_path}")
写入对象存储时,您需要提供正确的 storage_options 来配置访问凭证和 endpoint。
说明
提示:
https://bucket-name.tos-s3-cn-beijing.ivolces.com。https://bucket-name.tos-s3-cn-beijing.volces.com。# 准备要写入的 Ray Dataset ds = ray.data.range(100).map( lambda x: {"id": x["id"], "str": f"str-{x['id']}"} ) # 配置 TOS/S3 的 storage_options # 请将下面的占位符替换为您的实际信息 storage_options = { "access_key_id": "YOUR_ACCESS_KEY", "secret_access_key": "YOUR_SECRET_KEY", "aws_endpoint": "https://your-bucket.tos-s3-your-region.ivolces.com", # 示例 endpoint,必须包含 bucket "aws_region": "your-region", "virtual_hosted_style_request": "true" } # 定义对象存储上的路径 s3_path = "s3://your-bucket/path/to/dataset.lance" # 使用 lr.write_lance 写入数据到 TOS/S3 lr.write_lance( ds, uri=s3_path, storage_options=storage_options, mode='overwrite' ) print(f"数据已成功写入对象存储路径: {s3_path}")
write_lance 提供了丰富的参数来控制写入行为,以下是常用参数的说明:
参数 | 中文释义 | 说明与约束 |
|---|---|---|
| Ray 数据集 | 需要被写入的 |
| 存储路径 | 目标 Lance 数据集的 URI,可以是本地路径(如 |
| 写入模式 |
|
| 数据结构 | (可选)一个 |
| 文件最小行数 | (可选)每个 Lance 数据文件包含的最小行数,默认为 |
| 文件最大行数 | (可选)每个 Lance 数据文件包含的最大行数,默认为 |
| 数据存储版本 | (可选)指定 Lance 的存储格式版本,较新版本效率更高。 |
| 存储配置 | (可选)一个字典,用于配置底层文件系统的访问参数,如访问密钥、endpoint 等。 |
| 流式写入 | (可选)布尔值,默认为 |
| 批处理大小 | (可选)当 |
| 断点续传行数 | (可选)当 |
| 并发度 | (可选)写入任务的最大并发数。 |
lance-ray 的 read_lance 函数可以让您高效地从本地或对象存储中读取 Lance 数据集,并将其加载为 Ray Dataset,以便进行后续的分布式计算。
您可以通过提供数据集的 URI 来直接读取数据。
# 定义 Lance 数据集的路径(本地或 S3) dataset_uri = "/tmp/my_lance_dataset.lance" # 或 "s3://your-bucket/path/to/dataset.lance" # 配置访问对象存储所需的 storage_options (如果需要) storage_options = { "access_key_id": "YOUR_ACCESS_KEY", "secret_access_key": "YOUR_SECRET_KEY", "aws_endpoint": "https://your-bucket.tos-s3-your-region.ivolces.com", "aws_region": "your-region", "virtual_hosted_style_request": "true" } # 使用 lr.read_lance 读取数据 # 如果是本地文件,可以忽略 storage_options ds = lr.read_lance(uri=dataset_uri, storage_options=storage_options) # 查看数据集的 schema 和前几行 print("数据集 Schema:") print(ds.schema()) print("\n数据集行数:", ds.count()) print("\n前 5 行数据:") ds.show(5)
在数据处理和机器学习工作流中,我们经常需要在现有数据集的基础上派生新特征或添加元数据,这就是所谓的数据演进(Data Evolution)。lance-ray 为此提供了 add_columns 函数,它能够利用 Ray 的分布式计算能力,高效地为一个已存在的 Lance 数据集添加新列。
该操作是就地(in-place)的,意味着它会向现有数据集追加一个新的版本,而不会完全重写整个数据集,从而实现高效的 Schema 变更。
add_columns 的核心是通过一个用户定义的转换函数(transform function)来批量处理数据。这个函数接收一个数据批次(pyarrow.RecordBatch),计算出新列,并返回一个仅包含新列数据的批次。
典型使用场景:
参数 | 中文释义 | 说明与约束 |
|---|---|---|
| 数据集路径 | 目标 Lance 数据集的 URI,支持本地或对象存储路径(如 |
| 转换函数 | 一个 Python 函数,接收 |
| 存储配置 | (可选)访问对象存储(如 TOS/S3)所需的凭证和 endpoint 配置。 |
| 读取列 | (可选)指定转换函数需要读取的原始列名列表。优化读取性能,避免加载不必要的列。 |
| 并发度 | (可选)执行转换操作的 Ray Task 的最大并发数。 |
| Ray 远程参数 | (可选)传递给 Ray Task 的额外参数,用于资源调优,详见后续章节。 |
注意
add_columns 操作是原子性的。成功执行后,会创建一个新的数据集版本。如果操作失败,数据集将回退到之前的版本,保证数据一致性。add_columns 或其他写入操作可能会导致冲突。Lance 遵循“最后写入者获胜”(last-writer-wins)的原则。RecordBatch 必须只包含新列,且行数必须与输入批次相同,否则会导致错误。以下示例展示了如何读取一个存储在 TOS/S3 上的数据集,并基于 id 列派生出一个新的字符串列 id_str。
import lance_ray as lr import pyarrow as pa import ray # 假设我们已经有一个 Lance 数据集存储在 TOS/S3 上 # ds = ray.data.range(10) # storage_options = { ... } # 配置好您的 storage_options # s3_uri = "s3://your-bucket/path/to/my_dataset.lance" # lr.write_lance(ds, s3_uri, storage_options=storage_options, mode="overwrite") # --- 现在开始执行 add_columns --- # 1. 定义转换函数 def id_to_string_transform(batch: pa.RecordBatch) -> pa.RecordBatch: """ 将 id 列转换为字符串形式,并添加一个前缀。 """ ids = batch["id"] new_col_data = [f"id_{i.as_py()}" for i in ids] # 创建新列的 pyarrow Array new_array = pa.array(new_col_data, type=pa.string()) # 返回一个只包含新列的 RecordBatch return pa.RecordBatch.from_arrays([new_array], names=["id_str"]) # 2. 配置存储选项 s3_uri = "s3://your-bucket/path/to/my_dataset.lance" storage_options = { "access_key_id": "YOUR_ACCESS_KEY", "secret_access_key": "YOUR_SECRET_KEY", "aws_endpoint": "https://your-bucket.tos-s3-your-region.ivolces.com", "aws_region": "your-region", "virtual_hosted_style_request": "true" } # 3. 调用 add_columns lr.add_columns( uri=s3_uri, transform=id_to_string_transform, read_columns=["id"], # 明确指定我们只需要读取 'id' 列 storage_options=storage_options ) print(f"成功为数据集 {s3_uri} 添加了 'id_str' 列。") # 4. 验证结果 ds_new = lr.read_lance(uri=s3_uri, storage_options=storage_options) print("\n添加新列后的数据集 Schema:") print(ds_new.schema()) ds_new.show(5) # 预期输出: # 添加新列后的数据集 Schema: # id: int64 # id_str: string # # 前 5 行数据: # {'id': 0, 'id_str': 'id_0'} # {'id': 1, 'id_str': 'id_1'} # {'id': 2, 'id_str': 'id_2'} # {'id': 3, 'id_str': 'id_3'} # {'id': 4, 'id_str': 'id_4'}
随着数据的不断追加和删除,Lance 数据集可能会产生大量零碎的小文件(Fragments),或者包含许多被标记为删除但未被物理移除的数据。分布式 Compaction 操作正是为了解决这些问题而设计的,它通过 Ray 的并行计算能力,对数据集进行优化,主要目的包括:
lr.compact_files lr.compact_files 是执行分布式文件级别 Compaction 的核心 API。它能够对单个 Lance 数据集(Table)进行优化。
参数 | 中文释义 | 说明与约束 |
|---|---|---|
| 目标数据集 | 可以提供数据集的直接 |
| Compaction 选项 | 一个 |
| 工作节点数 | 用于执行 Compaction 的 Ray 并发任务数量。如果设置的数量超过了实际需要的任务数(例如,只有一个合并任务),该值会自动下调。 |
| Ray 远程参数 | (可选)传递给每个 Ray Compaction 任务的额外参数,如 |
| 存储配置 | (可选)访问对象存储(如 TOS/S3)所需的凭证和 endpoint 配置。 |
该函数执行成功后,会返回一个包含 fragments_removed(被移除的分片数)和 fragments_added(新生成的分片数)的 Metrics 对象。如果数据集已经处于优化状态,无需执行任何操作,则返回 None。
通过 CompactionOptions,您可以详细定制优化策略,常见字段包括:
字段 | 中文释义 | 说明与约束 |
|---|---|---|
| 目标行数 | 每个合并后分片的目标行数。这是控制文件大小和数量的关键参数,例如设置为 |
| 是否物化删除 | 布尔值,默认为 |
| 删除阈值 | 浮点数,范围 |
| 线程数 | 每个 Compaction 任务内部使用的线程数,用于并行读写和编码数据。 |
下面的示例展示了如何合并大量小分片,并设置合适的 num_workers 与 ray_remote_args:
import lance import lance_ray as lr import pandas as pd import ray from lance.optimize import CompactionOptions ray.init() # 构造一个包含多个小分片的数据集 fragment_data = [ pd.DataFrame({"id": range(i * 5, (i + 1) * 5), "value": [f"frag_{i}_row_{j}" for j in range(5)]}) for i in range(100) ] # 将每个 DataFrame 写成一个分片 uri = "/tmp/compaction_small_files.lance" first = ray.data.from_pandas(fragment_data[0]) lr.write_lance(first, uri=uri, min_rows_per_file=5, max_rows_per_file=5) for df in fragment_data[1:]: lr.write_lance( ray.data.from_pandas(df), uri=uri, mode="append", min_rows_per_file=5, max_rows_per_file=5, ) dataset = lance.dataset(uri) print("Compaction 前分片数:", len(dataset.get_fragments())) print("Compaction 前总行数:", dataset.count_rows()) # 配置 Compaction 策略 options = CompactionOptions( target_rows_per_fragment=20, num_threads=1, ) metrics = lr.compact_files( uri=uri, compaction_options=options, num_workers=4, ray_remote_args={"num_cpus": 1}, ) print("移除分片数:", metrics.fragments_removed) print("新增分片数:", metrics.fragments_added) # 验证数据完整性 after = lance.dataset(uri) assert after.count_rows() == dataset.count_rows()
注意
storage_options 参数,以确保 Ray 的远程任务拥有访问对象存储的权限和正确的 endpoint 配置(可复用前文写入/读取章节中的配置方式)。num_workers 控制并发任务数,而 ray_remote_args 控制每个任务的资源。需要根据 Ray 集群的规模和数据集的大小来权衡这两个参数,以达到最佳的性能和资源利用率。count_rows() 或使用 pandas 比对数据来验证数据没有丢失或损坏。compact_database 在一次调用中遍历并处理多个表,适合做离线的全库维护。建议在业务低峰期执行,并配合监控观察存储占用和查询延迟的变化。为了加速对文本数据的检索,lance-ray 支持为字符串类型的列构建分布式全文搜索(FTS)索引,也称为倒排索引(Inverted Index)。该功能利用 Ray 的并行计算能力,将索引构建任务分发到多个 worker 上,从而高效地处理大规模数据集。
注意
lr.create_scalar_index 构建分布式标量索引(包括 FTS)需要 pylance >= 0.36.0;如果版本过低,相关测试或示例可能会被跳过。BTREE)通常需要 pylance >= 0.37.0 或更高版本,相关说明可放在索引总览章节中统一介绍。lr.create_scalar_index lr.create_scalar_index 是构建包括 FTS 在内的各类标量索引的统一入口。
参数 | 中文释义 | 说明与约束 |
|---|---|---|
| 目标数据集 | 可以直接通过 |
| 目标列 | 要构建索引的列名。对于 FTS 索引,该列必须是字符串类型(如 |
| 索引类型 | 对于全文索引,应设置为 |
| 索引名称 | (可选)为索引指定一个名称,便于后续 |
| 是否替换已有索引 | 布尔值,默认为 |
| 工作节点数 | 用于并行构建索引的 Ray 任务数量。如果设置值大于分片数量,会自动下调。 |
| Ray 远程参数 | (可选)传递给每个索引构建任务的资源参数,如 |
| 存储配置 | (可选)访问 TOS/S3 等对象存储所需的凭证、endpoint 和 Region 配置,推荐复用前文写入/读取章节中的配置方式。 |
| 其他索引参数 | (可选)传递给底层索引构建器的其他参数,例如 |
一旦索引构建完成,您就可以在 scanner 中使用 full_text_query 参数来执行高效的全文检索。
# 假设索引已在 'text' 列上建好 dataset = lance.dataset("/tmp/fts_dataset.lance") results = dataset.scanner( full_text_query="Python", columns=["id", "text"], ).to_table() print(results.to_pandas())
调用 lr.create_scalar_index 时,常见错误包括:
ValueError: Column 'nonexistent' not found,通常由列名拼写错误导致。TypeError: must be string type,当对非字符串列尝试构建 FTS 索引时触发。index_type 不在支持列表(如 'BTREE'、'BITMAP'、'LABEL_LIST'、'INVERTED'、'FTS'、'NGRAM'、'ZONEMAP' 等)中时,会抛出 ValueError。num_workers:当 num_workers <= 0 时,会抛出 ValueError: num_workers must be positive。import lance import lance_ray as lr import pandas as pd import ray ray.init() # 准备一个包含文本列的数据集 text_data = pd.DataFrame({ "id": range(8), "text": [ "The quick brown fox jumps over the lazy dog", "Python is a powerful programming language", "Machine learning algorithms are fascinating", "Data science requires statistical knowledge", "Natural language processing uses text analysis", "Distributed computing scales horizontally", "Ray framework enables parallel processing", "Lance format provides efficient storage", ], }) uri = "/tmp/fts_dataset.lance" ds = ray.data.from_pandas(text_data) # 写入 Lance 数据集 lr.write_lance(ds, uri=uri, mode="overwrite", min_rows_per_file=2, max_rows_per_file=2) # 1. 分布式构建 FTS 索引 indexed_ds = lr.create_scalar_index( uri=uri, column="text", index_type="INVERTED", # 或 'FTS' num_workers=2, ) # 2. 验证索引已创建 indices = indexed_ds.list_indices() print("创建的索引:", indices) # 3. 使用索引进行全文搜索 search_term = "language" results = indexed_ds.scanner( full_text_query=search_term, columns=["id", "text"], ).to_table().to_pandas() print(f"搜索 '{search_term}' 的结果:") print(results)