消息队列 RocketMQ 5.x版本提供顺序消息(FIFO消息)供您使用。在顺序消息模型中,您需要严格按照顺序来发布和消费消息。本文提供使用 C++ SDK 收发顺序消息的示例代码供您参考。
顺序消息分为两类,全局顺序消息和分区顺序消息。区别仅为队列数量不同,代码没有区别。
注意顺序消息,需要在控制台申请顺序类型的topic以及顺序类型的group
发送顺序消息的示例代码如下。
#include <algorithm> #include <atomic> #include <condition_variable> #include <iostream> #include <memory> #include <random> #include <string> #include <system_error> #include "gflags/gflags.h" #include "rocketmq/CredentialsProvider.h" #include "rocketmq/FifoProducer.h" #include "rocketmq/Logger.h" #include "rocketmq/Message.h" #include "rocketmq/Producer.h" #include "rocketmq/SendReceipt.h" using namespace ROCKETMQ_NAMESPACE; /** * @brief A simple Semaphore to limit request concurrency. */ class Semaphore { public: Semaphore(std::size_t permits) : permits_(permits) { } /** * @brief Acquire a permit. */ void acquire() { while (true) { std::unique_lock<std::mutex> lk(mtx_); if (permits_ > 0) { permits_--; return; } cv_.wait(lk, [this]() { return permits_ > 0; }); } } /** * @brief Release the permit back to semaphore. */ void release() { std::unique_lock<std::mutex> lk(mtx_); permits_++; if (1 == permits_) { cv_.notify_one(); } } private: std::size_t permits_{0}; std::mutex mtx_; std::condition_variable cv_; }; const std::string& alphaNumeric() { static std::string alpha_numeric("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); return alpha_numeric; } std::string randomString(std::string::size_type len) { std::string result; result.reserve(len); std::random_device rd; std::mt19937 generator(rd()); std::string source(alphaNumeric()); std::string::size_type generated = 0; while (generated < len) { std::shuffle(source.begin(), source.end(), generator); std::string::size_type delta = std::min({len - generated, source.length()}); result.append(source.substr(0, delta)); generated += delta; } return result; } /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ DEFINE_string(topic, "xxxx", "Topic to which messages are published"); DEFINE_string(access_point, "rocketmq-xxxx:8080", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "xxxxx", "Your access key ID"); DEFINE_string(access_secret, "xxxx", "Your access secret"); DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); //当前火山不支持tsl/ssl模式,需要填写false int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); auto& logger = getLogger(); logger.setConsoleLevel(Level::Debug); logger.setLevel(Level::Debug); logger.init(); // Access Key/Secret pair may be acquired from management console CredentialsProviderPtr credentials_provider; if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) { credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret); } // In most case, you don't need to create too many producers, singleton pattern is recommended. auto producer = FifoProducer::newBuilder() .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) .withSsl(FLAGS_tls) .build()) .withConcurrency(FLAGS_concurrency) .withTopics({FLAGS_topic}) .build(); std::atomic_bool stopped; std::atomic_long count(0); auto stats_lambda = [&] { while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); while (!count.compare_exchange_weak(cnt, 0)) { cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; } }; std::thread stats_thread(stats_lambda); std::string body = randomString(FLAGS_message_body_size); std::size_t completed = 0; std::mutex mtx; std::condition_variable cv; std::unique_ptr<Semaphore> semaphore(new Semaphore(FLAGS_concurrency)); try { for (std::size_t i = 0; i < FLAGS_total; ++i) { auto message = Message::newBuilder() .withTopic(FLAGS_topic) .withTag("TagA") .withKeys({"Key-" + std::to_string(i)}) .withGroup("message-group" + std::to_string(i % FLAGS_concurrency)) .withBody(body) .build(); std::error_code ec; auto callback = [&](const std::error_code& ec, const SendReceipt& receipt) mutable { completed++; count++; semaphore->release(); if (completed >= FLAGS_total) { cv.notify_all(); } }; semaphore->acquire(); producer.send(std::move(message), callback); std::cout << "Cached No." << i << " message" << std::endl; } } catch (...) { std::cerr << "Ah...No!!!" << std::endl; } { std::unique_lock<std::mutex> lk(mtx); cv.wait(lk, [&]() { return completed >= FLAGS_total; }); std::cout << "Completed: " << completed << ", total: " << FLAGS_total << std::endl; } stopped.store(true, std::memory_order_relaxed); if (stats_thread.joinable()) { stats_thread.join(); } return EXIT_SUCCESS; }
订阅顺序消息的示例代码如下,要注意group一定是申请的顺序类型
#include <chrono> #include <iostream> #include <thread> #include "gflags/gflags.h" #include "rocketmq/Logger.h" #include "rocketmq/PushConsumer.h" using namespace ROCKETMQ_NAMESPACE; /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ DEFINE_string(topic, "xxxx", "Topic to which messages are published"); DEFINE_string(group, "xxxx", "GroupId, created through your instance management console"); DEFINE_string(access_point, "rocketmq-xxxx:8080", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "xxxxx", "Your access key ID"); DEFINE_string(access_secret, "xxxx", "Your access secret"); DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); //当前火山不支持tsl/ssl模式,需要填写false int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); auto& logger = getLogger(); logger.setConsoleLevel(Level::Info); logger.setLevel(Level::Info); logger.init(); std::string tag = "*"; auto listener = [](const Message& message) { std::cout << "Received a message[topic=" << message.topic() << ", MsgId=" << message.id() << "]" << std::endl; return ConsumeResult::SUCCESS; }; CredentialsProviderPtr credentials_provider; if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) { credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret); } // In most case, you don't need to create too many consumers, singletion pattern is recommended. auto push_consumer = PushConsumer::newBuilder() .withGroup(FLAGS_group) .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withRequestTimeout(std::chrono::seconds(3)) .withCredentialsProvider(credentials_provider) .withSsl(FLAGS_tls) .build()) .withConsumeThreads(4) .withListener(listener) .subscribe(FLAGS_topic, tag) .build(); std::this_thread::sleep_for(std::chrono::minutes(30)); return EXIT_SUCCESS; }