如何批量查询百万级表记录并逐批执行异步处理?
百万级数据分批异步处理方案
这个场景我太熟悉了——百万级数据批量处理最怕一次性拉全量,内存直接爆掉,用分批查询+异步队列是非常合理的方案!结合你提到的setFetchSize,我给你整理一套可落地的实现思路和代码:
核心思路
通过setFetchSize控制JDBC每次从数据库拉取的行数,避免全量数据加载到内存;同时每攒够一批数据就异步推送到队列,既保证内存稳定,又不耽误处理效率。
具体实现步骤
1. 优化DAO层,用setFetchSize配合滚动结果集
如果你的DAO基于原生JDBC或Spring JdbcTemplate,直接用可滚动只读结果集+setFetchSize是最优解,能让数据库按需返回数据,不会一次性吐全量。修改后的代码示例:
public void processUserPreferencesBatch(int batchSize) { String sqlQueryString = "SELECT id, preference_key, preference_value FROM user_preferences"; try (Connection conn = getConnection()) { // 创建仅向前滚动、只读的结果集,配合fetchSize控制拉取粒度 try (Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { stmt.setFetchSize(batchSize); // 关键:设置每次从数据库拉取的行数 try (ResultSet rs = stmt.executeQuery(sqlQueryString)) { List<UserPreferenceDTO> batch = new ArrayList<>(batchSize); while (rs.next()) { // 封装DTO对象 UserPreferenceDTO dto = new UserPreferenceDTO(); dto.setId(rs.getLong("id")); dto.setPreferenceKey(rs.getString("preference_key")); dto.setPreferenceValue(rs.getString("preference_value")); batch.add(dto); // 批次满了就异步推送,然后清空批次 if (batch.size() == batchSize) { pushToAsyncQueue(batch); batch.clear(); } } // 处理最后一批不足batchSize的剩余记录 if (!batch.isEmpty()) { pushToAsyncQueue(batch); } } } } catch (SQLException e) { log.error("批量处理用户偏好失败", e); throw new RuntimeException("批量处理异常", e); } } // 异步推送队列的工具方法 private void pushToAsyncQueue(List<UserPreferenceDTO> batch) { // 这里可以用自定义线程池、Spring @Async,或者MQ客户端(比如RabbitMQ/Kafka) CompletableFuture.runAsync(() -> { // 执行你的队列推送逻辑 queueClient.sendBatch(batch); }, customExecutor); // 一定要用自定义线程池,避免默认线程池耗尽 }
2. 必看的关键细节
- 数据库驱动适配:比如MySQL需要在JDBC URL中加
useCursorFetch=true,否则setFetchSize不会生效(MySQL默认会一次性拉全量数据),URL示例:jdbc:mysql://localhost:3306/your_db?useCursorFetch=true - 合理设置batchSize:根据你的内存情况调整,一般500-2000条/批比较合适——太大容易内存溢出,太小会增加数据库交互次数拖慢效率
- 自定义线程池:别用
CompletableFuture的默认线程池(它用的是ForkJoinPool,核心线程数等于CPU核心数,高并发下容易阻塞),给你个线程池配置示例:
private ExecutorService customExecutor = new ThreadPoolExecutor( 4, // 核心线程数 8, // 最大线程数 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), // 任务队列容量 new ThreadFactoryBuilder().setNameFormat("queue-push-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时让调用线程执行,避免丢任务 );
- 避免重复推送:如果需要保证数据不重复处理,建议给表加个
processed字段,每批推送成功后更新状态,下次查询只拉未处理的数据。
3. 替代方案:主键分页查询
如果滚动结果集不好实现,也可以用主键自增分页的方式,避免大offset的性能问题(MySQL中offset越大,查询越慢):
public void processUserPreferencesByPagination(int batchSize) { long lastProcessedId = 0; while (true) { String sql = "SELECT id, preference_key, preference_value FROM user_preferences WHERE id > ? ORDER BY id LIMIT ?"; List<UserPreferenceDTO> batch = jdbcTemplate.query(sql, new Object[]{lastProcessedId, batchSize}, (rs, rowNum) -> { UserPreferenceDTO dto = new UserPreferenceDTO(); dto.setId(rs.getLong("id")); dto.setPreferenceKey(rs.getString("preference_key")); dto.setPreferenceValue(rs.getString("preference_value")); return dto; }); if (batch.isEmpty()) break; // 没有更多数据,退出循环 pushToAsyncQueue(batch); lastProcessedId = batch.get(batch.size() - 1).getId(); // 更新最后处理的ID,作为下一批的起始条件 } }
这种方式适合主键是自增类型的场景,性能比传统的limit offset好很多。
内容的提问来源于stack exchange,提问作者user2160919




