You need to enable JavaScript to run this app.
导航

同步接口(write)

最近更新时间2023.12.12 16:12:24

首次发布时间2022.04.13 17:53:21

用于将数据上传至火山引擎服务器。数据预同步、历史数据同步、增量天级数据同步、增量实时数据同步等均会涉及到此接口。
每次请求数据量不超过10000条qps建议不超过100每秒上传的数据条数不超过50000条(请求qps*每次请求中数据条数)。
若既有增量天级数据,也有增量实时数据,必须先接入增量天级数据,再接入增量实时数据。
若仅有增量实时数据,上传后不可再上传增量天级数据。
数据上传接口的超时时间应尽量大,例如设置为5s。当数据上传接口调用失败的话,应重新上传数据。
增量实时数据上报时,建议聚合一批数据一起上报(比如积攒1000条再上报),减小客户端和服务端频繁交互的压力。

调用方法

write_data(self, data_list: list, topic: str, *opts: Option) -> WriteResponse

方法参数

参数

类型

说明

data_list

list[dict[str,Any]]

上传的具体数据,不同行业同步字段请按照数据规范填写

topic

str

数据上传时的topic,如用户数据对应“user”,商品数据对应“item”,行为数据对应“behavior”

opts

Option[]

请求中可选参数,不同场景需要带上不同opts参数,包括timeout、stage、data_date、request_id。其中data_date只需要在离线数据上传时使用。具体使用方式见用例

方法返回

参数

类型

说明

获取方法

code

int

状态码

response.status.code

message

string

请求结果信息

response.status.message

示例

# 数据上传example
import uuid
from datetime import datetime

from byteair import ClientBuilder, Client
from byteair.protocol.volcengine_byteair_pb2 import *
from core import Region, Option, NetException, BizException, metrics


# 示例省略client初始化过程


def write():
    # 此处为测试数据,实际调用时需注意字段类型和格式
    data_list = [
        {
            "id": "item_id1",
            "title": "test_title1",
            "status": 0,
            "brand": "volcengine",
            "pub_time": 1583641807,
            "current_price": 1.1,
        },
        {
            "id": "item_id2",
            "title": "test_title2",
            "status": 1,
            "brand": "volcengine",
            "pub_time": 1583641503,
            "current_price": 2.2,
        }
    ]
    # topic为枚举值,请参考API文档
    topic = "item"


    # 传输天级数据
    opts = (
        # 预同步("pre_sync"),历史数据同步("history_sync"),增量天级同步("incremental_sync_daily"),
        # 增量实时同步("incremental_sync_streaming")
        Option.with_stage("pre_sync"),
        # 必传,要求每次请求的Request-Id不重复,若未传,sdk会默认为每个请求添加
        Option.with_request_id(str(uuid.uuid1())),
        # 必传,数据产生日期,实际传输时需修改为实际日期
        Option.with_data_date(datetime(year=2022, month=1, day=1)),
    )


    try:
        response = client.write_data(data_list, topic, *opts)
    except BizException as e:
        print("[write] occur err, msg: %s" % e)
        return
    if not response.status.success:
        print("[write] failure")
        return
    print("[write] success")
    return

Spark on Python

代码示例

import uuid
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from datetime import datetime
from byteair import ClientBuilder, Client
from byteair.protocol.volcengine_byteair_pb2 import *
from core import Region, Option, NetException, BizException, metrics


def get_client():
      # 必传,租户id.
      TENANT_ID = "xxx"
      # 必传,项目id.
      PROJECT_ID = "xxx"
      # 必传,密钥AK,获取方式:【火山引擎控制台】->【个人信息】->【密钥管理】中获取.
      AK = "xxx"
      # 必传,密钥SK,获取方式:【火山引擎控制台】->【个人信息】->【密钥管理】中获取.
      SK = "xxx"

    client: Client = ClientBuilder() \
        .tenant_id(TENANT_ID) \
        .project_id(PROJECT_ID) \
        .ak(AK) \
        .sk(SK) \
        .region(Region.AIR_CN) \
        .build()
    # metrics上报初始化.建议开启,方便火山侧排查问题.
    metrics.init(())
    return client


input_path = 'hdfs://xxx'


if __name__ == '__main__':
    conf = SparkConf().setMaster('local').setAppName('spark_example_python')


    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    spark.sparkContext.setLogLevel("INFO")


    df = spark.read.json(input_path)
    df.show()


    def batch_write_data(items):
        client = get_client()
        data_list = []
        for item in items:
            import json
            item_json = json.loads(item)
            data_list.append(item_json)


        print("data_list:", data_list)


        # topic为枚举值,请参考API文档
        topic = "item"
        # 传输天级数据
        opts = (
            # 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"),
            # 增量实时同步("incremental_sync_streaming")
            Option.with_stage("incremental_sync_daily"),
            # 必传,数据产生日期,实际传输时需修改为实际日期
            Option.with_data_date(datetime(year=2022, month=10, day=7)),
            Option.with_timeout(timedelta(milliseconds=3000)),
            Option.with_request_id(str(uuid.uuid1())),
        )
        try:
            write_response = client.write_data(data_list, topic, *opts)
        except BizException as e:
            print("[write] occur err, msg: %s" % e)
            return
        if not write_response.status.success:
            print("[write] failure:", write_response.status.message)
            return
        print("[write] success")
        return

    df.toJSON().foreachPartition(batch_write_data)

python调用执行上面的任务,查看输出。