You need to enable JavaScript to run this app.
导航

Flink 使用 Proton

最近更新时间2023.07.12 10:25:15

首次发布时间2023.05.17 15:03:57

开源版本 Flink 不支持以 EXACTLY_ONCE 语义流式写入对象存储服务(TOS)存储,当有类似需求时,需要结合 Proton SDK 进行数据写入。从火山引擎 E-MapReduce(EMR)3.2.1 版本开始,火山 EMR Flink 已经默认在运行环境中集成了 Proton SDK,您可以开箱使用 Flink 读写 TOS 的功能,针对已有的历史集群,需要下载 Proton SDK,并且做一些额外配置,才能正常使用,可参考 Proton 发行版本,手动下载 Proton SDK。

1 火山 EMR

1.1 集群配置

火山EMR集群自 3.2.1 版本之后已经默认集成了 Proton 的相关依赖,包括 Hadoop 数据湖类型集群和 Flink 实时计算类型集群,无需额外配置。
针对 3.2.1 版本之前的存量集群,如果想要添加或者升级 Flink Proton 依赖包,在下载 Proton SDK 后, 将 Proton SDK 拷贝到每个 EMR 节点, 解压之后:

  • plugins/flink/proton-flink{flink.version}-{proton.version}.jar, 比如plugins/flink/proton-flink1.16-1.3.0.jar,放到 flink lib 目录/usr/lib/emr/current/flink/lib/下。

  • proton-hadoop${hadoop.major.version}-bundle-{proton.version}.jar,替换/usr/lib/emr/current/hadoop/share/hadoop/hdfs/下的proton-hadoop${hadoop.major.version}-bundle-{old.proton.version}.jar,拷贝时,请使用对应 Hadoop 系列的Jar包,比如 Hadoop2x 环境,请选择proton-hadoop2-bundle-{proton.version}.jar

1.2 认证配置

火山 EMR 已经默认配置好 TOS 和 IAM 认证信息,无需额外配置,如果需要自定义配置信息,可通过core-site.xml或者flink-conf.yaml进行配置。flink-conf.yaml配置可参考以下配置信息:

fs.tos.access-key-id: xxxx
fs.tos.secret-access-key: xxx
# 每个region设置对应的endpoint地址,如果运行在火山ECS,可以使用内网地址http://tos-cn-xxx.ivolces.com
fs.tos.endpoint: http://tos-cn-guangzhou.volces.com 

# 也可以针对每一个bucket进行认证信息设置
fs.tos.bucket.{bucketname}.access-key-id:xxx
fs.tos.bucket.{bucketname}.secret-access-key:xxx

# 可选:当需要通过Filesystem connector读取存储TOS的非Parquet数据的时候才需要
fs.tos.impl: io.proton.fs.RawFileSystem

如果需要用过 Filesystem connector 读取存储在 TOS 上的 Parquet 类型的数据时,由于当前 Flink ParquetVectorizedInputFormat获取配置信息的限制,需要将 TOS 以及 IAM 认证信息添加到core-site.xml中,才能正常读取。core-site.xml详细配置可参考 Hadoop 使用 Proton - 配置修改 章节。

<configuration>
   <property>
        <name>fs.tos.access-key-id</name>
        <value>{iam.role.access.key}</value>
   </property>
   <property>
        <name>fs.tos.secret-access-key</name>
        <value>{iam.role.access.val}</value>
    </property>
    <property>
        <name>fs.tos.impl</name>
        <value>io.proton.fs.RawFileSystem</value>
    </property>
    <property>
        <name>fs.tos.endpoint</name>
        <value>http://tos-cn-guangzhou.volces.com</value>
    </property>
</configuration>

2.1 集群配置

在自建 Hadoop + Flink 集群中,如果要实现 Flink 以 EXACTLY_ONCE 语义流式写入 TOS 存储,需要在下载 Proton SDK 之后,将proton-flink{flink.version}-{proton.version}.jar拷贝到 flink lib 中。

2.2 认证配置

参考上述火山EMR - 认证配置章节进行自定义配置。

3.1 集群配置

独立 Flink 集群和自建 Hadoop+Flink 集群类似,需要在下载 Proton SDK 之后,将proton-flink${flink.version}-${proton.version}.jar拷贝到 flink lib 目录下,然后在core-site.xml或者flink-conf.yaml中添加 IAM 和 TOS 的配置信息。

3.2 认证配置

参考上述火山EMR - 认证配置章节进行自定义配置。

