本文提供使用 C++ SDK 收发延迟消息的示例代码供您参考。
发送延迟消息需要在控制台申请延迟消息类型的topic,rocketmq 5.x版本支持任意精度的延迟消息,发送延迟消息的示例代码如下。
#include <algorithm> #include <atomic> #include <chrono> #include <cstddef> #include <iostream> #include <random> #include <string> #include <system_error> #include "gflags/gflags.h" #include "rocketmq/Message.h" #include "rocketmq/Producer.h" using namespace ROCKETMQ_NAMESPACE; const std::string& alphaNumeric() { static std::string alpha_numeric("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); return alpha_numeric; } std::string randomString(std::string::size_type len) { std::string result; result.reserve(len); std::random_device rd; std::mt19937 generator(rd()); std::string source(alphaNumeric()); std::string::size_type generated = 0; while (generated < len) { std::shuffle(source.begin(), source.end(), generator); std::string::size_type delta = std::min({len - generated, source.length()}); result.append(source.substr(0, delta)); generated += delta; } return result; } /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ DEFINE_string(topic, "xxxx", "Topic to which messages are published"); DEFINE_string(access_point, "rocketmq-xxxx:8080", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "xxxxx", "Your access key ID"); DEFINE_string(access_secret, "xxxx", "Your access secret"); DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); //当前火山不支持tsl/ssl模式,需要填写false int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); auto& logger = getLogger(); logger.setConsoleLevel(Level::Info); logger.setLevel(Level::Info); logger.init(); CredentialsProviderPtr credentials_provider; if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) { credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret); } // In most case, you don't need to create too many producers, singletion pattern is recommended. auto producer = Producer::newBuilder() .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); std::atomic_bool stopped; std::atomic_long count(0); auto stats_lambda = [&] { while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); while (!count.compare_exchange_weak(cnt, 0)) { cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; } }; std::thread stats_thread(stats_lambda); std::string body = randomString(FLAGS_message_body_size); try { for (std::size_t i = 0; i < FLAGS_total; ++i) { auto message = Message::newBuilder() .withTopic(FLAGS_topic) .withTag("TagA") .withKeys({"Key-" + std::to_string(i)}) .withBody(body) .availableAfter( std::chrono::system_clock::now() + std::chrono::seconds(10)) // This message would be available to consumers after 10 seconds .build(); std::error_code ec; SendReceipt send_receipt = producer.send(std::move(message), ec); std::cout << "Message-ID: " << send_receipt.message_id << std::endl; count++; } } catch (...) { std::cerr << "Ah...No!!!" << std::endl; } stopped.store(true, std::memory_order_relaxed); if (stats_thread.joinable()) { stats_thread.join(); } // std::this_thread::sleep_for(std::chrono::seconds(1)); return EXIT_SUCCESS; }
延迟消息的订阅方式与普通消息一致,示例代码如下所示。
#include <chrono> #include <iostream> #include <thread> #include "gflags/gflags.h" #include "rocketmq/Logger.h" #include "rocketmq/PushConsumer.h" using namespace ROCKETMQ_NAMESPACE; /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ DEFINE_string(topic, "xxxx", "Topic to which messages are published"); DEFINE_string(group, "xxxx", "GroupId, created through your instance management console"); DEFINE_string(access_point, "rocketmq-xxxx:8080", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "xxxxx", "Your access key ID"); DEFINE_string(access_secret, "xxxx", "Your access secret"); DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); //当前火山不支持tsl/ssl模式,需要填写false int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); auto& logger = getLogger(); logger.setConsoleLevel(Level::Info); logger.setLevel(Level::Info); logger.init(); std::string tag = "*"; auto listener = [](const Message& message) { std::cout << "Received a message[topic=" << message.topic() << ", MsgId=" << message.id() << "]" << std::endl; return ConsumeResult::SUCCESS; }; CredentialsProviderPtr credentials_provider; if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) { credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret); } // In most case, you don't need to create too many consumers, singletion pattern is recommended. auto push_consumer = PushConsumer::newBuilder() .withGroup(FLAGS_group) .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withRequestTimeout(std::chrono::seconds(3)) .withCredentialsProvider(credentials_provider) .withSsl(FLAGS_tls) .build()) .withConsumeThreads(4) .withListener(listener) .subscribe(FLAGS_topic, tag) .build(); std::this_thread::sleep_for(std::chrono::minutes(30)); return EXIT_SUCCESS; }