You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spring REST请求中基于长轮询的线程持有方案技术问询

实现Spring异步长轮询:线程间通知的靠谱方案

我来给你梳理下在Spring里实现这种长轮询+线程间通知的方案,亲测好用的那种!核心就是让服务端的请求线程挂起等待,直到新数据产生时被更新线程唤醒,或者超时自动返回。

核心思路

长轮询的关键逻辑是:

  1. 客户端发起请求后,服务端用异步机制让请求线程挂起,不立刻返回响应;
  2. 当有新数据写入数据库时,更新线程找到对应挂起的请求线程,通知它返回新数据;
  3. 如果超时还没新数据,服务端主动返回空响应,客户端再立即发起下一轮请求。

Spring提供的DeferredResult是专门为这类异步请求设计的,搭配线程安全的缓存就能完美实现需求。

具体实现步骤&代码示例

1. 长轮询请求处理Controller

这里用DeferredResult托管请求结果,同时把未完成的DeferredResult存入缓存,等待更新线程通知:

@RestController
@RequestMapping("/poll")
public class PollingController {

    // 用ConcurrentHashMap缓存DeferredResult,key用用户ID区分不同客户端
    private final ConcurrentHashMap<String, DeferredResult<ResponseEntity<Object>>> deferredResultCache = new ConcurrentHashMap<>();

    @Autowired
    private CacheService cacheService;

    @GetMapping("/data/{userId}")
    public DeferredResult<ResponseEntity<Object>> startLongPolling(@PathVariable String userId) {
        // 设置30秒超时,超时后自动返回无内容响应
        DeferredResult<ResponseEntity<Object>> deferredResult = new DeferredResult<>(30000L);

        // 超时回调:清理缓存,返回超时响应
        deferredResult.onTimeout(() -> {
            deferredResult.setResult(ResponseEntity.status(HttpStatus.NO_CONTENT).build());
            deferredResultCache.remove(userId);
        });

        // 请求完成(正常返回/超时)后清理缓存,避免内存泄漏
        deferredResult.onCompletion(() -> deferredResultCache.remove(userId));

        // 先检查缓存里有没有未返回的新数据,有就直接返回
        Object newData = cacheService.getUnsentData(userId);
        if (newData != null) {
            deferredResult.setResult(ResponseEntity.ok(newData));
            cacheService.clearUnsentData(userId);
        } else {
            // 没有数据,把DeferredResult存入缓存,等待更新通知
            deferredResultCache.put(userId, deferredResult);
        }

        return deferredResult;
    }
}

2. 数据更新时的通知逻辑

当新数据写入数据库后,更新线程要找到对应客户端的DeferredResult,设置结果触发响应:

@Service
public class DataUpdateService {

    @Autowired
    private PollingController pollingController;

    @Autowired
    private CacheService cacheService;

    public void processNewUserData(String userId, Object newData) {
        // 1. 执行数据库写入操作(这里省略具体DB代码)
        // 2. 把新数据存入临时缓存,防止客户端下次轮询错过
        cacheService.setUnsentData(userId, newData);

        // 3. 查找对应客户端的DeferredResult,存在就触发响应
        DeferredResult<ResponseEntity<Object>> deferredResult = pollingController.deferredResultCache.get(userId);
        if (deferredResult != null && !deferredResult.isSetOrExpired()) {
            deferredResult.setResult(ResponseEntity.ok(newData));
        }
    }
}

3. 临时缓存实现(模拟)

用线程安全的Map存储未返回的新数据:

@Service
public class CacheService {

    private final ConcurrentHashMap<String, Object> unsentDataCache = new ConcurrentHashMap<>();

    public void setUnsentData(String userId, Object data) {
        unsentDataCache.put(userId, data);
    }

    public Object getUnsentData(String userId) {
        return unsentDataCache.get(userId);
    }

    public void clearUnsentData(String userId) {
        unsentDataCache.remove(userId);
    }
}

关键注意事项

  • 线程安全:所有缓存集合必须用线程安全的实现(比如ConcurrentHashMap),避免多线程操作时出现并发问题;
  • 内存泄漏:一定要通过onTimeoutonCompletion回调清理缓存的DeferredResult,不然长时间积累会导致内存溢出;
  • 分布式适配:如果是集群部署,本地缓存的DeferredResult无法跨节点访问,这时候可以用Redis存储客户端标识,配合消息队列(如RabbitMQ)广播更新通知;
  • 超时设置:超时时间要根据业务调整,太短会导致客户端频繁请求,太长会影响用户体验。

内容的提问来源于stack exchange,提问作者Denis Stephanov

火山引擎 最新活动