You need to enable JavaScript to run this app.
导航

延时消息

最近更新时间2023.11.21 19:52:02

首次发布时间2023.07.04 21:17:30

消息队列 RocketMQ版提供 TCP 协议下的 RocketMQ 开源 C++ SDK 的相关说明,本文档介绍收发延时消息的示例代码。

前提条件

背景信息

火山引擎提供了两种发送延时消息的方式,一种是特定延时时间,另一种是任意延时时间,其中任意精度的延时消息包括以下两种:

  • 特定精度延时消息,只支持特定的 18 个等级。

    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    
  • 任意精度延时消息:对于 2023年2月21日之后创建的 RocketMQ 实例,建议通过属性 __STARTDELIVERTIME 来使用任意精度的延时消息。

发送延时消息

#include <iostream>
#include <chrono>
#include <thread>
#include <string>
#include "rocketmq/DefaultMQProducer.h"

using namespace std;
using namespace rocketmq;

int main()
{
    // 生产者名称无需申请
    DefaultMQProducer producer("producer_group_name");
    // 火山引擎的接入点
    producer.setNamesrvAddr("accesspoint");
    // AK、SK替换为RocketMQ实例的AccessKey和AccessKey Secret
    // 用户渠道,可以标明和用户相关即可,无需申请。
    producer.setSessionCredentials("ak", "sk", "volc");

    // 请确保参数设置完成之后启动Producer。
    producer.start();
    int count = 64;
    for (int i = 0; i < count; ++i)
    {
        // 配置消息写入的Topic名称。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
        MQMessage msg("you topic name", "TAG", "msg content");
        // messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        msg.setDelayTimeLevel(5);
        /*
            对于2023年2月21日之后创建的 RocketMQ 实例,可以通过msg.setProperty("__STARTDELIVERTIME", timestamp) 发送延时消息,例如
            chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
            chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
            long exp = mil.count() + 10 * 1000; //表示延时时间为10s
            msg.setProperty("__STARTDELIVERTIME", to_string(exp)); // 在哪个时间点发送消息
        */
        try
        {
            SendResult sendResult = producer.send(msg);
            std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl;
            this_thread::sleep_for(chrono::seconds(1));
        }
        catch (MQException e)
        {
            std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
        }
    }
    std::cout << "Send " << count << " messages OK, costs" << std::endl;

    producer.shutdown();
    return 0;
}

订阅延时消息

和订阅普通消息一致,请参考普通消息