You need to enable JavaScript to run this app.
导航

SDK订阅

最近更新时间2023.12.19 19:52:00

首次发布时间2023.05.29 11:14:09

简介

火山引擎DataSail数据订阅SDK提供了客户消费DataSail数据集的能力,用户无需关心消费的细节,只需实现自己的消息处理逻辑。

前置准备

服务开通

请确保您已开通了您需要访问的服务。您可前往火山引擎控制台开通全域数据集成服务,详见服务开通

获取安全凭证

Access Key(访问密钥)是访问火山引擎服务的安全凭证,包含Access Key ID(简称为AK)和Secret Access Key(简称为SK)两部分。您可登录火山引擎控制台 ,
前往访问控制访问密钥 中创建及管理您的Access Key。更多信息可参考访问密钥帮助文档

申请数据集

在Datasail确认已有要消费的数据集

申请消费组

申请消费组

环境检查

  • Go版本需要不低于1.16。

  • Java版本需要不低于1.8。

下载Lib包

Java版本:

datasail-subscriber-java-cloud-1.0-SNAPSHOT.jar
19.97KB

Go版本:
datasail_subscriber_go_cloud.tar.gz
34.87KB

接入示例

配置参数说明

参数名类型配置项含义示例
ConfigdatasetstringDataSail 数据集byteio_dataset_test

accessKey

string

火山云 Access Key

AKLTZW*****

secretKey

string

火山云 Secret Key

TW1KaVlURmlaR0*******

subscribeCenterstring数据订阅配置中心域名https://datasail01-cn-beijing.volceapplog.com
network枚举消费网络类型(公网消费or内网消费)NETWORK_EXTERNAL
ConsumerconsumerGroupstring消费者组test

autoOffsetReset

枚举

初始无offset时的消费策略

earliest

Java版

  1. 将jar包导入maven本地仓库

    mvn install:install-file -Dfile=datasail-subscriber-java-cloud-1.0-SNAPSHOT.jar -DgroupId=com.volcengine.datasail -DartifactId=datasail-subscriber-java-cloud -Dversion=1.0.0-SNAPSHOT -Dpackaging=jar
    
  2. 添加以下pom依赖


    <dependency>
      <groupId>com.volcengine.datasail</groupId>
      <artifactId>datasail-subscriber-java-cloud</artifactId>
      <version>1.0.0-SNAPSHOT</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.2.2</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.11.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.5.8</version>
    </dependency>
    <dependency>
      <groupId>com.volcengine</groupId>
      <artifactId>volc-sdk-java</artifactId>
      <version>1.0.49</version>
    </dependency>
    
  3. 消费代码

import com.volcengine.datasail.config.AutoOffsetResetMode;
import com.volcengine.datasail.IHandler;
import com.volcengine.datasail.Msg;
import com.volcengine.datasail.Subscriber;
import com.volcengine.datasail.config.Config;
import com.volcengine.datasail.config.ConsumerConfig;
import com.volcengine.datasail.config.NetworkType;
import com.volcengine.datasail.exception.SubscriberException;

public class SubscriberTest{
    public static void main(String[] args) throws SubscriberException {
        // 1. Subscriber配置参数
        Config config = new Config();
        config.dataset = "Your Dataset";
        config.accessKey = "Your Aceess Key";
        config.secretKey = "Your Secret Key";
        config.subscribeCenter = "http://localhost:6789";
        config.network = NetworkType.NETWORK_EXTERNAL;
        config.consumer = new ConsumerConfig();
        config.consumer.consumerGroup = "Your Consumer Group";
        config.consumer.autoOffsetReset = AutoOffsetResetMode.LATEST;    // Support "earliest" and "latest"
        // 2. 创建Subscriber对象,非线程安全
        Subscriber subscriber = new Subscriber(config);
        // 4. 设置消息处理器,开启数据订阅&消费,该方法会阻塞式消费
        subscriber.subscribe(new Handler());
    }

    // 3. 业务自定义消息处理器,需实现IHandler接口
    static class Handler implements IHandler {
        @Override
        public void handleMsg(Msg msg) {
            String value = new String(msg.getValue());
            System.out.println(value);
        }
    }
}

Go版

  1. 下载datasail_subscriber_go_cloud,与自己的项目目录在同一层

  2. 替换包名

    go mod edit -replace volcengine.com/datasail/datasail_subscriber_go_cloud=../datasail_subscriber_go_cloud
    
  3. 更新依赖

    go get volcengine.com/datasail/datasail_subscriber_go_cloud
    

  1. 消费代码

package main

import (
   "fmt"

   "volcengine.com/datasail/datasail_subscriber_go_cloud"
   "volcengine.com/datasail/datasail_subscriber_go_cloud/config"
)

func main() {
   // 1. Subscriber配置参数
   conf := config.Config{
      AccessKey:       "Your Access Key",
      SecretKey:       "Your Secret Key",
      SubscribeCenter: "http://localhost:6789",
      Dataset:         "Your Dataset",
      Network:         config.NetworkExternal,
      Consumer: config.ConsumerConfig{
         ConsumerGroup:   "Your Consumer Group",
         AutoOffsetReset: config.AutoOffsetResetEarliest,    // Support "earliest" and "latest"
      },
   }
   // 2. 创建Subscriber对象,非线程安全
   subscriber, err := datasail_subscriber_go_cloud.NewSubscriber(conf)
   if err != nil {
      panic(err)
   }
   // 3. 业务自定义消息处理器,需实现HandleFunc
   handler := func(msg datasail_subscriber_go_cloud.Msg) {
      fmt.Println(string(msg.Value))
   }
   // 4. 设置消息处理器,开启数据订阅&消费,该方法会阻塞式消费
   if err := subscriber.Subscribe(handler); err != nil {
       panic(err)
   }
}

开源组件声明
Java版本开源组件声明.txt
31.06KB
Go开源组件声明.txt
18.75KB