You need to enable JavaScript to run this app.
导航
普通消息
最近更新时间:2025.05.09 10:28:39首次发布时间:2025.05.09 10:28:39
我的收藏
有用
有用
无用
无用

火山引擎消息队列 RocketMQ 5.x版本提供同步发送、异步发送两种方式来发送普通消息。本文介绍如何通过不同方式发送普通消息。发送普通消息前请在控制台创建普通消息类型topic

同步发送消息

同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。一般用于较为重要的消息发送场景。
同步发送方式发送普通消息的示例代码如下。

public class ProducerNormalMessageSync {
    private static final Logger log = LoggerFactory.getLogger(ProducerNormalMessageSync.class);

    private ProducerNormalMessageSync() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        /*
         * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。
         * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
         */
        String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080";
        String accessKey = "xxxx";
        String secretKey = "xxxx";
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // 暂时不支持ssl模式
                .enableSsl(false)
                .setCredentialProvider(sessionCredentialsProvider)
                .build();
        String topic = "yourTopicName";
        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();
        // Define your message body.
        byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";
        final Message message = provider.newMessageBuilder()
                // Set topic for the current message.
                .setTopic(topic)
                // Message secondary classifier of message besides topic.
                .setTag(tag)
                // Key(s) of the message, another way to mark message besides message id.
                .setKeys("yourMessageKey-1c151062f96e")
                .setBody(body)
                .build();
        try {
            final SendReceipt sendReceipt = producer.send(message);
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (Throwable t) {
            log.error("Failed to send message", t);
        }
        // Close the producer when you don't need it anymore.
        // You could close it manually or add this into the JVM shutdown hook.
        producer.close();
    }
}

异步发送

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。异步发送可以避免线程阻塞,允许程序继续执行其他任务,从而提高系统的吞吐量和性能。
异步发送方式发送普通消息的示例代码如下。

public class ProducerNormalMessageAsync {
    private static final Logger log = LoggerFactory.getLogger(ProducerNormalMessageAsync.class);

    private ProducerNormalMessageAsync() {
    }

    public static void main(String[] args) throws ClientException, InterruptedException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        /*
         * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。
         * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
         */
        String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080";
        String accessKey = "xxxx";
        String secretKey = "xxxx";
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // 暂时不支持ssl模式
                .enableSsl(false)
                .setCredentialProvider(sessionCredentialsProvider)
                .build();
        String topic = "topicName";
        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();
        // Define your message body.
        byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";
        final Message message = provider.newMessageBuilder()
                // Set topic for the current message.
                .setTopic(topic)
                // Message secondary classifier of message besides topic.
                .setTag(tag)
                // Key(s) of the message, another way to mark message besides message id.
                .setKeys("yourMessageKey-1c151062f96e")
                .setBody(body)
                .build();
        // Set individual thread pool for send callback.
        final CompletableFuture<SendReceipt> future = producer.sendAsync(message);
        ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
        future.whenCompleteAsync((sendReceipt, throwable) -> {
            if (null != throwable) {
                log.error("Failed to send message", throwable);
                // Return early.
                return;
            }
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        }, sendCallbackExecutor);
        // Block to avoid exist of background threads.
        Thread.sleep(Long.MAX_VALUE);
        // Close the producer when you don't need it anymore.
        // You could close it manually or add this into the JVM shutdown hook.
        producer.close();
    }
}

订阅普通消息

5.x RocketMQ支持两种消费模式,分别为 Push Consumer 和 Simple Consumer。前者为服务端推送消息,后者为主动拉取消息,推荐使用Push Consumer
Push Consumer消费示例代码如下:

public class PushConsumerExample {
    private static final Logger log = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, InterruptedException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        /*
         * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。
         * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
         */
        String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080";
        String accessKey = "xxxx";
        String secretKey = "xxxx";
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // 暂时不支持ssl模式
                .enableSsl(false)
                .setCredentialProvider(sessionCredentialsProvider)
                .build();        
        String tag = "yourMessageTagA";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        String consumerGroup = "yourConsumerGroup";
        String topic = "yourTopic";
        // In most case, you don't need to create too many consumers, singleton pattern is recommended.
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            // Set the consumer group name.
            .setConsumerGroup(consumerGroup)
            // Set the subscription for the consumer.
            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
            .setMessageListener(messageView -> {
                // Handle the received message and return consume result.
                log.info("Consume message={}", messageView);
                return ConsumeResult.SUCCESS;
            })
            .build();
        // Block the main thread, no need for production environment.
        Thread.sleep(Long.MAX_VALUE);
        // Close the push consumer when you don't need it anymore.
        // You could close it manually or add this into the JVM shutdown hook.
        pushConsumer.close();
    }
}