You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

无需停止Ingestion的HDFS文件自动Compaction方案咨询

针对你遇到的不停Ingestion就没法做Compaction的痛点,我结合Spark+HDFS的常用生产场景,给你几个可行的自动Compaction方案,都是不需要停写就能运行的:

方案1:切换到Delta Lake(推荐,适配Spark生态)

Delta Lake是Spark原生支持的湖仓格式,天生解决小文件和在线Compaction问题,完全兼容你现有的Kafka->Spark->HDFS流程:

  • 配置写入:把原来写Parquet外部表的代码改成写Delta表,只需要修改格式为delta,比如:
    df.write.format("delta").mode("append").partitionBy("state", "store").save("/user/head/warehouse/main_table")
    
  • 自动小文件合并:开启写入时的自动优化,在Spark配置里加:
    spark.databricks.delta.optimizeWrite.enabled=true
    spark.databricks.delta.autoCompact.enabled=true
    
    这样每次写入时会自动合并小文件,从源头减少文件数量。
  • 后台异步Compaction:即使还是有积累的小文件,你可以定时执行OPTIMIZE命令,这个操作是异步的,完全不阻塞Ingestion:
    OPTIMIZE main_table ZORDER BY (state, store)
    
    ZORDER会把同分区的数据物理上存放在一起,既优化Compaction效果,又提升查询性能。而且Delta Lake的ACID特性保证了在Compaction过程中,写入和查询都是一致的,不会出现脏数据。
方案2:使用Hive ACID事务表(适配现有Hive生态)

如果不想切换湖仓格式,Hive 3.x及以上版本支持ACID事务表,支持在线Compaction和MERGE操作:

  • 创建ACID表:需要把表设置为事务型,格式推荐用ORC(Parquet在Hive 3.1+也支持ACID),比如:
    CREATE EXTERNAL TABLE main_table (...)
    PARTITIONED BY (state string, store string)
    STORED AS ORC
    TBLPROPERTIES ('transactional'='true', 'compaction.enabled'='true')
    
  • 开启自动Compaction:在Hive配置里开启后台自动Compaction:
    hive.compactor.initiator.on=true
    hive.compactor.worker.threads=3
    hive.compactor.delta.num.threshold=10  -- 当分区内delta文件超过10个时自动触发Compaction
    
  • Spark写入适配:Spark写Hive ACID表需要开启事务支持,配置:
    spark.sql.hive.convertMetastoreAcid=true
    
    之后Spark的append写入会自动变成事务性写入,Hive后台会自动合并小文件,不需要手动停Ingestion,Compaction是后台异步执行的,对用户完全透明。
方案3:分层存储+异步合并(无格式切换成本)

如果暂时不想改表格式,可以用分层存储的思路,把新写入和旧数据分开处理:

  • 分层设计
    • 临时层:创建一个临时表(或分区前缀),比如main_table_temp,Spark每30分钟写入这里,保留最近24小时的数据,小文件暂时不处理——因为SLA要求1小时内查询,临时层的数据量小,查询性能影响不大。
    • 主层:存储超过24小时的合并后的数据,文件数量可控。
  • 异步合并任务:每天在低峰期(比如凌晨)启动一个Spark任务,把临时层中超过24小时的数据合并后写入主层,然后清理临时层的旧文件。这个任务和Ingestion完全并行,因为Ingestion只写入临时层的新分区/数据,合并处理的是旧数据。
  • 查询适配:创建一个视图,合并临时层和主层的数据,用户查询视图即可,不需要感知分层:
    CREATE VIEW main_table_view AS
    SELECT * FROM main_table_temp WHERE dt >= date_sub(current_date(), 1)
    UNION ALL
    SELECT * FROM main_table_main
    
关键注意事项
  • 不管用哪个方案,都要避免全量Compaction,尽量用增量Compaction(比如只处理最近N天的分区),这样Compaction的耗时会大大减少,对系统影响更小。
  • 如果用Spark的OPTIMIZE或Hive的自动Compaction,要监控Compaction的进度和资源使用,避免在业务高峰期占用过多资源。
  • 对于历史数据的Compaction,可以先一次性处理完,之后用自动增量Compaction维护即可。

内容的提问来源于stack exchange,提问作者user2984495

火山引擎 最新活动