You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何让boost::asio::co_spawn返回boost::future并整合线程池使用

如何让boost::asio::co_spawn返回boost::future并整合线程池使用

刚好我之前也遇到过一模一样的需求:用asio协程处理网络异步IO,同时靠boost::future的链式调用做任务流编排,还不想维护两个独立的线程池。给你两个核心解决方案,一步步来解决问题:


一、让co_spawn直接返回boost::future:自定义完成令牌

asio原生的use_future确实只生成std::future,但我们可以模仿它的实现,自定义一个完成令牌use_boost_future,让co_spawn直接返回boost::future,完全不需要额外线程池包装。

核心思路是利用asio的async_result特化机制,用boost::promise替代标准库的std::promise来生成boost::future

#define BOOST_THREAD_PROVIDES_FUTURE 1
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION 1
#define BOOST_THREAD_PROVIDES_FUTURE_UNWRAP 1
#include <boost/asio.hpp>
#include <boost/thread/future.hpp>
#include <print>
#include <stdexcept>

namespace asio = boost::asio;

// 1. 定义自定义完成令牌的标签
struct use_boost_future_t {};
constexpr use_boost_future_t use_boost_future;

// 2. 特化asio::async_result,适配use_boost_future
template <typename Signature>
struct asio::async_result<use_boost_future_t, Signature> {
    using result_type = typename std::function<Signature>::result_type;
    using future_type = boost::future<std::decay_t<result_type>>;
    using promise_type = boost::promise<std::decay_t<result_type>>;

    explicit async_result(use_boost_future_t) : promise_() {}

    // 获取最终的boost::future
    future_type get() {
        return promise_.get_future();
    }

    // 处理无错误码的完成回调(比如协程返回值的情况)
    template <typename R>
    void complete(R&& r) {
        promise_.set_value(std::forward<R>(r));
    }

    // 处理带错误码的完成回调(比如原生异步IO的错误)
    template <typename E, typename R>
    void complete(E&& e, R&& r) {
        if (e) {
            promise_.set_exception(std::make_exception_ptr(boost::system::system_error(e)));
        } else {
            promise_.set_value(std::forward<R>(r));
        }
    }

private:
    promise_type promise_;
};

// 特化无返回值的情况(void)
template <>
struct asio::async_result<use_boost_future_t, void()> {
    using future_type = boost::future<void>;
    using promise_type = boost::promise<void>;

    explicit async_result(use_boost_future_t) : promise_() {}

    future_type get() {
        return promise_.get_future();
    }

    void complete() {
        promise_.set_value();
    }

    template <typename E>
    void complete(E&& e) {
        if (e) {
            promise_.set_exception(std::make_exception_ptr(boost::system::system_error(e)));
        } else {
            promise_.set_value();
        }
    }

private:
    promise_type promise_;
};

// 你的原有协程任务
auto running_task() -> asio::awaitable<void> {
    std::println("starting the coroutine ....");
    auto timer = asio::system_timer{ co_await asio::this_coro::executor };
    timer.expires_after(std::chrono::seconds(1));
    co_await timer.async_wait(asio::use_awaitable);
    std::println("finishing the coroutine ....");
}

现在调用co_spawn时传入use_boost_future,就能直接得到boost::future了:

auto main() -> int {
    auto io_context = asio::thread_pool{4};
    // 直接得到boost::future<void>类型的fut
    boost::future<void> fut = asio::co_spawn(io_context, running_task(), use_boost_future);

    // 直接用boost::future的链式特性
    auto chained_fut = fut.then([&io_context](boost::future<void> prev) {
        prev.get(); // 处理前序任务的结果/异常
        std::println("Starting chained task...");
        return asio::co_spawn(io_context, running_task(), use_boost_future);
    }).unwrap(); // 把future<future<T>>解包为future<T>

    chained_fut.wait();
    io_context.join();
    return 0;
}

二、整合线程池:让boost::future的回调复用asio线程池

上面的代码中,then的默认回调会在当前线程执行,如果想让链式任务的回调也运行在asio的thread_pool中,我们可以把asio的executor适配成boost的executor,让then的回调提交到asio线程池:

// 适配asio的executor为boost可识别的executor
struct asio_executor_adaptor {
    asio::any_io_executor exec;

    template <typename F>
    void submit(F&& f) const {
        // 把任务提交到asio线程池
        asio::post(exec, std::forward<F>(f));
    }
};

修改then的调用,指定用asio线程池作为执行器:

auto main() -> int {
    auto io_context = asio::thread_pool{4};
    boost::future<void> fut = asio::co_spawn(io_context, running_task(), use_boost_future);

    // 用适配后的asio执行器运行链式回调
    auto chained_fut = fut.then(
        asio_executor_adaptor{io_context.get_executor()},
        [&io_context](boost::future<void> prev) {
            prev.get();
            std::println("Chained task running in asio thread pool...");
            return asio::co_spawn(io_context, running_task(), use_boost_future);
        }
    ).unwrap();

    chained_fut.wait();
    io_context.join();
    return 0;
}

方案优势

  1. 无额外线程池:全程只用asio的thread_pool处理所有异步任务(网络IO+任务流回调),避免线程池之间的上下文切换损耗
  2. 无缝衔接特性:既保留了asio协程的异步IO能力,又能直接使用boost::future的thenwhen_allunwrap等任务流编排特性
  3. 错误统一处理:自定义完成令牌已经适配了错误码处理,异步操作的异常会被传递到boost::future中,方便统一捕获

这样就完美解决了你遇到的两个核心问题:让co_spawn返回boost::future,同时整合线程池资源。

火山引擎 最新活动