You need to enable JavaScript to run this app.
导航
顺序消息
最近更新时间:2025.05.09 10:28:39首次发布时间:2025.05.09 10:28:39
我的收藏
有用
有用
无用
无用

消息队列 RocketMQ 5.x版本提供顺序消息(FIFO消息)供您使用。在顺序消息模型中,您需要严格按照顺序来发布和消费消息。本文提供使用 Java SDK 收发顺序消息的示例代码供您参考。

背景信息

顺序消息分为两类,全局顺序消息和分区顺序消息。区别仅为队列数量不同,代码没有区别。

  • 全局顺序:
    对于指定的一个 Topic,所有消息的生产和消费需要遵循一定的顺序,消息的消费顺序必须和生产顺序一致,即需要严格的先入先出 FIFO(First In First Out)的顺序进行发布和消费。
  • 分区顺序:
    对于指定的一个 Topic,其中每一个分区的消息生产与消费是有序的,同一个队列内的消息按照严格的 FIFO 顺序进行发布和订阅。消息投递到哪一个分区由消息的 messageGroup 来进行区分

注意顺序消息,需要在控制台申请顺序类型的topic以及顺序类型的group

发顺序消息

发送顺序消息的示例代码如下。

public class RocketMQOrderProducer {
    private static final Logger log = LoggerFactory.getLogger(RocketMQOrderProducer.class);

    private RocketMQOrderProducer() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

       /*
         * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。
         * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
         */
        String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080";
        String accessKey = "xxxx";
        String secretKey = "xxxx";
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // 暂时不支持ssl模式
                .enableSsl(false)
                .setCredentialProvider(sessionCredentialsProvider)
                .build();
        // Define your message body.
        String topic = "yourTopicName"; // 控制台申请顺序类型topic
        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();
        byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";
        final Message message = provider.newMessageBuilder()
            // Set topic for the current message.
            .setTopic(topic)
            // Message secondary classifier of message besides topic.
            .setTag(tag)
            // Key(s) of the message, another way to mark message besides message id.
            .setKeys("yourMessageKey-1ff69ada8e0e")
            // Message group decides the message delivery order.
            .setMessageGroup("yourMessageGroup0")
            .setBody(body)
            .build();
        try {
            final SendReceipt sendReceipt = producer.send(message);
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (Throwable t) {
            log.error("Failed to send message", t);
        }
        // Close the producer when you don't need it anymore.
        // You could close it manually or add this into the JVM shutdown hook.
        producer.close();
    }
}

订阅顺序消息

订阅顺序消息的示例代码如下,要注意group一定是申请的顺序类型

public class RocketMQOrderConsumer {
    private static final Logger log = LoggerFactory.getLogger(RocketMQOrderConsumer.class);

    private RocketMQOrderConsumer() {
    }

    public static void main(String[] args) throws ClientException, InterruptedException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        /*
         * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。
         * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
         */
        String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080";
        String accessKey = "xxxx";
        String secretKey = "xxxx";
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // 暂时不支持ssl模式
                .enableSsl(false)
                .setCredentialProvider(sessionCredentialsProvider)
                .build();        
        String tag = "yourMessageTagA";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        String consumerGroup = "yourConsumerGroup"; // 控制台申请顺序类型group
        String topic = "yourTopic"; // 控制台申请的顺序类型topic
        // In most case, you don't need to create too many consumers, singleton pattern is recommended.
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            // Set the consumer group name.
            .setConsumerGroup(consumerGroup)
            // Set the subscription for the consumer.
            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
            .setMessageListener(messageView -> {
                // Handle the received message and return consume result.
                log.info("Consume message={}", messageView);
                return ConsumeResult.SUCCESS;
            })
            .build();
        // Block the main thread, no need for production environment.
        Thread.sleep(Long.MAX_VALUE);
        // Close the push consumer when you don't need it anymore.
        // You could close it manually or add this into the JVM shutdown hook.
        pushConsumer.close();
    }
}