You need to enable JavaScript to run this app.
导航

API接入

最近更新时间2023.12.19 19:07:27

首次发布时间2023.08.30 16:17:17

一、简介

火山引擎DataSail提供数据上报API,用户在开通DataSail服务之后,可以通过API将数据上报到火山云DataSail。

二、前置准备

服务开通

请确保您已开通了您需要访问的服务。您可前往火山引擎控制台开通全域数据集成服务,详见服务开通

获取安全凭证

Access Key(访问密钥)是访问火山引擎服务的安全凭证,包含Access Key ID(简称为AK)和Secret Access Key(简称为SK)两部分。您可登录火山引擎控制台,前往访问控制访问密钥 中创建及管理您的Access Key。更多信息可参考访问密钥帮助文档

产品接入

  1. 申请Topic

路径:数据采集-topic管理-新建Topic

  1. 申请数据采集

路径:数据采集-采集管理-新建采集任务

三、API接入

所需信息汇总

字段说明示例备注
AK火山引擎Access KeyAKLTZWU*****
SK火山引擎Secret KeyTW1KaVl******

采集任务ID

采集任务ID

hkktppvwtuv0xy000

获取路径:数据采集->采集管理->采集任务 ID 🔗

服务域名上报数据的域名datasail01-cn-beijing.volceapplog.com
Region区域cn-north-1固定值
Service服务dataleap固定值

服务域名

地域服务域名
华北2(北京)datasail01-cn-beijing.volceapplog.com

服务URL

当上报数据时,需要根据服务域名及采集任务名称组合出最终URL访问地址,格式如下:

https://{服务域名}/v1/production/general/collect/{采集任务ID}/list

例如,采集任务ID为hkktppvwtuv0xy000, 所在地域为华北2(北京),则对应访问地址为:

https://datasail01-cn-beijing.volceapplog.com/v1/production/general/collect/hkktppvwtuv0xy000/list

请求

参数名类型是否必填含义取值逻辑
AuthenticationString火山TOP网关鉴权按火山签名方案后的值

Content-Type

String

上报的数据类型

  • application/json

    • Json格式数据上报,支持批量上报
      • Http header中添加X-Collect-Content-Type: batch_json时,上报批量json数据

      • 无X-Collect-Content-Type时单条json数据上报

  • application/octet-stream

    • 任意类型数据上报
  • 未填

    • 任意类型

请求签名

签名方案参考火山引擎签名: https://www.volcengine.com/docs/6369/67269,可以使用火山引擎已提供的签名SDK: https://www.volcengine.com/docs/6369/156029
整体流程:
alt
因整体签名过程较繁琐,推荐使用火山引擎已封装的SDK,签名与数据上报Demo见下方"接入Demo"。

Body

上报实际数据

接入Demo

Java

pom依赖配置

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.8</version>
</dependency>
<dependency>
    <groupId>com.volcengine</groupId>
    <artifactId>volc-sdk-java</artifactId>
    <version>1.0.49</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.10.1</version>
</dependency>

Demo

import com.google.gson.Gson;
import com.volcengine.auth.ISignerV4;
import com.volcengine.auth.impl.SignerV4Impl;
import com.volcengine.model.Credentials;
import com.volcengine.service.SignableRequest;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.Asserts;
import org.apache.http.util.EntityUtils;

public class GeneralCollector {

    private ISignerV4 singer;
    private Credentials credentials = new Credentials();
    private CloseableHttpClient httpClient;
    private String url;


    public GeneralCollector(String ak, String sk, String domain, String taskKey) {
        singer = new SignerV4Impl();
        RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(1000)
                .setSocketTimeout(3000).setConnectTimeout(1000).build();
        httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();

        credentials.setAccessKeyID(ak);
        credentials.setSecretAccessKey(sk);
        credentials.setRegion("cn-north-1");
        credentials.setService("dataleap");

        url = "https://" + domain + "/v1/production/general/collect/" + taskKey + "/list/";
    }

    // 上报json list
    public void batchSendByteData(byte[] data) throws Exception {
        if (hasBigSize(data)) {
            throw new Exception("data size is too big");
        }

        send(data, ContentType.APPLICATION_JSON, "batch_json");

    }

    // 上报单条bytes数据
    public void sendByteData(byte[] data) throws Exception {
        if (hasBigSize(data)) {
            throw new Exception("data size is too big");
        }

        send(data, ContentType.APPLICATION_OCTET_STREAM, "");
    }

    private void send(byte[] data, ContentType contentType, String collectContentType) throws Exception {
        SignableRequest signableRequest;
        signableRequest = new SignableRequest();
        signableRequest.setMethod("POST");
        URIBuilder uriBuilder = new URIBuilder(url);
        
        signableRequest.setHeader("Content-Type", contentType.getMimeType());

        if (collectContentType != null && !collectContentType.equals("")) {
            signableRequest.setHeader("X-Collect-Content-Type", collectContentType);
        }
        signableRequest.setEntity(new ByteArrayEntity(data, contentType));
        signableRequest.setUriBuilder(uriBuilder);
        signableRequest.setURI(uriBuilder.build());

        singer.sign(signableRequest, credentials);

        Asserts.notNull(signableRequest, "signableRequest can not null");

        HttpResponse response = httpClient.execute(signableRequest);

        if (response.getStatusLine().getStatusCode() != 200) {
            throw new Exception("http status code error");
        }

        HttpEntity respEntity = response.getEntity();
        byte[] content = EntityUtils.toByteArray(respEntity);

        Gson g = new Gson();
        RespInfo respInfo = g.fromJson(new String(content), RespInfo.class);

        if (respInfo.e != null && respInfo.e != 0) {
            throw new Exception("response code error, code: " + respInfo.e + " msg: " + respInfo.m);
        }
    }

