You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何从外部应用订阅Spring Boot JMS主题(无需独立ActiveMQ)

没问题!我来给你一步步讲清楚怎么实现——核心就是让第一个应用启动一个内嵌式的ActiveMQ Broker,把它当成消息中间件服务来用,第二个应用直接通过网络连接这个内嵌Broker就行,完全不用单独部署独立的ActiveMQ服务器。

核心原理说明

内嵌Broker是ActiveMQ提供的特性,允许我们在应用程序内部启动一个完整的Broker实例,它和独立部署的ActiveMQ功能一致,但运行在当前应用的JVM进程中。通过给内嵌Broker配置TCP连接器,其他应用就能像连接独立Broker一样连接它,实现跨应用的JMS主题订阅。

服务端(第一个应用)实现步骤

首先,确保项目依赖包含ActiveMQ和Spring JMS相关包(比如Maven的activemq-springspring-boot-starter-jms)。然后创建配置类启动内嵌Broker,并配置消息发送组件:

import java.time.LocalDateTime;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsTemplate;

@SpringBootApplication
public class EmbeddedBrokerApp {

    public static void main(String[] args) {
        SpringApplication.run(EmbeddedBrokerApp.class, args);
    }

    // 初始化并启动内嵌ActiveMQ Broker
    @Bean(initMethod = "start", destroyMethod = "stop")
    public BrokerService brokerService() throws Exception {
        BrokerService broker = new BrokerService();
        // 关键:添加TCP连接器,允许外部应用通过网络连接
        broker.addConnector("tcp://0.0.0.0:61616");
        // 可选:禁用持久化,适合测试场景;生产环境可按需开启
        broker.setPersistent(false);
        // 可选:设置Broker名称
        broker.setBrokerName("embedded-broker");
        return broker;
    }

    // 配置连接工厂,指向本地内嵌Broker
    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL("tcp://localhost:61616");
        // 可选:配置用户名密码(如果Broker开启了安全验证)
        // factory.setUserName("admin");
        // factory.setPassword("admin");
        return factory;
    }

    // 创建JmsTemplate,用于发送消息到主题
    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate template = new JmsTemplate(connectionFactory);
        // 设置为主题模式(默认是队列模式)
        template.setPubSubDomain(true);
        return template;
    }

    // 示例:定时发送消息到主题(可以用@Scheduled触发,或者其他业务逻辑触发)
    @Autowired
    private JmsTemplate jmsTemplate;

    // @Scheduled(fixedRate = 5000) // 每5秒发送一条消息
    public void sendTopicMessage() {
        String message = "Message from embedded broker at: " + LocalDateTime.now();
        jmsTemplate.convertAndSend("demo-topic", message);
        System.out.println("Sent message: " + message);
    }
}
客户端(第二个应用)实现步骤

同样先添加依赖,然后配置连接到服务端的内嵌Broker,并实现主题订阅的监听器:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.stereotype.Component;

import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;

@SpringBootApplication
public class TopicSubscriberApp {

    public static void main(String[] args) {
        SpringApplication.run(TopicSubscriberApp.class, args);
    }

    // 配置连接工厂,指向服务端内嵌Broker的地址
    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        // 替换成服务端应用的实际IP地址和端口,本地测试用localhost即可
        factory.setBrokerURL("tcp://localhost:61616");
        // 如果服务端设置了用户名密码,这里也要对应配置
        // factory.setUserName("admin");
        // factory.setPassword("admin");
        return factory;
    }

    // 开启JMS监听支持
    @EnableJms
    public static class JmsConfig {}

    // 配置主题监听容器工厂(必须设置为主题模式)
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 关键:启用主题订阅模式
        factory.setPubSubDomain(true);
        return factory;
    }

    // 主题订阅者:监听指定主题的消息
    @Component
    public static class DemoTopicSubscriber {

        @JmsListener(destination = "demo-topic", containerFactory = "jmsListenerContainerFactory")
        public void receiveMessage(String message) {
            System.out.println("Received from topic: " + message);
        }
    }
}
关键注意事项
  • 版本一致性:确保服务端和客户端使用的ActiveMQ版本完全一致,避免因版本差异导致的连接或消息处理问题。
  • 网络连通性:如果两个应用不在同一机器,要把客户端的BrokerURL改成服务端的实际IP,同时确保服务端的61616端口没有被防火墙拦截。
  • 安全配置:生产环境建议给Broker配置用户名密码验证,避免未授权的应用连接。
  • 持久化设置:如果需要消息持久化,服务端的BrokerService可以设置setPersistent(true),并配置持久化存储路径。

内容的提问来源于stack exchange,提问作者Beto Neto

火山引擎 最新活动