Kafka Streams开启EXACTLY_ONCE后Spark读取报错及偏移量异常排查
核心成因拆解
从你描述的现象(开启Kafka Streams Exactly-Once后Spark报错、偏移量不连续、偏移量计数与实际消息数不符)来看,问题主要源于版本兼容性差异和Kafka Exactly-Once实现的特性,具体分三点:
Spark 2.2.0 Kafka客户端对事务消息的不支持
Spark 2.2.0的spark-streaming-kafka-0-10模块依赖的是Kafka Clients 0.10.x版本,而Kafka的事务API(Transactional API)是在0.11版本才引入的。当Kafka Streams开启EXACTLY_ONCE时,会通过事务API写入消息,这些消息带有事务标记。旧版本的Spark客户端无法识别事务状态,当它尝试seek到某个偏移量时,可能对应的是未提交/已回滚的事务消息(这类消息不会被对外暴露),就会触发AssertionError——因为Spark预期能找到该偏移量的记录,但实际找不到。Kafka Streams Exactly-Once模式下的偏移量特性
开启EXACTLY_ONCE后,Kafka Streams会用事务批量写入消息:
- 如果事务提交,消息会正常可见;
- 如果事务回滚,这批消息会被标记为不可消费,但偏移量会被占用(Kafka的偏移量是全局单调递增的,不会因为回滚而复用)。
这就解释了你看到的偏移量跳过现象(比如1、2之后直接到4、5),以及GetOffsetShell显示的结束偏移量远大于实际消息数——GetOffsetShell统计的是topic的最高偏移量(包括事务中未提交/回滚的记录),而实际可消费的只有已提交事务的消息。
- Kafka Streams事务状态残留
你提到重置Kafka Streams后问题仍存在,可能是因为topic中残留了未完成的事务元数据,导致后续写入的偏移量依然不连续,进而持续触发Spark的断言错误。
针对性解决方案
根据你的场景,推荐以下几种解决路径:
升级Spark版本(最优解)
将Spark升级到2.3.x及以上版本,这些版本的spark-streaming-kafka-0-10模块已经支持Kafka事务消息读取。同时在Spark配置中添加:.option("kafka.isolation.level", "read_committed")这个配置会让Spark只读取已提交事务的消息,完美适配Kafka Streams的
EXACTLY_ONCE写入。降级Kafka Streams的一致性保障(快速临时方案)
如果暂时无法升级Spark,可以修改Kafka Streams的配置,将PROCESSING_GUARANTEE_CONFIG改为AT_LEAST_ONCE:p.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE)这样Kafka Streams会用普通方式写入消息,不再使用事务,Spark 2.2.0就能正常读取,同时偏移量也会和实际消息数匹配。
清理异常Topic(辅助修复)
如果topic中残留了异常的事务元数据,可以删除该topic后重新创建,再重启Kafka Streams应用。不过这只能解决当前的偏移量异常,无法从根本上解决版本兼容性问题。
内容的提问来源于stack exchange,提问作者Tomas Lach




