You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Kafka Connect分区键映射异常:同Key无法同分区致流关联失败

解决Kafka Connect与生产者同Key分区不一致的问题

从你给出的日志能明显看到,同一个ID在CONNECT_TOPICSTREAM_TOPIC被分配到了不同分区(比如68f52084-cfc9-4997-a28e-57cfd4f7bbbf在CONNECT_TOPIC是分区13,STREAM_TOPIC是分区17),核心原因是Kafka Connect的Key序列化方式和你的生产者不一致,导致相同ID的哈希计算结果不同,最终分区映射错位。

问题根源分析

你当前使用JsonConverter作为Key转换器,即使提取了ID作为Key,JsonConverter会把字符串ID序列化为带双引号的JSON字符串(比如"68f52084-cfc9-4997-a28e-57cfd4f7bbbf");而你的生产者直接调用KafkaProducer.produce("string key", event)时,用的是StringSerializer,会把ID序列化为纯字符串字节流(比如68f52084-cfc9-4997-a28e-57cfd4f7bbbf)。这两种序列化后的字节内容完全不同,Kafka基于字节的哈希计算自然会得到不同的分区。

解决方案:统一Key序列化方式

修改你的Kafka Connect配置,将Key转换器替换为StringConverter,确保和生产者的序列化逻辑一致。同时可以简化Transforms配置,因为我们不需要再处理JSON格式的Key了:

修改后的Connector配置

{
  "name": "CONNECTOR",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "connection.url": "jdbc:mysql://localhost:3306/demo?user=rmoff&password=pw",
    "table.whitelist": "CONNECTOR",
    "mode": "timestamp",
    "timestamp.column.name": "update_ts",
    "validate.non.null": "false",
    "transforms": "createKey,extractId",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id",
    "transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractId.field": "id",
    "topic.prefix": "enrichment-"
  }
}

关键修改点说明

  1. 替换key.converter:用StringConverter替代JsonConverter,确保Key以纯字符串格式序列化,和你的生产者行为一致。
  2. 移除不必要的Cast转换:因为ExtractField$Key提取的已经是字符串类型的ID,直接用StringConverter序列化即可,不需要额外的Cast转换。

验证方法

修改配置重启Connector后,用Kafka控制台消费者分别查看两个Topic的Key格式:

# 查看CONNECT_TOPIC的Key
kafka-console-consumer.sh --bootstrap-server <your-broker> --topic enrichment-CONNECT_TOPIC --property print.key=true --from-beginning

# 查看STREAM_TOPIC的Key
kafka-console-consumer.sh --bootstrap-server <your-broker> --topic STREAM_TOPIC --property print.key=true --from-beginning

确认两个Topic中同一个ID的Key输出格式完全一致(没有额外的双引号),此时它们的分区映射就会匹配,流关联操作就能正常工作。


附你提供的问题现象日志:
流任务ID 0_13,分区编号13,消费CONNECT_EVENT:68f52084-cfc9-4997-a28e-57cfd4f7bbbf
流任务ID 0_13,分区编号13,已关联CONNECT_EVENT:68f52084-cfc9-4997-a28e-57cfd4f7bbbf
流任务ID 0_17,分区编号17,消费STREAM_EVENT:68f52084-cfc9-4997-a28e-57cfd4f7bbbf
流任务ID 0_17,分区编号17,已关联STREAM_EVENT:68f52084-cfc9-4997-a28e-57cfd4f7bbbf
流任务ID 0_7,分区编号7,消费STREAM_EVENT:32aaa88d-b175-4a54-8338-d542ed051e6a
流任务ID 0_7,分区编号7,已关联STREAM_EVENT:32aaa88d-b175-4a54-8338-d542ed051e6a
流任务ID 0_17,分区编号17,消费CONNECT_EVENT:32aaa88d-b175-4a54-8338-d542ed051e6a
流任务ID 0_17,分区编号17,已关联CONNECT_EVENT:32aaa88d-b175-4a54-8338-d542ed051e6a
流任务ID 0_11,分区编号11,消费CONNECT_EVENT:90265a93-adac-4e93-856c-d1498eeeb22e
流任务ID 0_11,分区编号11,已关联CONNECT_EVENT:90265a93-adac-4e93-856c-d1498eeeb22e
流任务ID 0_11,分区编号11,消费STREAM_EVENT:90265a93-adac-4e93-856c-d1498eeeb22e
流任务ID 0_11,分区编号11,已关联STREAM_EVENT:90265a93-adac-4e93-856c-d1498eeeb22e
流任务ID 0_11,分区编号11,已合并:90265a93-adac-4e93-856c-d1498eeeb22e

内容的提问来源于stack exchange,提问作者mikemikemike

火山引擎 最新活动