如何让Paho MQTT C++客户端库在初始连接时按设置的超时时间重试后再失败?
解决方案:实现初始连接的超时重试逻辑
你遇到的问题核心在于对Paho MQTT C++客户端automatic_reconnect选项的作用范围理解有误——这个选项仅对已经成功建立连接后发生的断开场景生效,初始连接失败时并不会自动触发重试。而set_connect_timeout(5)只是控制单次连接尝试的超时时间,并非总重试的超时窗口。
要实现"在5秒总超时内重试,之后判定失败"的需求,你需要手动添加初始连接的重试循环,结合时间判断来控制总等待时长。
1. 同步客户端(mqtt::client)的实现示例
修改你的代码,添加重试逻辑和总超时控制:
#include "mqtt/client.h" #include <chrono> #include <thread> #include <iostream> int main() { const std::string BROKER = "192.168.0.19"; mqtt::client client(BROKER, "", nullptr); mqtt::connect_options connOpts; connOpts.set_keep_alive_interval(3600); connOpts.set_connect_timeout(5); // 单次连接尝试的超时(5秒) connOpts.set_clean_session(true); connOpts.set_automatic_reconnect(true); // 仅对已连接后的断开生效 const int TOTAL_TIMEOUT_SECS = 5; // 总重试超时窗口 auto start_time = std::chrono::steady_clock::now(); bool is_connected = false; while (!is_connected) { try { client.connect(connOpts); is_connected = true; std::cout << "Successfully connected to broker!" << std::endl; } catch (const std::exception& e) { std::cout << "Connection attempt failed: " << e.what() << std::endl; // 检查是否已超过总超时时间 auto elapsed = std::chrono::duration_cast<std::chrono::seconds>( std::chrono::steady_clock::now() - start_time ); if (elapsed.count() >= TOTAL_TIMEOUT_SECS) { std::cout << "Total connection timeout reached. Giving up." << std::endl; break; } // 等待1秒后重试(可根据需要调整间隔) std::this_thread::sleep_for(std::chrono::seconds(1)); } } // 后续业务逻辑... return 0; }
2. 异步客户端(mqtt::async_client)的实现示例
如果使用异步客户端,你可以结合回调和条件变量来处理连接状态,同时控制总超时:
#include "mqtt/async_client.h" #include <chrono> #include <thread> #include <iostream> #include <mutex> #include <condition_variable> std::mutex mtx; std::condition_variable cv; bool connected_flag = false; // 连接成功回调 void on_connected(const std::string& /* cause */) { std::lock_guard<std::mutex> lock(mtx); connected_flag = true; cv.notify_one(); } int main() { const std::string BROKER = "tcp://192.168.0.19:1883"; mqtt::async_client client(BROKER, ""); mqtt::connect_options connOpts; connOpts.set_keep_alive_interval(3600); connOpts.set_connect_timeout(5); connOpts.set_clean_session(true); connOpts.set_automatic_reconnect(true); // 设置回调处理连接状态 auto callback = std::make_shared<mqtt::callback>(); callback->set_connected_handler(on_connected); callback->set_connection_lost_handler([](const std::string& cause) { std::cout << "Connection lost: " << cause << std::endl; }); client.set_callback(callback); const int TOTAL_TIMEOUT_SECS = 5; auto start_time = std::chrono::steady_clock::now(); bool is_connected = false; while (!is_connected) { connected_flag = false; try { auto conn_token = client.connect(connOpts); // 等待连接完成或单次超时 std::unique_lock<std::mutex> lock(mtx); if (cv.wait_for(lock, std::chrono::seconds(5), []{ return connected_flag; })) { is_connected = true; std::cout << "Successfully connected to broker!" << std::endl; } else { std::cout << "Single connection attempt timed out." << std::endl; } } catch (const std::exception& e) { std::cout << "Connection error: " << e.what() << std::endl; } // 检查总超时 auto elapsed = std::chrono::duration_cast<std::chrono::seconds>( std::chrono::steady_clock::now() - start_time ); if (elapsed.count() >= TOTAL_TIMEOUT_SECS) { std::cout << "Total timeout reached. Aborting connection attempts." << std::endl; break; } if (!is_connected) { std::this_thread::sleep_for(std::chrono::seconds(1)); } } // 后续业务逻辑... return 0; }
关键说明
set_automatic_reconnect(true):仅在客户端已经成功连接到代理后,发生意外断开时才会自动尝试重连,初始连接失败不会触发该机制。set_connect_timeout(5):控制单次连接尝试的超时时间,即每次发起连接后,如果5秒内未成功,则抛出异常。- 手动重试循环:通过记录初始时间、计算已流逝时间,来控制总重试窗口,确保在设定的总超时时间内持续尝试连接,之后才判定失败。
内容的提问来源于stack exchange,提问作者Paul Jurczak




