按特定条件批量导出Collection中的数据
说明
使用前请先授权 VikingDB 跨服务访问 TOS 去授权
参数名 | 类型 | 必须 | 说明 | 子字段 | 类型 | 必选 | 说明 |
---|---|---|---|---|---|---|---|
TaskType | string | 是 | 任务类型,填入data_export | ||||
TaskParams | map | 是 | 任务参数 | collection_name | string | 是 | Collection 名称,必填 |
filter_conds | map | 是 | 过滤条件,支持 must 、must_nor、range、range_out 算子,使用参考标量过滤
| ||||
export_all | bool | 否 | 导出全部数据,此时filter不生效。默认为false | ||||
tos_path | string | 是 | 将数据文件导入到用户的TOS 路径,格式 :{桶名}/{路径},注意不是域名。必填 | ||||
file_type | string | 否 | 备份文件类型 json 或者 parquet,默认为 parquet |
package main import ( "fmt" "github.com/volcengine/volc-sdk-golang/service/vikingdb" ) func main() { service := vikingdb.NewVikingDBService("", "", "", "", "") filter_conds := make([]interface{}, 0) condition1 := make(map[string]interface{}) condition1["op"] = "must" condition1["field"] = "city" condition1["conds"] = []string{"beijing"} condition2 := make(map[string]interface{}) condition2["op"] = "range" condition2["field"] = "user_id" condition2["gt"] = 0 condition2["lt"] = 4 filter_conds = append(filter_conds, condition1) filter_conds = append(filter_conds, condition2) task_param := make(map[string]interface{}) task_param["collection_name"] = "example" task_param["filter_conds"] = filter_conds task_id, err := service.CreateTask(vikingdb.Data_Export, task_param) if err != nil { fmt.Println(err) } }
属性 | 说明 |
---|---|
task_id | uuid |
import tos DOMAIN = "api-vikingdb.volces.com" TOS_ENDPOINT = "tos-cn-beijing.ivolces.com" REGION = "cn-beijing" AK = "****" SK = "****" COLLECTION_NAME = "example" BUCKET_NAME = "bucket_name" TOS_DIR = "tos_dir" def download(client, bucket_name, object_key, local_path): file_path = "{}/{}".format(local_path, object_key) try: client.get_object_to_file(bucket_name, object_key, file_path) except tos.exceptions.TosClientError as e: # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常 return 'fail with client error, message:{}, cause: {}'.format(e.message, e.cause) except tos.exceptions.TosServerError as e: return 'fail with server error : {}'.format(e) except Exception as e: return 'fail with unknown error: {}'.format(e) return '' client = tos.TosClientV2(AK, SK, TOS_ENDPOINT, REGION) download(client, BUCKET_NAME, TOS_DIR, "./")
import pyarrow.parquet as pq def process_parquet(file_path): parquet_file = pq.ParquetFile(file_path) file_data_count = sum(parquet_file.metadata.row_group(i).num_rows for i in range(parquet_file.num_row_groups)) schema = parquet_file.schema.to_arrow_schema() row_iterator = parquet_file.iter_batches(batch_size=100) # 迭代读取数据 for batch in row_iterator: df = batch.to_pandas() for index, row in df.iterrows(): """ do what you want """ print(row) return '' process_parquet("1.parquet")