You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spring Boot中如何为Kafka重试主题消费者设置指定延迟(如5m、15m、30m)的消费机制

Spring Boot中如何为Kafka重试主题消费者设置指定延迟(如5m、15m、30m)的消费机制

嘿,我正好处理过几乎一模一样的第三方服务重试场景,给你分享几个在Spring Boot里落地的靠谱方案,完全贴合你的需求,而且不会因为消息堆积搞挂应用:

核心思路

先明确:不要在应用内存里存待重试的消息,把所有消息都交给Kafka存储,我们只需要控制消费者“到点再消费”的逻辑——这才是最稳的,毕竟Kafka天生就是用来扛高并发消息堆积的。

方案一:基于消息时间戳的消费者端过滤+动态暂停/唤醒

这是我生产环境用得最多的方案,简单易实现,完全适配你现有的三个重试主题(5m/15m/30m)。

步骤1:生产者发送重试消息时,标记目标消费时间

当第三方调用失败后,你需要把消息发送到对应延迟的重试主题,同时在消息头里带上「允许消费的时间戳」(当前时间+延迟时长):

@Autowired
private KafkaTemplate<String, YourMessageDTO> kafkaTemplate;

public void sendToRetryTopic(YourMessageDTO message, String retryTopic, long delayMillis) {
    ProducerRecord<String, YourMessageDTO> record = new ProducerRecord<>(retryTopic, message.getBizId(), message);
    // 计算目标消费时间:当前时间 + 延迟时长
    long targetConsumeTime = System.currentTimeMillis() + delayMillis;
    // 存入消息头
    record.headers().add("target-consume-time", String.valueOf(targetConsumeTime).getBytes(StandardCharsets.UTF_8));
    kafkaTemplate.send(record);
}

// 调用示例:发送到5分钟重试主题
sendToRetryTopic(message, "topic-retry-5m", 5 * 60 * 1000);

步骤2:消费者端判断时间,未到则暂停分区,到点再唤醒

每个重试主题的消费者,在拿到消息后先检查是否到了允许消费的时间,如果没到就暂停当前分区的消费,用一个单例的定时任务在延迟后唤醒,到点再处理消息:

首先,先定义一个单例的定时线程池(避免重复创建线程):

@Configuration
public class KafkaConfig {
    @Bean
    public ScheduledExecutorService kafkaDelayWakeupExecutor() {
        // 用守护线程,避免影响应用 shutdown
        return Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("kafka-delay-wakeup-thread");
            return t;
        });
    }
}

然后是5分钟重试主题的消费者:

@KafkaListener(topics = "topic-retry-5m", groupId = "retry-5m-group", autoStartup = "true")
public void consumeRetry5m(ConsumerRecord<String, YourMessageDTO> record,
                           Acknowledgment ack,
                           Consumer<?, ?> consumer,
                           @Autowired ScheduledExecutorService wakeupExecutor) {
    // 1. 从消息头取目标消费时间
    Header targetTimeHeader = record.headers().lastHeader("target-consume-time");
    if (targetTimeHeader == null) {
        // 异常情况:没有时间头,直接处理或者丢死信队列
        processThirdPartyCall(record.value());
        ack.acknowledge();
        return;
    }

    long targetConsumeTime = Long.parseLong(new String(targetTimeHeader.value(), StandardCharsets.UTF_8));
    long now = System.currentTimeMillis();

    // 2. 判断是否到了消费时间
    if (now < targetConsumeTime) {
        // 还没到时间:暂停当前分区的消费
        Set<TopicPartition> assignedPartitions = consumer.assignment();
        consumer.pause(assignedPartitions);

        // 计算需要等待的时长,定时唤醒消费
        long delay = targetConsumeTime - now;
        wakeupExecutor.schedule(() -> {
            // 到点了,恢复分区消费
            consumer.resume(assignedPartitions);
        }, delay, TimeUnit.MILLISECONDS);

        // 注意:这里不要提交offset,因为还没处理消息
        return;
    }

    // 3. 到时间了,执行第三方调用
    try {
        boolean success = processThirdPartyCall(record.value());
        if (success) {
            // 调用成功,提交offset
            ack.acknowledge();
        } else {
            // 还是失败,转到下一个重试主题(比如15m)
            sendToRetryTopic(record.value(), "topic-retry-15m", 15 * 60 * 1000);
            ack.acknowledge();
        }
    } catch (Exception e) {
        // 捕获异常,同样转到下一个重试主题或者死信队列
        log.error("处理5分钟重试消息失败,转15分钟重试", e);
        sendToRetryTopic(record.value(), "topic-retry-15m", 15 * 60 * 1000);
        ack.acknowledge();
    }
}

