仅用BroadcastBlock和ActionBlock实现DataFlow同步执行的问题
问题分析与解决方案
你遇到的问题核心在于ActionBlock中使用了async void的委托,这会导致TPL DataFlow无法正确跟踪异步操作的完成状态,进而让action.Completion提前触发,而不是等待你的异步逻辑全部执行完毕。
为什么会出现这个顺序问题?
当你把async void的Action<T>传给ActionBlock时,ActionBlock只会执行这个委托的同步部分(也就是打印"Working.."之后,调用VeryLongProcess()),但它无法识别到这是一个异步操作,所以会认为这个委托已经执行完成,进而触发Completion的后续逻辑(也就是ContinueWith里的"Continue data",以及后续的"All Done"和"Completed"),而此时VeryLongProcess()里的延迟操作还在后台运行,所以最后才会打印"Long Process Done.."和"Done"。
如何解决?
只需要把ActionBlock的委托类型从Action<T>改成Func<T, Task>,这样ActionBlock就知道要等待这个异步任务完成后,才会标记自己为已完成状态。具体修改如下:
修改后的DataFlow逻辑类代码:
class CreateDownloadTask { public async Task VeryLongProcess() { await Task.Run(async () => { Console.WriteLine("Long Process Working.."); await Task.Delay(TimeSpan.FromSeconds(5)); Console.WriteLine("Long Process Done.."); }); } public async Task CreateSimpleBroadcastX<T>(T data) { // 把Action<T>改成Func<T, Task> Func<T, Task> process = async model => { Console.WriteLine("Working.."); await VeryLongProcess(); Console.WriteLine("Done"); }; var broad = new BroadcastBlock<T>(null); // 这里ActionBlock接收Func<T, Task>类型的委托 var action = new ActionBlock<T>(process); var dflo = new DataflowLinkOptions { PropagateCompletion = true }; broad.LinkTo(action, dflo); await broad.SendAsync(data); broad.Complete(); await action.Completion.ContinueWith(async tsk => { Console.WriteLine("Continue data"); }).ConfigureAwait(false); Console.WriteLine("All Done"); } }
修改后的输出顺序
修改后,控制台输出会和你的预期完全一致:
Working.. Long Process Working.. Long Process Done.. Done Continue data All Done Completed
额外注意事项
- 在TPL DataFlow中处理异步逻辑时,永远不要使用
async void的委托,必须使用Func<T, Task>(或对应的泛型版本),这样组件才能正确跟踪异步操作的生命周期。 - 你当前使用
BroadcastBlock的场景其实有点多余,因为你只发送了一个数据项,如果不需要广播到多个目标块,直接使用ActionBlock就足够了,但如果是后续要扩展到多个接收块,当前的结构是没问题的。
内容的提问来源于stack exchange,提问作者Om Rudi




