You need to enable JavaScript to run this app.
导航

消息队列 RocketMQ版生成消息轨迹

最近更新时间2022.04.12 09:50:28

首次发布时间2022.04.12 09:50:28

前言

在本教程中,您将学习如何使用JAVA开源SDK在火山消息队列 RocketMQ版上生成消息轨迹。

关于实验

预计部署时间:30分钟
级别:初级
相关产品:消息队列 RocketMQ版 云服务器
受众: 通用

环境说明
  1. 如果还没有火山引擎账号,点击此链接注册账号

  2. 如果您还没有VPC,请先点击链接创建VPC

  3. 消息队列 RocketMQ版,点击此链接创建

  4. 云服务器ECS:Centos 7

实验步骤

步骤1:创建消息队列 RocketMQ版实例

进入在控制台创建RocketMQ实例,并配置Topic、Group、以及秘钥,详见RocketMQ创建文档

步骤2: 配置密钥的权限,设置默认权限为发布和订阅

  1. 选择实例,选择密钥管理,然后点击查看权限详情

  1. 修改默认权限为发布、订阅

步骤3:在新创建的ECS上部署代码

  1. pom.xml依赖文件如下
<?xml version= 1.0  encoding= UTF-8 ?>
<project xmlns= http://maven.apache.org/POM/4.0.0 
         xmlns:xsi= http://www.w3.org/2001/XMLSchema-instance 
         xsi:schemaLocation= http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd >
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.alibaba.ons</groupId>
    <artifactId>apache-rocketmq-demo</artifactId>
    <name>apache-rocketmq-demo</name>
    <version>1.0-SNAPSHOT</version>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!-- compiler settings properties -->
        <java_source_version>1.6</java_source_version>
        <java_target_version>1.6</java_target_version>
        <file_encoding>UTF-8</file_encoding>

    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.1</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>${java_source_version}</source>
                    <target>${java_target_version}</target>
                    <encoding>${file_encoding}</encoding>
                    <showDeprecation>true</showDeprecation>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
  1. 配置文件MqConfig文件如下
/**
 * <p>
 * Licensed under the Apache License, Version 2.0 (the  License );
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an  AS IS  BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.bytedance.demo;

/**
 * MQ 配置
 */
public class MqConfig {
    /**
 *      * 启动测试之前请替换如下 XXX 为您的配置
 *           */
    public static final String TOPIC =  lxb ;
    public static final String GROUP_ID =  GID_lxb ;
    public static final String ORDER_TOPIC =  lxb ;
    public static final String ORDER_GROUP_ID =  GID_lxb ;
    public static final String ACCESS_KEY =  ZNmfp17*****LYaXDguy ;
    public static final String SECRET_KEY =  qO45DwJ******crPaUOTsF ;
    public static final String TAG =  mq_test_tag ;

    /**
 *      * https://console.volcengine.com/rocketmq/region:rocketmq+cn-beijing/instance 通过 实例概览--服务访问--TCP协议接入点 获取
 *           */
    public static final String NAMESRV_ADDR =  http://MQ_INST_******a25tr_mrecx.rocketmq.ivolces.com:9876 ;
}
  1. 生产者的配置文件如下:
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the  License ); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an  AS IS  BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.bytedance.demo.producer;

import com.bytedance.demo.MqConfig;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQProducer {

    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(MqConfig.ACCESS_KEY, MqConfig.SECRET_KEY));
    }

    public static void main(String[] args) throws MQClientException {

        DefaultMQProducer producer = new DefaultMQProducer(MqConfig.GROUP_ID, getAclRPCHook(), true,  RMQ_SYS_TRACE_TOPIC );
    
        producer.setNamesrvAddr(MqConfig.NAMESRV_ADDR);
        producer.start();

        for (int i = 0; i < 3; i++) {
            try {

                Message msg = new Message(MqConfig.TOPIC,
                    MqConfig.TAG,
                     Hello Bytedance .getBytes());
                SendResult sendResult = producer.send(msg);
                System.out.printf(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}
  1. 消费者的配置文件如下:
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the  License ); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an  AS IS  BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.bytedance.demo.consumer;

import com.bytedance.demo.MqConfig;
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQConsumer {

    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(MqConfig.ACCESS_KEY, MqConfig.SECRET_KEY));
    }

    public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MqConfig.GROUP_ID, getAclRPCHook(), new AllocateMessageQueueAveragely(), true, RMQ_SYS_TRACE_TOPIC );

::wq::
        consumer.setNamesrvAddr(MqConfig.NAMESRV_ADDR);
        consumer.subscribe(MqConfig.TOPIC,  * );

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf( %s Receive New Messages: %s %n , Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf( Consumer Started.%n );
    }
}

步骤4:查看消息轨迹

  1. 点击消息查询,选择Topic名称,输入消息消费的时间段

  1. 点击查看详情,可以看到具体的消息轨迹