在启用Sleuth时,Kafka群组已经在重新平衡。
需要在启用Spring Cloud Sleuth之前配置Kafka的自动提交偏移量。因为Sleuth会拦截Kafka消费者的poll()方法,并在返回之前创建或加入一个分布式跟踪。这个跟踪可能会触发Kafka重平衡,而Kafka在执行重平衡时会检查消费者的偏移量提交状态,如果消费者关闭了自动提交,则会导致群组重平衡。因此,需要将自动提交偏移量设置为true,并使用一个较长的周期(比如1分钟)进行提交。
代码示例:
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-id");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 开启自动提交
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000); // 设置提交周期为1分钟
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}