You need to enable JavaScript to run this app.
全域数据集成

全域数据集成

复制全文
开发者指南
Flink消费
复制全文
Flink消费
一、简介

Flink DataSail Connector是DataSail服务提供的用于对接Apache Flink的连接器,为客户提供了使用Flink生产或消费DataSail数据集的能力。

二、前置准备

服务开通

请确保您已开通了您需要访问的服务。您可前往火山引擎控制台开通全域数据集成服务,详见服务开通

获取安全凭证

Access Key(访问密钥)是访问火山引擎服务的安全凭证,包含Access Key ID(简称为AK)和Secret Access Key(简称为SK)两部分。您可登录火山引擎控制台,前往访问控制访问密钥中创建及管理您的Access Key。更多信息可参考访问密钥帮助文档

申请数据集

在DataSail中确认已创建要生产或消费的数据集

环境检查

  • Java版本需要不低于1.8

  • Flink版本需要不低于1.11

导入依赖

  1. 下载以下JAR和POM文件
flink-connector-datasail-1.0.0-SNAPSHOT.jar
30.00KB
flink-connector-datasail-parent-1.0.0-SNAPSHOT.pom
2.49KB
datasail-subscriber-java-cloud-1.0-SNAPSHOT.jar
20.14KB
  1. 使用Maven命令导入到本地仓库
mvn install:install-file -Dfile=datasail-subscriber-java-cloud-1.0-SNAPSHOT.jar -DgroupId=com.volcengine.datasail -DartifactId=datasail-subscriber-java-cloud -Dversion=1.0-SNAPSHOT -Dpackaging=jar
mvn install:install-file -Dfile=flink-connector-datasail-parent-1.0.0-SNAPSHOT.pom -DgroupId=com.volcengine.datasail -DartifactId=flink-connector-datasail-parent -Dversion=1.0.0-SNAPSHOT -Dpackaging=pom
mvn install:install-file -Dfile=flink-connector-datasail-1.0.0-SNAPSHOT.jar -DgroupId=com.volcengine.datasail -DartifactId=flink-connector-datasail -Dversion=1.0.0-SNAPSHOT -Dpackaging=jar
三、接入示例

添加依赖

<dependency>
    <groupId>com.volcengine.datasail</groupId>
    <artifactId>flink-connector-datasail</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>

Source

参数说明

参数名类型配置项含义示例

accessKey

string

火山云 Access Key

AKLTZW*****

secretKey

string

火山云 Secret Key

TW1KaVlURmlaR0*******

datasetstringDataSail 数据集byteio_dataset_test

network

枚举

消费网络类型(公网消费or私网消费)

NETWORK_EXTERNAL
NETWORK_PRIVATE

subscribeCenterstring数据订阅配置中心域名https://datasail01-cn-beijing.volceapplog.com/
consumerGroupstring消费者组test

autoOffsetReset

枚举

初始无offset时的消费策略

earliest

示例代码

package com.volcengine.datasail.streaming.connectors.examples;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.volcengine.datasail.streaming.connectors.datasail.ConsumerConfig;
import com.volcengine.datasail.streaming.connectors.datasail.FlinkDataSailConsumer;

public class SourceExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 1. Source配置参数
        ConsumerConfig consumerConfig = ConsumerConfig.builder()
                .accessKey("Your access key")
                .secretKey("Your secret key")
                .dataset("source_example")
                .subscribeCenter("https://datasail01-cn-beijing.volceapplog.com")
                .consumerGroup("source_example_group")
                .startFromLatest()
                .build();
        
        // 2. 消费DataSail数据集
        DataStream<String> stream = env.addSource(new FlinkDataSailConsumer<>(
                consumerConfig,
                new SimpleStringSchema()));
        
        // 3. 打印消息到标准输出流
        stream.print();

        // 4. 开始执行
        env.execute("Source Example");
    }
}

Sink

参数说明

参数名类型配置项含义示例

accessKey

string

火山云 Access Key

AKLTZW*****

secretKey

string

火山云 Secret Key

TW1KaVlURmlaR0*******

datasetstringDataSail 数据集byteio_dataset_test

network

枚举

消费网络类型(公网消费or私网消费)

NETWORK_EXTERNAL
NETWORK_PRIVATE

subscribeCenterstring数据订阅配置中心域名https://datasail01-cn-beijing.volceapplog.com/

示例代码

package com.volcengine.datasail.streaming.connectors.examples;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import com.volcengine.datasail.streaming.connectors.datasail.FlinkDataSailProducer;
import com.volcengine.datasail.streaming.connectors.datasail.ProducerConfig;

public class SinkExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1. 消费任意数据源,这里使用的是模拟数据源
        DataStream<String> stream = env.addSource(new InfiniteNumbersSource());
        
        // 2. Sink配置参数
        ProducerConfig producerConfig = ProducerConfig.builder()
                .accessKey("Your access key")
                .secretKey("Your secret key")
                .dataset("sink_example")
                .subscribeCenter("https://datasail01-cn-beijing.volceapplog.com")
                .build();

        // 3. 生产到DataSail数据集
        stream.addSink(new FlinkDataSailProducer<>(
                producerConfig,
                new SimpleStringSchema()));

        // 4. 开始执行
        env.execute("Sink Example");
    }

    private static class InfiniteNumbersSource implements SourceFunction<String> {

        private volatile boolean isRunning = true;

        private long counter = 0;

        @Override
        public void run(SourceContext<String> context) {
            while (isRunning) {
                try {
                    Thread.sleep(1_000L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                context.collect(Long.toString(counter++));
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}
开源组件声明
开源组件声明.txt
20.21KB
最近更新时间:2023.12.19 19:18:20
这个页面对您有帮助吗?
有用
有用
无用
无用