本文提供使用 Java SDK 收发延迟消息的示例代码供您参考。
发送延迟消息需要在控制台申请延迟消息类型的topic,rocketmq 5.x版本支持任意精度的延迟消息,发送延迟消息的示例代码如下。
package org.apache.rocketmq.client.java.example; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; public class ProducerDelayMessage { private static final Logger log = LoggerFactory.getLogger(ProducerDelayMessage.class); private ProducerDelayMessage() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080"; String accessKey = "xxxx"; String secretKey = "xxxx"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // 暂时不支持ssl模式 .enableSsl(false) .setCredentialProvider(sessionCredentialsProvider) .build(); String topic = "t-delay"; final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); // Define your message body. byte[] body = "This is a delay message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; Duration messageDelayTime = Duration.ofSeconds(10); final Message message = provider.newMessageBuilder() // Set topic for the current message. .setTopic(topic) // Message secondary classifier of message besides topic. .setTag(tag) // Key(s) of the message, another way to mark message besides message id. .setKeys("yourMessageKey-3ee439f945d7") // Set expected delivery timestamp of message. .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis()) .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); } // Close the producer when you don't need it anymore. // You could close it manually or add this into the JVM shutdown hook. producer.close(); } }
延迟消息的订阅方式与普通消息一致,示例代码如下所示。
public class RocketMQDelayConsumer { private static final Logger log = LoggerFactory.getLogger(RocketMQDelayConsumer.class); private RocketMQDelayConsumer() { } public static void main(String[] args) throws ClientException, InterruptedException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080"; String accessKey = "xxxx"; String secretKey = "xxxx"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // 暂时不支持ssl模式 .enableSsl(false) .setCredentialProvider(sessionCredentialsProvider) .build(); String tag = "yourMessageTagA"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); String consumerGroup = "yourConsumerGroup"; String topic = "yourTopic" // In most case, you don't need to create too many consumers, singleton pattern is recommended. PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView -> { // Handle the received message and return consume result. log.info("Consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); // Block the main thread, no need for production environment. Thread.sleep(Long.MAX_VALUE); // Close the push consumer when you don't need it anymore. // You could close it manually or add this into the JVM shutdown hook. pushConsumer.close(); } }