RxJava 3.x 中如何公平处理多个订阅以实现事件严格逐个执行?
你遇到的问题其实是RxJava的observeOn操作符默认的预取机制和调度器任务提交逻辑共同导致的,咱们一步步拆解原因和解决方案:
为什么当前代码不符合预期?
1. observeOn的批量处理问题
你用Schedulers.from(executor, false, true)创建了公平调度器,但默认的observeOn(scheduler)会预取128个事件(这是RxJava的默认背压策略)。每个订阅者(比如subject1的订阅)会一次性把多个事件的处理逻辑打包成连续任务提交给调度器,导致调度器先执行完subject1的所有事件,再处理subject2的,完全没体现出调度器的公平性。
2. subscribeOn的误用
subscribeOn的核心作用是指定Observable的订阅和上游事件发射的线程,对于PublishSubject这种热Observable来说,它不会改变onNext的调用线程,反而会让你的回调逻辑和调度器线程绑定。你看到的交替输出其实是因为每次onNext都把单个处理任务提交给了公平调度器,但代价是你的测试日志Test activity也跑到了调度器线程,这显然不符合“后台线程处理事件、主线程做其他操作”的初衷。
解决方案:让每个事件都成为独立的调度任务
要实现严格按事件提交顺序逐个执行,关键是让每个事件的处理都作为独立任务提交给公平调度器,这里有两种可靠的方式:
方案1:修改observeOn的预取数量为1
observeOn提供了重载方法,可以手动指定预取事件的数量。把预取数设为1后,每个订阅者每次只会从上游获取一个事件,处理完成后再请求下一个,这样每个事件都会作为独立任务提交给调度器,公平性就能生效了:
ExecutorService executor = Executors.newSingleThreadExecutor(); // 创建公平的单线程调度器 Scheduler scheduler = Schedulers.from(executor, false, true); PublishSubject<String> subject1 = PublishSubject.create(); PublishSubject<String> subject2 = PublishSubject.create(); // 关键:设置observeOn的prefetch参数为1 subject1.observeOn(scheduler, 1).subscribe(log::info); subject2.observeOn(scheduler, 1).subscribe(log::info); subject1.onNext("Hello11"); subject2.onNext("Hello21"); subject1.onNext("Hello12"); subject2.onNext("Hello22"); subject1.onNext("Hello13"); subject2.onNext("Hello23"); log.info("Test activity");
执行后输出会符合你的预期:
22:xx:xx [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Test activity 22:xx:xx [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello11 22:xx:xx [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello21 22:xx:xx [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello12 22:xx:xx [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello22 22:xx:xx [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello13 22:xx:xx [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello23
方案2:用flatMap包装单个事件的调度任务
另一种思路是把每个事件单独包装成一个在调度器上执行的Observable,确保每个事件的处理都是独立的调度任务:
ExecutorService executor = Executors.newSingleThreadExecutor(); Scheduler scheduler = Schedulers.from(executor, false, true); PublishSubject<String> subject1 = PublishSubject.create(); PublishSubject<String> subject2 = PublishSubject.create(); // 用flatMap把每个事件转换成独立的调度任务 subject1.flatMap(item -> Observable.just(item).subscribeOn(scheduler)) .subscribe(log::info); subject2.flatMap(item -> Observable.just(item).subscribeOn(scheduler)) .subscribe(log::info); // 后续事件发射和日志代码同上
这种方式同样能让调度器按事件提交顺序逐个处理,达到你想要的严格逐个执行的效果。
总结
你创建的公平调度器本身是没问题的,问题出在observeOn的默认预取机制让任务批量提交了。只要让每个事件的处理都成为独立的调度任务,就能实现你想要的严格逐个执行的效果。
备注:内容来源于stack exchange,提问作者Lunigorn




