本文仅针对本地 python 包测试使用,使用EMR 的Ray操作可以参考使用 Ray 操作 Lance 数据。
本文主要介绍在本地进行 Python 包测试,包括本地安装 SDK、创建数据集、版本控制、向量数据准备等操作,还详细说明了如何对 Lance 数据集进行写入、删除行、更新行、合并插入、演化模式(重命名列、更改列数据类型、添加新列、使用合并添加新列、删除列)、读取(包括迭代读取、过滤器下推、随机读取)、表维护以及对象存储配置等一系列操作。在版本控制方面,可以进行数据的追加、覆盖,并能创建和检出标签。
本地已安装 SDK。如未安装,参考如下操作。
说明
请使用 Linux 的环境安装 pylance 的包。
pip install https://proton-pkgs.tos-cn-beijing.volces.com/lance/pylance-0.20.0-cp39-abi3-manylinux_2_28_x86_64.whl
import lance import pyarrow as pa table = pa.Table.from_pylist([{"name": "Alice", "age": 20}, {"name": "Bob", "age": 30}]) lance.write_dataset(table, "./alice_and_bob.lance")
会在本地目录下创建alice_and_bob.lance目录,里面有真实数据的data目录,也管理版本的_versions目录,以及管理事务的_transactions目录。
alice_and_bob.lance |-- data | `-- 527881ac-cbaf-4519-a5a6-e39bffae5c79.lance |-- _transactions | `-- 0-89006b07-e468-4f4f-9ca1-35a0f3786af0.txn `-- _versions `-- 1.manifest
创建一个Dataframe
df = pd.DataFrame({"a": [5]})
将其写入 Lance
shutil.rmtree("/tmp/test.lance", ignore_errors=True) dataset = lance.write_dataset(df, "/tmp/test.lance") dataset.to_table().to_pandas()
从parquet转换到lance,读取一个parquet文件
shutil.rmtree("/tmp/test.parquet", ignore_errors=True) shutil.rmtree("/tmp/test.lance", ignore_errors=True) tbl = pa.Table.from_pandas(df) pa.dataset.write_dataset(tbl, "/tmp/test.parquet", format='parquet') parquet = pa.dataset.dataset("/tmp/test.parquet") parquet.to_table().to_pandas()
写入新的lance
dataset = lance.write_dataset(parquet, "/tmp/test.lance")
验证数据并打印
确保数据一致 dataset.to_table().to_pandas()
Lance 数据集 API 遵循 PyArrow API 约定。
dataset = lance.dataset("./alice_and_bob.lance") # 查看dataset的行数 dataset.count_rows() # 查看dataset的schema dataset.schema # 查看全部的dataset内容 dataset.to_table() # 根据条件扫描数据 dataset.scanner(filter="age > 25").to_table() # 根据条件扫描数据,只返回name列 dataset.scanner(columns=["name"], filter="age > 25").to_table()
要打开一个 Lance 数据集,可以使用 lance.dataset()
函数:
import lance ds = lance.dataset("s3://bucket/path/imagenet.lance")或者本地路径 ds = lance.dataset("./imagenet.lance")
读取 Lance 数据集的最直接方法是使用 lance.LanceDataset.to_table()
方法将整个数据集加载到内存中。
由于 Lance 是一种高性能的列式格式,它可以通过利用列(投影)下推和过滤器(谓词)下推来高效地读取数据集的子集。
table = ds.to_table( columns=["image", "label"],filter="label = 2 AND text IS NOT NULL", limit=1000, offset=3000)
Lance 理解读取重型列(如 image
)的成本。因此,它采用优化的查询计划来高效地执行操作。
如果数据集太大而无法一次性加载到内存中,可以使用 lance.LanceDataset.to_batches()
方法按批次读取:
for batch in ds.to_batches(columns=["image"], filter="label = 10"):对批次进行处理 compute_on_batch(batch)
Lance 采用标准 SQL 表达式作为数据集过滤的谓词。通过将 SQL 谓词直接下推到存储系统,可以显著减少扫描过程中的 I/O 负载。
代码示例
table = ds.to_table(columns=["image", "label"], filter="label = 2 AND text IS NOT NULL",limit=1000,offset=3000)
目前,Lance 支持越来越多的表达式。
>
, >=
, <
, <=
, =
AND
, OR
, NOT
IS NULL
, IS NOT NULL
IS TRUE
, IS NOT TRUE
, IS FALSE
, IS NOT FALSE
IN
LIKE
, NOT LIKE
regexp_match(column, pattern)
CAST
可以使用下标访问嵌套字段。结构体字段可以使用字段名下标,而列表字段可以使用索引下标。
如果列名包含特殊字符或是一个 SQL 关键字,可以使用反引号(```)将其转义。对于嵌套字段,路径的每个部分都必须用反引号包裹。
日期、时间戳和小数的字面量可以写成类型名后的字符串值。例如:
date_col = date '2021-01-01' and timestamp_col = timestamp '2021-01-01 00:00:00' and decimal_col = decimal(8,3) '1.000'
对于时间戳列,精度可以在类型参数中指定。默认精度为微秒(6)。
Lance 内部以 Arrow 格式存储数据。SQL 类型到 Arrow 类型的映射如下:
作为列式格式的一个显著特点,Lance 允许快速随机读取。
访问第 2 行、第 101 行和第 501 行 data = ds.take([1, 100, 500], columns=["image", "label"])
能够快速随机访问单个行在促进各种工作流(如机器学习训练中的随机采样和洗牌)中起着至关重要的作用。此外,它使用户能够构建二级索引,从而实现快速查询执行,提升性能。
与 Apache Pyarrow 类似,创建 Lance 数据集的最简单方法是通过 lance.write_dataset()
写入一个 pyarrow.Table
。
import lance import pyarrow as pa table = pa.Table.from_pylist([{"name": "Alice", "age": 20},{"name": "Bob", "age": 30}]) lance.write_dataset(table, "./alice_and_bob.lance")
如果数据集的内存占用过大,无法一次性写入内存,lance.write_dataset()
也支持通过 pyarrow.RecordBatch
的迭代器来写入数据集。
import lance import pyarrow as pa def producer():yield pa.RecordBatch.from_pylist([{"name": "Alice", "age": 20}])yield pa.RecordBatch.from_pylist([{"name": "Blob", "age": 30}]) schema = pa.schema([ pa.field("name", pa.string()), pa.field("age", pa.int64()),]) lance.write_dataset(producer(), "./alice_and_bob.lance", schema)
lance.write_dataset()
支持写入 pyarrow.Table
、pandas.DataFrame
、pyarrow.Dataset
和 Iterator[pyarrow.RecordBatch]
。更多详细信息请查看其文档。
Lance 支持使用 SQL 过滤器从数据集中删除行。例如,要从上述数据集中删除 Bob 的行,可以使用以下代码:
import lance dataset = lance.dataset("./alice_and_bob.lance") dataset.delete("name = 'Bob'")
lance.LanceDataset.delete()
支持与过滤器下推(Filter push-down)中描述相同的过滤器。
行是通过在单独的删除索引中标记为已删除来删除的。这比重写文件更快,也不会使指向这些文件的索引失效。任何后续查询都不会返回已删除的行。
Lance 支持使用 lance.LanceDataset.update()
方法基于 SQL 表达式更新行。例如,如果我们发现数据集中 Bob 的名字有时被写成了 Blob
,我们可以这样修正:
import lance dataset = lance.dataset("./alice_and_bob.lance") dataset.update({"name": "'Bob'"}, where="name = 'Blob'")
更新值是 SQL 表达式,这就是为什么 'Bob'
被单引号包裹。这意味着我们可以根据需要使用引用现有列的复杂表达式。例如,如果两年过去了,我们想在同一个示例中更新 Alice 和 Bob 的年龄,我们可以这样写:
import lance dataset = lance.dataset("./alice_and_bob.lance") dataset.update({"age": "age + 2"})
如果你试图用新值更新一组单独的行,通常使用下面描述的合并插入操作更高效。
import lance new_table = pa.Table.from_pylist([{"name": "Alice", "age": 30},{"name": "Bob", "age": 20}])这种方法可以工作,但效率不高,下面有更好的方法 dataset = lance.dataset("./alice_and_bob.lance")for idx in range(new_table.num_rows): name = new_table[0][idx].as_py() new_age = new_table[1][idx].as_py() dataset.update({"age": new_age}, where=f"name='{name}'")
Lance 支持合并插入操作。这可以用于批量添加新数据,同时也可以(潜在地)匹配现有数据。此操作可用于多种不同的用例。
lance.LanceDataset.update()
方法适用于基于过滤器更新行。然而,如果我们想用新行替换现有行,则合并插入操作会更高效:
import lance 改变 Alice 和 Bob 的年龄 new_table = pa.Table.from_pylist([{"name": "Alice", "age": 30},{"name": "Bob", "age": 20}]) dataset = lance.dataset("./alice_and_bob.lance")这将使用 name 作为匹配行的键。合并插入内部使用 JOIN,因此你通常希望这个列是某种唯一键或 ID。 dataset.merge_insert("name") \.when_matched_update_all() \.execute(new_table)
注意,与更新操作类似,被修改的行将被移除并重新插入表中,改变它们的位置到表的末尾。此外,这些行的相对顺序可能会改变,因为我们内部使用了哈希连接操作。
有时我们只想在之前没有插入过的数据才插入。这可能发生在我们有一批数据,但不知道哪些行已经添加过,不想创建重复行。我们可以使用合并插入操作来实现这一点:
import lance # Bob 已经在表中,但 Carla 是新的 new_table = pa.Table.from_pylist([{"name": "Bob", "age": 30},{"name": "Carla", "age": 37}]) dataset = lance.dataset("./alice_and_bob.lance")这将插入 Carla 但保持 Bob 不变 dataset.merge_insert("name") \.when_not_matched_insert_all() \.execute(new_table)
有时我们想结合上述两种行为。如果行已经存在,我们想更新它;如果行不存在,我们想添加它。此操作有时被称为“upsert”。我们也可以使用合并插入操作来实现这一点:
import lance # 改变 Carla 的年龄并插入 David new_table = pa.Table.from_pylist([{"name": "Carla", "age": 27},{"name": "David", "age": 42}]) dataset = lance.dataset("./alice_and_bob.lance")这将更新 Carla 并插入 David dataset.merge_insert("name") \.when_matched_update_all() \.when_not_matched_insert_all() \.execute(new_table)
一种不太常见但仍然有用的行为是替换现有行的某个区域(由过滤器定义)的新数据。这类似于在单个事务中同时执行删除和插入。例如:
import lance new_table = pa.Table.from_pylist([{"name": "Edgar", "age": 46},{"name": "Francine", "age": 44}]) dataset = lance.dataset("./alice_and_bob.lance")这将移除任何年龄大于 40 的人,并插入我们的新数据 dataset.merge_insert("name") \.when_not_matched_insert_all() \.when_not_matched_by_source_delete("age >= 40") \.execute(new_table)
完整例子
# 更新某些行 dataset.update({"name": "'Blob'"}, where="name = 'Bob'") # 删除某些行 dataset.delete("name = 'Blob'") # 写入新的一批数据 table = pa.Table.from_pylist([{"name": "Alice", "age": 20}, {"name": "Bob", "age": 30}]) lance.write_dataset(table, "./alice_and_bob.lance", mode = "append")
Lance是数据湖, 提供了多版本的能力, 能够快速的实现增删查改以及结构变更的需求, 也提供time travel的能力.
Lance 支持Schema Change:在数据集中添加、删除和修改列。这些操作中的大多数可以在不重写数据集中的数据文件的情况下执行,使它们成为非常高效的操作。
通常,模式更改会与大多数其他并发写操作冲突。例如,如果你在其他人向数据集追加数据时更改了数据集的模式,要么你的模式更改失败,要么追加操作失败,具体取决于操作的顺序。因此,建议在没有其他写操作时执行模式更改。
可以使用 lance.LanceDataset.alter_columns()
方法重命名列。
import lance import pyarrow as pa table = pa.table({"id": pa.array([1, 2, 3])}) dataset = lance.write_dataset(table, "ids") dataset.alter_columns({"path": "id", "name": "new_id"}) dataset.to_table().to_pandas()
这适用于嵌套列。要引用嵌套列,请使用点(.
)分隔嵌套级别。例如:
data = [{"meta": {"id": 1, "name": "Alice"}},{"meta": {"id": 2, "name": "Bob"}},] dataset = lance.write_dataset(data, "nested_rename") dataset.alter_columns({"path": "meta.id", "name": "new_id"})
meta 0 {"new_id": 1, "name": "Alice"} 1 {"new_id": 2, "name": "Bob"}
除了更改列名外,还可以使用 lance.LanceDataset.alter_columns()
方法更改列的数据类型。这需要重写该列的新数据文件,但不需要重写其他列。
注意
如果列有索引,更改列类型将导致索引被丢弃。
此方法可用于更改列的向量类型。例如,我们可以将 float32 嵌入列更改为 float16 列,以节省磁盘空间,但会降低精度:
import lance import pyarrow as pa import numpy as np table = pa.table({"id": pa.array([1, 2, 3]),"embedding": pa.FixedShapeTensorArray.from_numpy_ndarray( np.random.rand(3, 128).astype("float32"))}) dataset = lance.write_dataset(table, "embeddings") dataset.alter_columns({"path": "embedding","type": pa.list_(pa.float16(), 128)}) dataset.schema()
id: int64 embedding: fixed_size_list<item: float16, 128>
可以使用 lance.LanceDataset.add_columns()
方法在单个操作中添加新列并填充数据。有两种方法可以指定如何填充新列:首先,为每个新列提供一个 SQL 表达式;其次,提供一个函数来生成新列数据。
SQL 表达式可以是独立的表达式,也可以引用现有列。可以使用 SQL 字面量值为所有现有行设置一个值。
import lance import pyarrow as pa table = pa.table({"name": pa.array(["Alice", "Bob", "Carla"])}) dataset = lance.write_dataset(table, "names") dataset.add_columns({"hash": "sha256(name)","status": "'active'",}) dataset.to_table().to_pandas()
name hash... status 0 Alice 3bc51062973c... active 1 Bob cd9fb1e148cc... active 2 Carla ad8d83ffd82b... active
你也可以提供一个 Python 函数来生成新列数据。这可以用于计算新的嵌入列。此函数应接受一个 PyArrow RecordBatch 并返回一个 PyArrow RecordBatch 或 Pandas DataFrame。该函数将对数据集中的每个批次调用一次。
如果函数计算成本高昂且可能失败,建议在 UDF 中设置一个检查点文件。此检查点文件在每次调用后保存 UDF 的状态,以便如果 UDF 失败,可以从最后一个检查点重新启动。注意,此文件可能会变得相当大,因为它需要存储多达整个数据文件的未保存结果。
import lance import pyarrow as pa import numpy as np table = pa.table({"id": pa.array([1, 2, 3])}) dataset = lance.write_dataset(table, "ids") @lance.batch_udf(checkpoint_file="embedding_checkpoint.sqlite") def add_random_vector(batch): embeddings = np.random.rand(batch.num_rows, 128).astype("float32") return pd.DataFrame({"embedding": embeddings}) dataset.add_columns(add_random_vector)
如果你已经预先计算了一个或多个新列,可以使用 lance.LanceDataset.merge()
方法将它们添加到现有数据集。这允许在不需要重写整个数据集的情况下填充额外的列。
要使用 merge
方法,请提供一个包含要添加的列的新数据集,以及用于将新数据与现有数据集连接的列名。
例如,假设我们有一个包含嵌入和 ID 的数据集:
import lance import pyarrow as pa import numpy as np table = pa.table({"id": pa.array([1, 2, 3]),"embedding": pa.array([np.array([1, 2, 3]), np.array([4, 5, 6]), np.array([7, 8, 9])])}) dataset = lance.write_dataset(table, "embeddings")
现在,如果我们想添加一个我们已经生成的标签列,可以通过合并一个新表来实现:
new_data = pa.table({"id": pa.array([1, 2, 3]),"label": pa.array(["horse", "rabbit", "cat"])}) dataset.merge(new_data, "id") dataset.to_table().to_pandas()
id embedding label 0 1 [1, 2, 3] horse 1 2 [4, 5, 6] rabbit 2 3 [7, 8, 9] cat
最后,可以使用 lance.LanceDataset.drop_columns()
方法从数据集中删除列。这是一个仅涉及元数据的操作,不会删除磁盘上的数据。这使得它非常快速。
import lance import pyarrow as pa table = pa.table({"id": pa.array([1, 2, 3]),"name": pa.array(["Alice", "Bob", "Carla"])}) dataset = lance.write_dataset(table, "names") dataset.drop_columns(["name"]) dataset.schema()
要实际从磁盘上删除数据,必须重写文件以删除列,然后删除旧文件。这可以通过 lance.dataset.DatasetOptimizer.compact_files()
后跟 lance.LanceDataset.cleanup_old_versions()
来完成。
我们可以追加行:
df = pd.DataFrame({"a": [10]}) tbl = pa.Table.from_pandas(df) dataset = lance.write_dataset(tbl, "/tmp/test.lance", mode="append") dataset.to_table().to_pandas()
我们可以覆盖数据并创建一个新版本:
df = pd.DataFrame({"a": [50, 100]}) tbl = pa.Table.from_pandas(df) dataset = lance.write_dataset(tbl, "/tmp/test.lance", mode="overwrite")
dataset.to_table().to_pandas()
旧版本仍然存在:
[{'version': 1,'timestamp': datetime.datetime(2024, 8, 15, 21, 22, 31, 453453),'metadata': {}},{'version': 2,'timestamp': datetime.datetime(2024, 8, 15, 21, 22, 35, 475152),'metadata': {}},{'version': 3,'timestamp': datetime.datetime(2024, 8, 15, 21, 22, 45, 32922),'metadata': {}}]
lance.dataset('/tmp/test.lance', version=1).to_table().to_pandas()
lance.dataset('/tmp/test.lance', version=2).to_table().to_pandas()
我们可以创建标签:
dataset.tags.create("stable", 2) dataset.tags.create("nightly", 3) dataset.tags.list()
{'nightly': {'version': 3, 'manifest_size': 628},'stable': {'version': 2, 'manifest_size': 684}}
可以检出标签:
lance.dataset('/tmp/test.lance', version="stable").to_table().to_pandas()
在这个教程中,我们将使用 Sift 1M 数据集:
ANN_SIFT1M
:http://corpus-texmex.irisa.fr/ftp://ftp.irisa.fr/local/texmex/corpus/sift.tar.gz
rm -rf sift* vec_data.lance wget ftp://ftp.irisa.fr/local/texmex/corpus/sift.tar.gz tar -xzf sift.tar.gz
将其转换为 Lance:
from lance.vector import vec_to_table import lance import numpy as np import struct uri = "vec_data.lance" with open("sift/sift_base.fvecs", mode="rb") as fobj: buf = fobj.read() data = np.array(struct.unpack("<128000000f", buf[4 : 4 + 4 * 1000000 * 128])).reshape((1000000, 128)) dd = dict(zip(range(1000000), data)) table = vec_to_table(dd) lance.write_dataset(table, uri, max_rows_per_group=8192, max_rows_per_file=1024*1024)
验证数据集写入成功
随机采样 100 个向量作为查询向量:
调用最近邻(此处没有 ANN 索引):
import time start = time.time() tbl = sift1m.to_table(columns=["id"], nearest={"column": "vector", "q": samples[0], "k": 10}) end = time.time()
随着时间的推移,某些操作会导致 Lance 数据集布局不佳。例如,多次小批量追加会导致大量小片段。或者删除许多行会导致查询变慢,因为需要过滤掉已删除的行。
为了解决这些问题,Lance 提供了优化数据集布局的方法。
可以通过 lance.dataset.DatasetOptimizer.compact_files()
方法重写数据文件,以减少文件数量。当传递 target_rows_per_fragment
参数时,Lance 会跳过已超过该行数的片段,并重写其他片段。片段将根据它们的片段 ID 合并,因此数据的固有顺序将被保留。
注意
压缩会创建表的新版本。它不会删除旧版本的表和它引用的文件。
import lance dataset = lance.dataset("./alice_and_bob.lance") dataset.optimize.compact_files(target_rows_per_fragment=1024 * 1024)
在压缩过程中,Lance 还可以删除已删除的行。重写的片段将不再有删除文件。这可以提高扫描性能,因为软删除的行在扫描过程中不需要被跳过。
当文件被重写时,原始行地址将失效。这意味着受影响的文件不再属于任何 ANN 索引(如果之前属于)。因此,建议在重建索引之前重写文件。
Lance 支持对象存储,如 AWS S3(和兼容存储)、Azure Blob Storage 和 Google Cloud Storage。要使用的对象存储由数据集路径的 URI 方案决定。例如,s3://bucket/path
将使用 S3,火山 TOS 的 S3 配置可参考使用 Lance Python SDK 访问 TOS 上的 Lance 数据。