如何使用Project Reactor以响应式方式实现数据保存?附示例仓库代码
如何用Project Reactor实现响应式的内存数据保存
好的,我来帮你搞定这个响应式保存的实现。首先,我们得先理清楚几个关键点:内存仓库的并发安全、Vavr不可变集合的特性,以及Project Reactor的响应式语义。
核心问题分析
你的仓库用了Vavr的Map——这是不可变集合,意味着每次修改(比如添加/更新元素)都会生成一个新的Map实例。同时,你用了volatile修饰成员变量,但volatile只能保证可见性,没法保证多线程下更新操作的原子性(比如两个线程同时读取旧Map、生成新Map,最后会有一个更新被覆盖)。所以第一步我们要修正这个并发安全问题。
完整实现方案
首先,把原来的volatile Map换成AtomicReference<Map>,它能帮我们实现原子性的更新操作。然后在save方法里,用Reactor的操作符保持响应式流的连续性。
修改后的仓库代码
import io.vavr.collection.HashMap; import io.vavr.collection.Map; import io.vavr.control.Option; import org.springframework.stereotype.Repository; import reactor.core.publisher.Mono; import java.util.concurrent.atomic.AtomicReference; @Repository class ImageSizeRepositoryInMemoryImpl implements ImageSizeRepository { // 用AtomicReference保证不可变Map的原子更新 private final AtomicReference<Map<String, ImageSize>> imageSizes = new AtomicReference<>(HashMap.empty()); @Override public Mono<ImageSize> save(Mono<ImageSize> imageSizeMono) { return imageSizeMono.flatMap(imageSize -> // 把同步的原子更新逻辑包装成Mono,保持响应式语义 Mono.fromCallable(() -> { while (true) { // 获取当前的Map快照 Map<String, ImageSize> currentMap = imageSizes.get(); // 生成新的Map(Vavr的put返回新实例,原Map不变) Map<String, ImageSize> newMap = currentMap.put(imageSize.getId(), imageSize); // 原子性替换:只有当前Map和我们读取的快照一致时,才替换成新Map if (imageSizes.compareAndSet(currentMap, newMap)) { // 更新成功,返回保存后的对象 return imageSize; } // 如果更新失败,说明有其他线程修改了Map,重新循环尝试 } }) ); } // 示例:响应式实现findById方法 @Override public Mono<ImageSize> findById(String id) { return Mono.fromCallable(() -> imageSizes.get().get(id).getOrElse(null) ).filter(imageSize -> imageSize != null); } }
关键细节解释
AtomicReference的必要性:
Vavr集合不可变,所以每次更新都是创建新对象。AtomicReference的compareAndSet方法能确保只有当当前值和我们预期的旧值一致时,才会替换成新值,完美解决并发下的更新冲突问题。响应式流的处理:
- 用
flatMap处理传入的Mono<ImageSize>:它会订阅上游的Mono,拿到实际的ImageSize对象后,再执行后续的保存逻辑。 - 用
Mono.fromCallable包装同步的原子更新:因为原子更新是非常轻量的操作(没有IO阻塞),所以用fromCallable把同步逻辑转换成响应式流,不会阻塞Reactor的事件循环。
- 用
语义一致性:
方法返回的Mono<ImageSize>会在保存操作完成且成功后,发出保存后的对象。如果上游的imageSizeMono是空的(比如调用了Mono.empty()),那么这个方法也会返回空的Mono,符合响应式编程的语义。
额外注意事项
- 确保
ImageSize类有一个唯一标识的getId()方法(或者你用来作为Map键的其他属性),否则无法正确关联存储的对象。 - 如果需要处理保存失败的场景(比如业务上的校验),可以在
flatMap里添加Mono.error()来抛出异常,上游订阅者可以通过onError处理。
内容的提问来源于stack exchange,提问作者K2mil J33




