You need to enable JavaScript to run this app.
导航
Collection 数据过滤导出任务
最近更新时间:2025.05.26 14:51:26首次发布时间:2025.05.26 14:51:26
我的收藏
有用
有用
无用
无用

概述

按特定条件批量导出Collection中的数据

说明

使用前请先授权 VikingDB 跨服务访问 TOS 去授权

请求参数

参数名

类型

必须

说明

子字段

类型

必选

说明

TaskType

string

任务类型,填入data_export

TaskParams

map

任务参数

collection_name

string

Collection 名称,必填

filter_conds

map

过滤条件,支持 must 、must_nor、range、range_out 算子,使用参考标量过滤
过滤更新 city 为 beijing 且 user_id (0, 4) 之间的数据

[
    {
        "op": "must",
        "field": "city",
        "conds": ["beijing"]
    },
    {
        "op": "range",
        "field": "user_id",
        "gt": 0,
        "lt": 4
    },

]

export_all

bool

导出全部数据,此时filter不生效。默认为false

tos_path

string

将数据文件导入到用户的TOS 路径,格式 :{桶名}/{路径},注意不是域名。必填

file_type

string

备份文件类型 json 或者 parquet,默认为 parquet

示例

请求参数

package main

import (
  "fmt"

  "github.com/volcengine/volc-sdk-golang/service/vikingdb"
)

func main() {
      service := vikingdb.NewVikingDBService("", "", "", "", "")
      filter_conds := make([]interface{}, 0)
      condition1 := make(map[string]interface{})
      condition1["op"] = "must"
      condition1["field"] = "city"
      condition1["conds"] = []string{"beijing"}
      
      
      condition2 := make(map[string]interface{})
      condition2["op"] = "range"
      condition2["field"] = "user_id"
      condition2["gt"] = 0
      condition2["lt"] = 4
      
      filter_conds = append(filter_conds, condition1)
      filter_conds = append(filter_conds, condition2)
      
      task_param := make(map[string]interface{})
      task_param["collection_name"] = "example"
      task_param["filter_conds"] = filter_conds
      task_id, err := service.CreateTask(vikingdb.Data_Export, task_param)
      if err != nil {
        fmt.Println(err)
      }
 }

返回值

属性

说明

task_id

uuid

后续处理

1、从 TOS 下载文件

import tos
DOMAIN = "api-vikingdb.volces.com"
TOS_ENDPOINT = "tos-cn-beijing.ivolces.com"
REGION = "cn-beijing"
AK = "****"
SK = "****"
COLLECTION_NAME = "example"
BUCKET_NAME = "bucket_name"
TOS_DIR = "tos_dir"

def download(client, bucket_name, object_key, local_path):
    file_path = "{}/{}".format(local_path, object_key)
    try:
        client.get_object_to_file(bucket_name, object_key, file_path)
    except tos.exceptions.TosClientError as e:
        # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常
        return 'fail with client error, message:{}, cause: {}'.format(e.message, e.cause)
    except tos.exceptions.TosServerError as e:
        return 'fail with server error : {}'.format(e)
    except Exception as e:
        return 'fail with unknown error: {}'.format(e)
    return ''

client = tos.TosClientV2(AK, SK, TOS_ENDPOINT, REGION)
download(client, BUCKET_NAME, TOS_DIR, "./")

2、解析 parquet 类型数据

import pyarrow.parquet as pq
def process_parquet(file_path):
    parquet_file = pq.ParquetFile(file_path)

    file_data_count = sum(parquet_file.metadata.row_group(i).num_rows for i in range(parquet_file.num_row_groups))
    schema = parquet_file.schema.to_arrow_schema()
    row_iterator = parquet_file.iter_batches(batch_size=100)
    # 迭代读取数据
    for batch in row_iterator:
        df = batch.to_pandas()
        for index, row in df.iterrows():
            """
            do what you want
            """
            print(row)
    return ''

process_parquet("1.parquet")