You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
Connector 参考
Kafka/BMQ
复制全文
Kafka/BMQ

Kafka 连接器提供从 Kafka Topic 或 BMQ Topic 中消费和写入数据的能力,支持做数据源表和结果表。您可以创建 source 流从 Kafka Topic 中获取数据,作为作业的输入数据;也可以通过 Kafka 结果表将作业输出数据写入到 Kafka Topic 中。

注意事项

  • 使用 Flink SQL 的用户需要注意,不再支持 kafka-0.10kafka-0.11 两个版本的连接器,请直接使用 kafka 连接器访问 Kafka 0.10 和 0.11 集群。
    Kafka-0.10 和 Kafka-0.11 两个版本的连接器使用的 Kafka 客户端有缺陷,在某些情况下可能无法自动提交 Kafka offset 信息。
  • 使用 datastream API 开发的用户需要注意,在读 Kafka 消息的时候,不要使用 FlinkKafkaConsumer010FlinkKafkaConsumer011 两个 consumer,请直接使用 FlinkKafkaConsumer 进行开发;在往 Kafka 写消息的时候,不要使用 FlinkKafkaProducer010FlinkKafkaProducer011 两个 producer,请直接使用 FlinkKafkaProducer 进行开发。

DDL 定义

用作数据源(Source)

CREATE TABLE kafka_source (
    name String,
    score INT
 ) WITH (
     'connector' = 'kafka',
     'topic' = 'test_topic_01',
     'properties.bootstrap.servers' = 'localhost:9092',
     'properties.group.id' = 'test_topic_01',
     'format' = 'csv',
     'scan.startup.mode' = 'earliest-offset'
 );

用作数据目的(Sink)

CREATE TABLE kafka_sink (
    name String,
    score INT
 ) WITH (
     'connector' = 'kafka',
     'topic' = 'test_topic_01',
     'properties.bootstrap.servers' = 'localhost:9092',
     'format' = 'csv'
 );

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处仅支持 Kafka 连接器。

注意

Kafka-0.10 和 Kafka-0.11 两个版本的连接器使用的 Kafka 客户端有缺陷,在某些情况下可能无法自动提交 Kafka offset 信息。

topic

(none)

String

指定 Kafka Topic 的名称。

properties.bootstrap.servers

(none)

String

指定 Kafka Broker 的地址,格式为host:port

properties.group.id

(none)

String

指定 Kafka 消费组的 ID。

注意

在 Flink 中使用 Kafka 连接器消费 BMQ 消息时,需要提前在 BMQ 平台侧创建 Consumer Group。
如果没有提前创建 Group,任务可以正常运行,但不能正常提交 Offset。

properties.batch.size

16

string

单个 Partition 对应的 Batch 中支持写入的最大字节数,默认值为 16 KB。

  • batch.size=单个 Producer 的消息QPS * 消息大小 * liner.ms/ Partition 数
  • 提升 batch.size 的值,一个 Batch 能写入更多数据,可以提升吞吐量。但是 batch.size 也不能设置太大,以免出现 Batch 迟迟写不满,导致发送消息延迟高。
  • 一般与 properties.linger.msproperties.buffer.memory 参数联合使用,满足任意一个条件都会立即发送消息。

说明

如果在写 Kafka 数据时出现吞吐量不足,建议您提升 batch.size 取值,一般设置为 524288(表示 512KB)。

properties.linger.ms

0

string

消息在 Batch 中的停留时间,即发送消息前的等待时长。默认为 0 毫秒,表示“立即发送消息”。

  • 可以适当提升 linger.ms 取值,以引入小延迟为代价,提高吞吐量和压缩率。
  • 该参数一般与 properties.batch.sizeproperties.buffer.memory 参数联合使用,满足任意一个条件都会立即发送消息。

说明

如果在写 Kafka 数据时出现吞吐量不足,建议您提升 linger.ms 取值,一般设置为 500(表示 500 ms)。

properties.buffer.memory

32M

string

缓存消息的总可用 Memory 空间,如果 Memory 用完,则会立即发送缓存中的消息。

  • 设置时,建议按照计算公式设置:buffer.memory>=batch.size * partition数*2。
  • 该参数一般与 properties.batch.sizeproperties.linger.ms 参数联合使用,满足任意一个条件都会立即发送消息。

