Tokio任务满足特定条件后暂停/等待并在用户输入后恢复的实现问题求助
Tokio任务满足特定条件后暂停/等待并在用户输入后恢复的实现问题求助
我现在在做一个Tokio异步任务的控制逻辑,需求是这样的:
- 我有一批任务要执行,用信号量限制同时运行的任务数量
- 当某个任务触发特定条件时,希望所有未启动的任务进入等待状态,同时让正在运行的任务全部执行完毕
- 等所有当前运行的任务都完成后,询问用户是否继续执行剩余任务,或者直接终止程序——这样用户选择终止时,不会有正在运行的任务被提前中断
目前我自己写了一段实现代码,但遇到了一些问题,想请大家帮忙排查,或者给我一些更优的实现思路。
我的实现代码
#[tokio::main] async fn main() { let workers = 5; let range: u32 = 20; let semaphores: Arc<Semaphore> = std::sync::Arc::new(Semaphore::new(workers)); let mut handles: Vec<JoinHandle<()>> = Vec::new(); let pause = Arc::new(AtomicBool::new(false)); let complete_count = Arc::new(AtomicU32::new(0)); let start_count = Arc::new(AtomicU32::new(0)); for i in 0..range { let semaphores_clone = Arc::clone(&semaphores); let complete_count_clone = Arc::clone(&complete_count); let pause_clone = Arc::clone(&pause); let start_count_clone = Arc::clone(&start_count); handles.push(tokio::spawn(async move { let _permit = semaphores_clone.clone().acquire_owned().await.unwrap(); // Check if pause flag is true let mut execute_once = false; loop { if pause_clone.load(Ordering::SeqCst) { if !execute_once { println!( "Task #{i} has been paused after {} started tasks and {} completed tasks", start_count_clone.load(Ordering::SeqCst), complete_count_clone.load(Ordering::SeqCst), ); execute_once = true; } continue; } // Add to start_count when a task has started start_count_clone.fetch_add(1, Ordering::SeqCst); break; } // Perform some operation that takes time tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; // Pause all tasks at task #6 if i == 6 { println!( "Tasks paused by task #{i} after {} started tasks and {} completed tasks", start_count_clone.load(Ordering::SeqCst), complete_count_clone.load(Ordering::SeqCst) ); pause_clone.store(true, Ordering::SeqCst); // Ask for user input here } // Add to complete_count when a task is finished // Then subtract start_count { start_count_clone.fetch_sub(1, Ordering::SeqCst); complete_count_clone.fetch_add(1, Ordering::SeqCst); } })); } for handle in handles { handle.await.unwrap(); } }
遇到的问题
这段代码大部分情况下能正常工作,但偶尔会出现异常情况:只有少数任务(少于设置的worker数量)进入暂停状态,甚至还有很多任务还在运行中的时候就出现了这种问题。比如某次运行的输出示例:
Tasks paused by task #6 after 5 started tasks and 12 completed tasks
Task #19 has been paused after 4 started tasks and 15 completed tasks
另外需要说明的是,worker的数量和任务总数可能变化很大,有时候worker数量甚至会比任务总数还多。我不确定现在的实现是不是最优的,也很想知道有没有更合理的解决方案。
内容来源于stack exchange




