基于Java Stream单遍扫描大CSV文件获取多统计指标的技术问询
问题
我手头有一份超大的CSV文件,示例内容如下:
id, type, profit, purchaseDate, soldDate order1, fruit, 115.50, 1/1/2020, 20/1/2020 order2, veg, 114.25, 7/1/2020, 7/2/2020 order3, flowers, 113.30, 5/1/2020, 15/1/2020 order4, fruit, 111.20, 1/1/2019, 30/1/2019 order5, veg, 112.40, 17/1/2019,10/2/2019
需要读取该文件并生成三类统计结果:
- 按品类统计利润(目前我用的是平均利润,也可以调整为总和)
- 订单量最多的年份
- 采购与销售日期的平均间隔
目前我已经用Commons CSV解析器实现了单独获取单类统计的功能,代码如下:
Reader in = new FileReader("filePath"); Iterable<CSVRecord> records = CSVFormat.DEFAULT .withFirstRecordAsHeader() .withIgnoreEmptyLines(true) .withDelimiter(',') .withTrim() .parse(in); StreamSupport .stream(records.spliterator(), false) .collect(groupingBy(r -> r.get("type"),averagingDouble(r -> Double.parseDouble(r.get("profit")))));
现在想请教:能不能通过Java Stream API单遍扫描文件,同时获取这三类统计指标,而且不会产生内存过载?
解决方案
当然可以!这正是Java Stream API结合自定义统计容器的典型应用场景——单遍扫描就能完成多维度统计,而且完全不用把整个文件加载到内存里,完美适配超大CSV的场景。
核心思路是:自定义一个统计容器类,用来存储所有三类指标的中间计算数据,然后通过Stream的collect方法(三参数版本),在单遍遍历中完成所有数据的累加,最后从容器中提取最终的统计结果。
步骤1:定义统计容器类
这个类要包含三个统计维度的中间数据,以及累加、合并、结果提取的方法:
import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.DoubleSummaryStatistics; import org.apache.commons.csv.CSVRecord; class OrderStats { // 1. 按品类的利润统计(支持获取平均、总和、数量等) private final Map<String, DoubleSummaryStatistics> categoryProfitStats = new HashMap<>(); // 2. 各年份的订单数量 private final Map<Integer, Integer> yearOrderCount = new HashMap<>(); // 3. 日期间隔的总天数和有效订单数(用于计算平均) private long totalDaysBetween = 0; private int validDateCount = 0; // 适配CSV的日期格式:dd/MM/yyyy private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("d/M/yyyy"); // 累加一条CSV记录的统计数据 public void accumulate(CSVRecord record) { // 处理品类利润统计 String type = record.get("type"); double profit = Double.parseDouble(record.get("profit")); categoryProfitStats.computeIfAbsent(type, k -> new DoubleSummaryStatistics()) .accept(profit); // 处理年份订单统计 LocalDate purchaseDate = LocalDate.parse(record.get("purchaseDate"), DATE_FORMATTER); int year = purchaseDate.getYear(); yearOrderCount.put(year, yearOrderCount.getOrDefault(year, 0) + 1); // 处理采购与销售日期间隔统计 try { LocalDate soldDate = LocalDate.parse(record.get("soldDate"), DATE_FORMATTER); long daysBetween = java.time.temporal.ChronoUnit.DAYS.between(purchaseDate, soldDate); if (daysBetween >= 0) { // 过滤掉销售日期早于采购的异常数据 totalDaysBetween += daysBetween; validDateCount++; } } catch (Exception e) { // 可以添加日志记录,跳过无效日期的记录 System.err.println("跳过无效日期的记录: " + record); } } // 合并另一个OrderStats的数据(支持并行流) public void combine(OrderStats other) { // 合并品类利润统计 other.categoryProfitStats.forEach((type, stats) -> { categoryProfitStats.computeIfAbsent(type, k -> new DoubleSummaryStatistics()) .combine(stats); }); // 合并且年份订单统计 other.yearOrderCount.forEach((year, count) -> { yearOrderCount.put(year, yearOrderCount.getOrDefault(year, 0) + count); }); // 合并日期间隔统计 totalDaysBetween += other.totalDaysBetween; validDateCount += other.validDateCount; } // 提取最终的统计结果 public StatsResult getFinalResult() { // 1. 按品类的平均利润(如果需要总和,用stats.getSum()即可) Map<String, Double> categoryAvgProfit = new HashMap<>(); categoryProfitStats.forEach((type, stats) -> { categoryAvgProfit.put(type, stats.getAverage()); }); // 2. 找出订单量最多的年份 Integer topYear = yearOrderCount.entrySet().stream() .max(Map.Entry.comparingByValue()) .map(Map.Entry::getKey) .orElse(null); // 3. 计算平均日期间隔 double avgDaysBetween = validDateCount > 0 ? (double) totalDaysBetween / validDateCount : 0.0; return new StatsResult(categoryAvgProfit, topYear, avgDaysBetween); } // 封装最终结果的内部类 public static class StatsResult { private final Map<String, Double> categoryAvgProfit; private final Integer topOrderYear; private final double avgDaysBetween; public StatsResult(Map<String, Double> categoryAvgProfit, Integer topOrderYear, double avgDaysBetween) { this.categoryAvgProfit = categoryAvgProfit; this.topOrderYear = topOrderYear; this.avgDaysBetween = avgDaysBetween; } // Getter方法,方便获取结果 public Map<String, Double> getCategoryAvgProfit() { return categoryAvgProfit; } public Integer getTopOrderYear() { return topOrderYear; } public double getAvgDaysBetween() { return avgDaysBetween; } } }
步骤2:单遍扫描获取统计结果
结合Commons CSV和Stream API,只需要一次遍历就能拿到所有结果:
import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVRecord; import java.io.FileReader; import java.io.Reader; import java.util.stream.StreamSupport; public class CsvMultiStatsProcessor { public static void main(String[] args) throws Exception { String filePath = "your-large-file.csv"; // 用try-with-resources自动关闭Reader try (Reader in = new FileReader(filePath)) { Iterable<CSVRecord> records = CSVFormat.DEFAULT .withFirstRecordAsHeader() .withIgnoreEmptyLines(true) .withDelimiter(',') .withTrim() .parse(in); // 单遍扫描完成所有统计 OrderStats.StatsResult finalStats = StreamSupport.stream(records.spliterator(), false) .collect(OrderStats::new, OrderStats::accumulate, OrderStats::combine) .getFinalResult(); // 输出统计结果 System.out.println("按品类平均利润: " + finalStats.getCategoryAvgProfit()); System.out.println("订单量最多的年份: " + finalStats.getTopOrderYear()); System.out.println("采购与销售平均间隔天数: " + finalStats.getAvgDaysBetween()); } } }
关键优势说明
- 内存友好: Stream是逐个处理CSV记录的,不会将整个文件加载到内存,处理完一条就释放该记录的内存,完全适配超大文件场景。
- 单遍高效: 整个过程只遍历一次CSV数据,所有统计逻辑在
accumulate方法中完成,避免了多次IO读取带来的性能损耗。 - 灵活扩展: 用
DoubleSummaryStatistics可以轻松获取利润的总和、最大值、最小值等额外统计数据;日期处理加入了异常捕获,保证程序稳定性。 - 并行支持: 虽然文件IO场景下串行流通常更高效(IO是瓶颈),但代码中的
combine方法已经支持并行流的合并逻辑,如需并行处理只需将StreamSupport.stream的第二个参数改为true即可。
内容的提问来源于stack exchange,提问作者pradeepk6




