在 LAS 中创建好数据集后,您可在数据处理过程中对数据集中的数据进行读写操作,本文为您介绍使用 LAS 开发机基于Daft 使用数据集的通用流程与使用要点。
在使用数据集前,您需要先准备好数据处理的开发环境,建议您使用 LAS 开发机作为数据处理的开发环境,LAS 开发机为您预置了多种官网镜像,例如,Daft 相关版本的镜像,因此您可基于 LAS 开发机和官方镜像快速完成开发环境准备,详情请参见创建开发机、远程连接开发机。
export LAS_TOS_ACCESS_KEY=xxx export LAS_TOS_SECRET_KEY=xxx export TOS_ENDPOINT=https://tos-cn-beijing.ivolces.com export TOS_REGION=cn-beijing
import daft dataset_name = "your_dataset_name" df = daft.read_las_dataset(name=dataset_name) df.show()
支持将CSV、JSONL、Parquet、Image、Audio、Video、Lerobot格式的数据集转为Lance格式的数据集,具体操作说明如下
import daft from daft.io import CreateLasDatasetOptions, IOConfig, LanceWriteOptions from daft.las.io import TOSConfig # 您可以自定义您新数据集的路径 dataset_name = "your_dataset_name" lance_tos_dir = f"tos://{tos_bueckt_dir}/{dataset_name}.lance" LANCE_DATASET_FORMAT = "lance" # 创建相关的配置 io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config()) write_options = LanceWriteOptions(io_config=io_config, uri=lance_tos_dir) create_ds_options = CreateLasDatasetOptions( nick_name="daft_test_lance_write", privacy="public", description="This is my dataset", ) # 读取原始数据 df = daft.read_las_dataset(name=dataset_name) # 在这里进行创建 new_dataset_name = dataset_name + "_lance" df.write_las_dataset( name=new_dataset_name, format=LANCE_DATASET_FORMAT, write_options=write_options, create_ds_options=create_ds_options, ) # 读取转换后的新数据集 df = daft.read_las_dataset(name=new_dataset_name) print("\n\nnew dataset:") df.show()
如果您想使用 LAS 数据集页面上的完整功能,可以添加一个 __data_item_id 列作为逻辑主键,参考代码如下:
注意
进行加列操作前,您需要先将数据集转换为 Lance 数据集,当前仅支持对 Lance 数据集进行加列操作。
import random import pyarrow as pa from daft.io.lance import merge_columns from daft.io import IOConfig from daft.las.io import TOSConfig def create_assign_uuid(batch: pa.RecordBatch) -> pa.RecordBatch: num_rows = batch.num_rows uuid_list = [random.getrandbits(63) for _ in range(num_rows)] uuid_array = pa.array(uuid_list, type=pa.int64()) return pa.RecordBatch.from_arrays([uuid_array], names=["__data_item_id"]) print("正在生成业务主键 ...") dataset_name = "your_dataset_name" lance_tos_dir = f"tos://{tos_bueckt_dir}/{dataset_name}.lance" io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config()) merge_columns( url=lance_tos_dir.replace("tos://", "s3://"), io_config=io_config, transform=create_assign_uuid, ) print("业务主键生成完毕 ...")
对于图/音频/视频等数据通常也希望将原始的数据也写入 Lance 数据集,以下以图片数据为例,为您示例加列加列写入的示例代码:
import os import pyarrow as pa import tos from daft.io.lance import merge_columns from daft.io import IOConfig from daft.las.io import TOSConfig ak = os.environ["LAS_TOS_ACCESS_KEY"] sk = os.environ["LAS_TOS_SECRET_KEY"] endpoint = os.environ["TOS_ENDPOINT"] region = os.environ["TOS_REGION"] dataset_name = "doctest_image" # 从 tos 路径读取图片数据 def read_tos_path(path: str, tos_client: tos.TosClientV2): if (not path) or path.strip() == "": return None if path.startswith("tos://"): path = path[len("tos://") :] if path.startswith("s3://"): path = path[len("s3://") :] bucket, path = path.split("/", 1) obj = tos_client.get_object(bucket=bucket, key=path) return obj.read() def get_create_assign_raw_image_func(input_col: str, output_col: str): def create_assign_raw_image(batch: pa.RecordBatch) -> pa.RecordBatch: paths = batch.column(input_col).to_pylist() tos_client = tos.TosClientV2(ak, sk, endpoint, region) image_data_list = [] for path in paths: data = read_tos_path(path, tos_client) image_data_list.append(data) image_data_array = pa.array(image_data_list, type=pa.binary()) new_batch = pa.RecordBatch.from_arrays([image_data_array], names=[output_col]) # 仅返回新列 return new_batch return create_assign_raw_image print("正在将图片原始数据存入 lance 数据集中 ...") io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config()) lance_tos_dir = f"tos://ycs-test/las/lance/{dataset_name}.lance" merge_columns( url=lance_tos_dir.replace("tos://", "s3://"), io_config=io_config, transform=get_create_assign_raw_image_func("image", "raw_image"), ) print("图片原始数据已存入 lance 数据集")