如何在Spring Cloud Stream 3.2.2及Spring Cloud Alibaba 2021.0.1.0中配置RocketMQ?
在Spring Cloud Stream 3.2.2/Spring Cloud Alibaba 2021.0.1.0中配置RocketMQ
嘿,我刚好折腾过这两个版本的RocketMQ配置,给你整理了一套靠谱的步骤,照着来就行~
第一步:引入正确的依赖
首先得把相关依赖配好,这里用Maven做例子,Gradle的话对应转换就行。要注意版本匹配:Spring Cloud Alibaba 2021.0.1.0刚好对应Spring Cloud 2021.0.1(也就是Stream 3.2.2),直接引入BOM和starter就ok:
<dependencyManagement> <dependencies> <!-- 引入Spring Cloud Alibaba版本管理BOM --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2021.0.1.0</version> <type>pom</type> <scope>import</scope> </dependency> <!-- 引入Spring Cloud版本管理BOM,对应Stream 3.2.2 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2021.0.1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <!-- 核心的Stream RocketMQ Starter --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency> <!-- 如果需要接口触发发送消息,就加web依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
第二步:配置文件搞定连接与绑定
接下来在application.yml里配置RocketMQ地址、生产者和消费者的绑定信息,注释都写清楚了,替换成你的实际配置就行:
spring: cloud: stream: # RocketMQ基础配置 rocketmq: binder: name-server: 127.0.0.1:9876 # 替换成你的NameServer地址 # 通道绑定配置:output是生产者通道,input是消费者通道,名字可自定义 bindings: output: destination: test-topic # 要发送的Topic名称 content-type: application/json # 消息格式,也可用text/plain group: producer-demo-group # 必须指定生产者组 input: destination: test-topic # 订阅同一个Topic content-type: application/json group: consumer-demo-group # 必须指定消费者组 consumer: concurrency: 3 # 消费线程数,根据业务调整 broadcast: false # 默认集群消费,true则为广播模式
第三步:写生产者代码(两种方式可选)
Spring Cloud Stream 3.x推荐函数式编程,也支持旧版注解方式,这里两种都列出来:
方式一:用StreamBridge灵活发送(推荐接口触发场景)
import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class MessageSenderController { private final StreamBridge streamBridge; // 构造注入StreamBridge,无需手动配置绑定 public MessageSenderController(StreamBridge streamBridge) { this.streamBridge = streamBridge; } // 写个接口,调用就发消息 @GetMapping("/send") public String sendMessage(@RequestParam String content) { // 参数1:配置里的生产者通道名(output),参数2:消息内容 streamBridge.send("output", content); return "Message sent: " + content; } }
方式二:函数式Supplier(适合定时/自动发送场景)
import java.util.function.Supplier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ProducerConfig { // 定义Supplier,Stream会自动把返回的消息发送到对应通道 @Bean public Supplier<String> messageSupplier() { // 这里可自定义消息生成逻辑,比如从数据库取数、固定内容等 return () -> "Hello from RocketMQ Producer!"; } }
第四步:写消费者代码(函数式更简洁)
用函数式Consumer处理消息,比旧版@StreamListener清爽多了:
import java.util.function.Consumer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConsumerConfig { @Bean public Consumer<String> messageConsumer() { return message -> { // 这里写消息处理逻辑:打印、存数据库、调用其他服务等 System.out.println("Got message from RocketMQ: " + message); // 如果是JSON对象,直接换成对应实体类即可,比如Consumer<User> }; } }
几个要注意的坑
- 先确认RocketMQ NameServer已启动,端口配置正确,不然连不上;
- Topic最好提前在RocketMQ控制台创建,生产环境别开自动创建,容易出问题;
- 生产者组和消费者组命名要规范:同组消费者会负载均衡消费,不同组会重复消费;
- 若遇依赖冲突,检查Spring Boot、Spring Cloud、Spring Cloud Alibaba版本是否匹配:2021.0.1.0对应Spring Boot 2.6.x;
- 要是偏好旧版
@EnableBinding和@StreamListener也能用,但官方已不推荐,函数式是趋势。
内容的提问来源于stack exchange,提问作者August




