You need to enable JavaScript to run this app.
AI 数据湖服务

AI 数据湖服务

复制全文
Lance
快速入门
复制全文
快速入门

本文仅针对本地 python 包测试使用,使用EMR 的Ray操作可以参考使用 Ray 操作 Lance 数据
本文主要介绍在本地进行 Python 包测试,包括本地安装 SDK、创建数据集、版本控制、向量数据准备等操作,还详细说明了如何对 Lance 数据集进行写入、删除行、更新行、合并插入、演化模式(重命名列、更改列数据类型、添加新列、使用合并添加新列、删除列)、读取(包括迭代读取、过滤器下推、随机读取)、表维护以及对象存储配置等一系列操作。在版本控制方面,可以进行数据的追加、覆盖,并能创建和检出标签。

前期准备

本地已安装 SDK。如未安装,参考如下操作。

说明

请使用 Linux 的环境安装 PyLance 的包。

pip install ve-pylance

操作说明

创建数据集

import lance
import pyarrow as pa

table = pa.Table.from_pylist([{"name": "Alice", "age": 20},
                              {"name": "Bob", "age": 30}])
lance.write_dataset(table, "./alice_and_bob.lance")

会在本地目录下创建alice_and_bob.lance目录,里面有真实数据的data目录,也管理版本的_versions目录,以及管理事务的_transactions目录。

alice_and_bob.lance
|-- data
|   `-- 527881ac-cbaf-4519-a5a6-e39bffae5c79.lance
|-- _transactions
|   `-- 0-89006b07-e468-4f4f-9ca1-35a0f3786af0.txn
`-- _versions
    `-- 1.manifest

创建一个Dataframe

df = pd.DataFrame({"a": [5]})

将其写入 Lance

shutil.rmtree("/tmp/test.lance", ignore_errors=True)
dataset = lance.write_dataset(df, "/tmp/test.lance")
dataset.to_table().to_pandas()

从parquet转换到lance,读取一个parquet文件

shutil.rmtree("/tmp/test.parquet", ignore_errors=True)
shutil.rmtree("/tmp/test.lance", ignore_errors=True)
tbl = pa.Table.from_pandas(df)
pa.dataset.write_dataset(tbl, "/tmp/test.parquet", format='parquet')
parquet = pa.dataset.dataset("/tmp/test.parquet")
parquet.to_table().to_pandas()

写入新的lance

dataset = lance.write_dataset(parquet, "/tmp/test.lance")

验证数据并打印

确保数据一致
dataset.to_table().to_pandas()

读取Lance数据集

Lance 数据集 API 遵循 PyArrow API 约定。

  • 查询数据集
dataset = lance.dataset("./alice_and_bob.lance")
# 查看dataset的行数
dataset.count_rows()

# 查看dataset的schema
dataset.schema

# 查看全部的dataset内容
dataset.to_table()
 
# 根据条件扫描数据
dataset.scanner(filter="age > 25").to_table()
 
# 根据条件扫描数据,只返回name列
dataset.scanner(columns=["name"], filter="age > 25").to_table()

要打开一个 Lance 数据集,可以使用 lance.dataset() 函数:

import lance
ds = lance.dataset("s3://bucket/path/imagenet.lance")或者本地路径
ds = lance.dataset("./imagenet.lance")

读取 Lance 数据集的最直接方法是使用 lance.LanceDataset.to_table() 方法将整个数据集加载到内存中。
由于 Lance 是一种高性能的列式格式,它可以通过利用列(投影)下推和过滤器(谓词)下推来高效地读取数据集的子集。

table = ds.to_table(
    columns=["image", "label"],filter="label = 2 AND text IS NOT NULL",
    limit=1000,
    offset=3000)

Lance 理解读取重型列(如 image)的成本。因此,它采用优化的查询计划来高效地执行操作。

迭代读取

如果数据集太大而无法一次性加载到内存中,可以使用 lance.LanceDataset.to_batches() 方法按批次读取:

for batch in ds.to_batches(columns=["image"], filter="label = 10"):对批次进行处理
    compute_on_batch(batch)

Filter下推

Lance 采用标准 SQL 表达式作为数据集过滤的谓词。通过将 SQL 谓词直接下推到存储系统,可以显著减少扫描过程中的 I/O 负载。
代码示例

table = ds.to_table(columns=["image", "label"],
filter="label = 2 AND text IS NOT NULL",limit=1000,offset=3000)