说明

如果 buffer.memory 较小,可能会造成 Batch 失效,从而导致 QPS 升高被下游限流等问题。建议您提升 buffer.memory 取值,一般设置为 134217728(表示 128 MB)。

properties.enable.idempotence

true

Boolean

是否启用 Kafka 连接器的幂等性。默认为 true,表示启用幂等性。
启用幂等属性后,在面对 Client 重试引起的消息重复时,系统的反应与处理一次的请求相同,能够确保消息的顺序和完整性。

注意

如果您通过 Kafka 连接器连接 BMQ 资源,且使用 Flink 1.16-volcano 引擎版本,那么必须将properties.enable.idempotence参数设置为 false 以关闭幂等,否则任务会运行失败。

properties.ssl.endpoint.identification.algorithm

HTTPS

String

是否对 SSL 连接的服务器主机名称(hostname)进行验证。

注意

Flink SQL 1.16 和 1.17 内置的 Kafka 客户端是 3.x 版本,参数 properties.ssl.endpoint.identification.algorithm 在 3.x 版本中默认为 HTTPS,Kafka 客户端会对 SSL 连接的服务器主机名称(hostname)进行验证,验证服务器证书中的主机名是否与连接的目标主机名匹配,以此增强 SSL 连接的安全性。
如果遇到 javax.net.ssl.SSLHandshakeException: No name matching example.com found 报错,推荐设置为 '',跳过主机名称校验。

scan.topic-partition-discovery.interval

none

Duration

在 Kafka/BMQ 动态扩容的场景下,用于定期扫描并发现新的 Topic 和 Partition 的时间间隔,推荐设置为 120s。

注意

默认值是 none,代表不开启。建议您在任务中添加该参数配置,设置动态检测的时间间隔。
如果任务中不配置该参数,将不会动态发现分区。此时新增分区,将无法读取到新增分区中的数据。

format

(none)

String

用来反序列化 Kafka 消息体(value)时使用的格式。支持的格式如下:

  • CSV
  • JSON
  • Debezium-JSON
  • Canal-JSON
  • Maxwell-JSON
  • Ogg-JSON
  • Avro
  • Confluent Avro
  • RAW

scan.startup.mode

group-offsets

String

读取数据时的启动模式。 取值如下:

  • earliest-offset:从 Kafka 最早分区开始读取。
  • latest-offset:从 Kafka 最新位点开始读取。
  • group-offsets:默认值,根据 Group 读取。
  • timestamp:从 Kafka 指定时间点读取。需要在 WITH 参数中指定 scan.startup.timestamp-millis 参数。
  • specific-offsets:从 Kafka 指定分区目标偏移量读取。需要在 WITH 参数中指定 scan.startup.specific-offsets 参数。

scan.startup.specific-offsets

(none)

String

specific-offsets 启动模式下,指定每个分区的启动偏移量。如partition:0,offset:42;partition:1,offset:300

scan.startup.timestamp-millis

(none)

Long

timestamp 启动模式下,指定启动位点时间戳,单位毫秒。

scan.parallelism

(none)

Integer

单独设置 Source 并发。如果不设置,则并行度为作业的默认并发数。
该参数经常用于 Source 和下游算子需要断开算子链的场景,使得下游重计算的算子能使用较大的默认并发,提高计算能力,同时保持 Source 并发和 Kafka 分区数相等,此时 Source 到下游由于并发不同,数据 Shuffle 是均匀的,从而提高了整体计算速率。

sink.partitioner

fixed

String

Flink 分区到 Kafka 分区的映射关系。取值如下:

  • fixed(默认值):每个 Flink 分区对应一个 Kafka 分区。
  • round-robin:Flink 分区中的数据将被轮流分配至 Kafka 的各个分区。
  • 自定义映射模式:支持创建一个 FlinkKafkaPartitioner 的子类来自定义分区映射模式。例如org.mycompany.MyPartitioner

sink.parallelism

(none)

Integer

单独设置 Kafka Sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。

自动提交 Offsets

