通过 CreateVikingdbTask 接口以 task_type=data_export 的方式,将指定 Collection 中的数据批量导出到 TOS。
说明
使用前请先授权 VikingDB 跨服务访问 TOS 去授权
Python SDK 通过 VIKINGDBApi().create_vikingdb_task(request) 发起调用,request 类型为 volcenginesdkvikingdb.CreateVikingdbTaskRequest。
参数 | 子参数 | 类型 | 是否必填 | 描述 |
|---|---|---|---|---|
project_name | str | 否 | 任务所属项目,对应 API 字段 | |
collection_name | str | 2选1 | Collection 名称,对应 API 字段 | |
resource_id | str | Collection 资源 ID,对应 API 字段 | ||
task_type | str | 是 | 任务类型,对应 API 字段 | |
task_config | TaskConfigForCreateVikingdbTaskInput | 是 | 导出任务的详细配置,对应 API 字段 | |
tos_path | str | 是 | TOS 路径,格式 :{桶名}/{路径},注意不是域名。对应 API 字段 | |
file_type | str | 否 | 文件类型,支持 | |
export_all | bool | 否 | 是否导出全部数据,对应 API 字段 | |
filter_conds | list[object] | 是 | 过滤条件。使用参考 |
参数 | 类型 | 描述 |
|---|---|---|
task_id | str | 任务 ID,对应 API 字段 |
message | str | 接口返回的提示信息,对应 API 字段 |
import os import volcenginesdkcore import volcenginesdkvikingdb as vdb from volcenginesdkvikingdb.api.vikingdb_api import VIKINGDBApi configuration = volcenginesdkcore.Configuration() configuration.ak = os.environ["VIKINGDB_AK"] configuration.sk = os.environ["VIKINGDB_SK"] configuration.region = os.environ["VIKINGDB_REGION"] configuration.host = os.environ["VIKINGDB_HOST"] configuration.scheme = "https" volcenginesdkcore.Configuration.set_default(configuration) client = VIKINGDBApi() task_cfg = vdb.TaskConfigForCreateVikingdbTaskInput( file_type="parquet", export_all=True, tos_path="your-bucket/path/to/export.parquet", ) request = vdb.CreateVikingdbTaskRequest( project_name="default", collection_name="sdk_demo_collection", task_type="data_export", task_config=task_cfg, ) response = client.create_vikingdb_task(request) print("task id:", response.task_id) print("message:", response.message)
import tos DOMAIN = "api-vikingdb.volces.com" TOS_ENDPOINT = "tos-cn-beijing.ivolces.com" REGION = "cn-beijing" AK = "****" SK = "****" COLLECTION_NAME = "example" BUCKET_NAME = "bucket_name" TOS_DIR = "tos_dir" def download(client, bucket_name, object_key, local_path): file_path = "{}/{}".format(local_path, object_key) try: client.get_object_to_file(bucket_name, object_key, file_path) except tos.exceptions.TosClientError as e: # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常 return 'fail with client error, message:{}, cause: {}'.format(e.message, e.cause) except tos.exceptions.TosServerError as e: return 'fail with server error : {}'.format(e) except Exception as e: return 'fail with unknown error: {}'.format(e) return ''
import pyarrow.parquet as pq def process_parquet(file_path): parquet_file = pq.ParquetFile(file_path) file_data_count = sum(parquet_file.metadata.row_group(i).num_rows for i in range(parquet_file.num_row_groups)) schema = parquet_file.schema.to_arrow_schema() row_iterator = parquet_file.iter_batches(batch_size=100) # 迭代读取数据 for batch in row_iterator: df = batch.to_pandas() for index, row in df.iterrows(): """ do what you want """ print(row) return '' process_parquet("1.parquet")