You need to enable JavaScript to run this app.
导航
upsert_data
最近更新时间:2025.06.10 16:06:24首次发布时间:2024.04.17 14:21:06
我的收藏
有用
有用
无用
无用

概述

upsert_data 用于在指定的数据集 Collection 内写入数据。指定写入的数据是一个数组,允许单次插入一条数据或者多条数据,单次最多可插入100条数据。

请求参数

参数名

子参数

类型

是否必选

参数说明

Data

说明

Data 实例或者 Data 实例列表。

fields

map

指定写入的数据。

  • 单次写入的数据数目不超过100。
  • 每条数据作为一个 map,key 为字段名,value 为字段值。
  • 数据写入时 fields 长度最大为65535,超过限制时会返回报错 “fields data is too long, should be less than 65535”。
  • 不同字段类型的字段值格式如下:
    • int64:格式是整型数值。
    • float:格式是浮点数值。
    • string:格式是字符串。
    • bool:格式是 true/false。
    • list<string>:格式是字符串数组。
    • list<int64>:格式是整型数组。
    • vector:格式是向量(浮点数数组)。
    • sparse_vector:格式是 json 字典,k 为 string 类型,表示关键词的字面量,v 为 float 类型,表示该关键词的权重数值。
    • text:格式是 map<string, string>,当前支持 text 。
      • text:以 string 形式写入文本原始数据, 如 {"text": "hello world"}。
    • image:当前支持写入tos路径。格式为tos://{bucket}/{object_key}tos和VikingDB的region需要保持一致。

ttl

int

数据过期时间,单位为秒。

  • 格式:0 和正整数。
  • 默认值:默认为0,表示数据不过期。
  • 当 ttl 设置为86400时,表示1天后数据自动删除。
  • 数据 ttl 删除,不会立刻更新到索引。

async_upsert

bool

是否异步请求接口,适用于大规模数据的写入场景,性能提升10倍。

  • True:表明异步请求写入。
  • 默认值:默认为False,表示正常请求。

注意

  • 在执行upsert操作时选择上传数据的fields字段时,若选择“从向量化开始”,请根据需求上传text、image字段中的一个或一起上传,不要上传“vector”字段;若选择“已有向量数据”,请仅上传“vector”字段,不要上传“text”或 “image”字段。两者不能同时上传。
  • 若需要上传image图多模态类型字段,请先将图片上传至TOS,并将图片的tos存储路径传入"image"字段。具体执行逻辑可参考如下流程图。详见【向量库】多模态搜索最佳实践

Image

示例

请求参数

同步写入数据示例:

# 获取指定数据集,程序初始化时调用即可,无需重复调用
from volcengine.viking_db.CollectionClient import CollectionClient

collection_client = CollectionClient("test_collection", "api-vikingdb.volces.com", "cn-beijing", "your ak", "your sk", "http")

此处为使用“已有向量数据”创建数据集,即已经对原始数据执行过embedding操作,因此上传“vector”类型数据。

# 构建向量
def gen_random_vector(dim):
    res = [0, ] * dim
    for i in range(dim):
        res[i] = random.random() - 0.5
    return res
   
field1 = {"doc_id": "11", "text_vector": gen_random_vector(12), "text_sparse_vector": {"hello": 0.34, "world": 0.03, "!": 0.11}, "like": 1, "price": 1.11,
          "author": ["gy"], "aim": True}
field2 = {"doc_id": "22", "text_vector": gen_random_vector(12), "text_sparse_vector": {"hi": 0.12, "there": 0.043, "!": 0.5},"like": 2, "price": 2.22,
          "author": ["gy", "xjq"], "aim": False}
data1 = Data(field1)
data2 = Data(field2)
datas = []
datas.append(data1)
datas.append(data2)
collection_client.upsert_data(datas)  


# 异步调用
async def upsert_data():
    def gen_random_vector(dim):
        res = [0, ] * dim
        for i in range(dim):
            res[i] = random.random() - 0.5
        return res


    collection_client = CollectionClient("test_collection", "api-vikingdb.volces.com", "cn-beijing", "your ak", "your sk", "http")
    field1 = {"doc_id": "111", "text_vector": gen_random_vector(10), "like": 1, "price": 1.11,
             "author": ["gy"], "aim": True}
    field2 = {"doc_id": "222", "text_vector": gen_random_vector(10), "like": 2, "price": 2.22,
             "author": ["gy", "xjq"], "aim": False}
    field3 = {"doc_id": "333", "text_vector": gen_random_vector(10), "like": 3, "price": 3.33,
             "author": ["gy", "xjq"], "aim": False}
    field4 = {"doc_id": "444", "text_vector": gen_random_vector(10), "like": 4, "price": 4.44,
             "author": ["gy", "xjq"], "aim": False}
    data1 = Data(field1)
    data2 = Data(field2)
    data3 = Data(field3)
    data4 = Data(field4)
    datas = [data1, data2, data3, data4]
    await collection_client.async_upsert_data(datas)
asyncio.run(upsert_data())

异步写入数据示例:

import multiprocessing
import struct, base64, uuid, tqdm, time
from volcengine.viking_db import *

queue = multiprocessing.Queue(maxsize=10)
event = multiprocessing.Event()
def consumer():
    """消费者函数:从队列中取出数据并处理"""
    vikingdb_service = VikingDBService()
    vikingdb_service.set_ak("ak")
    vikingdb_service.set_sk("sk")
    collection_client = CollectionClient("test_collection", "api-vikingdb.volces.com", "cn-beijing", "your ak", "your sk", "http")
    items = []
    while not event.is_set() or not queue.empty():
        item = queue.get()
        items.append(item)
        if len(items) == 50:
            collection_client.upsert_data(items, async_upsert=True)
            items = []

    print("Consumer received event. Exiting...")

if __name__ == "__main__":
    # 创建消费者进程
    processors = []
    for i in range(10):
        p = multiprocessing.Process(target=consumer)
        p.start()
        processors.append(p)

    #  准备数据, 
    datas = []
    for i in range(100000):  
        # 压缩向量
        float_array = [0.124135132531424]*1024
        packed_data = struct.pack('f'*len(float_array), *float_array)
        s =  base64.b64encode(packed_data).decode()
        uuid4 = uuid.uuid4()  # 此处用户可修改为自己希望的id
        datas.append(Data({"id": str(uuid4), "text_vertor": s}))
    
    for data in tqdm.tqdm(datas):
        queue.put(data)
    # 通知消费者停止工作
    event.set()
    for p in processors:
        p.join()

    print("Main process exiting...")

返回值

Python 调用执行上面的任务,执行成功无返回信息。