    private boolean hasBigSize(byte[] data) {
        // 判断size,不能超过4MB
        if (data.length > 4000000) {
            return true;
        }
        return false;
    }

    public static void Test() throws Exception {

        GeneralCollector collector = new GeneralCollector("AKLTYzlhOGU5YjZjM2U4NGJjYzgzNzFiYmU5N**********",
                "TVdFMlptTmhNVE15TnpjMU5ESTFNR0ZsWlRZM01HRmxObVkx**********==",
                "datasail01-cn-beijing.volceapplog.com",
                "w13311338567c9afe"
                );

        collector.sendByteData("abc".getBytes());

        collector.batchSendByteData("[{\"a\":1},{\"b\":\"2\"}]".getBytes());

    }

    static class RespInfo {
        public Integer e;
        public String m;
    }
}

Go

签名使用: github.com/volcengine/volc-sdk-golang

package main

import (
   "bytes"
   "encoding/json"
   "fmt"
   "io/ioutil"
   "net/http"
   "time"

   "github.com/volcengine/volc-sdk-golang/base"
)

type RespInfo struct {
   E *int   `json:"e"`
   M string `json:"m"`
}

type GeneralCollector struct {
   credentials base.Credentials
   url         string
}

func NewGeneralCollector(ak, sk, domain, taskKey string) *GeneralCollector {
   return &GeneralCollector{
      credentials: base.Credentials{
         AccessKeyID:     ak,
         SecretAccessKey: sk,
         Service:         "dataleap",
         Region:          "cn-north-1",
      },
      url: fmt.Sprintf("https://%s/v1/production/general/collect/%s/list/", domain, taskKey),
   }
}

func (g *GeneralCollector) SendByteData(data []byte) error {

   payload := bytes.NewReader(data)
   req, _ := http.NewRequest("POST", g.url, payload)

   signRequest := g.credentials.Sign(req) // 签名

   httpClient := http.Client{Timeout: time.Second * 5}
   httpResponse, err := httpClient.Do(signRequest)
   if err != nil {
      return fmt.Errorf("failed to send data : %v", err)
   }

   defer func() {
      if httpResponse != nil && httpResponse.Body != nil {
         httpResponse.Body.Close()
      }
   }()

   if httpResponse.StatusCode != http.StatusOK {
      return fmt.Errorf("failed to send data (status code: %d)", httpResponse.StatusCode)
   }

   response, err := ioutil.ReadAll(httpResponse.Body)
   if err != nil {
      return fmt.Errorf("failed to read response: %v", err)
   }

   respInfo := RespInfo{}
   if err := json.Unmarshal(response, &respInfo); err != nil {
      return fmt.Errorf("failed to parse response: %v", err)
   }

   if respInfo.E == nil {
      return fmt.Errorf("get response error (no status code)")
   }
   if *respInfo.E != 0 {
      return fmt.Errorf("get response error (status code: %d, status msg: %s)", *respInfo.E, respInfo.M)
   }

   return nil
}

func main() {
   // 配置ak、sk、域名及taskKey
   collector := NewGeneralCollector("AKLTYzlhOGU5YjZjM2U4NGJjYzgzNzFiYmU5N**********",
      "TVdFMlptTmhNVE15TnpjMU5ESTFNR0ZsWlRZM01HRmxObVkx**********==",
      "datasail01-cn-beijing.volceapplog.com",
      "w13311338567c9afe",
   )

   if err := collector.SendByteData([]byte("Hello, world!")); err != nil {
      panic(err)
   }

}


响应

参数名含义详细信息

e

状态码

  • StatusOk = 0

  • StatusInvalidParam = -1

  • StatusInvalidEventFormat = -2

  • StatusParseArgError = -3

  • StatusNoSupportMethodError = -4

  • StatusTooManyElementError = -5

  • StatusTooManyEventError = -6

  • StatusGetAppidError = -7

  • StatusParseUserError = -8

  • StatusParseHeaderError = -9

  • StatusParseEventError = -10

  • StatusSaveEventError = -11

  • StatusEventEmptyError = -12

  • StatusUnknownError = -100

  • StatusIdentityMatchError = -101

  • StatusSignatureMatchError = -102

  • StatusNotSupportAppIdError = -103

  • StatusNotFoundRouteError = -104

m如失败,显示错误信息

数据示例

请求

POST /v1/production/general/collect/w13311338567c9afe/list HTTP/1.1
Host: datasail01-cn-beijing.volceapplog.com
Authorization: HMAC-SHA256 Credential=AKLTYzlhOGU5YjZjM2U4NGJjYzgzNzFiYmU5NjY4ZDQ1MDU/20230804/cn-north-1/dataleap/request, SignedHeaders=content-type;host;x-content-sha256;x-date, Signature=a2e214ea7d9492bb35f75f48eaf820570083194b4dd29942b6fdacdd278d2ebb
Content-Type: application/x-www-form-urlencoded; charset=utf-8
X-Content-Sha256: 315f5bdb76d078c43b8ac0064e4a0164612b1fce77c869345bfc94c75894edd3
X-Date: 20230804T101113Z

Hello, world!

响应

HTTP/1.1 200 OK
Content-Length: 21
Access-Control-Allow-Credentials: true
Access-Control-Allow-Methods: GET, OPTIONS, HEAD, PUT, POST
Access-Control-Max-Age: 1800
Connection: keep-alive
Content-Type: application/json; charset=utf-8
Date: Fri, 04 Aug 2023 10:11:13 GMT
Server: volcclb
Vary: Accept-Encoding
X-Trace-Id: 20230804101112000fe63e5682ecb48b

{"e":0,"m":"success"}