You need to enable JavaScript to run this app.
导航

普通消息

最近更新时间2023.04.21 14:05:13

首次发布时间2022.01.28 15:16:40

火山引擎消息队列 RocketMQ版提供同步发送、异步发送和单向(Oneway)发送三种方式来发送普通消息。本文介绍如何通过不同方式发送普通消息。

前提条件

您已完成准备工作

发送方式

火山引擎消息队列 RocketMQ版提供的普通消息发送方式包括以下三种,您可以根据业务要求选择合适的发送方式。

同步发送

异步发送

单向发送

发送方式

消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息。

消息发送方发出一条消息后,不等服务端返回响应,直接发送下一条消息。

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

应用场景

重要通知邮件、报名短信通知、营销短信系统等。

一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

TPS

最快

是否反馈发送结果

反馈

反馈

不反馈

可靠性

不丢失消息

不丢失消息

可能丢失消息

同步发送

同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。一般用于较为重要的消息发送场景。
同步发送方式发送普通消息的示例代码如下。

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
    //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的 GID,以及替换为RocketMQ实例的AccessKey ID和AccessKey Secret。
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
    }

    public static void main(String[] args) throws MQClientException {
        /**
         * 创建 Producer
         * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:9876”。
         * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
         */
        DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true)
        //producer.setUseTLS(true);
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                    "YOUR MESSAGE TAG",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                //消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        //在应用退出前,销毁Producer对象。
        producer.shutdown();
    }
}

异步发送

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
火山引擎消息队列 RocketMQ版的异步发送,需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
异步发送的示例代码如下。

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQAsyncProducer {
    //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为RocketMQ实例的AccessKey ID和AccessKey Secret。
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
    }

    public static void main(String[] args) throws MQClientException {
        /**
         * 创建Producer
         * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:9876”。
         * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
         */
        DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true)
        //producer.setUseTLS(true);
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                    "YOUR MESSAGE TAG",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override public void onSuccess(SendResult result) {
                        // 消费发送成功。
                        System.out.println("send message success. msgId= " + result.getMsgId());
                    }

                    @Override public void onException(Throwable throwable) {
                        // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                        System.out.println("send message failed.");
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // 在应用退出前,销毁Producer对象。
        // 注意:如果不销毁也没有问题。
        producer.shutdown();
    }
}

单向发送

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
单向发送适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集等场景。
单向发送的示例代码如下。

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQOnewayProducer {
    //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为RocketMQ实例的AccessKey ID和AccessKey Secret。
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
    }

    public static void main(String[] args) throws MQClientException {
        /**
         * 创建Producer
         * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.volces.com.com:9876”。
         * 设置RocketMQ密钥,即RocketMQ实例的AccessKey ID和AccessKey Secret。
         */
        DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true)
        //producer.setUseTLS(true);
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                    "YOUR MESSAGE TAG",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.sendOneway(msg);
            } catch (Exception e) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // 在应用退出前,销毁Producer对象。
        // 注意:如果不销毁也没有问题。
        producer.shutdown();
    }
}

订阅普通消息

您可以参考以下示例代码订阅普通消息。

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为RocketMQ实例的AccessKey ID和AccessKey Secret
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
    }
    public static void main(String[] args) throws MQClientException {
        //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID, 以及替换为RocketMQ实例的AccessKey ID和AccessKey Secret。
        //设置为火山引擎消息队列 RocketMQ版实例的接入点。
        // 设置为您在火山引擎消息队列 RocketMQ版控制台上创建的Topic。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr("http://{INSTANCE_ID}.rocketmq.volces.com.com:9876");
        consumer.subscribe("YOUR TOPIC", "*");
        //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true)
        //consumer.setUseTLS(true);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("Receive New Messages: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}