如何提升Mahout数据处理速度?Java推荐系统性能优化求助
嘿,针对你遇到的recommend()方法耗时1.7秒的问题,结合你的数据规模(822用户、67.7万条评分),我从计算加速、数据存储优化两个方面给你一些具体的改进方案,还会附上代码调整示例:
一、加速推荐计算的核心优化
1. 缓存用户相似度结果,避免重复计算
你当前的代码每次调用recommend()时,都会重新计算用户间的相似度——而遍历所有用户时,大量用户对的相似度会被重复计算。用CachedUserSimilarity包装你的相似度计算类,把结果缓存到内存里,能大幅减少重复计算的开销:
UserSimilarity similarity = new PearsonCorrelationSimilarity(model); // 包装成缓存版相似度计算器,默认缓存所有结果 UserSimilarity cachedSimilarity = new CachedUserSimilarity(similarity, model);
2. 换用固定数量的邻居选择策略
ThresholdUserNeighborhood会遍历所有用户筛选出相似度≥0.1的邻居,当用户数较多时(哪怕只有822),这个遍历成本也很高。换成NearestNUserNeighborhood,直接取最相似的N个用户(比如50或100个),计算量会小很多:
// 取最相似的50个用户作为邻居,替换原有的Threshold策略 UserNeighborhood neighborhood = new NearestNUserNeighborhood(50, cachedSimilarity, model);
3. 把数据模型加载到内存缓存
FileDataModel每次读取数据都要从磁盘IO,换成CachedDataModel把所有评分数据加载到内存,能显著降低数据访问延迟:
DataModel model = new FileDataModel(new File("data/data.csv")); // 包装成内存缓存版数据模型 DataModel cachedModel = new CachedDataModel(model);
4. 尝试基于物品的推荐算法(Item-Based)
你的用户数量少但评分数据多,基于物品的推荐在预计算物品相似度后,单用户推荐速度通常比基于用户的更快。可以试试替换成GenericItemBasedRecommender:
ItemSimilarity itemSimilarity = new PearsonCorrelationSimilarity(cachedModel); ItemBasedRecommender recommender = new GenericItemBasedRecommender(cachedModel, itemSimilarity);
5. 多线程并行计算推荐结果
每个用户的推荐计算是独立的,利用多线程并行处理能充分利用CPU多核,减少总耗时。比如用ExecutorService实现:
// 创建线程池,根据CPU核心数设置线程数 ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); LongPrimitiveIterator ids = cachedModel.getUserIDs(); List<Future<?>> futures = new ArrayList<>(); while(ids.hasNext()) { Long id = ids.next(); futures.add(executor.submit(() -> { try { long tempoInicio = System.currentTimeMillis(); List<RecommendedItem> recommendations = recommender.recommend(id, 20); System.out.println("User " + id + " Time: "+(System.currentTimeMillis()-tempoInicio)); // 构建推荐结果文档 List<Document> recItems = new ArrayList<>(); for (RecommendedItem recommendation : recommendations) { recItems.add(new Document("itemId", recommendation.getItemID()) .append("score", recommendation.getValue())); } Document userRecDoc = new Document("uid", id.intValue()) .append("recs", recItems); // 暂存结果,后续批量插入Mongo // 这里可以用线程安全的集合收集结果,比如ConcurrentLinkedQueue } catch (Exception e) { e.printStackTrace(); } })); } // 等待所有任务完成 for(Future<?> future : futures) { future.get(); } executor.shutdown();
二、数据存储的高效优化
1. 批量插入MongoDB,减少IO次数
你当前每次循环都插入一条文档,频繁的数据库交互会拖慢整体速度。建议先收集一批用户的推荐结果(比如每100条),再一次性批量插入:
// 用线程安全的集合收集结果 ConcurrentLinkedQueue<Document> recDocsQueue = new ConcurrentLinkedQueue<>(); // 在多线程任务中把结果加入队列 recDocsQueue.add(userRecDoc); // 批量插入逻辑(比如每100条插入一次) int batchSize = 100; List<Document> batch = new ArrayList<>(batchSize); while(!recDocsQueue.isEmpty()) { batch.add(recDocsQueue.poll()); if(batch.size() >= batchSize) { m.getCollection("recommendations").insertMany(batch); batch.clear(); } } // 插入剩余的结果 if(!batch.isEmpty()) { m.getCollection("recommendations").insertMany(batch); }
2. 优化MongoDB文档结构
你当前把item ID作为文档的key,这种结构会导致文档字段数量过多,不利于查询和存储。建议改用数组存储推荐项:
// 优化后的文档结构 { "uid": 123, "recs": [ {"itemId": 456, "score": 4.2}, {"itemId": 789, "score": 3.8} ] }
这样的结构更规范,也方便后续对推荐结果进行统计或查询。
3. 复用MongoDB连接
确保你的MongoDB连接是复用的,不要每次插入都创建新连接。可以用Mongo的连接池配置,保持长连接:
// 初始化Mongo连接(只做一次) MongoClient mongoClient = MongoClients.create(MongoClientSettings.builder() .applyConnectionString(new ConnectionString("mongodb://localhost:27017")) .applyToConnectionPoolSettings(builder -> builder.maxSize(20) // 设置连接池大小 ) .build()); MongoDatabase database = mongoClient.getDatabase("your_db"); MongoCollection<Document> recCollection = database.getCollection("recommendations");
三、代码调整后的完整示例片段
public class App { public static void main(String[] args) throws Exception { // 1. 初始化缓存版数据模型 DataModel model = new FileDataModel(new File("data/data.csv")); DataModel cachedModel = new CachedDataModel(model); // 2. 初始化缓存版相似度计算+固定数量邻居 UserSimilarity similarity = new PearsonCorrelationSimilarity(cachedModel); UserSimilarity cachedSimilarity = new CachedUserSimilarity(similarity, cachedModel); UserNeighborhood neighborhood = new NearestNUserNeighborhood(50, cachedSimilarity, cachedModel); // 3. 初始化推荐器 UserBasedRecommender recommender = new GenericUserBasedRecommender(cachedModel, neighborhood, cachedSimilarity); // 4. 初始化MongoDB连接池 MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017"); MongoCollection<Document> recCollection = mongoClient.getDatabase("recommender_db").getCollection("user_recommendations"); // 5. 多线程并行处理推荐 ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); LongPrimitiveIterator ids = cachedModel.getUserIDs(); List<Future<?>> futures = new ArrayList<>(); ConcurrentLinkedQueue<Document> recDocsQueue = new ConcurrentLinkedQueue<>(); while(ids.hasNext()) { Long id = ids.next(); futures.add(executor.submit(() -> { try { long tempoInicio = System.currentTimeMillis(); List<RecommendedItem> recommendations = recommender.recommend(id, 20); System.out.println("User " + id + " Time: "+(System.currentTimeMillis()-tempoInicio)); // 构建优化后的推荐结果文档 List<Document> recItems = new ArrayList<>(); for (RecommendedItem recommendation : recommendations) { recItems.add(new Document("itemId", recommendation.getItemID()) .append("score", recommendation.getValue())); } recDocsQueue.add(new Document("uid", id.intValue()).append("recs", recItems)); } catch (Exception e) { e.printStackTrace(); } })); } // 等待所有任务完成 for(Future<?> future : futures) { future.get(); } executor.shutdown(); // 6. 批量插入MongoDB int batchSize = 100; List<Document> batch = new ArrayList<>(batchSize); while(!recDocsQueue.isEmpty()) { batch.add(recDocsQueue.poll()); if(batch.size() >= batchSize) { recCollection.insertMany(batch); batch.clear(); } } if(!batch.isEmpty()) { recCollection.insertMany(batch); } mongoClient.close(); } }
按照这些方案调整后,单用户的推荐耗时应该能降到几百毫秒甚至更低,整体处理所有用户的速度也会大幅提升。
内容的提问来源于stack exchange,提问作者Vinicius Morais




