列出数据集column信息
import daft
import os
import lance
from daft.io.object_store_options import io_config_to_storage_options
from daft.io import IOConfig
from daft.las.io import TOSConfig
# 设置访问密钥、密钥和端点信息
# 请替换为您的访问密钥和密钥
ak = "<your_ak>" # 请替换为您的访问密钥
sk = "<your_sk>" # 请替换为您的密钥
endpoint = "https://tos-cn-beijing.ivolces.com"
region = "cn-beijing"
# 将密钥信息设置为环境变量
os.environ["LAS_TOS_ACCESS_KEY"] = ak
os.environ["LAS_TOS_SECRET_KEY"] = sk
os.environ["TOS_ENDPOINT"] = endpoint
def list_dataset_columns(dataset_name=None, lance_tos_dir=None):
"""
列出数据集的列名
参数:
dataset_name: LAS数据集名称
lance_tos_dir: Lance数据集TOS路径
"""
if dataset_name:
# 通过数据集名称读取
print(f"\n读取LAS数据集: {dataset_name}")
df = daft.read_las_dataset(name=dataset_name)
print("\n数据集列名:")
print(df.column_names)
# 显示数据集样例
print("\n数据集样例:")
df.show(5)
if lance_tos_dir:
# 通过Lance路径直接读取
print(f"\n读取Lance数据集: {lance_tos_dir}")
# 配置IO参数,使用从环境变量获取的TOS配置
io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config())
# 生成lance读取配置,构建lance客户端
storage_options = io_config_to_storage_options(io_config, lance_tos_dir.replace("tos://", "s3://"))
lance_ds = lance.dataset(uri=lance_tos_dir.replace("tos://", "s3://"), storage_options=storage_options)
# 获取Lance数据集的schema
schema = lance_ds.schema
print("\nLance数据集schema:")
print(schema)
# 列出所有列名
print("\nLance数据集列名:")
column_names = [field.name for field in schema]
print(column_names)
def main():
# 示例:列出LAS数据集的列名
# 替换为您的数据集名称
las_dataset_name = "las_dataset_20251011_image_lance"
list_dataset_columns(dataset_name=las_dataset_name)
# 示例:列出Lance数据集的列名
# 替换为您的TOS桶名和路径
lance_tos_dir = "tos://xuxiaoliang-sh-test/lance/las_dataset_20251011_1_image.lance"
list_dataset_columns(lance_tos_dir=lance_tos_dir)
if __name__ == "__main__":
main()
删除不需要的列
##请先执行source env.sh ,获取相关变量值
import os
import lance
from daft.io import IOConfig
from daft.io.object_store_options import io_config_to_storage_options
from daft.las.io import TOSConfig
from daft.las.infra.las_dataset import LasDatasetClient, LasDatasetConfig
# -------------------------- 1. 核心依赖导入 & 配置环境变量(删除列必需) --------------------------
# 从环境变量获取关键配置(需提前通过环境变量或脚本设置)
TOS_LANCE_DIR = os.getenv("TOS_LANCE_DIR") # Lance数据集在TOS的路径(如:tos://bucket/lance/dataset.lance)
DATASET_NAME = os.getenv("DATASET_NAME") # LAS数据集名称(用于刷新元数据)
# 加载TOS访问密钥(从环境变量读取,避免硬编码)
os.environ["LAS_TOS_ACCESS_KEY"] = os.getenv("LAS_TOS_ACCESS_KEY")
os.environ["LAS_TOS_SECRET_KEY"] = os.getenv("LAS_TOS_SECRET_KEY")
os.environ["TOS_ENDPOINT"] = os.getenv("LAS_TOS_ENDPOINT")
# 初始化IO配置(连接TOS必需,用于Lance数据集读写)
io_config = IOConfig(s3=TOSConfig.from_env().to_s3_config())
# -------------------------- 2. 核心功能:删除Lance数据集指定列 + 刷新元数据 --------------------------
def delete_embedding_columns():
# 1. 处理Lance路径格式(将tos://转为s3://,适配Lance SDK)
if not TOS_LANCE_DIR:
raise ValueError("请先设置环境变量 TOS_LANCE_DIR(Lance数据集在TOS的路径)")
lance_path = TOS_LANCE_DIR.replace("tos://", "s3://")
# 2. 连接Lance数据集(需TOS存储配置)
storage_options = io_config_to_storage_options(io_config, lance_path)
lance_ds = lance.dataset(uri=lance_path, storage_options=storage_options)
print(f"成功连接Lance数据集:{lance_path}")
print(f"当前数据集列:{lance_ds.schema.names}")
# 3. 核心操作:删除指定列(保留原代码中需要删除的列名)
columns_to_delete = ["num_rows", "image_embedding"] # 需删除的列名列表
# 过滤出数据集中实际存在的列(避免删除不存在的列报错)
existing_columns = [col for col in columns_to_delete if col in lance_ds.schema.names]
if existing_columns:
lance_ds.drop_columns(existing_columns) # 执行删除列操作
print(f"已成功删除列:{existing_columns}")
else:
print(f"无需删除:指定的列 {columns_to_delete} 均不在数据集中")
# 4. 刷新LAS控制台元数据(确保删除后在控制面能看到最新列结构)
if DATASET_NAME:
client = LasDatasetClient(config=LasDatasetConfig.from_io_config(io_config))
body = {"DatasetName": DATASET_NAME}
client.api_client.call_api(
method="POST",
params={},
headers={},
action="RefreshDatasetMetadata",
body=body,
)
print(f"已刷新LAS数据集 {DATASET_NAME} 的元数据")
else:
print("未设置 DATASET_NAME,跳过元数据刷新步骤")
# -------------------------- 3. 执行入口 --------------------------
if __name__ == "__main__":
delete_embedding_columns()