You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

基于Java Stream单遍扫描大CSV文件获取多统计指标的技术问询

问题

我手头有一份超大的CSV文件,示例内容如下:

id, type, profit, purchaseDate, soldDate
order1, fruit, 115.50, 1/1/2020, 20/1/2020
order2, veg, 114.25, 7/1/2020, 7/2/2020
order3, flowers, 113.30, 5/1/2020, 15/1/2020
order4, fruit, 111.20, 1/1/2019, 30/1/2019
order5, veg, 112.40, 17/1/2019,10/2/2019

需要读取该文件并生成三类统计结果:

  • 按品类统计利润(目前我用的是平均利润,也可以调整为总和)
  • 订单量最多的年份
  • 采购与销售日期的平均间隔

目前我已经用Commons CSV解析器实现了单独获取单类统计的功能,代码如下:

Reader in = new FileReader("filePath");
Iterable<CSVRecord> records = CSVFormat.DEFAULT
    .withFirstRecordAsHeader()
    .withIgnoreEmptyLines(true)
    .withDelimiter(',')
    .withTrim()
    .parse(in);
StreamSupport
    .stream(records.spliterator(), false)
    .collect(groupingBy(r -> r.get("type"),averagingDouble(r -> Double.parseDouble(r.get("profit")))));

现在想请教:能不能通过Java Stream API单遍扫描文件,同时获取这三类统计指标,而且不会产生内存过载?


解决方案

当然可以!这正是Java Stream API结合自定义统计容器的典型应用场景——单遍扫描就能完成多维度统计,而且完全不用把整个文件加载到内存里,完美适配超大CSV的场景。

核心思路是:自定义一个统计容器类,用来存储所有三类指标的中间计算数据,然后通过Stream的collect方法(三参数版本),在单遍遍历中完成所有数据的累加,最后从容器中提取最终的统计结果。

步骤1:定义统计容器类

这个类要包含三个统计维度的中间数据,以及累加、合并、结果提取的方法:

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.DoubleSummaryStatistics;
import org.apache.commons.csv.CSVRecord;

class OrderStats {
    // 1. 按品类的利润统计(支持获取平均、总和、数量等)
    private final Map<String, DoubleSummaryStatistics> categoryProfitStats = new HashMap<>();
    // 2. 各年份的订单数量
    private final Map<Integer, Integer> yearOrderCount = new HashMap<>();
    // 3. 日期间隔的总天数和有效订单数(用于计算平均)
    private long totalDaysBetween = 0;
    private int validDateCount = 0;

    // 适配CSV的日期格式:dd/MM/yyyy
    private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("d/M/yyyy");

    // 累加一条CSV记录的统计数据
    public void accumulate(CSVRecord record) {
        // 处理品类利润统计
        String type = record.get("type");
        double profit = Double.parseDouble(record.get("profit"));
        categoryProfitStats.computeIfAbsent(type, k -> new DoubleSummaryStatistics())
                .accept(profit);

        // 处理年份订单统计
        LocalDate purchaseDate = LocalDate.parse(record.get("purchaseDate"), DATE_FORMATTER);
        int year = purchaseDate.getYear();
        yearOrderCount.put(year, yearOrderCount.getOrDefault(year, 0) + 1);

        // 处理采购与销售日期间隔统计
        try {
            LocalDate soldDate = LocalDate.parse(record.get("soldDate"), DATE_FORMATTER);
            long daysBetween = java.time.temporal.ChronoUnit.DAYS.between(purchaseDate, soldDate);
            if (daysBetween >= 0) { // 过滤掉销售日期早于采购的异常数据
                totalDaysBetween += daysBetween;
                validDateCount++;
            }
        } catch (Exception e) {
            // 可以添加日志记录,跳过无效日期的记录
            System.err.println("跳过无效日期的记录: " + record);
        }
    }

