无需停止Ingestion的HDFS文件自动Compaction方案咨询
针对你遇到的不停Ingestion就没法做Compaction的痛点,我结合Spark+HDFS的常用生产场景,给你几个可行的自动Compaction方案,都是不需要停写就能运行的:
方案1:切换到Delta Lake(推荐,适配Spark生态)
Delta Lake是Spark原生支持的湖仓格式,天生解决小文件和在线Compaction问题,完全兼容你现有的Kafka->Spark->HDFS流程:
- 配置写入:把原来写Parquet外部表的代码改成写Delta表,只需要修改格式为
delta,比如:df.write.format("delta").mode("append").partitionBy("state", "store").save("/user/head/warehouse/main_table") - 自动小文件合并:开启写入时的自动优化,在Spark配置里加:
这样每次写入时会自动合并小文件,从源头减少文件数量。spark.databricks.delta.optimizeWrite.enabled=true spark.databricks.delta.autoCompact.enabled=true - 后台异步Compaction:即使还是有积累的小文件,你可以定时执行
OPTIMIZE命令,这个操作是异步的,完全不阻塞Ingestion:
ZORDER会把同分区的数据物理上存放在一起,既优化Compaction效果,又提升查询性能。而且Delta Lake的ACID特性保证了在Compaction过程中,写入和查询都是一致的,不会出现脏数据。OPTIMIZE main_table ZORDER BY (state, store)
方案2:使用Hive ACID事务表(适配现有Hive生态)
如果不想切换湖仓格式,Hive 3.x及以上版本支持ACID事务表,支持在线Compaction和MERGE操作:
- 创建ACID表:需要把表设置为事务型,格式推荐用ORC(Parquet在Hive 3.1+也支持ACID),比如:
CREATE EXTERNAL TABLE main_table (...) PARTITIONED BY (state string, store string) STORED AS ORC TBLPROPERTIES ('transactional'='true', 'compaction.enabled'='true') - 开启自动Compaction:在Hive配置里开启后台自动Compaction:
hive.compactor.initiator.on=true hive.compactor.worker.threads=3 hive.compactor.delta.num.threshold=10 -- 当分区内delta文件超过10个时自动触发Compaction - Spark写入适配:Spark写Hive ACID表需要开启事务支持,配置:
之后Spark的append写入会自动变成事务性写入,Hive后台会自动合并小文件,不需要手动停Ingestion,Compaction是后台异步执行的,对用户完全透明。spark.sql.hive.convertMetastoreAcid=true
方案3:分层存储+异步合并(无格式切换成本)
如果暂时不想改表格式,可以用分层存储的思路,把新写入和旧数据分开处理:
- 分层设计:
- 临时层:创建一个临时表(或分区前缀),比如
main_table_temp,Spark每30分钟写入这里,保留最近24小时的数据,小文件暂时不处理——因为SLA要求1小时内查询,临时层的数据量小,查询性能影响不大。 - 主层:存储超过24小时的合并后的数据,文件数量可控。
- 临时层:创建一个临时表(或分区前缀),比如
- 异步合并任务:每天在低峰期(比如凌晨)启动一个Spark任务,把临时层中超过24小时的数据合并后写入主层,然后清理临时层的旧文件。这个任务和Ingestion完全并行,因为Ingestion只写入临时层的新分区/数据,合并处理的是旧数据。
- 查询适配:创建一个视图,合并临时层和主层的数据,用户查询视图即可,不需要感知分层:
CREATE VIEW main_table_view AS SELECT * FROM main_table_temp WHERE dt >= date_sub(current_date(), 1) UNION ALL SELECT * FROM main_table_main
关键注意事项
- 不管用哪个方案,都要避免全量Compaction,尽量用增量Compaction(比如只处理最近N天的分区),这样Compaction的耗时会大大减少,对系统影响更小。
- 如果用Spark的
OPTIMIZE或Hive的自动Compaction,要监控Compaction的进度和资源使用,避免在业务高峰期占用过多资源。 - 对于历史数据的Compaction,可以先一次性处理完,之后用自动增量Compaction维护即可。
内容的提问来源于stack exchange,提问作者user2984495




