You need to enable JavaScript to run this app.
导航

事务消息

最近更新时间2023.10.31 11:56:20

首次发布时间2023.07.04 21:17:30

本文提供使用 C++ SDK 收发事务消息的示例代码供您参考。

前提条件

发送事务消息

通过以下步骤发送事务消息。

  1. 业务侧通过 sendMessageInTransaction 发送消息到 RocketMQ 服务端。
  2. 业务侧通过 executeLocalTransaction 执行本地事务。
  3. 实现业务查询事务执行是否成功的接口 checkLocalTransaction

示例代码如下。

#include <iostream>
#include <chrono>
#include <thread>
#include "rocketmq/TransactionMQProducer.h"
#include "rocketmq/MQClientException.h"
#include "rocketmq/TransactionListener.h"

using namespace std;
using namespace rocketmq;

class DefineTransactionListener : public TransactionListener
{
public:
    LocalTransactionState executeLocalTransaction(const MQMessage &msg, void *arg)
    {
        /*
        执行本地事务
            1. 成功返回COMMIT_MESSAGE
            2. 失败返回ROLLBACK_MESSAGE
            3. 不确定返回UNKNOWN。服务端会触发定时任务回查状态
        */
        std::cout << "Execute Local Transaction,Received Message Topic:" << msg.getTopic()
                  << ", transactionId:" << msg.getTransactionId() << std::endl;
        return UNKNOWN;
    }

    LocalTransactionState checkLocalTransaction(const MQMessageExt &msg)
    {
        /*
        回查本地事务执行情况
            1. 成功返回COMMIT_MESSAGE
            2. 失败返回ROLLBACK_MESSAGE
            3. 不确定返回UNKNOWN。则等待下次定时任务回查。
        */
        std::cout << "Check Local Transaction,Received Message Topic:" << msg.getTopic()
                  << ", MsgId:" << msg.getMsgId() << std::endl;
        return COMMIT_MESSAGE;
    }
};

int main()
{
    // 生产者名称无需申请
    TransactionMQProducer producer("producer_group_name");
    // 火山引擎的接入点
    producer.setNamesrvAddr("accesspoint");
    // AK、SK替换为RocketMQ实例的AccessKey和AccessKey Secret
    // 用户渠道,可以标明和用户相关即可,无需申请。
    producer.setSessionCredentials("ak", "sk", "volc");

    // 本地事务执行和回查函数。
    DefineTransactionListener *exampleTransactionListener = new DefineTransactionListener();
    producer.setTransactionListener(exampleTransactionListener);
    // 请确保参数设置完成之后启动Producer。
    producer.start();
    int count = 3;
    for (int i = 0; i < count; ++i)
    {
        // 配置消息写入的Topic名称。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
        MQMessage msg("TRANSACTION TOPIC", "TAG", "Transaction content");
        try
        {
            SendResult sendResult = producer.sendMessageInTransaction(msg, &i);
            std::cout << "SendResult:" << sendResult.getSendStatus() 
                      << ", Message ID: " << sendResult.getMsgId()
                      << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        }
        catch (MQException e)
        {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    std::cout << "Send " << count << " messages OK " << endl;

    std::cout << "Wait for local transaction check..... " << std::endl;
    for (int i = 0; i < 6; ++i)
    {
        this_thread::sleep_for(chrono::seconds(10));
        std::cout << "Running " << i * 10 + 10 << " Seconds......" << std::endl;
    }
    producer.shutdown();
    return 0;
}

订阅事务消息

和订阅普通消息一致,请参考普通消息