You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

基于Boost.Asio的同步并发数据结构及容器strand同步准则问询

我来分享下在Boost.ASIO里用strand处理不同容器同步的系统化思路,这也是我在实际项目里踩过坑后总结的经验,希望能帮到你:

一、STL/类STL容器(如std::deque、std::unordered_map)的strand同步方案

strand的核心价值就是序列化所有提交给它的handler,完全可以替代mutex来实现共享容器的线程安全,而且更贴合ASIO的异步编程模型。具体步骤可以这么做:

  • 第一步:给目标容器绑定一个专属的strand实例。每个需要同步的容器最好对应一个独立的strand,避免不同容器的操作互相阻塞:

    boost::asio::io_context io_ctx;
    auto container_strand = boost::make_shared<boost::asio::strand<boost::asio::io_context::executor_type>>(io_ctx.get_executor());
    std::unordered_map<int, std::string> shared_map;
    
  • 第二步:所有对容器的读写改操作,必须包装成handler提交到这个strand里。不管是插入、删除还是查询,绝对不能在strand外部直接碰容器:
    比如插入元素:

    boost::asio::post(*container_strand, [&shared_map]() {
        shared_map.emplace(1, "hello world");
    });
    

    查询并处理元素:

    boost::asio::dispatch(*container_strand, [&shared_map]() {
        if (auto it = shared_map.find(1); it != shared_map.end()) {
            // 在这里安全处理it->second,不用担心并发冲突
        }
    });
    

    这里优先用dispatch而不是post——如果当前线程已经在strand的上下文里,dispatch会直接执行handler,减少不必要的调度开销;否则和post一样把任务放进队列。

  • 第三步:如果需要从容器操作中获取返回值给调用者,可以结合std::future或者异步回调。比如异步查询值:

    auto get_value = [&container_strand, &shared_map](int key) -> std::future<std::optional<std::string>> {
        return std::async(std::launch::deferred, [=, &container_strand, &shared_map]() {
            std::optional<std::string> result;
            boost::asio::dispatch(*container_strand, [&]() {
                if (auto it = shared_map.find(key); it != shared_map.end()) {
                    result = it->second;
                }
            });
            return result;
        });
    };
    
    // 调用示例
    auto future = get_value(1);
    auto value = future.get(); // 等待结果返回
    
二、无锁容器(如boost::lockfree::spsc_queue、folly::ProducerConsumerQueue)的strand同步适配

无锁容器本身已经通过原子操作解决了多线程并发读写的安全问题,但strand依然能在两种场景下发挥作用:

  • 场景1:保护容器取出后的处理逻辑
    无锁容器只保证自身的读写安全,但如果你从容器取出数据后的处理逻辑不是线程安全的(比如操作另一个共享资源、执行复杂的计算),就可以用strand来序列化这些处理操作:

    boost::lockfree::spsc_queue<int, boost::lockfree::capacity<1024>> queue;
    auto consumer_strand = boost::make_shared<boost::asio::strand<boost::asio::io_context::executor_type>>(io_ctx.get_executor());
    
    // 生产者线程直接push(无锁容器本身支持并发写入)
    queue.push(42);
    queue.push(100);
    
    // 把处理逻辑提交到strand,保证每次只有一个处理任务在执行
    boost::asio::post(*consumer_strand, [&queue]() {
        int value;
        while (queue.pop(value)) {
            // 这里的处理逻辑是线程安全的,因为strand保证序列化执行
            process_value(value);
        }
    });
    
  • 场景2:实现严格的操作序列化语义
    有些业务场景需要保证容器的操作顺序完全一致(比如生产者插入的顺序必须和消费者取出的顺序严格对应),这时候可以把所有容器操作都提交到strand里,让无锁容器退化为"序列化操作的容器":

    auto queue_strand = boost::make_shared<boost::asio::strand<boost::asio::io_context::executor_type>>(io_ctx.get_executor());
    
    // 生产者的push操作都通过strand提交
    boost::asio::post(*queue_strand, [&queue]() { queue.push(1); });
    boost::asio::post(*queue_strand, [&queue]() { queue.push(2); });
    
    // 消费者的pop操作也通过strand提交
    boost::asio::post(*queue_strand, [&queue]() {
        int val;
        while (queue.pop(val)) {
            // 取出顺序一定是1、2,完全符合提交顺序
            process_value(val);
        }
    });
    

    这种场景下,strand的作用是强化顺序性,而不是解决线程安全问题——如果你的业务不需要严格顺序,就没必要这么做,会浪费无锁容器的并发优势。

三、通用指导原则

最后总结几个关键原则,帮你避免踩坑:

  • 普通容器:所有操作必须在同一个strand上下文执行,禁止外部直接访问,这是保证线程安全的核心。
  • 无锁容器:strand是补充工具,用来处理非线程安全的后续逻辑严格顺序需求,不要用它来做无锁容器本身的安全保护,纯属浪费性能。
  • 避免在strand handler里做耗时操作:strand是序列化执行的,耗时操作会阻塞后续所有任务,严重影响异步流程的响应性。如果有耗时工作,应该把它放到专门的线程池,只把容器的读写操作留在strand里。
  • 优先用dispatch:能减少调度开销,提升性能,只有在明确需要把任务放入队列延后执行时才用post

内容的提问来源于stack exchange,提问作者Lia Stratopoulos

火山引擎 最新活动