// 你的第三方调用逻辑
private boolean processThirdPartyCall(YourMessageDTO message) {
    // 这里写调用第三方服务的代码,返回是否成功
    try {
        thirdPartyService.sendInfo(message);
        return true;
    } catch (Exception e) {
        log.error("调用第三方服务失败", e);
        return false;
    }
}

关键注意事项

  1. 必须关闭自动提交offset:在消费者配置里把enable.auto.commit设为false,用手动提交(Acknowledgment ack),不然没处理就提交了,消息会丢失。
  2. 暂停的是当前消费者分配的分区:不要全局暂停,不然会影响其他分区的消费(如果你的主题有多个分区的话)。
  3. 定时线程用守护线程:避免应用 shutdown 时因为线程还在运行导致无法正常退出。

方案二:利用Spring Kafka的ContainerProperties自定义消费逻辑(进阶)

如果你的重试主题比较多,不想重复写暂停/唤醒的逻辑,可以把延迟判断的逻辑抽成一个通用的MessageListener,然后通过ListenerContainerCustomizer给每个重试主题的容器绑定:

@Component
public class DelayedKafkaMessageListener implements AcknowledgingMessageListener<String, YourMessageDTO> {
    @Autowired
    private ScheduledExecutorService wakeupExecutor;
    @Autowired
    private ThirdPartyService thirdPartyService;
    @Autowired
    private KafkaRetrySender kafkaRetrySender;

    @Override
    public void onMessage(ConsumerRecord<String, YourMessageDTO> record, Acknowledgment ack, Consumer<?, ?> consumer) {
        // 这里的逻辑和方案一的消费者逻辑完全一样,把延迟判断、暂停唤醒、处理逻辑都放在这里
        // 省略重复代码...
    }
}

// 然后配置容器自定义器,给每个重试主题绑定这个监听器
@Configuration
public class KafkaContainerConfig {
    @Bean
    public ListenerContainerCustomizer<ConcurrentMessageListenerContainer<String, YourMessageDTO>> delayedContainerCustomizer(DelayedKafkaMessageListener delayedListener) {
        return (container, dest, group) -> {
            if (dest.startsWith("topic-retry-")) {
                container.getContainerProperties().setMessageListener(delayedListener);
                // 关闭自动提交
                container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
            }
        };
    }
}

这个方案适合主题较多的场景,能减少重复代码。

为什么你的顾虑(消息堆积)不会发生?

你之前担心“大量消息堆积会搞挂应用”,但用上面的方案完全不用怕:

  • 所有待重试的消息都存在Kafka里,Kafka是分布式的、高可用的消息存储,天生就能扛百万级别的消息堆积。
  • 消费者只是暂停了分区的消费,不会把消息加载到应用内存里,应用的内存占用只会和当前正在处理的消息数量有关,不会因为堆积而暴涨。
  • 定时任务只是负责“唤醒”消费者,本身不存储任何消息,占用的资源可以忽略不计。

最后给你的业务建议

  1. 给每个重试主题设置独立的消费组:比如retry-5m-groupretry-15m-group,避免不同延迟的重试逻辑互相影响。
  2. 设置重试上限:比如30分钟的重试主题如果还是失败,就把消息转到死信队列(DLQ),然后人工介入排查,避免无限循环重试。
  3. 监控重试主题的消息堆积量:比如用Prometheus+Grafana监控每个重试主题的消息数,如果某个主题的消息突然暴涨,说明第三方服务可能出了大问题,及时告警。

这样一套下来,你的延迟重试机制就非常稳了,完全贴合第三方服务调用失败的重试场景~

火山引擎 最新活动