You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

RxJava 3.x 中如何公平处理多个订阅以实现事件严格逐个执行?

RxJava 3.x 中如何公平处理多个订阅以实现事件严格逐个执行?

你遇到的问题其实是RxJava的observeOn操作符默认的预取机制和调度器任务提交逻辑共同导致的,咱们一步步拆解原因和解决方案:

为什么当前代码不符合预期?

1. observeOn的批量处理问题

你用Schedulers.from(executor, false, true)创建了公平调度器,但默认的observeOn(scheduler)预取128个事件(这是RxJava的默认背压策略)。每个订阅者(比如subject1的订阅)会一次性把多个事件的处理逻辑打包成连续任务提交给调度器,导致调度器先执行完subject1的所有事件,再处理subject2的,完全没体现出调度器的公平性。

2. subscribeOn的误用

subscribeOn的核心作用是指定Observable的订阅和上游事件发射的线程,对于PublishSubject这种热Observable来说,它不会改变onNext的调用线程,反而会让你的回调逻辑和调度器线程绑定。你看到的交替输出其实是因为每次onNext都把单个处理任务提交给了公平调度器,但代价是你的测试日志Test activity也跑到了调度器线程,这显然不符合“后台线程处理事件、主线程做其他操作”的初衷。

解决方案:让每个事件都成为独立的调度任务

要实现严格按事件提交顺序逐个执行,关键是让每个事件的处理都作为独立任务提交给公平调度器,这里有两种可靠的方式:

方案1:修改observeOn的预取数量为1

observeOn提供了重载方法,可以手动指定预取事件的数量。把预取数设为1后,每个订阅者每次只会从上游获取一个事件,处理完成后再请求下一个,这样每个事件都会作为独立任务提交给调度器,公平性就能生效了:

ExecutorService executor = Executors.newSingleThreadExecutor();
// 创建公平的单线程调度器
Scheduler scheduler = Schedulers.from(executor, false, true);
PublishSubject<String> subject1 = PublishSubject.create();
PublishSubject<String> subject2 = PublishSubject.create();

// 关键:设置observeOn的prefetch参数为1
subject1.observeOn(scheduler, 1).subscribe(log::info);
subject2.observeOn(scheduler, 1).subscribe(log::info);

subject1.onNext("Hello11");
subject2.onNext("Hello21");
subject1.onNext("Hello12");
subject2.onNext("Hello22");
subject1.onNext("Hello13");
subject2.onNext("Hello23");

log.info("Test activity");

执行后输出会符合你的预期:

22:xx:xx [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Test activity
22:xx:xx [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello11
22:xx:xx [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello21
22:xx:xx [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello12
22:xx:xx [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello22
22:xx:xx [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello13
22:xx:xx [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello23

方案2:用flatMap包装单个事件的调度任务

另一种思路是把每个事件单独包装成一个在调度器上执行的Observable,确保每个事件的处理都是独立的调度任务:

ExecutorService executor = Executors.newSingleThreadExecutor();
Scheduler scheduler = Schedulers.from(executor, false, true);
PublishSubject<String> subject1 = PublishSubject.create();
PublishSubject<String> subject2 = PublishSubject.create();

// 用flatMap把每个事件转换成独立的调度任务
subject1.flatMap(item -> Observable.just(item).subscribeOn(scheduler))
        .subscribe(log::info);
subject2.flatMap(item -> Observable.just(item).subscribeOn(scheduler))
        .subscribe(log::info);

// 后续事件发射和日志代码同上

这种方式同样能让调度器按事件提交顺序逐个处理,达到你想要的严格逐个执行的效果。

总结

你创建的公平调度器本身是没问题的,问题出在observeOn的默认预取机制让任务批量提交了。只要让每个事件的处理都成为独立的调度任务,就能实现你想要的严格逐个执行的效果。

备注:内容来源于stack exchange,提问作者Lunigorn

火山引擎 最新活动