    // 合并另一个OrderStats的数据(支持并行流)
    public void combine(OrderStats other) {
        // 合并品类利润统计
        other.categoryProfitStats.forEach((type, stats) -> {
            categoryProfitStats.computeIfAbsent(type, k -> new DoubleSummaryStatistics())
                    .combine(stats);
        });

        // 合并且年份订单统计
        other.yearOrderCount.forEach((year, count) -> {
            yearOrderCount.put(year, yearOrderCount.getOrDefault(year, 0) + count);
        });

        // 合并日期间隔统计
        totalDaysBetween += other.totalDaysBetween;
        validDateCount += other.validDateCount;
    }

    // 提取最终的统计结果
    public StatsResult getFinalResult() {
        // 1. 按品类的平均利润(如果需要总和,用stats.getSum()即可)
        Map<String, Double> categoryAvgProfit = new HashMap<>();
        categoryProfitStats.forEach((type, stats) -> {
            categoryAvgProfit.put(type, stats.getAverage());
        });

        // 2. 找出订单量最多的年份
        Integer topYear = yearOrderCount.entrySet().stream()
                .max(Map.Entry.comparingByValue())
                .map(Map.Entry::getKey)
                .orElse(null);

        // 3. 计算平均日期间隔
        double avgDaysBetween = validDateCount > 0 ? (double) totalDaysBetween / validDateCount : 0.0;

        return new StatsResult(categoryAvgProfit, topYear, avgDaysBetween);
    }

    // 封装最终结果的内部类
    public static class StatsResult {
        private final Map<String, Double> categoryAvgProfit;
        private final Integer topOrderYear;
        private final double avgDaysBetween;

        public StatsResult(Map<String, Double> categoryAvgProfit, Integer topOrderYear, double avgDaysBetween) {
            this.categoryAvgProfit = categoryAvgProfit;
            this.topOrderYear = topOrderYear;
            this.avgDaysBetween = avgDaysBetween;
        }

        // Getter方法,方便获取结果
        public Map<String, Double> getCategoryAvgProfit() { return categoryAvgProfit; }
        public Integer getTopOrderYear() { return topOrderYear; }
        public double getAvgDaysBetween() { return avgDaysBetween; }
    }
}

步骤2:单遍扫描获取统计结果

结合Commons CSV和Stream API,只需要一次遍历就能拿到所有结果:

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;

import java.io.FileReader;
import java.io.Reader;
import java.util.stream.StreamSupport;

public class CsvMultiStatsProcessor {
    public static void main(String[] args) throws Exception {
        String filePath = "your-large-file.csv";
        // 用try-with-resources自动关闭Reader
        try (Reader in = new FileReader(filePath)) {
            Iterable<CSVRecord> records = CSVFormat.DEFAULT
                    .withFirstRecordAsHeader()
                    .withIgnoreEmptyLines(true)
                    .withDelimiter(',')
                    .withTrim()
                    .parse(in);

            // 单遍扫描完成所有统计
            OrderStats.StatsResult finalStats = StreamSupport.stream(records.spliterator(), false)
                    .collect(OrderStats::new, OrderStats::accumulate, OrderStats::combine)
                    .getFinalResult();

            // 输出统计结果
            System.out.println("按品类平均利润: " + finalStats.getCategoryAvgProfit());
            System.out.println("订单量最多的年份: " + finalStats.getTopOrderYear());
            System.out.println("采购与销售平均间隔天数: " + finalStats.getAvgDaysBetween());
        }
    }
}

关键优势说明

  1. 内存友好: Stream是逐个处理CSV记录的,不会将整个文件加载到内存,处理完一条就释放该记录的内存,完全适配超大文件场景。
  2. 单遍高效: 整个过程只遍历一次CSV数据,所有统计逻辑在accumulate方法中完成,避免了多次IO读取带来的性能损耗。
  3. 灵活扩展: 用DoubleSummaryStatistics可以轻松获取利润的总和、最大值、最小值等额外统计数据;日期处理加入了异常捕获,保证程序稳定性。
  4. 并行支持: 虽然文件IO场景下串行流通常更高效(IO是瓶颈),但代码中的combine方法已经支持并行流的合并逻辑,如需并行处理只需将StreamSupport.stream的第二个参数改为true即可。

内容的提问来源于stack exchange,提问作者pradeepk6

火山引擎 最新活动