You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
Connector 参考
Upsert Kafka
复制全文
Upsert Kafka

Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic,支持做数据源表和结果表。

  • 作为源表时,Upsert Kafka 连接器可以将 Kafka 中存储的数据转换为 changelog 流,其中每条数据记录代表一个更新或删除事件。数据记录中有 key,表示 UPDATE;数据记录中没有 key,表示 INSERT;数据记录中 key 的 value 为空,表示 DELETE。
  • 作为结果表时,Upsert Kafka 连接器可以消费上游计算逻辑产生的 changelog 流。它会将 INSERT 或 UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入,表示对应 key 的消息被删除。
    Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新或删除消息将落在同一分区中。

使用限制

Upsert-kafka 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。

DDL 定义

CREATE TABLE upsert_kafka_sink (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = '<yourTopicName>',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'avro',
  'value.format' = 'avro'
);

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 upsert-kafka 连接器。

topic

(none)

String

指定用于读取或写入的 Kafka topic 名称。

properties.bootstrap.servers

(none)

String

以逗号分隔的 Kafka brokers 列表,格式为host:port,host:port

properties.*

(none)

String

传递给 Kafka 的配置参数,如需了解具体的参数,请参见configuration
Flink 会将properties.删除,将剩余配置传递给底层 KafkaClient。
示例:'properties.allow.auto.create.topics' = 'false' 禁用自动创建 Topic。

key.format

(none)

String

读取或写入 Kafka 消息 key 部分时使用的序列化和反序列化的格式,支持csvjsonavroconfluent-avroraw

key.fields

(none)

String

Kafka 消息 key 部分对应的源表或结果表字段。多个字段名以分号(;)分隔。例如field1;field2

key.fields-prefix

(none)

String

key.fields的所有字段定义自定义前缀,以避免和 value.fields 字段名称冲突。
示例:指定前缀为 prefix_,并且 Key 字段名为 name,那么写入 Kafka 后显示为 prefix_name。

说明

  • key.fields-prefix配置项仅用于源表和结果表的列名区分,解析和生成 Kafka 消息 key 部分时,该前缀会被移除。
  • 如果使用key.fields-prefix配置项,那么value.fields-include必须配置为EXCEPT_KEY

value.format

(none)

String

读取或写入 Kafka 消息 value 部分时使用的序列化和反序列化的格式,支持csvjsonavroconfluent-avroraw

value.fields-include

ALL

String

控制哪些字段应该出现在 value 中。

  • ALL:默认值,消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。
  • EXCEPT_KEY:消息的 value 部分将包含除了定义为主键的字段以外,其余 schema 的所有字段。

scan.parallelism

(none)

Integer

单独设置 Source 并发。如果不设置,则并行度为作业的默认并发数。
该参数经常用于 Source 和下游算子需要断开算子链的场景,使得下游重计算的算子能使用较大的默认并发,提高计算能力,同时保持 Source 并发和 Kafka 分区数相等,此时 Source 到下游由于并发不同,数据 Shuffle 是均匀的,从而提高了整体计算速率。

sink.parallelism

(none)

Integer

定义 upsert-kafka sink 算子的并行度。默认情况下,与上游算子的并行度保持一致,由框架确定并行度。

sink.buffer-flush.max-rows

0

Integer

最多能缓存多少条记录。默认值为 0,表示不开启缓存。
当 sink 收到很多相同 key 的更新,缓存将保留相同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量。

说明

如果需要开启缓存,则需要同时设置sink.buffer-flush.max-rowssink.buffer-flush.interval两个参数取值大于 0。

sink.buffer-flush.interval

0

Duration

缓存刷新的间隔时间,超过该时间后将刷新缓存数据。默认值为 0,表示不开启缓存。
当 sink 收到很多相同 key 的更新,缓存将保留相同 key 的最后一条记录,因此 sink 缓存能帮助减少发往 Kafka topic 的数据量。

说明

如果需要开启缓存,则需要同时设置sink.buffer-flush.max-rowssink.buffer-flush.interval两个参数取值大于 0。

示例代码

  • 源表

    CREATE TABLE upsert_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp,
         PRIMARY KEY (order_id) NOT ENFORCED
         )
         WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'source',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'group_01', 
      'key.format' = 'json', 
      'value.format' = 'json'  
    );
    
     CREATE TABLE print_sink (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp
        )
    WITH (
         'connector' = 'print'           
    );
    
    insert into print_sink
    select * from upsert_source; 
    
  • 结果表

    CREATE TABLE datagen_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time as localtimestamp
        )
    WITH (
         'connector' = 'datagen',
         'rows-per-second' = '5'          
    );
    
    CREATE TABLE upsert_sink (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp,
         PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'sink',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'group_01', 
      'key.format' = 'json', 
      'value.format' = 'json'  
    );
    
    
    INSERT INTO upsert_sink
    SELECT * FROM datagen_source;
    
最近更新时间:2025.08.25 00:35:53
这个页面对您有帮助吗?
有用
有用
无用
无用