You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何提升Mahout数据处理速度?Java推荐系统性能优化求助

优化Mahout推荐系统的计算速度与数据存储方案

嘿,针对你遇到的recommend()方法耗时1.7秒的问题,结合你的数据规模(822用户、67.7万条评分),我从计算加速、数据存储优化两个方面给你一些具体的改进方案,还会附上代码调整示例:

一、加速推荐计算的核心优化

1. 缓存用户相似度结果,避免重复计算

你当前的代码每次调用recommend()时,都会重新计算用户间的相似度——而遍历所有用户时,大量用户对的相似度会被重复计算。用CachedUserSimilarity包装你的相似度计算类,把结果缓存到内存里,能大幅减少重复计算的开销:

UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
// 包装成缓存版相似度计算器,默认缓存所有结果
UserSimilarity cachedSimilarity = new CachedUserSimilarity(similarity, model);

2. 换用固定数量的邻居选择策略

ThresholdUserNeighborhood会遍历所有用户筛选出相似度≥0.1的邻居,当用户数较多时(哪怕只有822),这个遍历成本也很高。换成NearestNUserNeighborhood,直接取最相似的N个用户(比如50或100个),计算量会小很多:

// 取最相似的50个用户作为邻居,替换原有的Threshold策略
UserNeighborhood neighborhood = new NearestNUserNeighborhood(50, cachedSimilarity, model);

3. 把数据模型加载到内存缓存

FileDataModel每次读取数据都要从磁盘IO,换成CachedDataModel把所有评分数据加载到内存,能显著降低数据访问延迟:

DataModel model = new FileDataModel(new File("data/data.csv"));
// 包装成内存缓存版数据模型
DataModel cachedModel = new CachedDataModel(model);

4. 尝试基于物品的推荐算法(Item-Based)

你的用户数量少但评分数据多,基于物品的推荐在预计算物品相似度后,单用户推荐速度通常比基于用户的更快。可以试试替换成GenericItemBasedRecommender

ItemSimilarity itemSimilarity = new PearsonCorrelationSimilarity(cachedModel);
ItemBasedRecommender recommender = new GenericItemBasedRecommender(cachedModel, itemSimilarity);

5. 多线程并行计算推荐结果

每个用户的推荐计算是独立的,利用多线程并行处理能充分利用CPU多核,减少总耗时。比如用ExecutorService实现:

// 创建线程池,根据CPU核心数设置线程数
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
LongPrimitiveIterator ids = cachedModel.getUserIDs();
List<Future<?>> futures = new ArrayList<>();

while(ids.hasNext()) {
    Long id = ids.next();
    futures.add(executor.submit(() -> {
        try {
            long tempoInicio = System.currentTimeMillis();
            List<RecommendedItem> recommendations = recommender.recommend(id, 20);
            System.out.println("User " + id + " Time: "+(System.currentTimeMillis()-tempoInicio));
            
            // 构建推荐结果文档
            List<Document> recItems = new ArrayList<>();
            for (RecommendedItem recommendation : recommendations) {
                recItems.add(new Document("itemId", recommendation.getItemID())
                                .append("score", recommendation.getValue()));
            }
            Document userRecDoc = new Document("uid", id.intValue())
                                    .append("recs", recItems);
            
            // 暂存结果,后续批量插入Mongo
            // 这里可以用线程安全的集合收集结果,比如ConcurrentLinkedQueue
        } catch (Exception e) {
            e.printStackTrace();
        }
    }));
}

// 等待所有任务完成
for(Future<?> future : futures) {
    future.get();
}
executor.shutdown();

二、数据存储的高效优化

1. 批量插入MongoDB,减少IO次数

你当前每次循环都插入一条文档,频繁的数据库交互会拖慢整体速度。建议先收集一批用户的推荐结果(比如每100条),再一次性批量插入:

// 用线程安全的集合收集结果
ConcurrentLinkedQueue<Document> recDocsQueue = new ConcurrentLinkedQueue<>();

// 在多线程任务中把结果加入队列
recDocsQueue.add(userRecDoc);

