You need to enable JavaScript to run this app.
AI 数据湖服务

AI 数据湖服务

复制全文
数据集
应用实践:基于veDaft的数据集使用说明
复制全文
应用实践:基于veDaft的数据集使用说明

在 LAS 中创建好数据集后,您可在数据处理过程中对数据集中的数据进行读写操作,本文为您介绍使用 LAS 开发机基于Daft 使用数据集的通用流程与使用要点。

注意与前提
  • 已创建好 LAS 数据集,详情请参见数据集管理
  • 注意:当前 Iceberg、Text类型的数据集暂不支持 veDaft 直接读取数据,以下为您介绍其他类型数据集的操作指导。

准备工作

准备开发环境

在使用数据集前,您需要先准备好数据处理的开发环境,建议您使用 LAS 开发机作为数据处理的开发环境,LAS 开发机为您预置了多种官网镜像,例如,Daft 相关版本的镜像,因此您可基于 LAS 开发机和官方镜像快速完成开发环境准备,详情请参见创建开发机远程连接开发机

配置环境变量

  1. 在使用数据集前,您需要先获取 API Key、数据集的地域及 Endpoint 信息,用于后续使用数据集时配置。
    • 您可以在「访问控制」->「API 访问密钥」中创建或查看您的 API Key: 访问控制 控制台
    • 您可以在对应数据集的 TOS 控制台查看当前数据集的TOS Bucket 所在的地域与Endpoint。
  2. 建议将上述获取到的信息配置为环境变量,避免在使用数据集时明文配置使用,提高安全性。
    export LAS_TOS_ACCESS_KEY=xxx
    export LAS_TOS_SECRET_KEY=xxx
    export TOS_ENDPOINT=https://tos-cn-beijing.ivolces.com
    export TOS_REGION=cn-beijing
    

使用数据集

读取数据

import daft

dataset_name = "your_dataset_name"
df = daft.read_las_dataset(name=dataset_name)
df.show()

转为 Lance 数据集

支持将CSV、JSONL、Parquet、Image、Audio、Video、Lerobot格式的数据集转为Lance格式的数据集,具体操作说明如下

import daft
from daft.io import CreateLasDatasetOptions, IOConfig, LanceWriteOptions
from daft.las.io import TOSConfig

# 您可以自定义您新数据集的路径
dataset_name = "your_dataset_name"
lance_tos_dir = f"tos://{tos_bueckt_dir}/{dataset_name}.lance"
LANCE_DATASET_FORMAT = "lance"

# 创建相关的配置
io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config())
write_options = LanceWriteOptions(io_config=io_config, uri=lance_tos_dir)
create_ds_options = CreateLasDatasetOptions(
    nick_name="daft_test_lance_write",
    privacy="public",
    description="This is my dataset",
)

# 读取原始数据
df = daft.read_las_dataset(name=dataset_name)

# 在这里进行创建
new_dataset_name = dataset_name + "_lance"
df.write_las_dataset(
    name=new_dataset_name,
    format=LANCE_DATASET_FORMAT,
    write_options=write_options,
    create_ds_options=create_ds_options,
)

# 读取转换后的新数据集
df = daft.read_las_dataset(name=new_dataset_name)
print("\n\nnew dataset:")
df.show()

加列

如果您想使用 LAS 数据集页面上的完整功能,可以添加一个 __data_item_id 列作为逻辑主键,参考代码如下:

注意

进行加列操作前,您需要先将数据集转换为 Lance 数据集,当前仅支持对 Lance 数据集进行加列操作。

import random
import pyarrow as pa
from daft.io.lance import merge_columns
from daft.io import IOConfig
from daft.las.io import TOSConfig

def create_assign_uuid(batch: pa.RecordBatch) -> pa.RecordBatch:
    num_rows = batch.num_rows

    uuid_list = [random.getrandbits(63) for _ in range(num_rows)]
    uuid_array = pa.array(uuid_list, type=pa.int64())

    return pa.RecordBatch.from_arrays([uuid_array], names=["__data_item_id"])


print("正在生成业务主键 ...")

dataset_name = "your_dataset_name"
lance_tos_dir = f"tos://{tos_bueckt_dir}/{dataset_name}.lance"
io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config())
merge_columns(
    url=lance_tos_dir.replace("tos://", "s3://"),
    io_config=io_config,
    transform=create_assign_uuid,
)

print("业务主键生成完毕 ...")

对于图/音频/视频等数据通常也希望将原始的数据也写入 Lance 数据集,以下以图片数据为例,为您示例加列加列写入的示例代码:

import os
import pyarrow as pa
import tos
from daft.io.lance import merge_columns
from daft.io import IOConfig
from daft.las.io import TOSConfig

ak = os.environ["LAS_TOS_ACCESS_KEY"]
sk = os.environ["LAS_TOS_SECRET_KEY"]
endpoint = os.environ["TOS_ENDPOINT"]
region = os.environ["TOS_REGION"]

dataset_name = "doctest_image"

# 从 tos 路径读取图片数据
def read_tos_path(path: str, tos_client: tos.TosClientV2):
    if (not path) or path.strip() == "":
        return None

    if path.startswith("tos://"):
        path = path[len("tos://") :]

    if path.startswith("s3://"):
        path = path[len("s3://") :]

    bucket, path = path.split("/", 1)
    obj = tos_client.get_object(bucket=bucket, key=path)
    return obj.read()


def get_create_assign_raw_image_func(input_col: str, output_col: str):
    def create_assign_raw_image(batch: pa.RecordBatch) -> pa.RecordBatch:
        paths = batch.column(input_col).to_pylist()
        tos_client = tos.TosClientV2(ak, sk, endpoint, region)

        image_data_list = []
        for path in paths:
            data = read_tos_path(path, tos_client)
            image_data_list.append(data)

        image_data_array = pa.array(image_data_list, type=pa.binary())
        new_batch = pa.RecordBatch.from_arrays([image_data_array], names=[output_col])

        # 仅返回新列
        return new_batch

    return create_assign_raw_image


print("正在将图片原始数据存入 lance 数据集中 ...")

io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config())
lance_tos_dir = f"tos://ycs-test/las/lance/{dataset_name}.lance"
merge_columns(
    url=lance_tos_dir.replace("tos://", "s3://"),
    io_config=io_config,
    transform=get_create_assign_raw_image_func("image", "raw_image"),
)

print("图片原始数据已存入 lance 数据集")
最近更新时间:2026.02.10 15:51:19
这个页面对您有帮助吗?
有用
有用
无用
无用