目前,Lance 支持越来越多的表达式。

  • >, >=, <, <=, =
  • AND, OR, NOT
  • IS NULL, IS NOT NULL
  • IS TRUE, IS NOT TRUE, IS FALSE, IS NOT FALSE
  • IN
  • LIKE, NOT LIKE
  • regexp_match(column, pattern)
  • CAST

可以使用下标访问嵌套字段。结构体字段可以使用字段名下标,而列表字段可以使用索引下标。
如果列名包含特殊字符或是一个 SQL 关键字,可以使用反引号(```)将其转义。对于嵌套字段,路径的每个部分都必须用反引号包裹。
日期、时间戳和小数的字面量可以写成类型名后的字符串值。例如:

date_col = date '2021-01-01'
and timestamp_col = timestamp '2021-01-01 00:00:00'
and decimal_col = decimal(8,3) '1.000'

对于时间戳列,精度可以在类型参数中指定。默认精度为微秒(6)。
Lance 内部以 Arrow 格式存储数据。SQL 类型到 Arrow 类型的映射如下:

随机读取

作为列式格式的一个显著特点,Lance 允许快速随机读取。

访问第 2 行、第 101 行和第 501 行
data = ds.take([1, 100, 500], columns=["image", "label"])

能够快速随机访问单个行在促进各种工作流(如机器学习训练中的随机采样和洗牌)中起着至关重要的作用。此外,它使用户能够构建二级索引,从而实现快速查询执行,提升性能。

写入 Lance 数据集

与 Apache Pyarrow 类似,创建 Lance 数据集的最简单方法是通过 lance.write_dataset() 写入一个 pyarrow.Table

import lance
import pyarrow as pa
table = pa.Table.from_pylist([{"name": "Alice", "age": 20},{"name": "Bob", "age": 30}])
lance.write_dataset(table, "./alice_and_bob.lance")

如果数据集的内存占用过大,无法一次性写入内存,lance.write_dataset() 也支持通过 pyarrow.RecordBatch 的迭代器来写入数据集。

import lance
import pyarrow as pa

def producer():yield pa.RecordBatch.from_pylist([{"name": "Alice", "age": 20}])yield pa.RecordBatch.from_pylist([{"name": "Blob", "age": 30}])
schema = pa.schema([
    pa.field("name", pa.string()),
    pa.field("age", pa.int64()),])
lance.write_dataset(producer(), "./alice_and_bob.lance", schema)

lance.write_dataset() 支持写入 pyarrow.Tablepandas.DataFramepyarrow.DatasetIterator[pyarrow.RecordBatch]。更多详细信息请查看其文档。

删除行

Lance 支持使用 SQL 过滤器从数据集中删除行。例如,要从上述数据集中删除 Bob 的行,可以使用以下代码:

import lance
dataset = lance.dataset("./alice_and_bob.lance")
dataset.delete("name = 'Bob'")

lance.LanceDataset.delete() 支持与过滤器下推(Filter push-down)中描述相同的过滤器。
行是通过在单独的删除索引中标记为已删除来删除的。这比重写文件更快,也不会使指向这些文件的索引失效。任何后续查询都不会返回已删除的行。

更新行

Lance 支持使用 lance.LanceDataset.update() 方法基于 SQL 表达式更新行。例如,如果我们发现数据集中 Bob 的名字有时被写成了 Blob,我们可以这样修正:

import lance
dataset = lance.dataset("./alice_and_bob.lance")
dataset.update({"name": "'Bob'"}, where="name = 'Blob'")

更新值是 SQL 表达式,这就是为什么 'Bob' 被单引号包裹。这意味着我们可以根据需要使用引用现有列的复杂表达式。例如,如果两年过去了,我们想在同一个示例中更新 Alice 和 Bob 的年龄,我们可以这样写:

import lance
dataset = lance.dataset("./alice_and_bob.lance")
dataset.update({"age": "age + 2"})

如果你试图用新值更新一组单独的行,通常使用下面描述的合并插入操作更高效。

import lance


new_table = pa.Table.from_pylist([{"name": "Alice", "age": 30},{"name": "Bob", "age": 20}])这种方法可以工作,但效率不高,下面有更好的方法
dataset = lance.dataset("./alice_and_bob.lance")for idx in range(new_table.num_rows):
    name = new_table[0][idx].as_py()
    new_age = new_table[1][idx].as_py()
    dataset.update({"age": new_age}, where=f"name='{name}'")

批量更新

Lance 支持合并插入操作。这可以用于批量添加新数据,同时也可以(潜在地)匹配现有数据。此操作可用于多种不同的用例。

批量更新

lance.LanceDataset.update() 方法适用于基于过滤器更新行。然而,如果我们想用新行替换现有行,则合并插入操作会更高效:

import lance

# 改变 Alice 和 Bob 的年龄
new_table = pa.Table.from_pylist([{"name": "Alice", "age": 30},{"name": "Bob", "age": 20}])
dataset = lance.dataset("./alice_and_bob.lance")这将使用 name 作为匹配行的键。合并插入内部使用 JOIN,因此你通常希望这个列是某种唯一键或 ID。
dataset.merge_insert("name") \.when_matched_update_all() \.execute(new_table)

注意

与更新操作相似,被修改的行首先会被从当前位置移除,然后重新插入到表中。并且重新插入后,这些行的位置会处于表的末尾。此外,由于在内部使用了哈希连接操作,所以这些被修改行的相对顺序可能会发生改变。也就是说,原本在表中的相对位置在经过该操作后可能不再保持一致。

有时我们只想在之前没有插入过的数据才插入。这可能发生在我们有一批数据,但不知道哪些行已经添加过,不想创建重复行。我们可以使用合并插入操作来实现这一点:

import lance

# Bob 已经在表中,但 Carla 是新的
new_table = pa.Table.from_pylist([{"name": "Bob", "age": 30},{"name": "Carla", "age": 37}])
dataset = lance.dataset("./alice_and_bob.lance")这将插入 Carla 但保持 Bob 不变
dataset.merge_insert("name") \.when_not_matched_insert_all() \.execute(new_table)

更新或插入(Upsert)

有时我们想结合上述两种行为。如果行已经存在,我们想更新它;如果行不存在,我们想添加它。此操作有时被称为“upsert”。我们也可以使用合并插入操作来实现这一点:

import lance

# 改变 Carla 的年龄并插入 David
new_table = pa.Table.from_pylist([{"name": "Carla", "age": 27},{"name": "David", "age": 42}])
dataset = lance.dataset("./alice_and_bob.lance")这将更新 Carla 并插入 David
dataset.merge_insert("name") \.when_matched_update_all() \.when_not_matched_insert_all() \.execute(new_table)

替换部分数据

一种不太常见但仍然有用的行为是替换现有行的某个区域(由过滤器定义)的新数据。这类似于在单个事务中同时执行删除和插入。例如:

import lance
new_table = pa.Table.from_pylist([{"name": "Edgar", "age": 46},{"name": "Francine", "age": 44}])
dataset = lance.dataset("./alice_and_bob.lance")这将移除任何年龄大于 40 的人,并插入我们的新数据
dataset.merge_insert("name") \.when_not_matched_insert_all() \.when_not_matched_by_source_delete("age >= 40") \.execute(new_table)

完整例子

# 更新某些行
dataset.update({"name": "'Blob'"}, where="name = 'Bob'")

# 删除某些行
dataset.delete("name = 'Blob'")

# 写入新的一批数据
table = pa.Table.from_pylist([{"name": "Alice", "age": 20},
                              {"name": "Bob", "age": 30}])
lance.write_dataset(table, "./alice_and_bob.lance", mode = "append")

Schema Change

Lance是数据湖, 提供了多版本的能力, 能够快速的实现增删查改以及结构变更的需求, 也提供time travel的能力.
Lance 支持Schema Change:在数据集中添加、删除和修改列。这些操作中的大多数可以在不重写数据集中的数据文件的情况下执行,使它们成为非常高效的操作。
通常,模式更改会与大多数其他并发写操作冲突。例如,如果你在其他人向数据集追加数据时更改了数据集的模式,要么你的模式更改失败,要么追加操作失败,具体取决于操作的顺序。因此,建议在没有其他写操作时执行模式更改。

重命名列

可以使用 lance.LanceDataset.alter_columns() 方法重命名列。

import lance
import pyarrow as pa
table = pa.table({"id": pa.array([1, 2, 3])})
dataset = lance.write_dataset(table, "ids")
dataset.alter_columns({"path": "id", "name": "new_id"})
dataset.to_table().to_pandas()

这适用于嵌套列。要引用嵌套列,请使用点(.)分隔嵌套级别。例如:

data = [{"meta": {"id": 1, "name": "Alice"}},{"meta": {"id": 2, "name": "Bob"}},]
dataset = lance.write_dataset(data, "nested_rename")
dataset.alter_columns({"path": "meta.id", "name": "new_id"})
meta
0  {"new_id": 1, "name": "Alice"}
1  {"new_id": 2, "name": "Bob"}

更改列的数据类型

除了更改列名外,还可以使用 lance.LanceDataset.alter_columns() 方法更改列的数据类型。这需要重写该列的新数据文件,但不需要重写其他列。
注意
如果列有索引,更改列类型将导致索引被丢弃。
此方法可用于更改列的向量类型。例如,我们可以将 float32 嵌入列更改为 float16 列,以节省磁盘空间,但会降低精度:

import lance
import pyarrow as pa
import numpy as np
table = pa.table({"id": pa.array([1, 2, 3]),"embedding": pa.FixedShapeTensorArray.from_numpy_ndarray(
       np.random.rand(3, 128).astype("float32"))})
dataset = lance.write_dataset(table, "embeddings")
dataset.alter_columns({"path": "embedding","type": pa.list_(pa.float16(), 128)})
dataset.schema()
id: int64
embedding: fixed_size_list<item: float16, 128>

添加新列

可以使用 lance.LanceDataset.add_columns() 方法在单个操作中添加新列并填充数据。有两种方法可以指定如何填充新列:首先,为每个新列提供一个 SQL 表达式;其次,提供一个函数来生成新列数据。
SQL 表达式可以是独立的表达式,也可以引用现有列。可以使用 SQL 字面量值为所有现有行设置一个值。

import lance
import pyarrow as pa
table = pa.table({"name": pa.array(["Alice", "Bob", "Carla"])})
dataset = lance.write_dataset(table, "names")
dataset.add_columns({"hash": "sha256(name)","status": "'active'",})
dataset.to_table().to_pandas()
name         hash...   status
0  Alice  3bc51062973c...  active
1    Bob  cd9fb1e148cc...  active
2  Carla  ad8d83ffd82b...  active

你也可以提供一个 Python 函数来生成新列数据。这可以用于计算新的嵌入列。此函数应接受一个 PyArrow RecordBatch 并返回一个 PyArrow RecordBatch 或 Pandas DataFrame。该函数将对数据集中的每个批次调用一次。
如果函数计算成本高昂且可能失败,建议在 UDF 中设置一个检查点文件。此检查点文件在每次调用后保存 UDF 的状态,以便如果 UDF 失败,可以从最后一个检查点重新启动。注意,此文件可能会变得相当大,因为它需要存储多达整个数据文件的未保存结果。

import lance
import pyarrow as pa
import numpy as np
table = pa.table({"id": pa.array([1, 2, 3])})
dataset = lance.write_dataset(table, "ids")

@lance.batch_udf(checkpoint_file="embedding_checkpoint.sqlite")
def add_random_vector(batch):
    embeddings = np.random.rand(batch.num_rows, 128).astype("float32")
    return pd.DataFrame({"embedding": embeddings})
dataset.add_columns(add_random_vector)

使用合并添加新列

如果你已经预先计算了一个或多个新列,可以使用 lance.LanceDataset.merge() 方法将它们添加到现有数据集。这允许在不需要重写整个数据集的情况下填充额外的列。
要使用 merge 方法,请提供一个包含要添加的列的新数据集,以及用于将新数据与现有数据集连接的列名。
例如,假设我们有一个包含嵌入和 ID 的数据集:

import lance
import pyarrow as pa
import numpy as np
table = pa.table({"id": pa.array([1, 2, 3]),"embedding": pa.array([np.array([1, 2, 3]), np.array([4, 5, 6]),
                          np.array([7, 8, 9])])})
dataset = lance.write_dataset(table, "embeddings")

现在,如果我们想添加一个我们已经生成的标签列,可以通过合并一个新表来实现:

new_data = pa.table({"id": pa.array([1, 2, 3]),"label": pa.array(["horse", "rabbit", "cat"])})
dataset.merge(new_data, "id")
dataset.to_table().to_pandas()
id  embedding   label
0   1  [1, 2, 3]   horse
1   2  [4, 5, 6]  rabbit
2   3  [7, 8, 9]     cat

删除列

最后,可以使用 lance.LanceDataset.drop_columns() 方法从数据集中删除列。这是一个仅涉及元数据的操作,不会删除磁盘上的数据。这使得它非常快速。

import lance
import pyarrow as pa
table = pa.table({"id": pa.array([1, 2, 3]),"name": pa.array(["Alice", "Bob", "Carla"])})
dataset = lance.write_dataset(table, "names")
dataset.drop_columns(["name"])
dataset.schema()

要实际从磁盘上删除数据,必须重写文件以删除列,然后删除旧文件。这可以通过 lance.dataset.DatasetOptimizer.compact_files() 后跟 lance.LanceDataset.cleanup_old_versions() 来完成。

版本管理

我们可以追加行:

df = pd.DataFrame({"a": [10]})
tbl = pa.Table.from_pandas(df)
dataset = lance.write_dataset(tbl, "/tmp/test.lance", mode="append")
dataset.to_table().to_pandas()

我们可以覆盖数据并创建一个新版本:

df = pd.DataFrame({"a": [50, 100]})
tbl = pa.Table.from_pandas(df)
dataset = lance.write_dataset(tbl, "/tmp/test.lance", mode="overwrite")
dataset.to_table().to_pandas()

旧版本仍然存在:

[{'version': 1,'timestamp': datetime.datetime(2024, 8, 15, 21, 22, 31, 453453),'metadata': {}},{'version': 2,'timestamp': datetime.datetime(2024, 8, 15, 21, 22, 35, 475152),'metadata': {}},{'version': 3,'timestamp': datetime.datetime(2024, 8, 15, 21, 22, 45, 32922),'metadata': {}}]
lance.dataset('/tmp/test.lance', version=1).to_table().to_pandas()
lance.dataset('/tmp/test.lance', version=2).to_table().to_pandas()

我们可以创建标签:

dataset.tags.create("stable", 2)
dataset.tags.create("nightly", 3)
dataset.tags.list()
{'nightly': {'version': 3, 'manifest_size': 628},'stable': {'version': 2, 'manifest_size': 684}}

可以检出标签:

lance.dataset('/tmp/test.lance', version="stable").to_table().to_pandas()

向量查询

准备向量数据

在这个教程中,我们将使用 Sift 1M 数据集:

  • 从以下地址下载 ANN_SIFT1Mhttp://corpus-texmex.irisa.fr/
  • 直接链接应为: ftp://ftp.irisa.fr/local/texmex/corpus/sift.tar.gz
  • 下载并解压 tarball
rm -rf sift* vec_data.lance
wget ftp://ftp.irisa.fr/local/texmex/corpus/sift.tar.gz
tar -xzf sift.tar.gz

将其转换为 Lance:

from lance.vector import vec_to_table
import lance
import numpy as np
import struct
uri = "vec_data.lance"
with open("sift/sift_base.fvecs", mode="rb") as fobj:
    buf = fobj.read()
    data = np.array(struct.unpack("<128000000f", buf[4 : 4 + 4 * 1000000 * 128])).reshape((1000000, 128))
    dd = dict(zip(range(1000000), data))
table = vec_to_table(dd)
lance.write_dataset(table, uri, max_rows_per_group=8192, max_rows_per_file=1024*1024)

验证数据集写入成功

KNN查询

随机采样 100 个向量作为查询向量:
调用最近邻(此处没有 ANN 索引):

import time
start = time.time()
tbl = sift1m.to_table(columns=["id"], nearest={"column": "vector", "q": samples[0], "k": 10})
end = time.time()

Compact优化

随着时间的推移,某些操作会导致 Lance 数据集布局不佳。例如,多次小批量追加会导致大量小片段。或者删除许多行会导致查询变慢,因为需要过滤掉已删除的行。
为了解决这些问题,Lance 提供了优化数据集布局的方法。

可以通过 lance.dataset.DatasetOptimizer.compact_files() 方法重写数据文件,以减少文件数量。当传递 target_rows_per_fragment 参数时,Lance 会跳过已超过该行数的片段,并重写其他片段。片段将根据它们的片段 ID 合并,因此数据的固有顺序将被保留。
注意
压缩会创建表的新版本。它不会删除旧版本的表和它引用的文件。

import lance
dataset = lance.dataset("./alice_and_bob.lance")
dataset.optimize.compact_files(target_rows_per_fragment=1024 * 1024)

在压缩过程中,Lance 还可以删除已删除的行。重写的片段将不再有删除文件。这可以提高扫描性能,因为软删除的行在扫描过程中不需要被跳过。
当文件被重写时,原始行地址将失效。这意味着受影响的文件不再属于任何 ANN 索引(如果之前属于)。因此,建议在重建索引之前重写文件。

对象存储配置

Lance 支持对象存储,如 AWS S3(和兼容存储)、Azure Blob Storage 和 Google Cloud Storage。要使用的对象存储由数据集路径的 URI 方案决定。例如,s3://bucket/path 将使用 S3,火山 TOS 的 S3 配置可参考使用 Lance Python SDK 访问 TOS 上的 Lance 数据

最近更新时间:2025.10.28 16:40:23
这个页面对您有帮助吗?
有用
有用
无用
无用