目前一般使用以下两种方式自动提交 Kafka Offsets。

  • 方式 1:依赖 Flink 任务 Checkpoint。
    Flink 任务开启 Checkpoint 时,Kafka Source 在完成 Checkpoint 时会提交当前的消费位点,以保证 Flink 的 Checkpoint 状态和 Kafka Broker 上的提交位点一致。

    注意

    依赖 Flink 任务 Checkpoint 来管理 Kafka Offsets 时,如果上游数据量很大,很可能会触发上游的 LAG 告警。

  • 方式 2:依赖 Kafka Consumer 的位点定时提交逻辑。
    当 Flink 任务没有开启 Checkpoint 时,Kafka Source 将依赖 Kafka Consumer 的位点定时提交逻辑。您可以通过设置enable.auto.commitauto.commit.interval.ms两个参数来控制位点定时自动提交。
    -- 是否自动提交 Offsets。取值为 true 表示自动提交 Offsets;取值为 false,表示手动同步或异步提交。
      'properties.enable.auto.commit' = 'true',
      -- 自动提交 Offsets 的时间间隔,单位为 ms。
      'properties.auto.commit.interval.ms' = '500',
    

安全与认证

如果 Kafka 集群要求安全连接或认证,您需要在 WITH 参数中通过 properties.前缀添加安全认证相关配置。
示例 1:使用 SASL_PLAINTEXT 安全协议,SASL 机制为 PLAIN 。

CREATE TABLE KafkaTable (
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  ...
  -- 配置安全协议为 SASL_PLAINTEXT。
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  -- 配置 SASL 机制为  PLAIN。
  'properties.sasl.mechanism' = 'PLAIN',
  -- 配置 JAAS。
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="yourname" password="yourpassword";'
)

示例 2:使用 SASL_SSL 安全协议, SASL 机制为 SASL_SSL,并配置证书和密钥信息。
注意:Flink SQL 1.16 和 1.17 内置的 Kafka 客户端是 3.x 版本,参数 properties.ssl.endpoint.identification.algorithm 在 3.x 版本中默认为 HTTPS,Kafka 客户端会对 SSL 连接的服务器主机名称(hostname)进行验证,验证服务器证书中的主机名是否与连接的目标主机名匹配,以此增强 SSL 连接的安全性。如果遇到 javax.net.ssl.SSLHandshakeException: No name matching example.com found 报错,推荐设置为 '',跳过主机名称校验。

CREATE TABLE KafkaTable (
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  ...
  -- 配置安全协议为 SASL_SSL。
  'properties.security.protocol' = 'SASL_SSL',
  -- 配置服务端提供的 truststore (CA 证书) 的路径和密码。注意:需要提前在任务参数中上传并选中依赖文件
  'properties.ssl.truststore.location' = '/opt/tiger/workdir/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  -- 如果要求客户端认证,则需要配置 keystore (私钥) 的路径和密码。注意:需要提前在任务参数中上传并选中依赖文件
  'properties.ssl.keystore.location' = '/opt/tiger/workdir/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  --配置 SASL 机制为 SCRAM-SHA-256。 
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  -- 配置 JAAS。
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="yourname" password="yourpassword";',
  --如果接入点是私网域名,需要跳过私网域名验证
  'properties.ssl.endpoint.identification.algorithm' = ''
)

示例代码

  • 源表示例

    CREATE TABLE kafka_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp
         )
    WITH (
      'connector' = 'kafka',
      'topic' = 'source',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'group_01',  -- 消费BMQ消息时,需要提前在BMQ平台侧创建Group,否则不能正常提交Offset。
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
    );
    CREATE TABLE print_sink (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp
        )
    WITH (
         'connector' = 'print'           
    );
    insert into print_sink
    select * from kafka_source;
    
  • 结果表示例

    CREATE TABLE datagen_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time as localtimestamp
        )
    WITH (
         'connector' = 'datagen',
         'rows-per-second' = '5'          
    );
    
    CREATE TABLE kafka_sink (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp
         )
    WITH (
      'connector' = 'kafka',
      'topic' = 'sink',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'group_01',
      'format' = 'json'
    );
    
    insert into kafka_sink
    select * from datagen_source;
    

FAQ

写入 Kafka 数据时出现吞吐量不足

