You need to enable JavaScript to run this app.
导航

步骤三:生产消费普通消息

最近更新时间2024.04.19 11:50:04

首次发布时间2022.04.24 22:01:21

准备开发环境,并创建相关服务资源之后,您可以调用开源 TCP 协议的 RocketMQ SDK 收发普通消息。本文档以 TCP 协议的 Java SDK 为例,介绍消息队列 RocketMQ版收发普通消息的基本步骤。

注意事项

在使用 Java SDK 接入火山引擎消息队列 RocketMQ版收发消息时,需要配置相应的消息生产或消费参数。您可以参考参数说明,了解相关的参数信息。消息队列 RocketMQ版提供常见场景的消息生产与消费示例代码,您也可以参考示例代码中的注释,了解各个关键参数。详细信息请参考SDK 参考

前提条件

  • 已配置开发环境,并创建了 RocketMQ 实例等相关资源。详细操作步骤请准备环境创建资源
  • 如果需要通过公网访问 RocketMQ 实例,需要提前为实例开启公网访问,同时绑定一个相同地域的 EIP,建议该 EIP 的带宽上限大于预估的公网业务流量峰值。操作步骤请参考开启公网访问

下载 SDK

RocketMQ Java 客户端 SDK 支持连接火山引擎消息队列 RocketMQ版,推荐使用的客户端版本为 4.8.0,详情请参见 Java SDK 下载地址

Maven方式引入依赖

pom.xml 中添加以下依赖。

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.8.0</version>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-acl</artifactId>
  <version>4.8.0</version>
</dependency>

发送普通消息

您可以通过以下方式发送普通消息:

  • 消息队列 RocketMQ版控制台Topic管理页签中找到需要测试消息发送的 Topic,并在其对应的操作列单击发送消息。详细说明请参考在线调试
    建议仅在业务调试阶段进行普通消息的发送测试。业务正常运行期间通过该方式发送消息可能会产生脏数据。
  • 下载并安装 RocketMQ 客户端 Java SDK 后,运行以下示例代码,异步发送普通消息。

成功发送消息之后,如果可以通过消息查询功能检索到这条消息,表示消息已成功发送到服务端。查询消息的操作步骤请参考消息查询

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQAsyncProducer {
    //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为您的AccessKey ID和AccessKey Secret。
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
    }

    public static void main(String[] args) throws MQClientException {
        /**
         * 创建Producer
         * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,例如TCP私网接入点(VPC)为“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:9876”。
         * 设置AccessKey和Secret Key
         */
        DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true)
        //producer.setUseTLS(true);
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                    "YOUR MESSAGE TAG",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override public void onSuccess(SendResult result) {
                        // 消费发送成功。
                        System.out.println("send message success. msgId= " + result.getMsgId());
                    }

                    @Override public void onException(Throwable throwable) {
                        // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                        System.out.println("send message failed.");
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // 在应用退出前,销毁Producer对象。
        // 注意:如果不销毁也没有问题。
        producer.shutdown();
    }
}

订阅普通消息

成功发送消息之后,需要启动消费者客户端,订阅消息。您可以参考以下示例代码,启动消费者并订阅普通消息。

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为您的AccessKey ID和AccessKey Secret。
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
    }
    public static void main(String[] args) throws MQClientException {
        //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID, 以及替换为您火山引擎账号的AccessKey ID和AccessKey Secret。
        //设置为火山引擎消息队列 RocketMQ版实例的接入点。
        // 设置为您在火山引擎消息队列 RocketMQ版控制台上创建的Topic。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr("http://{INSTANCE_ID}.rocketmq.volces.com.com:9876");
        consumer.subscribe("YOUR TOPIC", "*");
        //如果 SSL 认证策略设置为仅SSL连接,则通过公网访问实例时必须设置setUseTLS(true)
        //consumer.setUseTLS(true);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("Receive New Messages: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}