You need to enable JavaScript to run this app.
导航

Flink消费

最近更新时间2023.12.19 19:18:20

首次发布时间2023.12.19 19:18:20

一、简介

Flink DataSail Connector是DataSail服务提供的用于对接Apache Flink的连接器,为客户提供了使用Flink生产或消费DataSail数据集的能力。

二、前置准备

服务开通

请确保您已开通了您需要访问的服务。您可前往火山引擎控制台开通全域数据集成服务,详见服务开通

获取安全凭证

Access Key(访问密钥)是访问火山引擎服务的安全凭证,包含Access Key ID(简称为AK)和Secret Access Key(简称为SK)两部分。您可登录火山引擎控制台,前往访问控制访问密钥中创建及管理您的Access Key。更多信息可参考访问密钥帮助文档

申请数据集

在DataSail中确认已创建要生产或消费的数据集

环境检查

  • Java版本需要不低于1.8

  • Flink版本需要不低于1.11

导入依赖

  1. 下载以下JAR和POM文件
flink-connector-datasail-1.0.0-SNAPSHOT.jar
30.00KB
flink-connector-datasail-parent-1.0.0-SNAPSHOT.pom
2.49KB
datasail-subscriber-java-cloud-1.0-SNAPSHOT.jar
20.14KB
  1. 使用Maven命令导入到本地仓库
mvn install:install-file -Dfile=datasail-subscriber-java-cloud-1.0-SNAPSHOT.jar -DgroupId=com.volcengine.datasail -DartifactId=datasail-subscriber-java-cloud -Dversion=1.0-SNAPSHOT -Dpackaging=jar
mvn install:install-file -Dfile=flink-connector-datasail-parent-1.0.0-SNAPSHOT.pom -DgroupId=com.volcengine.datasail -DartifactId=flink-connector-datasail-parent -Dversion=1.0.0-SNAPSHOT -Dpackaging=pom
mvn install:install-file -Dfile=flink-connector-datasail-1.0.0-SNAPSHOT.jar -DgroupId=com.volcengine.datasail -DartifactId=flink-connector-datasail -Dversion=1.0.0-SNAPSHOT -Dpackaging=jar
三、接入示例

添加依赖

<dependency>
    <groupId>com.volcengine.datasail</groupId>
    <artifactId>flink-connector-datasail</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>

Source

参数说明

参数名类型配置项含义示例

accessKey

string

火山云 Access Key

AKLTZW*****

secretKey

string

火山云 Secret Key

TW1KaVlURmlaR0*******

datasetstringDataSail 数据集byteio_dataset_test

network

枚举

消费网络类型(公网消费or私网消费)

NETWORK_EXTERNAL
NETWORK_PRIVATE

subscribeCenterstring数据订阅配置中心域名https://datasail01-cn-beijing.volceapplog.com/
consumerGroupstring消费者组test

autoOffsetReset

枚举

初始无offset时的消费策略

earliest

示例代码

package com.volcengine.datasail.streaming.connectors.examples;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.volcengine.datasail.streaming.connectors.datasail.ConsumerConfig;
import com.volcengine.datasail.streaming.connectors.datasail.FlinkDataSailConsumer;

public class SourceExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 1. Source配置参数
        ConsumerConfig consumerConfig = ConsumerConfig.builder()
                .accessKey("Your access key")
                .secretKey("Your secret key")
                .dataset("source_example")
                .subscribeCenter("https://datasail01-cn-beijing.volceapplog.com")
                .consumerGroup("source_example_group")
                .startFromLatest()
                .build();
        
        // 2. 消费DataSail数据集
        DataStream<String> stream = env.addSource(new FlinkDataSailConsumer<>(
                consumerConfig,
                new SimpleStringSchema()));
        
        // 3. 打印消息到标准输出流
        stream.print();

        // 4. 开始执行
        env.execute("Source Example");
    }
}

Sink

参数说明

参数名类型配置项含义示例

accessKey

string

火山云 Access Key

AKLTZW*****

secretKey

string

火山云 Secret Key

TW1KaVlURmlaR0*******

datasetstringDataSail 数据集byteio_dataset_test

network

枚举

消费网络类型(公网消费or私网消费)

NETWORK_EXTERNAL
NETWORK_PRIVATE

subscribeCenterstring数据订阅配置中心域名https://datasail01-cn-beijing.volceapplog.com/

示例代码

package com.volcengine.datasail.streaming.connectors.examples;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import com.volcengine.datasail.streaming.connectors.datasail.FlinkDataSailProducer;
import com.volcengine.datasail.streaming.connectors.datasail.ProducerConfig;

public class SinkExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1. 消费任意数据源,这里使用的是模拟数据源
        DataStream<String> stream = env.addSource(new InfiniteNumbersSource());
        
        // 2. Sink配置参数
        ProducerConfig producerConfig = ProducerConfig.builder()
                .accessKey("Your access key")
                .secretKey("Your secret key")
                .dataset("sink_example")
                .subscribeCenter("https://datasail01-cn-beijing.volceapplog.com")
                .build();

        // 3. 生产到DataSail数据集
        stream.addSink(new FlinkDataSailProducer<>(
                producerConfig,
                new SimpleStringSchema()));

        // 4. 开始执行
        env.execute("Sink Example");
    }

    private static class InfiniteNumbersSource implements SourceFunction<String> {

        private volatile boolean isRunning = true;

        private long counter = 0;

        @Override
        public void run(SourceContext<String> context) {
            while (isRunning) {
                try {
                    Thread.sleep(1_000L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                context.collect(Long.toString(counter++));
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}
开源组件声明
开源组件声明.txt
20.21KB