能否创建Flink SQL表消费含Schema ID的Kafka Schema Registry JSON消息?
可以用Flink SQL消费Confluent Schema Registry序列化的JSON消息
完全可以实现,Flink 1.20.0的Kafka连接器原生支持Confluent Schema Registry的JSON序列化格式(包括你提到的Confluent wire format),下面是具体实现步骤:
1. 准备依赖
首先确保你的Flink环境包含以下依赖:
- Kafka连接器:版本要匹配你的Kafka集群(3.5.1),对应Flink 1.20.0的包是
flink-sql-connector-kafka-3.5.1_2.12 - Confluent JSON Schema Registry适配包:
flink-json-schema-registry-confluent,版本和Flink一致(1.20.0)
如果是用Flink SQL Client,把这两个jar包放到Flink的lib目录下即可;如果是项目开发,Maven依赖如下:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka-3.5.1_2.12</artifactId> <version>1.20.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json-schema-registry-confluent</artifactId> <version>1.20.0</version> </dependency> </dependencies>
2. 创建Flink SQL表
使用json-confluent格式来处理遵循Confluent wire format的消息,建表语句示例如下:
CREATE TABLE person_kafka ( first_name STRING, last_name STRING ) WITH ( 'connector' = 'kafka', -- 替换为你的Kafka主题名 'topic' = 'your_person_topic', -- 替换为你的Kafka broker地址 'properties.bootstrap.servers' = 'kafka-broker-1:9092,kafka-broker-2:9092', -- 替换为你的消费组ID 'properties.group.id' = 'flink-person-consumer', -- 指定使用Confluent Schema Registry的JSON格式 'format' = 'json-confluent', -- 替换为你的Schema Registry地址 'value.schema.registry.url' = 'http://schema-registry:8081', -- 可选:设置消费起始位置,比如latest-offset或earliest-offset 'scan.startup.mode' = 'latest-offset' );
参数说明:
format = 'json-confluent':这个格式专门用于解析带Confluent wire format的JSON消息,会自动读取消息前5个字节的schema ID,然后从Schema Registry拉取对应的JSON Schema来解析消息体。value.schema.registry.url:指定Schema Registry的访问地址,Flink会自动从这里获取对应schema ID的JSON Schema。
3. 验证数据
建表完成后,直接执行查询语句即可消费数据:
SELECT * FROM person_kafka;
如果你的表结构和Schema Registry中的Person schema一致,Flink会自动完成字段映射,不需要额外配置。
内容的提问来源于stack exchange,提问作者Jake




