You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

能否创建Flink SQL表消费含Schema ID的Kafka Schema Registry JSON消息?

完全可以实现,Flink 1.20.0的Kafka连接器原生支持Confluent Schema Registry的JSON序列化格式(包括你提到的Confluent wire format),下面是具体实现步骤:

1. 准备依赖

首先确保你的Flink环境包含以下依赖:

  • Kafka连接器:版本要匹配你的Kafka集群(3.5.1),对应Flink 1.20.0的包是flink-sql-connector-kafka-3.5.1_2.12
  • Confluent JSON Schema Registry适配包flink-json-schema-registry-confluent,版本和Flink一致(1.20.0)

如果是用Flink SQL Client,把这两个jar包放到Flink的lib目录下即可;如果是项目开发,Maven依赖如下:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-sql-connector-kafka-3.5.1_2.12</artifactId>
        <version>1.20.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json-schema-registry-confluent</artifactId>
        <version>1.20.0</version>
    </dependency>
</dependencies>

使用json-confluent格式来处理遵循Confluent wire format的消息,建表语句示例如下:

CREATE TABLE person_kafka (
    first_name STRING,
    last_name STRING
) WITH (
    'connector' = 'kafka',
    -- 替换为你的Kafka主题名
    'topic' = 'your_person_topic',
    -- 替换为你的Kafka broker地址
    'properties.bootstrap.servers' = 'kafka-broker-1:9092,kafka-broker-2:9092',
    -- 替换为你的消费组ID
    'properties.group.id' = 'flink-person-consumer',
    -- 指定使用Confluent Schema Registry的JSON格式
    'format' = 'json-confluent',
    -- 替换为你的Schema Registry地址
    'value.schema.registry.url' = 'http://schema-registry:8081',
    -- 可选:设置消费起始位置,比如latest-offset或earliest-offset
    'scan.startup.mode' = 'latest-offset'
);

参数说明:

  • format = 'json-confluent':这个格式专门用于解析带Confluent wire format的JSON消息,会自动读取消息前5个字节的schema ID,然后从Schema Registry拉取对应的JSON Schema来解析消息体。
  • value.schema.registry.url:指定Schema Registry的访问地址,Flink会自动从这里获取对应schema ID的JSON Schema。

3. 验证数据

建表完成后,直接执行查询语句即可消费数据:

SELECT * FROM person_kafka;

如果你的表结构和Schema Registry中的Person schema一致,Flink会自动完成字段映射,不需要额外配置。

内容的提问来源于stack exchange,提问作者Jake

火山引擎 最新活动