You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

RxJava中如何避免timeout(...)取消流发射?超时后需继续处理

解决RxJava中超时不终止整个流的问题

我明白你的需求:你希望每个任务独立判断是否超时,超时只记录当前任务的超时信息,同时继续处理后续任务,而不是让整个流因为某个任务超时就直接终止。

问题根源

你原来的代码里,timeout(1, TimeUnit.SECONDS, ...)是直接作用在整个上游Observable流上的。当第二个任务task(2, "B")阻塞2秒时,整个流在1秒内没有发射新的事件,触发了流级别的超时,直接终止了整个流,所以后续的C和D任务根本没机会被处理。

解决方案

要实现单个任务独立超时,我们需要把每个任务包装成独立的Observable,然后对每个独立的Observable应用timeout操作符,这样单个任务的超时只会影响自身,不会中断整个流的执行。

修改后的代码如下:

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class TaskTimeoutExample {
    public static void main(String[] args) {
        // 先定义每个任务的参数,避免在create里同步阻塞执行任务
        Observable<Task> taskSource = Observable.just(
                new Task(0, "A"),
                new Task(2, "B"), // 这个任务会超时
                new Task(0, "C"),
                new Task(0, "D")
        );

        taskSource
                // 把每个任务转换成独立的Observable,单独处理超时
                .flatMap(task -> Observable.fromCallable(() -> executeTask(task.delay, task.name))
                        .subscribeOn(Schedulers.computation()) // 每个任务在计算线程执行
                        // 对单个任务应用超时,超时后返回指定的提示信息
                        .timeout(1, TimeUnit.SECONDS, Observable.just("timeout"))
                )
                .blockingSubscribe(result -> System.out.println("RECEIVED: " + result));
    }

    private static String executeTask(int delaySeconds, String taskName) {
        try {
            TimeUnit.SECONDS.sleep(delaySeconds);
        } catch (InterruptedException e) {
            // 恢复线程中断状态,避免后续逻辑忽略中断
            Thread.currentThread().interrupt();
        }
        return taskName;
    }

    // 用一个简单的类封装任务参数,代码更清晰
    private static class Task {
        int delay;
        String name;

        Task(int delay, String name) {
            this.delay = delay;
            this.name = name;
        }
    }
}

代码解释

  1. 任务独立封装:用Task类封装每个任务的延迟时间和名称,避免在原始Observable的create方法里同步执行任务导致阻塞。
  2. flatMap拆分流:通过flatMap把每个任务转换成独立的Observable,这样每个任务的执行和超时判断都是独立的。
  3. 单个任务超时处理:对每个独立的任务Observable应用timeout,超时后返回"timeout",但不会影响其他任务的Observable执行。
  4. 线程处理:用subscribeOn(Schedulers.computation())让每个任务在计算线程池中执行,避免阻塞主线程或其他任务的发射。

运行结果

执行这段代码后,你会得到期望的输出:

RECEIVED: A
RECEIVED: timeout
RECEIVED: C
RECEIVED: D

内容的提问来源于stack exchange,提问作者chhil

火山引擎 最新活动