在消息消费失败时,使用 Acknowledgment.nack() 可以将未确认的消息返回给队列,等待其他消费者再次消费。但是,当使用 @KafkaListener 注解消费消息时,可能会出现跳过消息的问题。这是因为 @KafkaListener 默认启用了批量消费,当消息消费失败时,它会调用 Acknowledgment.nack(),将当前批次内所有消息都返回给队列并重新消费下一批次的消息。如果某些消息消费失败的情况下,并不希望跳过这些消息,可以通过以下两种方式解决:
- 禁用批量消费
在 @KafkaListener 注解中配置参数为 max.poll.records=1,可以禁用批量消费。
@KafkaListener(topics = "${kafka.topic}", max.poll.records=1)
public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
try {
// 消息消费失败
if (消息处理失败) {
ack.nack(0, 1000);// 将未确认的消息返回给队列,等待重新消费
} else {
ack.acknowledge();// 消息处理成功
}
} catch (Exception e) {
ack.nack(0, 1000);//将未确认的消息返回给队列,等待重新消费
}
}
- 消费者异常处理器
在消费者中配置异常处理器,将捕获到的异常封装成 Acknowledgment.nack(),而不是在处理消息时调用 Acknowledgment.nack()。
@KafkaListener(topics = "${kafka.topic}")
public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
try {
// 消息处理异常
if (消息处理失败) {
throw new RuntimeException("消息处理失败");
} else {
ack.acknowledge();// 消息处理成功
}