You need to enable JavaScript to run this app.
导航

同步接口(write)

最近更新时间2023.10.19 17:15:44

首次发布时间2022.04.13 17:53:20

用于将数据上传至火山引擎服务器。数据预同步、历史数据同步、增量天级数据同步、增量实时数据同步等均会涉及到此接口。
每次请求数据量不超过10000条qps建议不超过100每秒上传的数据条数不超过50000条(请求qps*每次请求中数据条数)。
若既有增量天级数据,也有增量实时数据,必须先接入增量天级数据,再接入增量实时数据。
若仅有增量实时数据,上传后不可再上传增量天级数据。
数据上传接口的超时时间应尽量大,例如设置为5s。当数据上传接口调用失败的话,应重新上传数据。
增量实时数据上报时,建议聚合一批数据一起上报(比如积攒1000条再上报),减小客户端和服务端频繁交互的压力。

调用方法

WriteData(dataList []map[string]interface{}, topic string, opts ...option.Option (*WriteResponse, error)

方法参数

参数

类型

说明

dataList

[]map[string]interface{}

上传的具体数据,不同行业同步字段请按照数据规范填写

topic

String

数据上传时的topic,如用户数据对应“user”,商品数据对应“item”,行为数据对应“behavior”

opts

Option[]

请求中可选参数,不同场景需要带上不同opts参数,包括timeout、stage、DataDate、RequestId。其中DataDate只需要在离线数据上传时使用。具体使用方式见用例

方法返回

使用自定义的WriteResponse类作为响应类型,具体参数如下表所示。在获取到WriteResponse类型的返回值后可调用它的GetStatus()方法判断此次数据上传是否成功。

参数

类型

字段含义

获取方法

Status

int

状态码

GetStatus

Errors

DataError

出错的数据

GetErrors

示例

import (
   "github.com/google/uuid"
   "github.com/volcengine/volcengine-sdk-go-rec/byteair"
   "github.com/volcengine/volcengine-sdk-go-rec/core/logs"
   "github.com/volcengine/volcengine-sdk-go-rec/core/option"
   "time"
)


// 已经初始化好的client.示例省略init()
var client byteair.Client


func Write() {
   // 此处为示例数据,实际调用时需注意字段类型和格式
   dataList := []map[string]interface{}{
      { // 第一条数据
         "id":            "item_id1",
         "title":         "test_title1",
         "status":        0,
         "brand":         "volcengine",
         "pub_time":      1583641807,
         "current_price": 1.1,
      },
      { // 第二条数据
         "id":            "item_id2",
         "title":         "test_title2",
         "status":        1,
         "brand":         "volcengine",
         "pub_time":      1583641503,
         "current_price": 2.2,
      },
   }
   // topic为枚举值,请参考API文档
   topic := "item"
   // 同步离线天级数据,需要指定日期
   date, _ := time.Parse("2006-01-02", "2022-01-01")
   opts := []option.Option{
      // 预同步("pre_sync"),历史数据同步("history_sync"),增量天级同步("incremental_sync_daily"),增量实时同步("incremental_sync_streaming")
      option.WithStage("pre_sync"),
      // 必传,数据产生日期,实际传输时需修改为实际日期.增量实时同步可不传.
      option.WithDataDate(date),
      option.WithRequestId(uuid.NewString()),
   }
   rsp, err := client.WriteData(dataList, topic, opts...)
   if err != nil {
      logs.Error("[write] occur error, msg: %s", err.Error())
      return
   }
   if !rsp.GetStatus().GetSuccess() {
      logs.Error("[write] failure")
      return
   }
   logs.Info("[write] success")
   return
}