You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Tokio任务满足特定条件后暂停/等待并在用户输入后恢复的实现问题求助

Tokio任务满足特定条件后暂停/等待并在用户输入后恢复的实现问题求助

我现在在做一个Tokio异步任务的控制逻辑,需求是这样的:

  • 我有一批任务要执行,用信号量限制同时运行的任务数量
  • 当某个任务触发特定条件时,希望所有未启动的任务进入等待状态,同时让正在运行的任务全部执行完毕
  • 等所有当前运行的任务都完成后,询问用户是否继续执行剩余任务,或者直接终止程序——这样用户选择终止时,不会有正在运行的任务被提前中断

目前我自己写了一段实现代码,但遇到了一些问题,想请大家帮忙排查,或者给我一些更优的实现思路。

我的实现代码

#[tokio::main]
async fn main() {
    let workers = 5;
    let range: u32 = 20;

    let semaphores: Arc<Semaphore> = std::sync::Arc::new(Semaphore::new(workers));
    let mut handles: Vec<JoinHandle<()>> = Vec::new();

    let pause = Arc::new(AtomicBool::new(false));
    let complete_count = Arc::new(AtomicU32::new(0));
    let start_count = Arc::new(AtomicU32::new(0));

    for i in 0..range {
        let semaphores_clone = Arc::clone(&semaphores);
        let complete_count_clone = Arc::clone(&complete_count);
        let pause_clone = Arc::clone(&pause);
        let start_count_clone = Arc::clone(&start_count);

        handles.push(tokio::spawn(async move {
            let _permit = semaphores_clone.clone().acquire_owned().await.unwrap();

            // Check if pause flag is true
            let mut execute_once = false;
            loop {
                if pause_clone.load(Ordering::SeqCst) {
                    if !execute_once {
                        println!(
                            "Task #{i} has been paused after {} started tasks and {} completed tasks",
                            start_count_clone.load(Ordering::SeqCst),
                            complete_count_clone.load(Ordering::SeqCst),
                        );
                        execute_once = true;
                    }
                    continue;
                }
                // Add to start_count when a task has started
                start_count_clone.fetch_add(1, Ordering::SeqCst);
                break;
            }

            // Perform some operation that takes time
            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

            // Pause all tasks at task #6
            if i == 6 {
                println!(
                    "Tasks paused by task #{i} after {} started tasks and {} completed tasks",
                    start_count_clone.load(Ordering::SeqCst),
                    complete_count_clone.load(Ordering::SeqCst)
                );
                pause_clone.store(true, Ordering::SeqCst);
                // Ask for user input here
            }

            // Add to complete_count when a task is finished
            // Then subtract start_count
            {
                start_count_clone.fetch_sub(1, Ordering::SeqCst);
                complete_count_clone.fetch_add(1, Ordering::SeqCst);
            }
        }));
    }

    for handle in handles {
        handle.await.unwrap();
    }
}

遇到的问题

这段代码大部分情况下能正常工作,但偶尔会出现异常情况:只有少数任务(少于设置的worker数量)进入暂停状态,甚至还有很多任务还在运行中的时候就出现了这种问题。比如某次运行的输出示例:

Tasks paused by task #6 after 5 started tasks and 12 completed tasks
Task #19 has been paused after 4 started tasks and 15 completed tasks

另外需要说明的是,worker的数量和任务总数可能变化很大,有时候worker数量甚至会比任务总数还多。我不确定现在的实现是不是最优的,也很想知道有没有更合理的解决方案。

内容来源于stack exchange

火山引擎 最新活动