解决 KafkaTridentSpoutOpaque 重复消费最后一条消息的问题,可以采用以下方法:
-
使用 Kafka 版本 0.10.1.0 或更高版本,以确保 Kafka Spout 在处理消息时具有幂等性。
-
在 KafkaSpoutConfig 中设置 startOffsetTime
属性为 kafka.api.OffsetRequest.EarliestTime()
,这将从最早的消息开始消费,而不是从最新的消息开始。
KafkaSpoutConfig<String, String> kafkaSpoutConfig = new KafkaSpoutConfig.Builder<>(kafkaBootstrapServers, kafkaTopics)
.setStartOffsetTime(kafka.api.OffsetRequest.EarliestTime())
.build();
- 在 TridentTopology 中使用 KafkaTridentSpoutOpaque 时,通过设置
ignoreZkOffsets
属性为 true
,来禁用使用 ZooKeeper 来追踪已消费的消息的偏移量。
TridentTopology topology = new TridentTopology();
KafkaTridentSpoutOpaque<String, String> kafkaSpout = new KafkaTridentSpoutOpaque<>(kafkaSpoutConfig);
topology.newStream("kafka-stream", kafkaSpout)
.each(...) // 对消息进行处理
.parallelismHint(...) // 设置并行度
...
这样设置后,KafkaTridentSpoutOpaque 将从最早的消息开始消费,并且不会使用 ZooKeeper 来追踪已消费的消息的偏移量,从而避免了重复消费最后一条消息的问题。