SpringBoot Kafka生产者提示消息已发送但消费者无法接收的问题求助
你好,我看了你的代码和问题描述,大概率是这几个核心问题导致的,咱们一步步来排查解决:
1. 最关键的问题:异步发送未处理回调,日志提前打印造成「假成功」
你现在的send方法里,调用kafkaTemplate().send(topic, message)后直接打日志,但send是异步方法——它只是把消息放进生产者本地发送队列就返回了,并不会等待Broker确认接收成功。所以你的日志只能证明消息进入了本地队列,不能代表Broker真的收到了消息。
赶紧给发送逻辑加上回调,看看真实的发送结果:
public void send(String topic, String message) { kafkaTemplate().send(topic, message) .addCallback( // 发送成功的回调:打印Broker返回的偏移量,证明真的落地了 result -> log.info("Message [{}] sent successfully to topic [{}], offset: {}", message, topic, result.getRecordMetadata().offset()), // 发送失败的回调:打印异常栈,直接定位问题 ex -> log.error("Failed to send message [{}] to topic [{}]", message, topic, ex) ); }
加上这个之后,你就能看到消息是真的发成功了,还是在底层抛出了连接、权限或者Broker不可用的异常。
2. 生产者Broker地址硬编码,和配置不一致
我注意到SenderComponent里注入了配置项${sporting.kafka.url}但完全没用到,反而SenderKafkaProducer里把BOOTSTRAP_SERVERS_CONFIG写死成了localhost:9092。如果你的配置文件里sporting.kafka.url不是这个地址,就会出现「看似连接正常但实际发错Broker」的情况。
改成读取配置的方式,避免硬编码:
// 在SenderKafkaProducer里注入配置 @Value("${sporting.kafka.url}") private String kafkaUrl; // 替换producerConfigs里的硬编码内容 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
3. 验证消息是否真的到达Topic
用kcat从头消费一次Topic,确认有没有消息:
kcat -b localhost:9092 -t SCH-list -o beginning
如果这个命令能收到消息,那问题出在你自己的消费者配置上——比如消费者的group.id之前消费过该Topic,偏移量已经到最新位置,所以收不到旧消息;或者消费者的bootstrap.servers配置错误。
4. 其他小排查点
- 确认Topic的分区权限:检查Broker日志,看有没有生产者无权限写入的报错
- 检查Broker磁盘空间:如果Broker磁盘满了,会拒绝写入,回调会抛出对应的异常
- 确认
SenderComponent里的topicName是否正确:可以在发送前打印log.info("Sending to topic: {}", topicName),避免配置项写错
另外提个小建议:你的SenderComponent里的scheduledRoutings是内存集合,应用重启后会清空,可能导致重复调度任务,后续可以考虑把已调度的任务标记持久化到仓库里,不过这个和当前的消息发送问题无关~
先试试加回调看真实发送结果,应该能立刻定位到问题根源!




