You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

SpringBoot Kafka生产者提示消息已发送但消费者无法接收的问题求助

SpringBoot Kafka生产者提示消息已发送但消费者无法接收的问题求助

你好,我看了你的代码和问题描述,大概率是这几个核心问题导致的,咱们一步步来排查解决:

1. 最关键的问题:异步发送未处理回调,日志提前打印造成「假成功」

你现在的send方法里,调用kafkaTemplate().send(topic, message)后直接打日志,但send异步方法——它只是把消息放进生产者本地发送队列就返回了,并不会等待Broker确认接收成功。所以你的日志只能证明消息进入了本地队列,不能代表Broker真的收到了消息。

赶紧给发送逻辑加上回调,看看真实的发送结果:

public void send(String topic, String message) {
    kafkaTemplate().send(topic, message)
        .addCallback(
            // 发送成功的回调:打印Broker返回的偏移量,证明真的落地了
            result -> log.info("Message [{}] sent successfully to topic [{}], offset: {}", 
                              message, topic, result.getRecordMetadata().offset()),
            // 发送失败的回调:打印异常栈,直接定位问题
            ex -> log.error("Failed to send message [{}] to topic [{}]", message, topic, ex)
        );
}

加上这个之后,你就能看到消息是真的发成功了,还是在底层抛出了连接、权限或者Broker不可用的异常。

2. 生产者Broker地址硬编码,和配置不一致

我注意到SenderComponent里注入了配置项${sporting.kafka.url}但完全没用到,反而SenderKafkaProducer里把BOOTSTRAP_SERVERS_CONFIG写死成了localhost:9092。如果你的配置文件里sporting.kafka.url不是这个地址,就会出现「看似连接正常但实际发错Broker」的情况。

改成读取配置的方式,避免硬编码:

// 在SenderKafkaProducer里注入配置
@Value("${sporting.kafka.url}")
private String kafkaUrl;

// 替换producerConfigs里的硬编码内容
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);

3. 验证消息是否真的到达Topic

kcat从头消费一次Topic,确认有没有消息:

kcat -b localhost:9092 -t SCH-list -o beginning

如果这个命令能收到消息,那问题出在你自己的消费者配置上——比如消费者的group.id之前消费过该Topic,偏移量已经到最新位置,所以收不到旧消息;或者消费者的bootstrap.servers配置错误。

4. 其他小排查点

  • 确认Topic的分区权限:检查Broker日志,看有没有生产者无权限写入的报错
  • 检查Broker磁盘空间:如果Broker磁盘满了,会拒绝写入,回调会抛出对应的异常
  • 确认SenderComponent里的topicName是否正确:可以在发送前打印log.info("Sending to topic: {}", topicName),避免配置项写错

另外提个小建议:你的SenderComponent里的scheduledRoutings是内存集合,应用重启后会清空,可能导致重复调度任务,后续可以考虑把已调度的任务标记持久化到仓库里,不过这个和当前的消息发送问题无关~

先试试加回调看真实发送结果,应该能立刻定位到问题根源!

火山引擎 最新活动