You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何使用Project Reactor以响应式方式实现数据保存?附示例仓库代码

如何用Project Reactor实现响应式的内存数据保存

好的,我来帮你搞定这个响应式保存的实现。首先,我们得先理清楚几个关键点:内存仓库的并发安全、Vavr不可变集合的特性,以及Project Reactor的响应式语义。

核心问题分析

你的仓库用了Vavr的Map——这是不可变集合,意味着每次修改(比如添加/更新元素)都会生成一个新的Map实例。同时,你用了volatile修饰成员变量,但volatile只能保证可见性,没法保证多线程下更新操作的原子性(比如两个线程同时读取旧Map、生成新Map,最后会有一个更新被覆盖)。所以第一步我们要修正这个并发安全问题。

完整实现方案

首先,把原来的volatile Map换成AtomicReference<Map>,它能帮我们实现原子性的更新操作。然后在save方法里,用Reactor的操作符保持响应式流的连续性。

修改后的仓库代码

import io.vavr.collection.HashMap;
import io.vavr.collection.Map;
import io.vavr.control.Option;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;
import java.util.concurrent.atomic.AtomicReference;

@Repository
class ImageSizeRepositoryInMemoryImpl implements ImageSizeRepository {
    // 用AtomicReference保证不可变Map的原子更新
    private final AtomicReference<Map<String, ImageSize>> imageSizes = 
        new AtomicReference<>(HashMap.empty());

    @Override
    public Mono<ImageSize> save(Mono<ImageSize> imageSizeMono) {
        return imageSizeMono.flatMap(imageSize -> 
            // 把同步的原子更新逻辑包装成Mono,保持响应式语义
            Mono.fromCallable(() -> {
                while (true) {
                    // 获取当前的Map快照
                    Map<String, ImageSize> currentMap = imageSizes.get();
                    // 生成新的Map(Vavr的put返回新实例,原Map不变)
                    Map<String, ImageSize> newMap = currentMap.put(imageSize.getId(), imageSize);
                    // 原子性替换:只有当前Map和我们读取的快照一致时,才替换成新Map
                    if (imageSizes.compareAndSet(currentMap, newMap)) {
                        // 更新成功,返回保存后的对象
                        return imageSize;
                    }
                    // 如果更新失败,说明有其他线程修改了Map,重新循环尝试
                }
            })
        );
    }

    // 示例:响应式实现findById方法
    @Override
    public Mono<ImageSize> findById(String id) {
        return Mono.fromCallable(() -> 
            imageSizes.get().get(id).getOrElse(null)
        ).filter(imageSize -> imageSize != null);
    }
}

关键细节解释

  1. AtomicReference的必要性
    Vavr集合不可变,所以每次更新都是创建新对象。AtomicReferencecompareAndSet方法能确保只有当当前值和我们预期的旧值一致时,才会替换成新值,完美解决并发下的更新冲突问题。

  2. 响应式流的处理

    • flatMap处理传入的Mono<ImageSize>:它会订阅上游的Mono,拿到实际的ImageSize对象后,再执行后续的保存逻辑。
    • Mono.fromCallable包装同步的原子更新:因为原子更新是非常轻量的操作(没有IO阻塞),所以用fromCallable把同步逻辑转换成响应式流,不会阻塞Reactor的事件循环。
  3. 语义一致性
    方法返回的Mono<ImageSize>会在保存操作完成且成功后,发出保存后的对象。如果上游的imageSizeMono是空的(比如调用了Mono.empty()),那么这个方法也会返回空的Mono,符合响应式编程的语义。

额外注意事项

  • 确保ImageSize类有一个唯一标识的getId()方法(或者你用来作为Map键的其他属性),否则无法正确关联存储的对象。
  • 如果需要处理保存失败的场景(比如业务上的校验),可以在flatMap里添加Mono.error()来抛出异常,上游订阅者可以通过onError处理。

内容的提问来源于stack exchange,提问作者K2mil J33

火山引擎 最新活动