You need to enable JavaScript to run this app.
导航

顺序消息

最近更新时间2023.10.31 11:56:20

首次发布时间2023.07.04 21:17:29

消息队列 RocketMQ版提供顺序消息(FIFO消息)供您使用。在顺序消息模型中,您需要严格按照顺序来发布和消费消息。本文提供使用 C++ SDK 收发顺序消息的示例代码供您参考。

背景信息

顺序消息分为两类,全局顺序消息和分区顺序消息。区别仅为队列数量不同,代码没有区别。

  • 全局顺序:
    对于指定的一个 Topic,所有消息的生产和消费需要遵循一定的顺序,消息的消费顺序必须和生产顺序一致,即需要严格的先入先出 FIFO(First In First Out)的顺序进行发布和消费。
  • 分区顺序:
    对于指定的一个 Topic,其中每一个分区的消息生产与消费是有序的,同一个队列内的消息按照严格的 FIFO 顺序进行发布和订阅。消息投递到哪一个分区由消息的 Sharding Key 来进行区分。在 SDK 中可以通过指定 Sharding Key 和 MessageQueueSelector 回调函数来控制消息投递到哪个分区。

前提条件

发送顺序消息

发送顺序消息的示例代码如下。使用 g++ 命令进行编译。

g++ -o order_producer order_producer.cpp  -lrocketmq -lpthread -lz -ldl -lrt

如果页面提示 undefined reference 相关的编译错误,请先确定是否已安装动态库,如果确定已安装,可以尝试在 g++ 命令添加 -D_GLIBCXX_USE_CXX11_ABI=0 参数重新尝试编译。

#include <iostream>
#include <chrono>
#include <thread>
#include "rocketmq/DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

class DefineSelectMessageQueue : public MessageQueueSelector
{
public:
    MQMessageQueue select(const std::vector<MQMessageQueue> &mqs, const MQMessage &msg, void *arg)
    {
        // 实现自定义分区逻辑,根据业务传入arg参数即分区键,计算路由到哪个队列,这里以arg为int型参数为例。
        int orderId = *static_cast<int *>(arg);
        int index = orderId % mqs.size();
        return mqs[index];
    }
};

int main()
{
    // 生产者名称无需申请
    DefaultMQProducer producer("producer_group_name");
    // 火山引擎的接入点
    producer.setNamesrvAddr("accesspoint");
    // AK、SK替换为RocketMQ实例的AccessKey和AccessKey Secret
    // 用户渠道,可以标明和用户相关即可,无需申请。
    producer.setSessionCredentials("ak", "sk", "volc");

    // 请确保参数设置完成之后启动Producer。
    producer.start();

    DefineSelectMessageQueue *queueSelector = new DefineSelectMessageQueue();
    int count = 64;
    for (int i = 0; i < count; ++i)
    {
        // 配置消息写入的Topic名称。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
        MQMessage msg("you_topic_name", "TAG", "msg content");
        try
        {
            SendResult sendResult = producer.send(msg, queueSelector, &i, 3, false);
            std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        }
        catch (MQException e)
        {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    std::cout << "Send " << count << " messages OK, costs" << std::endl;

    producer.shutdown();
    return 0;
}

订阅顺序消息

订阅顺序消息的示例代码如下。

g++ -o order_consumer order_consumer.cpp  -lrocketmq -lpthread -lz -ldl -lrt

如果页面提示 undefined reference 相关的编译错误,请先确定是否已安装动态库,如果确定已安装,可以尝试在 g++ 命令添加 -D_GLIBCXX_USE_CXX11_ABI=0 参数重新尝试编译。

#include <iostream>
#include <thread>
#include "rocketmq/DefaultMQPushConsumer.h"

using namespace rocketmq;

class OrderlyMessageListener : public MessageListenerOrderly
{
public:
    ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs)
    {
        for (auto item = msgs.begin(); item != msgs.end(); item++)
        {
            std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << std::endl;
        }
        return CONSUME_SUCCESS;
    }
};

int main(int argc, char *argv[])
{
    // 您在火山引擎消息队列 RocketMQ控制台上申请的GID。
    DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer("GID_group");
    // 从火山引擎消息队列 RocketMQ控制台的实例详情页面获取。
    consumer->setNamesrvAddr("your access point");
    // AK、SK替换为RocketMQ实例的AccessKey和AccessKey Secret
    // 用户渠道,可以标明和用户相关即可,无需申请。
    consumer->setSessionCredentials("ak", "sk", "VOLC");

    // 请注册自定义监听函数用来处理接收到的消息,并返回响应的处理结果。
    OrderlyMessageListener *messageListener = new OrderlyMessageListener();
    consumer->subscribe("topic_name", "tag");
    consumer->registerMessageListener(messageListener);

    // Start this consumer
    // 准备工作完成,必须调用启动函数,才可以正常工作。
    //  ********************************************
    //  1.确保订阅关系的设置在启动之前完成。
    //  2.确保相同GID下面的消费者的订阅关系一致。
    //  *********************************************
    consumer->start();

    // 保持主线程运行直到进程结束
    std::this_thread::sleep_for(std::chrono::seconds(60));
    consumer->shutdown();
    return 0;
}