You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Kafka Streams出现CorruptRecordException问题求助

解决Kafka Streams中触发CorruptRecordException的问题

我之前也碰到过类似的情况,结合你描述的场景——跟着官方教程写了Pipe.java程序,启动组件后生产者输入内容就抛出CorruptRecordException,控制台有错误但Kafka日志目录没相关记录,给你几个针对性的排查和解决方向:

  • 优先排查序列化/反序列化器不匹配问题
    官方的Pipe示例默认使用的是ByteArraySerializerByteArrayDeserializer,但咱们平时用kafka-console-producer.sh的时候,默认的序列化器是StringSerializer,这就会导致Streams程序拿到消息后,用字节数组反序列化字符串格式的消息,直接触发异常。

    解决办法

    • 启动生产者时明确指定和Streams程序一致的序列化器:
      bin/kafka-console-producer.sh --broker-list localhost:9092 --topic input-topic \
      --property key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer \
      --property value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
      
    • 或者修改Pipe.java里的Streams配置,把序列化/反序列化器改成StringSerializer/StringDeserializer,匹配控制台生产者的默认设置:
      props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
      props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
      
  • 检查Kafka版本兼容性
    如果你的本地Kafka集群版本和Pipe.java依赖的kafka-streams版本不一致,可能会出现消息格式(比如v0/v1/v2版本)不兼容的情况,进而导致反序列化失败抛出异常。

    排查方法
    确认pom.xml(Maven)或build.gradle(Gradle)里的kafka-streams依赖版本,和你本地启动的Kafka集群版本完全一致,比如集群是2.8.1,依赖也得是2.8.1。

  • 修复日志输出问题
    控制台能看到错误但$KAFKA_HOME/logs下没日志,大概率是你的Streams程序没加载Kafka的日志配置。

    解决办法
    启动Streams程序时,指定Kafka自带的log4j配置文件:

    java -cp ".:$KAFKA_HOME/libs/*" -Dlog4j.configuration=file:$KAFKA_HOME/config/log4j.properties Pipe
    

    这样程序的日志就会输出到Kafka的logs目录下,方便你后续更细致地排查问题。

  • 测试简单消息排除内容问题
    如果上面的方法都没解决,可以先输入最简单的纯文本内容(比如hello kafka),看看是否还会触发异常。要是简单消息没问题,那可能是你输入的内容包含特殊字符,和序列化器的要求冲突了。

内容的提问来源于stack exchange,提问作者Eric

火山引擎 最新活动