You need to enable JavaScript to run this app.
导航
Collection 数据过滤导出任务
最近更新时间:2025.04.23 11:46:02首次发布时间:2025.04.23 11:46:02
我的收藏
有用
有用
无用
无用

概述

按特定条件批量导出Collection中的数据

说明

使用前请先授权 VikingDB 跨服务访问 TOS 去授权

请求参数

参数名

类型

必须

说明

子字段

类型

必选

说明

task_type

string

任务类型,填入data_export

task_params

map

任务参数

collection_name

string

Collection 名称,必填

filter_conds

map

过滤条件,支持 must 、must_nor、range、range_out 算子,使用参考标量过滤
过滤更新 city 为 beijing 且 user_id (0, 4) 之间的数据

[
    {
        "op": "must",
        "field": "city",
        "conds": ["beijing"]
    },
    {
        "op": "range",
        "field": "user_id",
        "gt": 0,
        "lt": 4
    },

]

export_all

bool

导出全部数据,此时filter不生效。默认为false

tos_path

string

将数据文件导入到用户的TOS 路径,格式 :{桶名}/{路径},注意不是域名。必填

file_type

string

备份文件类型 json 或者 parquet,默认为 parquet

示例

请求参数

from volcengine.viking_db import *

vikingdb_service = VikingDBService()
vikingdb_service.set_ak()
vikingdb_service.set_sk()
task_params = {
    "collection_name": "example",
    "tos_path": "{}/{}".format("bucket_name", "dir_name"),
    "filter_conds": [
        {
            "op": "must",
            "field": "city",
            "conds": ["beijing"]
        },
        {
            "op": "range",
            "field": "user_id",
            "gt": 0,
            "lt": 4
        },
    ]
}
task_id = vikingdb_service.create_task(TaskType.Data_export, task_params)

返回值

属性

说明

task_id

uuid

后续处理

1、从 TOS 下载文件

import tos
DOMAIN = "api-vikingdb.volces.com"
TOS_ENDPOINT = "tos-cn-beijing.ivolces.com"
REGION = "cn-beijing"
AK = "****"
SK = "****"
COLLECTION_NAME = "example"
BUCKET_NAME = "bucket_name"
TOS_DIR = "tos_dir"

def download(client, bucket_name, object_key, local_path):
    file_path = "{}/{}".format(local_path, object_key)
    try:
        client.get_object_to_file(bucket_name, object_key, file_path)
    except tos.exceptions.TosClientError as e:
        # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常
        return 'fail with client error, message:{}, cause: {}'.format(e.message, e.cause)
    except tos.exceptions.TosServerError as e:
        return 'fail with server error : {}'.format(e)
    except Exception as e:
        return 'fail with unknown error: {}'.format(e)
    return ''

client = tos.TosClientV2(AK, SK, TOS_ENDPOINT, REGION)
download(client, BUCKET_NAME, TOS_DIR, "./")

2、解析 parquet 类型数据

import pyarrow.parquet as pq
def process_parquet(file_path):
    parquet_file = pq.ParquetFile(file_path)

    file_data_count = sum(parquet_file.metadata.row_group(i).num_rows for i in range(parquet_file.num_row_groups))
    schema = parquet_file.schema.to_arrow_schema()
    row_iterator = parquet_file.iter_batches(batch_size=100)
    # 迭代读取数据
    for batch in row_iterator:
        df = batch.to_pandas()
        for index, row in df.iterrows():
            """
            do what you want
            """
            print(row)
    return ''

process_parquet("1.parquet")