You need to enable JavaScript to run this app.
导航

普通消息

最近更新时间2023.10.31 11:56:20

首次发布时间2023.07.04 21:17:30

火山引擎消息队列 RocketMQ版提供同步发送和单向(Oneway)发送两种方式来发送普通消息。本文介绍如何通过不同方式发送普通消息。

前提条件

发送普通消息

发送普通消息的示例代码如下,使用 g++ 命令进行编译。

g++ -o producer producer.cpp -lrocketmq -lpthread -lz -ldl -lrt

如果页面提示 undefined reference 相关的编译错误,请先确定是否已安装动态库,如果确定已安装,可以尝试在 g++ 命令添加 -D_GLIBCXX_USE_CXX11_ABI=0 参数重新尝试编译。

#include <iostream>
#include <chrono>
#include <thread>
#include <string>
#include "rocketmq/DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

int main()
{
    // 生产者名称无需申请
    DefaultMQProducer producer("producer_group_name");
    // 火山引擎的接入点
    producer.setNamesrvAddr("your access point");
    // AK、SK替换为RocketMQ实例的AccessKey和AccessKey Secret
    // 用户渠道,可以标明和用户相关即可,无需申请。
    producer.setSessionCredentials("ak", "sk", "VOLC");

    // 请确保参数设置完成之后启动Producer。
    producer.start();
    int count = 64;
    for (int i = 0; i < count; ++i)
    {
        // 配置消息写入的Topic名称。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
        MQMessage msg("topic_name", "TAG", "msg content");
        try
        {
            SendResult sendResult = producer.send(msg);
            std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        }
        catch (MQException e)
        {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    std::cout << "Send " << count << " messages OK, costs" << std::endl;

    producer.shutdown();
    return 0;
}

订阅普通消息

订阅普通消息的示例代码如下,使用 g++ 命令进行编译。

g++ -o consumer consumer.cpp  -lrocketmq -lpthread -lz -ldl -lrt

如果页面提示 undefined reference 相关的编译错误,请先确定是否已安装动态库,如果确定已安装,可以尝试在 g++ 命令添加 -D_GLIBCXX_USE_CXX11_ABI=0 参数重新尝试编译。

#include <iostream>
#include <thread>
#include "rocketmq/DefaultMQPushConsumer.h"

using namespace rocketmq;

class ConcurrentMessageListener : public MessageListenerConcurrently
{
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs)
    {
        for (auto item = msgs.begin(); item != msgs.end(); item++)
        {
            std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[])
{
    // 您在火山引擎消息队列 RocketMQ控制台上申请的GID。
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_group");
    // 从火山引擎消息队列 RocketMQ控制台的实例详情页面获取。
    consumer->setNamesrvAddr("your access point");
    // AK、SK替换为RocketMQ实例的AccessKey和AccessKey Secret
    // 用户渠道,可以标明和用户相关即可,无需申请。
    consumer->setSessionCredentials("ak", "sk", "VOLC");

    // 请注册自定义监听函数用来处理接收到的消息,并返回响应的处理结果。
    ConcurrentMessageListener *messageListener = new ConcurrentMessageListener();
    consumer->subscribe("topic_name", "tag");
    consumer->registerMessageListener(messageListener);

    // 准备工作完成,必须调用启动函数,才可以正常工作。
    //  ********************************************
    //  1.确保订阅关系的设置在启动之前完成。
    //  2.确保相同GID下面的消费者的订阅关系一致。
    //  *********************************************
    consumer->start();

    // 保持主线程运行直到进程结束
    std::this_thread::sleep_for(std::chrono::seconds(60));
    consumer->shutdown();
    return 0;
}