4 使用样例

  1. 启动 SQL Client 客户端
    使用 SQL Client 的时候访问 TOS 时,需要显示设置HADOOP_CLASSPATH

    export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
    /usr/lib/emr/current/flink/bin/sql-client.sh embedded
    
    set execution.target=yarn-per-job;
    
  2. 创建数据源

    create table datagen (
        id          int,
        first_name  varchar,
        last_name   varchar,
        phone       bigint,
        address     varchar,
        company     int
    ) with (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.id.kind' = 'sequence',
        'fields.id.start' = '1',
        'fields.id.end' = '100000',
        'fields.first_name.length' = '5',
        'fields.last_name.length' = '5',
        'fields.phone.min' = '0000000',
        'fields.phone.min' = '9999999',
        'fields.address.length' = '7',
        'fields.company.min' = '10',
        'fields.company.max' = '20'
    );
    
  3. 写入数据

    CREATE TABLE tos_parquet_user_sink_tbl (
        id          int,
        first_name  varchar,
        last_name   varchar,
        phone       bigint,
        address     varchar,
        company     int
    ) PARTITIONED BY (company) WITH (
     'connector' = 'filesystem',
     'path' = 'tos://{bucket}/xxxxx/flink/tos_parquet_user_tbl',
     'format' = 'parquet',
     'sink.rolling-policy.file-size' = '5MB',
     'sink.rolling-policy.rollover-interval' = '5min',
     'sink.rolling-policy.check-interval' = '1min',
     'parquet.compression'='SNAPPY' -- 如果无需压缩,可以不添加该配置
    );
    
    SET 'execution.checkpointing.interval' = '300s';
    
    INSERT INTO tos_parquet_user_sink_tbl SELECT * FROM datagen;
    
  1. 代码样例

    1. pom.xml
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-kafka-tos-demo</artifactId>
        <packaging>jar</packaging>
        <version>1.0-SNAPSHOT</version>
    
        <name>flink-kafka-tos-demo</name>
    
        <properties>
            <flink.version>1.16.1</flink.version>
            <hadoop.version>3.3.4</hadoop.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${hadoop.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-parquet</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.parquet</groupId>
                <artifactId>parquet-avro</artifactId>
                <version>1.12.2</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-client</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>it.unimi.dsi</groupId>
                        <artifactId>fastutil</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.2.1</version>
                    <executions>
                        <!-- Run shade goal on package phase -->
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>org.apache.flink.KafkaToTosDemo</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>8</source>
                        <target>8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    
    1. 业务逻辑
    /*
     * ByteDance Volcengine EMR, Copyright 2022.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.flink;
    
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.source.KafkaSource;
    import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
    import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.parquet.hadoop.metadata.CompressionCodecName;
    
    public class KafkaToTosDemo {
      public static void main(String[] args) throws Exception {
        ParameterTool pt = ParameterTool.fromArgs(args);
    
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        String outputPath = pt.get("tos.output.path");
        String topic = pt.getRequired("kafka.topic");
        String consumerGroup = pt.get("kafka.consumer.group.id", "kafka-to-tos-demo-group");
        String bootstrapServers = pt.getRequired("kafka.bootstrap.servers");
        String checkpointPath = pt.getRequired("checkpoint.path");
        long checkpointInterval = pt.getLong("checkpoint.interval", 10_000L);
    
        env.getCheckpointConfig().setCheckpointStorage(checkpointPath);
        env.getCheckpointConfig().setCheckpointInterval(checkpointInterval);
    
        ObjectMapper jsonParser = new ObjectMapper();
    
        env.fromSource(createKafkaSource(topic, bootstrapServers, consumerGroup),
                WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source")
            .map(data -> {
              JsonNode jsonNode = jsonParser.readValue(data, JsonNode.class);
              return new Tuple2<>(jsonNode.get("ticker").toString(), 1);
            }).returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(v -> v.f0)
            // .timeWindow(Time.minutes(1)) // Tumbling window definition // Flink 1.11
            .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) // since Flink 1.13
            .sum(1) // Count the appearances by ticker per partition
            .map(t -> new TickCount(t.f0, t.f1))
            .addSink(createTosSnappySinkFromStaticConfig(outputPath))
            .name("TOS Parquet Sink");
    
        env.execute("kafka-to-tos-demo");
      }
    
      private static KafkaSource<String> createKafkaSource(String topic, String bootstrapServers, String consumerGroup) {
        return KafkaSource.<String>builder()
            .setBootstrapServers(bootstrapServers)
            .setTopics(topic)
            .setGroupId(consumerGroup)
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();
      }
    
      private static StreamingFileSink<TickCount> createTosSinkFromStaticConfig(String outputPath) {
        return StreamingFileSink
            .forBulkFormat(new Path(outputPath), AvroParquetWriters.forReflectRecord(TickCount.class))
            .withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy'/month='MM'/day='dd'/hour='HH/"))
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .withOutputFileConfig(OutputFileConfig.builder()
                .withPartPrefix("complete")
                .withPartSuffix(".parquet")
                .build())
            .build();
      }
    
      private static StreamingFileSink<TickCount> createTosSnappySinkFromStaticConfig(String outputPath) {
        return StreamingFileSink
            .forBulkFormat(new Path(outputPath),
                CompressionAvroParquetWriters.forReflectRecord(TickCount.class, CompressionCodecName.SNAPPY))
            .withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy'/month='MM'/day='dd'/hour='HH/"))
            .withRollingPolicy(OnCheckpointRollingPolicy.build())
            .withOutputFileConfig(OutputFileConfig.builder()
                .withPartPrefix("complete")
                .withPartSuffix(".parquet")
                .build())
            .build();
      }
    }
    

    TickCount.java

    /*
     * ByteDance Volcengine EMR, Copyright 2022.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.flink;
    
    public class TickCount {
      private String tick;
      private int count;
    
      public TickCount(String tick, int count) {
        this.tick = tick;
        this.count = count;
      }
    
      public String getTick() {
        return tick;
      }
    
      public void setTick(String tick) {
        this.tick = tick;
      }
    
      public int getCount() {
        return count;
      }
    
      public void setCount(int count) {
        this.count = count;
      }
    }
    

    Flink 默认提供的AvroParquetWriters不支持压缩,需要重新实现。

    /*
     * ByteDance Volcengine EMR, Copyright 2022.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.flink;
    
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.reflect.ReflectData;
    import org.apache.avro.specific.SpecificData;
    import org.apache.avro.specific.SpecificRecordBase;
    import org.apache.flink.formats.parquet.ParquetBuilder;
    import org.apache.flink.formats.parquet.ParquetWriterFactory;
    import org.apache.parquet.avro.AvroParquetWriter;
    import org.apache.parquet.hadoop.ParquetWriter;
    import org.apache.parquet.hadoop.metadata.CompressionCodecName;
    import org.apache.parquet.io.OutputFile;
    
    import java.io.IOException;
    
    public class CompressionAvroParquetWriters {
      public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(
          Class<T> type, CompressionCodecName compressionCodecName) {
        final String schemaString = SpecificData.get().getSchema(type).toString();
        final ParquetBuilder<T> builder =
            (out) -> createAvroParquetWriter(schemaString, SpecificData.get(), out, compressionCodecName);
        return new ParquetWriterFactory<>(builder);
      }
    
      /**
       * Creates a ParquetWriterFactory that accepts and writes Avro generic types. The Parquet
       * writers will use the given schema to build and write the columnar data.
       *
       * @param schema The schema of the generic type.
       */
      public static ParquetWriterFactory<GenericRecord> forGenericRecord(
          Schema schema,
          CompressionCodecName compressionCodecName) {
        final String schemaString = schema.toString();
        final ParquetBuilder<GenericRecord> builder =
            (out) -> createAvroParquetWriter(schemaString, GenericData.get(), out, compressionCodecName);
        return new ParquetWriterFactory<>(builder);
      }
    
      /**
       * Creates a ParquetWriterFactory for the given type. The Parquet writers will use Avro to
       * reflectively create a schema for the type and use that schema to write the columnar data.
       *
       * @param type The class of the type to write.
       */
      public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type, CompressionCodecName compressionCodecName) {
        final String schemaString = ReflectData.get().getSchema(type).toString();
        final ParquetBuilder<T> builder =
            (out) -> createAvroParquetWriter(schemaString, ReflectData.get(), out, compressionCodecName);
        return new ParquetWriterFactory<>(builder);
      }
    
      private static <T> ParquetWriter<T> createAvroParquetWriter(
          String schemaString, GenericData dataModel, OutputFile out,
          CompressionCodecName compressionCodecName) throws IOException {
    
        final Schema schema = new Schema.Parser().parse(schemaString);
    
        return AvroParquetWriter.<T>builder(out)
            .withSchema(schema)
            .withDataModel(dataModel)
            .withCompressionCodec(compressionCodecName)
            .build();
      }
    
      // ------------------------------------------------------------------------
    
      /**
       * Class is not meant to be instantiated.
       */
      private CompressionAvroParquetWriters() {
      }
    }
    
  2. 任务执行

    1. 可通过一下脚本生成数据
    # generate_dummy_data.py
    import datetime
    import json
    import random
    
    
    def get_data():
        return {
            'event_time': datetime.datetime.now().isoformat(),
            'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
            'price': round(random.random() * 100, 2)
        }
    
    if __name__ == '__main__':
        num = 1000
        with open("/root/kafka/dummy_data.json", "a") as f:
            for _ in range(0, num):
                f.write(json.dumps(get_data()) + '\n')
    
    mkdir /root/kafka/
    python generate_dummy_data.py
    
    ./bin/kafka-topics.sh --bootstrap-server {borker1_ip}:9092,{borker2_ip}:9092 --topic {topic_name} --create
    
    ./bin/kafka-console-producer.sh --bootstrap-server {borker1_ip}:9092,{borker2_ip}:9092 --topic {topic_name} < /root/kafka/dummy_data.json
    
    1. 提交任务
    /usr/lib/emr/current/flink/bin/flink run-application \
    -t yarn-application /opt/flink-kafka-tos-demo-1.0-SNAPSHOT.jar \
    --tos.output.path tos://{bucket}/xxxx/flink/sdk/ticket_stat_snappy \
    --kafka.topic xxx_test \
    --kafka.bootstrap.servers {borker1_ip}:9092,{borker2_ip}:9092 \
    --checkpoint.path tos://{bucket}/xxxx/flink/ckp/ticket_snappy/