目前 Kafka Connector 在写入时,默认是没有攒批的,这就导致写入吞吐很小,而且容易导致 Kafka server CPU 压力较高。如果出现这种情况,建议您调整下面三个配置值,以提升写入性能:

  • 提升 batch.size 取值,设置为 512KB:properties.batch.size = 524288
  • 提升 linger.ms 取值,设置为 500ms: properties.linger.ms = 500
  • 提升 buffer.memory 取值,设置为 128MB:properties.buffer.memory = 134217728

作业拓扑变动,无法从历史状态恢复

如果修改了作业的拓扑结构,新增了一些算子,则不能从历史状态恢复,此时可以丢状态,并从 timestamp 或者 group-offsets 模式启动:

  • 从 timestamp 时间戳恢复

适用于之前的作业没开过 checkpoint。需要从数据曲线-业务延迟监控里,找到当前消费延迟,反推出当前消费的时间点,从这个时间点恢复即可。
比如业务延迟是 1h,当前时刻是 12 点,则反推出已消费时间点是 11 点,为了避免丢数,则可以根据情况往前多消费一点,比如往前 30min,那么要设置的恢复的时刻就是 10:30。
参数如下:

'properties.scan.startup.mode' = 'timestamp',
'properties.scan.startup.timestamp-millis' = 'xxxx' -- 毫秒时间戳,可以从 https://tool.chinaz.com/tools/unixtime.aspx 获取

  • 从 group offsets 恢复

适用于之前的作业开过 checkpoint,并完成过 checkpoint。由于 checkpoint 时,kafka source 会向 kafka 实例提交 offset,因此 offset 会保存一份到 kafka 实例。
此时需要把 kafka source 里的 startup mode 改为 group-offsets,参数如下:

'properties.scan.startup.mode' = 'group-offsets'

注意,如果这个消费者组之前没有 offset 信息,第一次启动的时候会报错无法获取 partition 的 offset 信息,此时需要设置如下 kafka 参数:

'properties.auto.offset.reset' = 'earliest'

此参数意义为如果没有 group-offset 则从最老的位置进行消费

Kafka Lag 监控波动较大

问题现象:在 Kafka 的 Lag 监控中,发现 Kafka 的堆积量值存在周期性波动,但是实际并不存在数据积压。
Image
原因:这个是因为 Flink 任务提交 Kafka 消费组的消费位点是在 Checkpoint 时间做的。默认情况下 Flink Checkpoint 是 5 分钟一次。所以从 Kafka 端看到堆积量的趋势是陡升陡降,但是实际上并没有数据积压。
解决方法

  1. 方法一:不使用 Checkpoint 提交位点,可以参考上文,依赖 Kafka Consumer 的位点定时提交,按照需要的频率提交位点。这样子能够在 Kafka 监控侧查看到比较实时的堆积量指标
  2. 方法二:使用 Flink 的任务运维 - 数据曲线 - Kafka - Max KafkaConsumer Records Lag 监控曲线图,可以看到 Flink 任务实时上报的 Kafka 堆积量监控图。

SASL_SSL 模式下主机名称校验报错

注意:Flink SQL 1.16 和 1.17 内置的 Kafka 客户端是 3.x 版本,参数 properties.ssl.endpoint.identification.algorithm 在 3.x 版本中默认为 HTTPS,Kafka 客户端会对 SSL 连接的服务器主机名称(hostname)进行验证,验证服务器证书中的主机名是否与连接的目标主机名匹配,以此增强 SSL 连接的安全性。
如果遇到 javax.net.ssl.SSLHandshakeException: No name matching example.com found 报错,推荐设置为 '',跳过主机名称校验。
详细报错如下:

Caused by: javax.net.ssl.SSLHandshakeException: No name matching example.com found
    at sun.security.ssl.Alert.createSSLException(Alert.java:131)
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:324)
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:267)
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:262)
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:654)
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:473)
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:369)
    at sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:377)
    at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:981)
    at sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:968)
    at java.security.AccessController.doPrivileged(Native Method)
    at sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:915)
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435)
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523)
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373)
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293)
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:526)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:447)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.security.cert.CertificateException: No name matching example.com found
    at sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:253)
    at sun.security.util.HostnameChecker.match(HostnameChecker.java:106)
    at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:457)
    at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:417)
    at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)
    at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632)
    ... 22 more
最近更新时间:2025.12.29 11:42:32
这个页面对您有帮助吗?
有用
有用
无用
无用