火山引擎消息队列 RocketMQ 5.x版本提供同步发送、异步发送两种方式来发送普通消息。本文介绍如何通过不同方式发送普通消息。发送普通消息前请在控制台创建普通消息类型topic
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。一般用于较为重要的消息发送场景。
同步发送方式发送普通消息的示例代码如下
#include <algorithm> #include <atomic> #include <iostream> #include <memory> #include <random> #include <string> #include <system_error> #include "gflags/gflags.h" #include "rocketmq/CredentialsProvider.h" #include "rocketmq/Logger.h" #include "rocketmq/Message.h" #include "rocketmq/Producer.h" using namespace ROCKETMQ_NAMESPACE; const std::string& alphaNumeric() { static std::string alpha_numeric("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); return alpha_numeric; } std::string randomString(std::string::size_type len) { std::string result; result.reserve(len); std::random_device rd; std::mt19937 generator(rd()); std::string source(alphaNumeric()); std::string::size_type generated = 0; while (generated < len) { std::shuffle(source.begin(), source.end(), generator); std::string::size_type delta = std::min({len - generated, source.length()}); result.append(source.substr(0, delta)); generated += delta; } return result; } /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ DEFINE_string(topic, "xxxx", "Topic to which messages are published"); DEFINE_string(access_point, "rocketmq-xxxx:8080", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "xxxxx", "Your access key ID"); DEFINE_string(access_secret, "xxxx", "Your access secret"); DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); //当前火山不支持tsl/ssl模式,需要填写false int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); auto& logger = getLogger(); logger.setConsoleLevel(Level::Info); logger.setLevel(Level::Info); logger.init(); // Access Key/Secret pair may be acquired from management console CredentialsProviderPtr credentials_provider; if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) { credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret); } // In most case, you don't need to create too many producers, singleton pattern is recommended. auto producer = Producer::newBuilder() .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); std::atomic_bool stopped; std::atomic_long count(0); auto stats_lambda = [&] { while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); while (!count.compare_exchange_weak(cnt, 0)) { cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; } }; std::thread stats_thread(stats_lambda); std::string body = randomString(FLAGS_message_body_size); try { for (std::size_t i = 0; i < FLAGS_total; ++i) { auto message = Message::newBuilder() .withTopic(FLAGS_topic) .withTag("TagA") .withKeys({"Key-" + std::to_string(i)}) .withBody(body) .build(); std::error_code ec; SendReceipt send_receipt = producer.send(std::move(message), ec); if (ec) { std::cerr << "Failed to publish message to " << FLAGS_topic << ". Cause: " << ec.message() << std::endl; } else { std::cout << "Publish message to " << FLAGS_topic << " OK. Message-ID: " << send_receipt.message_id << std::endl; count++; } } } catch (...) { std::cerr << "Ah...No!!!" << std::endl; } stopped.store(true, std::memory_order_relaxed); if (stats_thread.joinable()) { stats_thread.join(); } return EXIT_SUCCESS; }
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。异步发送可以避免线程阻塞,允许程序继续执行其他任务,从而提高系统的吞吐量和性能。
异步发送方式发送普通消息的示例代码如下。
#include <algorithm> #include <atomic> #include <condition_variable> #include <iostream> #include <mutex> #include <random> #include <string> #include <system_error> #include "gflags/gflags.h" #include "rocketmq/Message.h" #include "rocketmq/Producer.h" using namespace ROCKETMQ_NAMESPACE; /** * @brief A simple Semaphore to limit request concurrency. */ class Semaphore { public: Semaphore(std::size_t permits) : permits_(permits) { } /** * @brief Acquire a permit. */ void acquire() { while (true) { std::unique_lock<std::mutex> lk(mtx_); if (permits_ > 0) { permits_--; return; } cv_.wait(lk, [this]() { return permits_ > 0; }); } } /** * @brief Release the permit back to semaphore. */ void release() { std::unique_lock<std::mutex> lk(mtx_); permits_++; if (1 == permits_) { cv_.notify_one(); } } private: std::size_t permits_{0}; std::mutex mtx_; std::condition_variable cv_; }; const std::string& alphaNumeric() { static std::string alpha_numeric("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); return alpha_numeric; } std::string randomString(std::string::size_type len) { std::string result; result.reserve(len); std::random_device rd; std::mt19937 generator(rd()); std::string source(alphaNumeric()); std::string::size_type generated = 0; while (generated < len) { std::shuffle(source.begin(), source.end(), generator); std::string::size_type delta = std::min({len - generated, source.length()}); result.append(source.substr(0, delta)); generated += delta; } return result; } /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ DEFINE_string(topic, "xxxx", "Topic to which messages are published"); DEFINE_string(access_point, "rocketmq-xxxx:8080", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "xxxxx", "Your access key ID"); DEFINE_string(access_secret, "xxxx", "Your access secret"); DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); //当前火山不支持tsl/ssl模式,需要填写false int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); auto& logger = getLogger(); logger.setConsoleLevel(Level::Info); logger.setLevel(Level::Info); logger.init(); CredentialsProviderPtr credentials_provider; if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) { credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret); } // In most case, you don't need to create too many producers, singletion pattern is recommended. auto producer = Producer::newBuilder() .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withCredentialsProvider(credentials_provider) .withSsl(FLAGS_tls) .build()) .withTopics({FLAGS_topic}) .build(); std::atomic_bool stopped; std::atomic_long count(0); auto stats_lambda = [&] { while (!stopped.load(std::memory_order_relaxed)) { long cnt = count.load(std::memory_order_relaxed); while (!count.compare_exchange_weak(cnt, 0)) { cnt = count.load(std::memory_order_relaxed); } std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "QPS: " << cnt << std::endl; } }; std::thread stats_thread(stats_lambda); std::string body = randomString(FLAGS_message_body_size); std::size_t completed = 0; std::mutex mtx; std::condition_variable cv; std::unique_ptr<Semaphore> semaphore(new Semaphore(FLAGS_concurrency)); auto send_callback = [&](const std::error_code& ec, const SendReceipt& receipt) { std::unique_lock<std::mutex> lk(mtx); semaphore->release(); completed++; count++; if (completed >= FLAGS_total) { cv.notify_all(); } }; for (std::size_t i = 0; i < FLAGS_total; ++i) { auto message = Message::newBuilder() .withTopic(FLAGS_topic) .withTag("TagA") .withKeys({"Key-" + std::to_string(i)}) .withBody(body) .build(); semaphore->acquire(); producer.send(std::move(message), send_callback); } { std::unique_lock<std::mutex> lk(mtx); cv.wait(lk, [&]() { return completed >= FLAGS_total; }); std::cout << "Completed: " << completed << ", total: " << FLAGS_total << std::endl; } stopped.store(true, std::memory_order_relaxed); if (stats_thread.joinable()) { stats_thread.join(); } return EXIT_SUCCESS; }
RocketMQ 5.x 支持两种消费模式,分别为 Push Consumer 和 Simple Consumer。前者为服务端推送消息,后者为主动拉取消息。推荐使用Push Consumer
Push Consumer消费示例代码如下:
#include <chrono> #include <iostream> #include <thread> #include "gflags/gflags.h" #include "rocketmq/Logger.h" #include "rocketmq/PushConsumer.h" using namespace ROCKETMQ_NAMESPACE; /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ DEFINE_string(topic, "xxxx", "Topic to which messages are published"); DEFINE_string(group, "xxxx", "GroupId, created through your instance management console"); DEFINE_string(access_point, "rocketmq-xxxx:8080", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "xxxxx", "Your access key ID"); DEFINE_string(access_secret, "xxxx", "Your access secret"); DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); //当前火山不支持tsl/ssl模式,需要填写false int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); auto& logger = getLogger(); logger.setConsoleLevel(Level::Info); logger.setLevel(Level::Info); logger.init(); std::string tag = "*"; auto listener = [](const Message& message) { std::cout << "Received a message[topic=" << message.topic() << ", MsgId=" << message.id() << "]" << std::endl; return ConsumeResult::SUCCESS; }; CredentialsProviderPtr credentials_provider; if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) { credentials_provider = std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, FLAGS_access_secret); } // In most case, you don't need to create too many consumers, singletion pattern is recommended. auto push_consumer = PushConsumer::newBuilder() .withGroup(FLAGS_group) .withConfiguration(Configuration::newBuilder() .withEndpoints(FLAGS_access_point) .withRequestTimeout(std::chrono::seconds(3)) .withCredentialsProvider(credentials_provider) .withSsl(FLAGS_tls) .build()) .withConsumeThreads(4) .withListener(listener) .subscribe(FLAGS_topic, tag) .build(); std::this_thread::sleep_for(std::chrono::minutes(30)); return EXIT_SUCCESS; }