// 批量插入逻辑(比如每100条插入一次)
int batchSize = 100;
List<Document> batch = new ArrayList<>(batchSize);
while(!recDocsQueue.isEmpty()) {
    batch.add(recDocsQueue.poll());
    if(batch.size() >= batchSize) {
        m.getCollection("recommendations").insertMany(batch);
        batch.clear();
    }
}
// 插入剩余的结果
if(!batch.isEmpty()) {
    m.getCollection("recommendations").insertMany(batch);
}

2. 优化MongoDB文档结构

你当前把item ID作为文档的key,这种结构会导致文档字段数量过多,不利于查询和存储。建议改用数组存储推荐项:

// 优化后的文档结构
{
  "uid": 123,
  "recs": [
    {"itemId": 456, "score": 4.2},
    {"itemId": 789, "score": 3.8}
  ]
}

这样的结构更规范,也方便后续对推荐结果进行统计或查询。

3. 复用MongoDB连接

确保你的MongoDB连接是复用的,不要每次插入都创建新连接。可以用Mongo的连接池配置,保持长连接:

// 初始化Mongo连接(只做一次)
MongoClient mongoClient = MongoClients.create(MongoClientSettings.builder()
        .applyConnectionString(new ConnectionString("mongodb://localhost:27017"))
        .applyToConnectionPoolSettings(builder -> 
            builder.maxSize(20) // 设置连接池大小
        )
        .build());
MongoDatabase database = mongoClient.getDatabase("your_db");
MongoCollection<Document> recCollection = database.getCollection("recommendations");

三、代码调整后的完整示例片段

public class App {
    public static void main(String[] args) throws Exception {
        // 1. 初始化缓存版数据模型
        DataModel model = new FileDataModel(new File("data/data.csv"));
        DataModel cachedModel = new CachedDataModel(model);
        
        // 2. 初始化缓存版相似度计算+固定数量邻居
        UserSimilarity similarity = new PearsonCorrelationSimilarity(cachedModel);
        UserSimilarity cachedSimilarity = new CachedUserSimilarity(similarity, cachedModel);
        UserNeighborhood neighborhood = new NearestNUserNeighborhood(50, cachedSimilarity, cachedModel);
        
        // 3. 初始化推荐器
        UserBasedRecommender recommender = new GenericUserBasedRecommender(cachedModel, neighborhood, cachedSimilarity);
        
        // 4. 初始化MongoDB连接池
        MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
        MongoCollection<Document> recCollection = mongoClient.getDatabase("recommender_db").getCollection("user_recommendations");
        
        // 5. 多线程并行处理推荐
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        LongPrimitiveIterator ids = cachedModel.getUserIDs();
        List<Future<?>> futures = new ArrayList<>();
        ConcurrentLinkedQueue<Document> recDocsQueue = new ConcurrentLinkedQueue<>();
        
        while(ids.hasNext()) {
            Long id = ids.next();
            futures.add(executor.submit(() -> {
                try {
                    long tempoInicio = System.currentTimeMillis();
                    List<RecommendedItem> recommendations = recommender.recommend(id, 20);
                    System.out.println("User " + id + " Time: "+(System.currentTimeMillis()-tempoInicio));
                    
                    // 构建优化后的推荐结果文档
                    List<Document> recItems = new ArrayList<>();
                    for (RecommendedItem recommendation : recommendations) {
                        recItems.add(new Document("itemId", recommendation.getItemID())
                                        .append("score", recommendation.getValue()));
                    }
                    recDocsQueue.add(new Document("uid", id.intValue()).append("recs", recItems));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }));
        }
        
        // 等待所有任务完成
        for(Future<?> future : futures) {
            future.get();
        }
        executor.shutdown();
        
        // 6. 批量插入MongoDB
        int batchSize = 100;
        List<Document> batch = new ArrayList<>(batchSize);
        while(!recDocsQueue.isEmpty()) {
            batch.add(recDocsQueue.poll());
            if(batch.size() >= batchSize) {
                recCollection.insertMany(batch);
                batch.clear();
            }
        }
        if(!batch.isEmpty()) {
            recCollection.insertMany(batch);
        }
        
        mongoClient.close();
    }
}

按照这些方案调整后,单用户的推荐耗时应该能降到几百毫秒甚至更低,整体处理所有用户的速度也会大幅提升。

内容的提问来源于stack exchange,提问作者Vinicius Morais

火山引擎 最新活动