Spring JMS如何借助ActiveMQ Advisory Message获取Topic消费者数量?
Great question! Let's tackle both of your concerns step by step—first clarifying how Advisory Messages work, then walking through concrete implementations using Spring and ActiveMQ APIs to meet your requirement.
First, let's clear up your confusion about who triggers the Advisory Topic messages: the ActiveMQ Broker itself handles this automatically.
When a consumer subscribes to your target Topic (e.g., abc.xyz), the broker immediately sends an Advisory message to ActiveMQ.Advisory.Consumer.Topic.abc.xyz notifying listeners that the consumer count has increased. Similarly, when a consumer unsubscribes or disconnects (including non-persistent WebSocket clients), the broker sends another Advisory message updating the count to reflect the change.
Your onMessage method will be invoked automatically whenever these events occur—you don't need to manually trigger anything. The broker pushes these events to all listeners subscribed to the Advisory Topic.
Your goal is to check the consumer count before sending to a Topic, and you're using CachingConnectionFactory with Spring. Below are two reliable approaches, including how to leverage ActiveMQ's extensions (since JMS 2.0 doesn't include a standard API for consumer count queries).
Option 1: Event-Driven Tracking via Advisory Messages
This approach maintains a local, real-time count of consumers by listening to Advisory events. It's efficient for frequent message sends since it avoids repeated broker queries.
Step 1: Create an Advisory Listener Component
This component will listen to the Advisory Topic and update a consumer count:
@Component public class TopicConsumerTracker implements MessageListener { // AtomicInteger ensures thread-safe updates private final AtomicInteger currentConsumerCount = new AtomicInteger(0); private final String targetTopicName = "abc.xyz"; @Override public void onMessage(Message message) { if (!(message instanceof ActiveMQMessage)) return; ActiveMQMessage activeMQMsg = (ActiveMQMessage) message; try { Object eventData = activeMQMsg.getDataStructure(); if (eventData instanceof ConsumerEvent) { ConsumerEvent consumerEvent = (ConsumerEvent) eventData; // Verify the event is for our target Topic String eventTopic = consumerEvent.getDestination().getPhysicalName(); if (targetTopicName.equals(eventTopic)) { // Update the count with the latest value from the broker currentConsumerCount.set(consumerEvent.getConsumerCount()); System.out.printf("Updated consumer count for %s: %d%n", targetTopicName, currentConsumerCount.get()); } } } catch (JMSException e) { // Handle exception (log, alert, etc.) e.printStackTrace(); } } public int getCurrentConsumerCount() { return currentConsumerCount.get(); } }
Step 2: Configure a Message Listener Container
Use Spring's DefaultMessageListenerContainer to attach your listener to the Advisory Topic:
@Configuration public class ActiveMQAdvisoryConfig { @Bean public DefaultMessageListenerContainer advisoryListenerContainer( CachingConnectionFactory connectionFactory, TopicConsumerTracker consumerTracker) { DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // Point to the Advisory Topic for your target Topic container.setDestination(new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.abc.xyz")); container.setMessageListener(consumerTracker); container.setConcurrentConsumers(1); return container; } }
Step 3: Use the Count in Your Message Sender
Inject the tracker into your sending service to check counts before sending:
@Service public class SafeTopicMessageSender { @Autowired private JmsTemplate jmsTemplate; @Autowired private TopicConsumerTracker consumerTracker; @Autowired private ActiveMQTopic abcXyzTopic; // Your existing Topic @Bean public void sendMessageIfSafe(String messageContent) { int consumerCount = consumerTracker.getCurrentConsumerCount(); if (consumerCount >= 1) { jmsTemplate.convertAndSend(abcXyzTopic, messageContent); System.out.printf("Message sent to %s - %d active consumers%n", abcXyzTopic.getPhysicalName(), consumerCount); } else { System.out.println("No active consumers - message not sent to avoid loss"); } } }
Option 2: Direct Broker Statistics Query
If you prefer to check the count directly before each send (for absolute accuracy, even if it adds minor broker overhead), use ActiveMQ's admin API to query the broker's statistics.
Step 1: Create a Broker Statistics Service
This service uses ActiveMQ's extended API to fetch the consumer count:
@Service public class BrokerStatsService { @Autowired private CachingConnectionFactory connectionFactory; public int getTopicConsumerCount(String topicName) throws JMSException { try (Connection connection = connectionFactory.createConnection()) { if (!(connection instanceof ActiveMQConnection)) { return 0; } ActiveMQConnection activeMQConn = (ActiveMQConnection) connection; // Use ActiveMQ's admin API to get Topic statistics BrokerAdmin brokerAdmin = activeMQConn.getBrokerAdmin(); TopicStatistics topicStats = brokerAdmin.getTopicStatistics(topicName); return topicStats != null ? topicStats.getConsumerCount() : 0; } } }
Step 2: Integrate with Your Message Sender
Update your sender to use this service for pre-send checks:
@Service public class SafeTopicMessageSender { @Autowired private JmsTemplate jmsTemplate; @Autowired private BrokerStatsService statsService; @Autowired private ActiveMQTopic abcXyzTopic; public void sendMessageIfSafe(String messageContent) { try { int consumerCount = statsService.getTopicConsumerCount(abcXyzTopic.getPhysicalName()); if (consumerCount >= 1) { jmsTemplate.convertAndSend(abcXyzTopic, messageContent); System.out.printf("Message sent to %s - %d active consumers%n", abcXyzTopic.getPhysicalName(), consumerCount); } else { System.out.println("No active consumers - message not sent to avoid loss"); } } catch (JMSException e) { // Handle exception (log, fallback logic, etc.) e.printStackTrace(); } } }
- JMS 2.0 Limitation: JMS 2.0 doesn't define a standard API for retrieving consumer counts, so you must use ActiveMQ's proprietary extensions (like
ActiveMQConnectionandBrokerAdmin) for this functionality. - Initial Count Sync: For the event-driven approach, when your application starts, you may want to run a one-time query (using the broker stats service) to initialize the consumer count in case consumers were already subscribed before your listener started.
- WebSocket Clients: Non-persistent WebSocket subscribers will trigger Advisory events just like regular JMS consumers—their connection/disconnection will update the count automatically.
- CachingConnectionFactory: Both approaches work with
CachingConnectionFactorysince we're creating connections correctly (and closing them in the stats service with try-with-resources).
内容的提问来源于stack exchange,提问作者Manuel Jordan




