You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

自定义Rx操作符:实现带背压控制的信号触发逻辑

自定义Rx背压控制操作符实现需求

需求概述

需要编写一个自定义Rx操作符/方法,将原始Observable转换为具备背压控制(类似Throttle操作符)特性的Observable。目标Observable类型为IObservable<Unit>,仅关注事件发生的信号。

示例场景(延迟时长1秒)

  • 原始Observable事件序列:A---B-C-----D-E--------F--
  • 期望生成的目标Observable序列:1---2---3---4---5------6--

规则说明

  • 事件C对应目标序列的3:因与B触发的2间隔过近,不会立即发射,需等待前一个发射项的延迟周期结束后发射
  • 事件D对应目标序列的4:因与前一个发射项3的间隔超过延迟时长,立即发射
  • 事件F对应目标序列的6:因前一个发射项5的背压延迟已结束,立即发射

验证预期行为的单元测试示例

[Fact]
public void Custom_operator_has_expected_behavior_for_handling_back_pressure()
{
    // Arrange
    TimeSpan configuredDelay = TimeSpan.FromSeconds(1);
    TestScheduler testScheduler = new TestScheduler();
    IObservable<char> initial = Observable.Create<char>(
        async o =>
        {
            // First event at time 1 tick
            o.OnNext('A');

            // waiting 700ms before sending second event at time 700ms + 1 tick
            await testScheduler.Sleep(TimeSpan.FromSeconds(0.7));

            o.OnNext('B');
            
            // waiting 1s before sending third event at time 1s 700ms + 1 tick
            await testScheduler.Sleep(TimeSpan.FromSeconds(1));

            o.OnNext('C');
            
            // waiting 1s 800ms before sending third event at time 3s 500ms + 1 tick
            await testScheduler.Sleep(TimeSpan.FromSeconds(1.8));

            o.OnNext('D');

            return Disposable.Empty;
        });

    IObservable<Unit> result = initial.Throttle(configuredDelay, testScheduler).Select(_ => Unit.Default);
    
    // Act
    var testObserver = testScheduler.Start(() => result, 0, 0, TimeSpan.FromSeconds(10).Ticks);

    // Assert
    testObserver.Messages.Count.Should().Be(3);
    testObserver.Messages.Should().SatisfyRespectively(
        m =>
        {
            // first item on result is expected immediately
            m.Time.Should().Be(1L);
        },
        m =>
        {
            // second item on result is at 1s (generated by 'B' but emitted at the end of 1s delay from previous emitted item)
            m.Time.Should().Be(10000001L);
        },
        m =>
        {
            // third item on result is at 2s (generated by 'c' but emitted at the end of 1s delay from previous emitted item)
            m.Time.Should().Be(20000001L);
        },
        m =>
        {
            // fourth item on result is at 3.5s generated by 'd' and emitted directly since previous item was long before the 1s delay period
            m.Time.Should().Be(35000001L);
        });
}

已尝试的方案

已使用ThrottleBufferWindow(windowOpening, windowClosing)等操作符,但无法正确生成目标序列中的3,也无法处理F对应的6的触发逻辑。目前找到最接近的思路是关于RxJS throttleTime在Rx.NET中等效实现的讨论。

内容的提问来源于stack exchange,提问作者Remiab

火山引擎 最新活动