最近更新时间:2023.07.12 10:25:15
首次发布时间:2023.05.17 15:03:57
开源版本 Flink 不支持以 EXACTLY_ONCE 语义流式写入对象存储服务(TOS)存储,当有类似需求时,需要结合 Proton SDK 进行数据写入。从火山引擎 E-MapReduce(EMR)3.2.1 版本开始,火山 EMR Flink 已经默认在运行环境中集成了 Proton SDK,您可以开箱使用 Flink 读写 TOS 的功能,针对已有的历史集群,需要下载 Proton SDK,并且做一些额外配置,才能正常使用,可参考 Proton 发行版本,手动下载 Proton SDK。
火山EMR集群自 3.2.1 版本之后已经默认集成了 Proton 的相关依赖,包括 Hadoop 数据湖类型集群和 Flink 实时计算类型集群,无需额外配置。
针对 3.2.1 版本之前的存量集群,如果想要添加或者升级 Flink Proton 依赖包,在下载 Proton SDK 后, 将 Proton SDK 拷贝到每个 EMR 节点, 解压之后:
将plugins/flink/proton-flink{flink.version}-{proton.version}.jar
, 比如plugins/flink/proton-flink1.16-1.3.0.jar
,放到 flink lib 目录/usr/lib/emr/current/flink/lib/
下。
用proton-hadoop${hadoop.major.version}-bundle-{proton.version}.jar
,替换/usr/lib/emr/current/hadoop/share/hadoop/hdfs/
下的proton-hadoop${hadoop.major.version}-bundle-{old.proton.version}.jar
,拷贝时,请使用对应 Hadoop 系列的Jar包,比如 Hadoop2x 环境,请选择proton-hadoop2-bundle-{proton.version}.jar
火山 EMR 已经默认配置好 TOS 和 IAM 认证信息,无需额外配置,如果需要自定义配置信息,可通过core-site.xml
或者flink-conf.yaml
进行配置。flink-conf.yaml
配置可参考以下配置信息:
fs.tos.access-key-id: xxxx fs.tos.secret-access-key: xxx # 每个region设置对应的endpoint地址,如果运行在火山ECS,可以使用内网地址http://tos-cn-xxx.ivolces.com fs.tos.endpoint: http://tos-cn-guangzhou.volces.com # 也可以针对每一个bucket进行认证信息设置 fs.tos.bucket.{bucketname}.access-key-id:xxx fs.tos.bucket.{bucketname}.secret-access-key:xxx # 可选:当需要通过Filesystem connector读取存储TOS的非Parquet数据的时候才需要 fs.tos.impl: io.proton.fs.RawFileSystem
如果需要用过 Filesystem connector 读取存储在 TOS 上的 Parquet 类型的数据时,由于当前 Flink ParquetVectorizedInputFormat
获取配置信息的限制,需要将 TOS 以及 IAM 认证信息添加到core-site.xml
中,才能正常读取。core-site.xml
详细配置可参考 Hadoop 使用 Proton - 配置修改 章节。
<configuration> <property> <name>fs.tos.access-key-id</name> <value>{iam.role.access.key}</value> </property> <property> <name>fs.tos.secret-access-key</name> <value>{iam.role.access.val}</value> </property> <property> <name>fs.tos.impl</name> <value>io.proton.fs.RawFileSystem</value> </property> <property> <name>fs.tos.endpoint</name> <value>http://tos-cn-guangzhou.volces.com</value> </property> </configuration>
在自建 Hadoop + Flink 集群中,如果要实现 Flink 以 EXACTLY_ONCE 语义流式写入 TOS 存储,需要在下载 Proton SDK 之后,将proton-flink{flink.version}-{proton.version}.jar
拷贝到 flink lib 中。
参考上述火山EMR - 认证配置章节进行自定义配置。
独立 Flink 集群和自建 Hadoop+Flink 集群类似,需要在下载 Proton SDK 之后,将proton-flink${flink.version}-${proton.version}.jar
拷贝到 flink lib 目录下,然后在core-site.xml
或者flink-conf.yaml
中添加 IAM 和 TOS 的配置信息。
参考上述火山EMR - 认证配置章节进行自定义配置。
启动 SQL Client 客户端
使用 SQL Client 的时候访问 TOS 时,需要显示设置HADOOP_CLASSPATH
。
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` /usr/lib/emr/current/flink/bin/sql-client.sh embedded set execution.target=yarn-per-job;
创建数据源
create table datagen ( id int, first_name varchar, last_name varchar, phone bigint, address varchar, company int ) with ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '100000', 'fields.first_name.length' = '5', 'fields.last_name.length' = '5', 'fields.phone.min' = '0000000', 'fields.phone.min' = '9999999', 'fields.address.length' = '7', 'fields.company.min' = '10', 'fields.company.max' = '20' );
写入数据
CREATE TABLE tos_parquet_user_sink_tbl ( id int, first_name varchar, last_name varchar, phone bigint, address varchar, company int ) PARTITIONED BY (company) WITH ( 'connector' = 'filesystem', 'path' = 'tos://{bucket}/xxxxx/flink/tos_parquet_user_tbl', 'format' = 'parquet', 'sink.rolling-policy.file-size' = '5MB', 'sink.rolling-policy.rollover-interval' = '5min', 'sink.rolling-policy.check-interval' = '1min', 'parquet.compression'='SNAPPY' -- 如果无需压缩,可以不添加该配置 ); SET 'execution.checkpointing.interval' = '300s'; INSERT INTO tos_parquet_user_sink_tbl SELECT * FROM datagen;
代码样例
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.flink</groupId> <artifactId>flink-kafka-tos-demo</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>flink-kafka-tos-demo</name> <properties> <flink.version>1.16.1</flink.version> <hadoop.version>3.3.4</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.12.2</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> <exclusion> <groupId>it.unimi.dsi</groupId> <artifactId>fastutil</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.apache.flink.KafkaToTosDemo</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>
/* * ByteDance Volcengine EMR, Copyright 2022. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.avro.AvroParquetWriters; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.parquet.hadoop.metadata.CompressionCodecName; public class KafkaToTosDemo { public static void main(String[] args) throws Exception { ParameterTool pt = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String outputPath = pt.get("tos.output.path"); String topic = pt.getRequired("kafka.topic"); String consumerGroup = pt.get("kafka.consumer.group.id", "kafka-to-tos-demo-group"); String bootstrapServers = pt.getRequired("kafka.bootstrap.servers"); String checkpointPath = pt.getRequired("checkpoint.path"); long checkpointInterval = pt.getLong("checkpoint.interval", 10_000L); env.getCheckpointConfig().setCheckpointStorage(checkpointPath); env.getCheckpointConfig().setCheckpointInterval(checkpointInterval); ObjectMapper jsonParser = new ObjectMapper(); env.fromSource(createKafkaSource(topic, bootstrapServers, consumerGroup), WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source") .map(data -> { JsonNode jsonNode = jsonParser.readValue(data, JsonNode.class); return new Tuple2<>(jsonNode.get("ticker").toString(), 1); }).returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(v -> v.f0) // .timeWindow(Time.minutes(1)) // Tumbling window definition // Flink 1.11 .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) // since Flink 1.13 .sum(1) // Count the appearances by ticker per partition .map(t -> new TickCount(t.f0, t.f1)) .addSink(createTosSnappySinkFromStaticConfig(outputPath)) .name("TOS Parquet Sink"); env.execute("kafka-to-tos-demo"); } private static KafkaSource<String> createKafkaSource(String topic, String bootstrapServers, String consumerGroup) { return KafkaSource.<String>builder() .setBootstrapServers(bootstrapServers) .setTopics(topic) .setGroupId(consumerGroup) .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); } private static StreamingFileSink<TickCount> createTosSinkFromStaticConfig(String outputPath) { return StreamingFileSink .forBulkFormat(new Path(outputPath), AvroParquetWriters.forReflectRecord(TickCount.class)) .withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy'/month='MM'/day='dd'/hour='HH/")) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(OutputFileConfig.builder() .withPartPrefix("complete") .withPartSuffix(".parquet") .build()) .build(); } private static StreamingFileSink<TickCount> createTosSnappySinkFromStaticConfig(String outputPath) { return StreamingFileSink .forBulkFormat(new Path(outputPath), CompressionAvroParquetWriters.forReflectRecord(TickCount.class, CompressionCodecName.SNAPPY)) .withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy'/month='MM'/day='dd'/hour='HH/")) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(OutputFileConfig.builder() .withPartPrefix("complete") .withPartSuffix(".parquet") .build()) .build(); } }
TickCount.java
/* * ByteDance Volcengine EMR, Copyright 2022. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink; public class TickCount { private String tick; private int count; public TickCount(String tick, int count) { this.tick = tick; this.count = count; } public String getTick() { return tick; } public void setTick(String tick) { this.tick = tick; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } }
Flink 默认提供的AvroParquetWriters
不支持压缩,需要重新实现。
/* * ByteDance Volcengine EMR, Copyright 2022. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.ReflectData; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificRecordBase; import org.apache.flink.formats.parquet.ParquetBuilder; import org.apache.flink.formats.parquet.ParquetWriterFactory; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.io.OutputFile; import java.io.IOException; public class CompressionAvroParquetWriters { public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord( Class<T> type, CompressionCodecName compressionCodecName) { final String schemaString = SpecificData.get().getSchema(type).toString(); final ParquetBuilder<T> builder = (out) -> createAvroParquetWriter(schemaString, SpecificData.get(), out, compressionCodecName); return new ParquetWriterFactory<>(builder); } /** * Creates a ParquetWriterFactory that accepts and writes Avro generic types. The Parquet * writers will use the given schema to build and write the columnar data. * * @param schema The schema of the generic type. */ public static ParquetWriterFactory<GenericRecord> forGenericRecord( Schema schema, CompressionCodecName compressionCodecName) { final String schemaString = schema.toString(); final ParquetBuilder<GenericRecord> builder = (out) -> createAvroParquetWriter(schemaString, GenericData.get(), out, compressionCodecName); return new ParquetWriterFactory<>(builder); } /** * Creates a ParquetWriterFactory for the given type. The Parquet writers will use Avro to * reflectively create a schema for the type and use that schema to write the columnar data. * * @param type The class of the type to write. */ public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type, CompressionCodecName compressionCodecName) { final String schemaString = ReflectData.get().getSchema(type).toString(); final ParquetBuilder<T> builder = (out) -> createAvroParquetWriter(schemaString, ReflectData.get(), out, compressionCodecName); return new ParquetWriterFactory<>(builder); } private static <T> ParquetWriter<T> createAvroParquetWriter( String schemaString, GenericData dataModel, OutputFile out, CompressionCodecName compressionCodecName) throws IOException { final Schema schema = new Schema.Parser().parse(schemaString); return AvroParquetWriter.<T>builder(out) .withSchema(schema) .withDataModel(dataModel) .withCompressionCodec(compressionCodecName) .build(); } // ------------------------------------------------------------------------ /** * Class is not meant to be instantiated. */ private CompressionAvroParquetWriters() { } }
任务执行
# generate_dummy_data.py import datetime import json import random def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2) } if __name__ == '__main__': num = 1000 with open("/root/kafka/dummy_data.json", "a") as f: for _ in range(0, num): f.write(json.dumps(get_data()) + '\n')
mkdir /root/kafka/ python generate_dummy_data.py ./bin/kafka-topics.sh --bootstrap-server {borker1_ip}:9092,{borker2_ip}:9092 --topic {topic_name} --create ./bin/kafka-console-producer.sh --bootstrap-server {borker1_ip}:9092,{borker2_ip}:9092 --topic {topic_name} < /root/kafka/dummy_data.json
/usr/lib/emr/current/flink/bin/flink run-application \ -t yarn-application /opt/flink-kafka-tos-demo-1.0-SNAPSHOT.jar \ --tos.output.path tos://{bucket}/xxxx/flink/sdk/ticket_stat_snappy \ --kafka.topic xxx_test \ --kafka.bootstrap.servers {borker1_ip}:9092,{borker2_ip}:9092 \ --checkpoint.path tos://{bucket}/xxxx/flink/ckp/ticket_snappy/