火山引擎消息队列 RocketMQ 5.x版本提供同步发送、异步发送两种方式来发送普通消息。本文介绍如何通过不同方式发送普通消息。发送普通消息前请在控制台创建普通消息类型topic
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。一般用于较为重要的消息发送场景。
同步发送方式发送普通消息的示例代码如下。
public class ProducerNormalMessageSync { private static final Logger log = LoggerFactory.getLogger(ProducerNormalMessageSync.class); private ProducerNormalMessageSync() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080"; String accessKey = "xxxx"; String secretKey = "xxxx"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // 暂时不支持ssl模式 .enableSsl(false) .setCredentialProvider(sessionCredentialsProvider) .build(); String topic = "yourTopicName"; final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); // Define your message body. byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; final Message message = provider.newMessageBuilder() // Set topic for the current message. .setTopic(topic) // Message secondary classifier of message besides topic. .setTag(tag) // Key(s) of the message, another way to mark message besides message id. .setKeys("yourMessageKey-1c151062f96e") .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); } // Close the producer when you don't need it anymore. // You could close it manually or add this into the JVM shutdown hook. producer.close(); } }
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。异步发送可以避免线程阻塞,允许程序继续执行其他任务,从而提高系统的吞吐量和性能。
异步发送方式发送普通消息的示例代码如下。
public class ProducerNormalMessageAsync { private static final Logger log = LoggerFactory.getLogger(ProducerNormalMessageAsync.class); private ProducerNormalMessageAsync() { } public static void main(String[] args) throws ClientException, InterruptedException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080"; String accessKey = "xxxx"; String secretKey = "xxxx"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // 暂时不支持ssl模式 .enableSsl(false) .setCredentialProvider(sessionCredentialsProvider) .build(); String topic = "topicName"; final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); // Define your message body. byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; final Message message = provider.newMessageBuilder() // Set topic for the current message. .setTopic(topic) // Message secondary classifier of message besides topic. .setTag(tag) // Key(s) of the message, another way to mark message besides message id. .setKeys("yourMessageKey-1c151062f96e") .setBody(body) .build(); // Set individual thread pool for send callback. final CompletableFuture<SendReceipt> future = producer.sendAsync(message); ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool(); future.whenCompleteAsync((sendReceipt, throwable) -> { if (null != throwable) { log.error("Failed to send message", throwable); // Return early. return; } log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); }, sendCallbackExecutor); // Block to avoid exist of background threads. Thread.sleep(Long.MAX_VALUE); // Close the producer when you don't need it anymore. // You could close it manually or add this into the JVM shutdown hook. producer.close(); } }
5.x RocketMQ支持两种消费模式,分别为 Push Consumer 和 Simple Consumer。前者为服务端推送消息,后者为主动拉取消息,推荐使用Push Consumer
Push Consumer消费示例代码如下:
public class PushConsumerExample { private static final Logger log = LoggerFactory.getLogger(PushConsumerExample.class); private PushConsumerExample() { } public static void main(String[] args) throws ClientException, InterruptedException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080"; String accessKey = "xxxx"; String secretKey = "xxxx"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // 暂时不支持ssl模式 .enableSsl(false) .setCredentialProvider(sessionCredentialsProvider) .build(); String tag = "yourMessageTagA"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); String consumerGroup = "yourConsumerGroup"; String topic = "yourTopic"; // In most case, you don't need to create too many consumers, singleton pattern is recommended. PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView -> { // Handle the received message and return consume result. log.info("Consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); // Block the main thread, no need for production environment. Thread.sleep(Long.MAX_VALUE); // Close the push consumer when you don't need it anymore. // You could close it manually or add this into the JVM shutdown hook. pushConsumer.close(); } }