You need to enable JavaScript to run this app.
导航

Upsert Kafka

最近更新时间2024.02.02 16:34:47

首次发布时间2023.09.08 17:51:49

Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic,支持做数据源表和结果表。

  • 作为源表时,Upsert Kafka 连接器可以将 Kafka 中存储的数据转换为 changelog 流,其中每条数据记录代表一个更新或删除事件。数据记录中有 key,表示 UPDATE;数据记录中没有 key,表示 INSERT;数据记录中 key 的 value 为空,表示 DELETE。
  • 作为结果表时,Upsert Kafka 连接器可以消费上游计算逻辑产生的 changelog 流。它会将 INSERT 或 UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入,表示对应 key 的消息被删除。
    Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新或删除消息将落在同一分区中。

使用限制

Upsert-kafka 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。

DDL 定义

CREATE TABLE upsert_kafka_sink (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = '<yourTopicName>',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'avro',
  'value.format' = 'avro'
);

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 upsert-kafka 连接器。

topic

(none)

String

指定用于读取或写入的 Kafka topic 名称。

properties.bootstrap.servers

(none)

String

以逗号分隔的 Kafka brokers 列表,格式为host:port,host:port

properties.*

(none)

String

传递给 Kafka 的配置参数,如需了解具体的参数,请参见configuration
Flink 会将properties.删除,将剩余配置传递给底层 KafkaClient。
示例:'properties.allow.auto.create.topics' = 'false' 禁用自动创建 Topic。

key.format

(none)

String

读取或写入 Kafka 消息 key 部分时使用的序列化和反序列化的格式,支持csvjsonavro

key.fields

(none)

String

Kafka 消息 key 部分对应的源表或结果表字段。多个字段名以分号(;)分隔。例如field1;field2

key.fields-prefix

(none)

String

key.fields的所有字段定义自定义前缀,以避免和 value.fields 字段名称冲突。
示例:指定前缀为 prefix_,并且 Key 字段名为 name,那么写入 Kafka 后显示为 prefix_name。

说明

  • key.fields-prefix配置项仅用于源表和结果表的列名区分,解析和生成 Kafka 消息 key 部分时,该前缀会被移除。
  • 如果使用key.fields-prefix配置项,那么value.fields-include必须配置为EXCEPT_KEY

value.format

(none)

String

读取或写入 Kafka 消息 value 部分时使用的序列化和反序列化的格式,支持csvjsonavro

value.fields-include

ALL

String

控制哪些字段应该出现在 value 中。

  • ALL:默认值,消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。
  • EXCEPT_KEY:消息的 value 部分将包含除了定义为主键的字段以外,其余 schema 的所有字段。

sink.parallelism

(none)

Integer

定义 upsert-kafka sink 算子的并行度。默认情况下,与上游算子的并行度保持一致,由框架确定并行度。

sink.buffer-flush.max-rows

0

Integer

最多能缓存多少条记录。默认值为 0,表示不开启缓存。
当 sink 收到很多相同 key 的更新,缓存将保留相同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量。

说明

如果需要开启缓存,则需要同时设置sink.buffer-flush.max-rowssink.buffer-flush.interval两个参数取值大于 0。

sink.buffer-flush.interval

0

Duration

缓存刷新的间隔时间,超过该时间后将刷新缓存数据。默认值为 0,表示不开启缓存。
当 sink 收到很多相同 key 的更新,缓存将保留相同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量。

说明

如果需要开启缓存,则需要同时设置sink.buffer-flush.max-rowssink.buffer-flush.interval两个参数取值大于 0。

示例代码

  • 源表

    CREATE TABLE upsert_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp,
         PRIMARY KEY (order_id) NOT ENFORCED
         )
         WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'source',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'group_01', 
      'key.format' = 'json', 
      'value.format' = 'json'  
    );
    
     CREATE TABLE print_sink (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp
        )
    WITH (
         'connector' = 'print'           
    );
    
    insert into print_sink
    select * from upsert_source; 
    
  • 结果表

    CREATE TABLE datagen_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time as localtimestamp
        )
    WITH (
         'connector' = 'datagen',
         'rows-per-second' = '5'          
    );
    
    CREATE TABLE upsert_sink (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp,
         PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'sink',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'group_01', 
      'key.format' = 'json', 
      'value.format' = 'json'  
    );
    
    
    INSERT INTO upsert_sink
    SELECT * FROM datagen_source;