You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在Spring Cloud Stream 3.2.2及Spring Cloud Alibaba 2021.0.1.0中配置RocketMQ?

在Spring Cloud Stream 3.2.2/Spring Cloud Alibaba 2021.0.1.0中配置RocketMQ

嘿,我刚好折腾过这两个版本的RocketMQ配置,给你整理了一套靠谱的步骤,照着来就行~

第一步:引入正确的依赖

首先得把相关依赖配好,这里用Maven做例子,Gradle的话对应转换就行。要注意版本匹配:Spring Cloud Alibaba 2021.0.1.0刚好对应Spring Cloud 2021.0.1(也就是Stream 3.2.2),直接引入BOM和starter就ok:

<dependencyManagement>
    <dependencies>
        <!-- 引入Spring Cloud Alibaba版本管理BOM -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>2021.0.1.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <!-- 引入Spring Cloud版本管理BOM,对应Stream 3.2.2 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>2021.0.1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <!-- 核心的Stream RocketMQ Starter -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    </dependency>
    <!-- 如果需要接口触发发送消息,就加web依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

第二步:配置文件搞定连接与绑定

接下来在application.yml里配置RocketMQ地址、生产者和消费者的绑定信息,注释都写清楚了,替换成你的实际配置就行:

spring:
  cloud:
    stream:
      # RocketMQ基础配置
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876 # 替换成你的NameServer地址
      # 通道绑定配置:output是生产者通道,input是消费者通道,名字可自定义
      bindings:
        output:
          destination: test-topic # 要发送的Topic名称
          content-type: application/json # 消息格式,也可用text/plain
          group: producer-demo-group # 必须指定生产者组
        input:
          destination: test-topic # 订阅同一个Topic
          content-type: application/json
          group: consumer-demo-group # 必须指定消费者组
          consumer:
            concurrency: 3 # 消费线程数,根据业务调整
            broadcast: false # 默认集群消费,true则为广播模式

第三步:写生产者代码(两种方式可选)

Spring Cloud Stream 3.x推荐函数式编程,也支持旧版注解方式,这里两种都列出来:

方式一:用StreamBridge灵活发送(推荐接口触发场景)

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageSenderController {

    private final StreamBridge streamBridge;

    // 构造注入StreamBridge,无需手动配置绑定
    public MessageSenderController(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    // 写个接口,调用就发消息
    @GetMapping("/send")
    public String sendMessage(@RequestParam String content) {
        // 参数1:配置里的生产者通道名(output),参数2:消息内容
        streamBridge.send("output", content);
        return "Message sent: " + content;
    }
}

方式二:函数式Supplier(适合定时/自动发送场景)

import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ProducerConfig {

    // 定义Supplier,Stream会自动把返回的消息发送到对应通道
    @Bean
    public Supplier<String> messageSupplier() {
        // 这里可自定义消息生成逻辑,比如从数据库取数、固定内容等
        return () -> "Hello from RocketMQ Producer!";
    }
}

第四步:写消费者代码(函数式更简洁)

用函数式Consumer处理消息,比旧版@StreamListener清爽多了:

import java.util.function.Consumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfig {

    @Bean
    public Consumer<String> messageConsumer() {
        return message -> {
            // 这里写消息处理逻辑:打印、存数据库、调用其他服务等
            System.out.println("Got message from RocketMQ: " + message);
            // 如果是JSON对象,直接换成对应实体类即可,比如Consumer<User>
        };
    }
}

几个要注意的坑

  • 先确认RocketMQ NameServer已启动,端口配置正确,不然连不上;
  • Topic最好提前在RocketMQ控制台创建,生产环境别开自动创建,容易出问题;
  • 生产者组和消费者组命名要规范:同组消费者会负载均衡消费,不同组会重复消费;
  • 若遇依赖冲突,检查Spring Boot、Spring Cloud、Spring Cloud Alibaba版本是否匹配:2021.0.1.0对应Spring Boot 2.6.x;
  • 要是偏好旧版@EnableBinding@StreamListener也能用,但官方已不推荐,函数式是趋势。

内容的提问来源于stack exchange,提问作者August

火山引擎 最新活动