You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spring Cloud Stream Kafka生产者消息发送至错误主题问题排查求助

Spring Cloud Stream Kafka生产者消息发送至错误主题问题排查求助

我来帮你捋捋这个问题——你用Spring Cloud Stream的StreamBridge发Kafka消息时,明明配置里指定了目标主题,结果消息却跑到了错误的主题里,后来改了函数式配置也没解决,对吧?先拆解下你的核心问题:

  1. 最初配置把kafka_demo_topic_out_0绑定到kafka_demo_topic,但调用streamBridge.send("kafka_demo_topic_out_0", ...)时,消息还是发到了kafka_demo_topic_out_0,完全没走配置的目标主题
  2. 后来尝试用函数定义kafkaDemoTopic,绑定kafkaDemoTopic-out-0kafka_demo_topic,结果反而创建了kafkaDemoTopic这个新主题,依然不符合预期

先搞懂第一个问题:StreamBridge为啥无视你的绑定配置?

你现在直接用streamBridge.send("kafka_demo_topic_out_0", ...),这里的第一个参数如果是绑定名,理论上应该走配置的destination,但问题出在:你没激活这个绑定!

Spring Cloud Stream的绑定需要对应的函数/组件来触发初始化,如果你的应用里没有任何代码关联kafka_demo_topic_out_0这个绑定,它就不会被加载,这时StreamBridge会默认把你传入的参数当作主题名直接发送,自然就会发到kafka_demo_topic_out_0了。

给你两个靠谱的解决方案

方案1:激活绑定后再用StreamBridge

如果你想继续用StreamBridge,得先让绑定生效。可以在配置里加一个空的供应商函数来激活绑定:

spring:
  cloud:
    stream:
      function:
        definition: demoSupplier # 定义一个空函数,用来触发绑定初始化
      kafka:
        binder:
          autoAddPartitions: true
          brokers: localhost:9092
          auto-create-topics: false
      bindings:
        demoSupplier-out-0: # 对应上面定义的函数的输出绑定
          producer:
            headerMode: raw
          destination: kafka_demo_topic # 这里指定实际要发的主题
          content-type: application/json
          binder: kafka

然后代码里调用StreamBridge时,用这个激活后的绑定名:

streamBridge.send("demoSupplier-out-0", new ObjectMapper().writeValueAsString(message));

方案2:用函数式编程(Spring Cloud Stream推荐方式)

这种方式更规范,也不容易出错。你可以定义一个Supplier函数,让Spring Cloud Stream自动帮你处理发送逻辑,不用手动管StreamBridge和定时任务:

先写生产者函数组件:

@Component
public class KafkaProducerFunction {
    private AtomicLong idGenerator = new AtomicLong();

    @Bean
    public Supplier<org.springframework.messaging.Message<String>> kafkaDemoTopic() {
        return () -> {
            try {
                Message message = constructMessage();
                System.out.println("sending message: " + message);
                return org.springframework.messaging.MessageBuilder
                        .withPayload(new ObjectMapper().writeValueAsString(message))
                        .build();
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        };
    }

    private Message constructMessage() {
        Message message = new Message();
        message.setId(idGenerator.getAndIncrement());
        message.setUuid(UUID.randomUUID().toString());
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        message.setDate(now.format(formatter));
        return message;
    }
}

然后调整配置文件:

spring:
  cloud:
    stream:
      function:
        definition: kafkaDemoTopic # 对应上面@Bean的函数名
      kafka:
        binder:
          autoAddPartitions: true
          brokers: localhost:9092
          auto-create-topics: false
      bindings:
        kafkaDemoTopic-out-0: # 函数名-out-0是Spring Cloud Stream默认的输出绑定格式
          producer:
            headerMode: raw
          destination: kafka_demo_topic # 指定实际目标主题
          content-type: application/json
          binder: kafka
      poller: # 配置定时发送的间隔,替代@Scheduled
        fixed-delay: 1000

这样Spring Cloud Stream会自动每隔1秒调用这个Supplier函数,把消息发到kafka_demo_topic,完全不用你手动调用StreamBridge。

为啥你之前的配置没生效?

  1. 第一种配置里,你定义了kafka_demo_topic_out_0绑定,但没有对应的函数激活它,绑定根本没初始化,StreamBridge只能把参数当主题名用
  2. 第二种配置里,你写了function.definition: kafkaDemoTopic,但没创建对应的@Bean函数,Spring Cloud Stream就默认生成了一个空函数,还把函数名当作主题名,所以才创建了kafkaDemoTopic这个新主题

额外要检查的点

  • 确认auto-create-topics: false生效,要是kafka_demo_topic主题不存在,得手动创建,不然消息会发送失败
  • 检查版本兼容性:你用的spring-cloud-starter-stream-kafka:4.1.0对应的Spring Boot版本应该是3.2.x,版本不匹配也可能导致配置失效

备注:内容来源于stack exchange,提问作者Petr Kostroun

火山引擎 最新活动