You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何批量查询百万级表记录并逐批执行异步处理?

百万级数据分批异步处理方案

这个场景我太熟悉了——百万级数据批量处理最怕一次性拉全量,内存直接爆掉,用分批查询+异步队列是非常合理的方案!结合你提到的setFetchSize,我给你整理一套可落地的实现思路和代码:

核心思路

通过setFetchSize控制JDBC每次从数据库拉取的行数,避免全量数据加载到内存;同时每攒够一批数据就异步推送到队列,既保证内存稳定,又不耽误处理效率。

具体实现步骤

1. 优化DAO层,用setFetchSize配合滚动结果集

如果你的DAO基于原生JDBC或Spring JdbcTemplate,直接用可滚动只读结果集+setFetchSize是最优解,能让数据库按需返回数据,不会一次性吐全量。修改后的代码示例:

public void processUserPreferencesBatch(int batchSize) {
    String sqlQueryString = "SELECT id, preference_key, preference_value FROM user_preferences";
    
    try (Connection conn = getConnection()) {
        // 创建仅向前滚动、只读的结果集,配合fetchSize控制拉取粒度
        try (Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
            stmt.setFetchSize(batchSize); // 关键:设置每次从数据库拉取的行数
            try (ResultSet rs = stmt.executeQuery(sqlQueryString)) {
                List<UserPreferenceDTO> batch = new ArrayList<>(batchSize);
                while (rs.next()) {
                    // 封装DTO对象
                    UserPreferenceDTO dto = new UserPreferenceDTO();
                    dto.setId(rs.getLong("id"));
                    dto.setPreferenceKey(rs.getString("preference_key"));
                    dto.setPreferenceValue(rs.getString("preference_value"));
                    
                    batch.add(dto);
                    
                    // 批次满了就异步推送,然后清空批次
                    if (batch.size() == batchSize) {
                        pushToAsyncQueue(batch);
                        batch.clear();
                    }
                }
                // 处理最后一批不足batchSize的剩余记录
                if (!batch.isEmpty()) {
                    pushToAsyncQueue(batch);
                }
            }
        }
    } catch (SQLException e) {
        log.error("批量处理用户偏好失败", e);
        throw new RuntimeException("批量处理异常", e);
    }
}

// 异步推送队列的工具方法
private void pushToAsyncQueue(List<UserPreferenceDTO> batch) {
    // 这里可以用自定义线程池、Spring @Async,或者MQ客户端(比如RabbitMQ/Kafka)
    CompletableFuture.runAsync(() -> {
        // 执行你的队列推送逻辑
        queueClient.sendBatch(batch);
    }, customExecutor); // 一定要用自定义线程池,避免默认线程池耗尽
}

2. 必看的关键细节

  • 数据库驱动适配:比如MySQL需要在JDBC URL中加useCursorFetch=true,否则setFetchSize不会生效(MySQL默认会一次性拉全量数据),URL示例:jdbc:mysql://localhost:3306/your_db?useCursorFetch=true
  • 合理设置batchSize:根据你的内存情况调整,一般500-2000条/批比较合适——太大容易内存溢出,太小会增加数据库交互次数拖慢效率
  • 自定义线程池:别用CompletableFuture的默认线程池(它用的是ForkJoinPool,核心线程数等于CPU核心数,高并发下容易阻塞),给你个线程池配置示例:
private ExecutorService customExecutor = new ThreadPoolExecutor(
    4, // 核心线程数
    8, // 最大线程数
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100), // 任务队列容量
    new ThreadFactoryBuilder().setNameFormat("queue-push-%d").build(),
    new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时让调用线程执行,避免丢任务
);
  • 避免重复推送:如果需要保证数据不重复处理,建议给表加个processed字段,每批推送成功后更新状态,下次查询只拉未处理的数据。

3. 替代方案:主键分页查询

如果滚动结果集不好实现,也可以用主键自增分页的方式,避免大offset的性能问题(MySQL中offset越大,查询越慢):

public void processUserPreferencesByPagination(int batchSize) {
    long lastProcessedId = 0;
    while (true) {
        String sql = "SELECT id, preference_key, preference_value FROM user_preferences WHERE id > ? ORDER BY id LIMIT ?";
        List<UserPreferenceDTO> batch = jdbcTemplate.query(sql, 
            new Object[]{lastProcessedId, batchSize},
            (rs, rowNum) -> {
                UserPreferenceDTO dto = new UserPreferenceDTO();
                dto.setId(rs.getLong("id"));
                dto.setPreferenceKey(rs.getString("preference_key"));
                dto.setPreferenceValue(rs.getString("preference_value"));
                return dto;
            });
        
        if (batch.isEmpty()) break; // 没有更多数据,退出循环
        
        pushToAsyncQueue(batch);
        lastProcessedId = batch.get(batch.size() - 1).getId(); // 更新最后处理的ID,作为下一批的起始条件
    }
}

这种方式适合主键是自增类型的场景,性能比传统的limit offset好很多。


内容的提问来源于stack exchange,提问作者user2160919

火山引擎 最新活动