You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spring JMS如何借助ActiveMQ Advisory Message获取Topic消费者数量?

Great question! Let's tackle both of your concerns step by step—first clarifying how Advisory Messages work, then walking through concrete implementations using Spring and ActiveMQ APIs to meet your requirement.


Understanding Advisory Message Triggers

First, let's clear up your confusion about who triggers the Advisory Topic messages: the ActiveMQ Broker itself handles this automatically.

When a consumer subscribes to your target Topic (e.g., abc.xyz), the broker immediately sends an Advisory message to ActiveMQ.Advisory.Consumer.Topic.abc.xyz notifying listeners that the consumer count has increased. Similarly, when a consumer unsubscribes or disconnects (including non-persistent WebSocket clients), the broker sends another Advisory message updating the count to reflect the change.

Your onMessage method will be invoked automatically whenever these events occur—you don't need to manually trigger anything. The broker pushes these events to all listeners subscribed to the Advisory Topic.


Implementing Consumer Count Checks with Spring + ActiveMQ

Your goal is to check the consumer count before sending to a Topic, and you're using CachingConnectionFactory with Spring. Below are two reliable approaches, including how to leverage ActiveMQ's extensions (since JMS 2.0 doesn't include a standard API for consumer count queries).

Option 1: Event-Driven Tracking via Advisory Messages

This approach maintains a local, real-time count of consumers by listening to Advisory events. It's efficient for frequent message sends since it avoids repeated broker queries.

Step 1: Create an Advisory Listener Component

This component will listen to the Advisory Topic and update a consumer count:

@Component
public class TopicConsumerTracker implements MessageListener {

    // AtomicInteger ensures thread-safe updates
    private final AtomicInteger currentConsumerCount = new AtomicInteger(0);
    private final String targetTopicName = "abc.xyz";

    @Override
    public void onMessage(Message message) {
        if (!(message instanceof ActiveMQMessage)) return;

        ActiveMQMessage activeMQMsg = (ActiveMQMessage) message;
        try {
            Object eventData = activeMQMsg.getDataStructure();
            if (eventData instanceof ConsumerEvent) {
                ConsumerEvent consumerEvent = (ConsumerEvent) eventData;
                // Verify the event is for our target Topic
                String eventTopic = consumerEvent.getDestination().getPhysicalName();
                if (targetTopicName.equals(eventTopic)) {
                    // Update the count with the latest value from the broker
                    currentConsumerCount.set(consumerEvent.getConsumerCount());
                    System.out.printf("Updated consumer count for %s: %d%n", targetTopicName, currentConsumerCount.get());
                }
            }
        } catch (JMSException e) {
            // Handle exception (log, alert, etc.)
            e.printStackTrace();
        }
    }

    public int getCurrentConsumerCount() {
        return currentConsumerCount.get();
    }
}

Step 2: Configure a Message Listener Container

Use Spring's DefaultMessageListenerContainer to attach your listener to the Advisory Topic:

@Configuration
public class ActiveMQAdvisoryConfig {

    @Bean
    public DefaultMessageListenerContainer advisoryListenerContainer(
            CachingConnectionFactory connectionFactory,
            TopicConsumerTracker consumerTracker) {

        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // Point to the Advisory Topic for your target Topic
        container.setDestination(new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.abc.xyz"));
        container.setMessageListener(consumerTracker);
        container.setConcurrentConsumers(1);
        return container;
    }
}

Step 3: Use the Count in Your Message Sender

Inject the tracker into your sending service to check counts before sending:

@Service
public class SafeTopicMessageSender {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private TopicConsumerTracker consumerTracker;

    @Autowired
    private ActiveMQTopic abcXyzTopic; // Your existing Topic @Bean

    public void sendMessageIfSafe(String messageContent) {
        int consumerCount = consumerTracker.getCurrentConsumerCount();
        if (consumerCount >= 1) {
            jmsTemplate.convertAndSend(abcXyzTopic, messageContent);
            System.out.printf("Message sent to %s - %d active consumers%n", abcXyzTopic.getPhysicalName(), consumerCount);
        } else {
            System.out.println("No active consumers - message not sent to avoid loss");
        }
    }
}

Option 2: Direct Broker Statistics Query

If you prefer to check the count directly before each send (for absolute accuracy, even if it adds minor broker overhead), use ActiveMQ's admin API to query the broker's statistics.

Step 1: Create a Broker Statistics Service

This service uses ActiveMQ's extended API to fetch the consumer count:

@Service
public class BrokerStatsService {

    @Autowired
    private CachingConnectionFactory connectionFactory;

    public int getTopicConsumerCount(String topicName) throws JMSException {
        try (Connection connection = connectionFactory.createConnection()) {
            if (!(connection instanceof ActiveMQConnection)) {
                return 0;
            }
            ActiveMQConnection activeMQConn = (ActiveMQConnection) connection;
            // Use ActiveMQ's admin API to get Topic statistics
            BrokerAdmin brokerAdmin = activeMQConn.getBrokerAdmin();
            TopicStatistics topicStats = brokerAdmin.getTopicStatistics(topicName);
            return topicStats != null ? topicStats.getConsumerCount() : 0;
        }
    }
}

Step 2: Integrate with Your Message Sender

Update your sender to use this service for pre-send checks:

@Service
public class SafeTopicMessageSender {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private BrokerStatsService statsService;

    @Autowired
    private ActiveMQTopic abcXyzTopic;

    public void sendMessageIfSafe(String messageContent) {
        try {
            int consumerCount = statsService.getTopicConsumerCount(abcXyzTopic.getPhysicalName());
            if (consumerCount >= 1) {
                jmsTemplate.convertAndSend(abcXyzTopic, messageContent);
                System.out.printf("Message sent to %s - %d active consumers%n", abcXyzTopic.getPhysicalName(), consumerCount);
            } else {
                System.out.println("No active consumers - message not sent to avoid loss");
            }
        } catch (JMSException e) {
            // Handle exception (log, fallback logic, etc.)
            e.printStackTrace();
        }
    }
}

Key Notes & Considerations
  • JMS 2.0 Limitation: JMS 2.0 doesn't define a standard API for retrieving consumer counts, so you must use ActiveMQ's proprietary extensions (like ActiveMQConnection and BrokerAdmin) for this functionality.
  • Initial Count Sync: For the event-driven approach, when your application starts, you may want to run a one-time query (using the broker stats service) to initialize the consumer count in case consumers were already subscribed before your listener started.
  • WebSocket Clients: Non-persistent WebSocket subscribers will trigger Advisory events just like regular JMS consumers—their connection/disconnection will update the count automatically.
  • CachingConnectionFactory: Both approaches work with CachingConnectionFactory since we're creating connections correctly (and closing them in the stats service with try-with-resources).

内容的提问来源于stack exchange,提问作者Manuel Jordan

火山引擎 最新活动