You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

仅用BroadcastBlock和ActionBlock实现DataFlow同步执行的问题

问题分析与解决方案

你遇到的问题核心在于ActionBlock中使用了async void的委托,这会导致TPL DataFlow无法正确跟踪异步操作的完成状态,进而让action.Completion提前触发,而不是等待你的异步逻辑全部执行完毕。

为什么会出现这个顺序问题?

当你把async voidAction<T>传给ActionBlock时,ActionBlock只会执行这个委托的同步部分(也就是打印"Working.."之后,调用VeryLongProcess()),但它无法识别到这是一个异步操作,所以会认为这个委托已经执行完成,进而触发Completion的后续逻辑(也就是ContinueWith里的"Continue data",以及后续的"All Done"和"Completed"),而此时VeryLongProcess()里的延迟操作还在后台运行,所以最后才会打印"Long Process Done.."和"Done"。

如何解决?

只需要把ActionBlock的委托类型从Action<T>改成Func<T, Task>,这样ActionBlock就知道要等待这个异步任务完成后,才会标记自己为已完成状态。具体修改如下:

修改后的DataFlow逻辑类代码:

class CreateDownloadTask { 
    public async Task VeryLongProcess() { 
        await Task.Run(async () => { 
            Console.WriteLine("Long Process Working.."); 
            await Task.Delay(TimeSpan.FromSeconds(5)); 
            Console.WriteLine("Long Process Done.."); 
        }); 
    } 
    public async Task CreateSimpleBroadcastX<T>(T data) { 
        // 把Action<T>改成Func<T, Task>
        Func<T, Task> process = async model => { 
            Console.WriteLine("Working.."); 
            await VeryLongProcess(); 
            Console.WriteLine("Done"); 
        }; 
        var broad = new BroadcastBlock<T>(null); 
        // 这里ActionBlock接收Func<T, Task>类型的委托
        var action = new ActionBlock<T>(process); 
        var dflo = new DataflowLinkOptions { PropagateCompletion = true }; 
        broad.LinkTo(action, dflo); 
        await broad.SendAsync(data); 
        broad.Complete(); 
        await action.Completion.ContinueWith(async tsk => { 
            Console.WriteLine("Continue data"); 
        }).ConfigureAwait(false); 
        Console.WriteLine("All Done"); 
    } 
}

修改后的输出顺序

修改后,控制台输出会和你的预期完全一致:

Working..
Long Process Working..
Long Process Done..
Done
Continue data
All Done
Completed

额外注意事项

  • 在TPL DataFlow中处理异步逻辑时,永远不要使用async void的委托,必须使用Func<T, Task>(或对应的泛型版本),这样组件才能正确跟踪异步操作的生命周期。
  • 你当前使用BroadcastBlock的场景其实有点多余,因为你只发送了一个数据项,如果不需要广播到多个目标块,直接使用ActionBlock就足够了,但如果是后续要扩展到多个接收块,当前的结构是没问题的。

内容的提问来源于stack exchange,提问作者Om Rudi

火山引擎 最新活动