You need to enable JavaScript to run this app.
导航
延迟消息
最近更新时间:2025.05.09 10:28:39首次发布时间:2025.05.09 10:28:39
我的收藏
有用
有用
无用
无用

本文提供使用 Java SDK 收发延迟消息的示例代码供您参考。

发送延时消息

发送延迟消息需要在控制台申请延迟消息类型的topic,rocketmq 5.x版本支持任意精度的延迟消息,发送延迟消息的示例代码如下。

package org.apache.rocketmq.client.java.example;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;

public class ProducerDelayMessage {
    private static final Logger log = LoggerFactory.getLogger(ProducerDelayMessage.class);

    private ProducerDelayMessage() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        /*
         * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。
         * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
         */
        String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080";
        String accessKey = "xxxx";
        String secretKey = "xxxx";
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // 暂时不支持ssl模式
                .enableSsl(false)
                .setCredentialProvider(sessionCredentialsProvider)
                .build();
        String topic = "t-delay";
        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();
        // Define your message body.
        byte[] body = "This is a delay message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";
        Duration messageDelayTime = Duration.ofSeconds(10);
        final Message message = provider.newMessageBuilder()
                // Set topic for the current message.
                .setTopic(topic)
                // Message secondary classifier of message besides topic.
                .setTag(tag)
                // Key(s) of the message, another way to mark message besides message id.
                .setKeys("yourMessageKey-3ee439f945d7")
                // Set expected delivery timestamp of message.
                .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis())
                .setBody(body)
                .build();
        try {
            final SendReceipt sendReceipt = producer.send(message);
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (Throwable t) {
            log.error("Failed to send message", t);
        }
        // Close the producer when you don't need it anymore.
        // You could close it manually or add this into the JVM shutdown hook.
        producer.close();
    }
}

订阅延迟消息

延迟消息的订阅方式与普通消息一致,示例代码如下所示。

public class RocketMQDelayConsumer {
    private static final Logger log = LoggerFactory.getLogger(RocketMQDelayConsumer.class);

    private RocketMQDelayConsumer() {
    }

    public static void main(String[] args) throws ClientException, InterruptedException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        /*
         * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。
         * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
         */
        String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080";
        String accessKey = "xxxx";
        String secretKey = "xxxx";
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // 暂时不支持ssl模式
                .enableSsl(false)
                .setCredentialProvider(sessionCredentialsProvider)
                .build();        
        String tag = "yourMessageTagA";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        String consumerGroup = "yourConsumerGroup";
        String topic = "yourTopic"
        // In most case, you don't need to create too many consumers, singleton pattern is recommended.
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            // Set the consumer group name.
            .setConsumerGroup(consumerGroup)
            // Set the subscription for the consumer.
            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
            .setMessageListener(messageView -> {
                // Handle the received message and return consume result.
                log.info("Consume message={}", messageView);
                return ConsumeResult.SUCCESS;
            })
            .build();
        // Block the main thread, no need for production environment.
        Thread.sleep(Long.MAX_VALUE);
        // Close the push consumer when you don't need it anymore.
        // You could close it manually or add this into the JVM shutdown hook.
        pushConsumer.close();
    }
}