You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何让Paho MQTT C++客户端库在初始连接时按设置的超时时间重试后再失败?

解决方案:实现初始连接的超时重试逻辑

你遇到的问题核心在于对Paho MQTT C++客户端automatic_reconnect选项的作用范围理解有误——这个选项仅对已经成功建立连接后发生的断开场景生效,初始连接失败时并不会自动触发重试。而set_connect_timeout(5)只是控制单次连接尝试的超时时间,并非总重试的超时窗口。

要实现"在5秒总超时内重试,之后判定失败"的需求,你需要手动添加初始连接的重试循环,结合时间判断来控制总等待时长。

1. 同步客户端(mqtt::client)的实现示例

修改你的代码,添加重试逻辑和总超时控制:

#include "mqtt/client.h"
#include <chrono>
#include <thread>
#include <iostream>

int main() {
    const std::string BROKER = "192.168.0.19";
    mqtt::client client(BROKER, "", nullptr);
    
    mqtt::connect_options connOpts;
    connOpts.set_keep_alive_interval(3600);
    connOpts.set_connect_timeout(5); // 单次连接尝试的超时(5秒)
    connOpts.set_clean_session(true);
    connOpts.set_automatic_reconnect(true); // 仅对已连接后的断开生效

    const int TOTAL_TIMEOUT_SECS = 5; // 总重试超时窗口
    auto start_time = std::chrono::steady_clock::now();
    bool is_connected = false;

    while (!is_connected) {
        try {
            client.connect(connOpts);
            is_connected = true;
            std::cout << "Successfully connected to broker!" << std::endl;
        } catch (const std::exception& e) {
            std::cout << "Connection attempt failed: " << e.what() << std::endl;

            // 检查是否已超过总超时时间
            auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
                std::chrono::steady_clock::now() - start_time
            );
            if (elapsed.count() >= TOTAL_TIMEOUT_SECS) {
                std::cout << "Total connection timeout reached. Giving up." << std::endl;
                break;
            }

            // 等待1秒后重试(可根据需要调整间隔)
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
    }

    // 后续业务逻辑...

    return 0;
}

2. 异步客户端(mqtt::async_client)的实现示例

如果使用异步客户端,你可以结合回调和条件变量来处理连接状态,同时控制总超时:

#include "mqtt/async_client.h"
#include <chrono>
#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool connected_flag = false;

// 连接成功回调
void on_connected(const std::string& /* cause */) {
    std::lock_guard<std::mutex> lock(mtx);
    connected_flag = true;
    cv.notify_one();
}

int main() {
    const std::string BROKER = "tcp://192.168.0.19:1883";
    mqtt::async_client client(BROKER, "");
    
    mqtt::connect_options connOpts;
    connOpts.set_keep_alive_interval(3600);
    connOpts.set_connect_timeout(5);
    connOpts.set_clean_session(true);
    connOpts.set_automatic_reconnect(true);

    // 设置回调处理连接状态
    auto callback = std::make_shared<mqtt::callback>();
    callback->set_connected_handler(on_connected);
    callback->set_connection_lost_handler([](const std::string& cause) {
        std::cout << "Connection lost: " << cause << std::endl;
    });
    client.set_callback(callback);

    const int TOTAL_TIMEOUT_SECS = 5;
    auto start_time = std::chrono::steady_clock::now();
    bool is_connected = false;

    while (!is_connected) {
        connected_flag = false;
        try {
            auto conn_token = client.connect(connOpts);
            
            // 等待连接完成或单次超时
            std::unique_lock<std::mutex> lock(mtx);
            if (cv.wait_for(lock, std::chrono::seconds(5), []{ return connected_flag; })) {
                is_connected = true;
                std::cout << "Successfully connected to broker!" << std::endl;
            } else {
                std::cout << "Single connection attempt timed out." << std::endl;
            }
        } catch (const std::exception& e) {
            std::cout << "Connection error: " << e.what() << std::endl;
        }

        // 检查总超时
        auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
            std::chrono::steady_clock::now() - start_time
        );
        if (elapsed.count() >= TOTAL_TIMEOUT_SECS) {
            std::cout << "Total timeout reached. Aborting connection attempts." << std::endl;
            break;
        }

        if (!is_connected) {
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
    }

    // 后续业务逻辑...

    return 0;
}

关键说明

  • set_automatic_reconnect(true):仅在客户端已经成功连接到代理后,发生意外断开时才会自动尝试重连,初始连接失败不会触发该机制。
  • set_connect_timeout(5):控制单次连接尝试的超时时间,即每次发起连接后,如果5秒内未成功,则抛出异常。
  • 手动重试循环:通过记录初始时间、计算已流逝时间,来控制总重试窗口,确保在设定的总超时时间内持续尝试连接,之后才判定失败。

内容的提问来源于stack exchange,提问作者Paul Jurczak

火山引擎 最新活动