自定义Rx操作符:实现带背压控制的信号触发逻辑
自定义Rx背压控制操作符实现需求
需求概述
需要编写一个自定义Rx操作符/方法,将原始Observable转换为具备背压控制(类似Throttle操作符)特性的Observable。目标Observable类型为IObservable<Unit>,仅关注事件发生的信号。
示例场景(延迟时长1秒)
- 原始Observable事件序列:
A---B-C-----D-E--------F-- - 期望生成的目标Observable序列:
1---2---3---4---5------6--
规则说明
- 事件C对应目标序列的3:因与B触发的2间隔过近,不会立即发射,需等待前一个发射项的延迟周期结束后发射
- 事件D对应目标序列的4:因与前一个发射项3的间隔超过延迟时长,立即发射
- 事件F对应目标序列的6:因前一个发射项5的背压延迟已结束,立即发射
验证预期行为的单元测试示例
[Fact] public void Custom_operator_has_expected_behavior_for_handling_back_pressure() { // Arrange TimeSpan configuredDelay = TimeSpan.FromSeconds(1); TestScheduler testScheduler = new TestScheduler(); IObservable<char> initial = Observable.Create<char>( async o => { // First event at time 1 tick o.OnNext('A'); // waiting 700ms before sending second event at time 700ms + 1 tick await testScheduler.Sleep(TimeSpan.FromSeconds(0.7)); o.OnNext('B'); // waiting 1s before sending third event at time 1s 700ms + 1 tick await testScheduler.Sleep(TimeSpan.FromSeconds(1)); o.OnNext('C'); // waiting 1s 800ms before sending third event at time 3s 500ms + 1 tick await testScheduler.Sleep(TimeSpan.FromSeconds(1.8)); o.OnNext('D'); return Disposable.Empty; }); IObservable<Unit> result = initial.Throttle(configuredDelay, testScheduler).Select(_ => Unit.Default); // Act var testObserver = testScheduler.Start(() => result, 0, 0, TimeSpan.FromSeconds(10).Ticks); // Assert testObserver.Messages.Count.Should().Be(3); testObserver.Messages.Should().SatisfyRespectively( m => { // first item on result is expected immediately m.Time.Should().Be(1L); }, m => { // second item on result is at 1s (generated by 'B' but emitted at the end of 1s delay from previous emitted item) m.Time.Should().Be(10000001L); }, m => { // third item on result is at 2s (generated by 'c' but emitted at the end of 1s delay from previous emitted item) m.Time.Should().Be(20000001L); }, m => { // fourth item on result is at 3.5s generated by 'd' and emitted directly since previous item was long before the 1s delay period m.Time.Should().Be(35000001L); }); }
已尝试的方案
已使用Throttle、Buffer、Window(windowOpening, windowClosing)等操作符,但无法正确生成目标序列中的3,也无法处理F对应的6的触发逻辑。目前找到最接近的思路是关于RxJS throttleTime在Rx.NET中等效实现的讨论。
内容的提问来源于stack exchange,提问作者Remiab




