如何在HDFS集群中按类别合并小于100MB的每日小文件
嗨,这个需求我之前帮不少同行处理过,其实用HDFS自带工具+简单脚本就能轻松搞定,要是数据量偏大或者需要更灵活的处理逻辑,用Spark也很合适。我给你分两种实用方案详细说说:
方案一:Shell脚本 + HDFS原生命令(轻量无依赖)
这种方案不需要额外依赖,直接用HDFS自带命令和Shell脚本就能完成,适合中小规模的文件处理场景。
完整脚本示例
#!/bin/bash # 替换成你的HDFS文件根目录 HDFS_ROOT="/your/hdfs/data/dir" # 定义要处理的三类文件前缀 FILE_TYPES=("product_info" "user_info" "user_activity") # 循环处理每一类文件 for TYPE in "${FILE_TYPES[@]}" do echo "=== 开始处理${TYPE}类型文件 ===" # 1. 筛选出该类型下小于100MB的文件,保存到本地临时列表 # 注:100MB = 100*1024*1024字节 hdfs dfs -ls "${HDFS_ROOT}/${TYPE}_*" | awk -v max_size=$((100*1024*1024)) '$5 < max_size {print $8}' > /tmp/${TYPE}_small_files.txt # 2. 检查是否有需要合并的小文件 if [ -s /tmp/${TYPE}_small_files.txt ]; then # 使用HDFS的getmerge命令合并小文件到目标文件 hdfs dfs -getmerge /tmp/${TYPE}_small_files.txt "${HDFS_ROOT}/${TYPE}" # 3. 【可选操作】删除原小文件(务必先备份再执行!) # while read FILE_PATH; do # hdfs dfs -rm "$FILE_PATH" # done < /tmp/${TYPE}_small_files.txt echo "${TYPE}类型小文件合并完成,合并后文件路径:${HDFS_ROOT}/${TYPE}" else echo "${TYPE}类型没有小于100MB的文件,无需处理" fi # 清理本地临时文件 rm /tmp/${TYPE}_small_files.txt done
脚本关键点说明
hdfs dfs -ls:列出HDFS目标目录下的所有对应前缀文件awk:过滤出文件大小(第5列)小于100MB的文件路径(第8列)hdfs dfs -getmerge:将列表中的文件合并成单个文件,直接写入HDFS- 删除原文件的部分被注释了,一定要先确认合并后的文件没问题再取消注释执行
方案二:Spark 分布式处理(适合大数据量场景)
如果每日要处理的小文件数量特别多(比如上千个),Shell脚本的单线程处理效率会很低,这时候用Spark的分布式能力会更高效。
Python版Spark作业示例
from pyspark.sql import SparkSession def merge_small_files(hdfs_root: str, file_prefix: str, max_size_mb: int = 100): # 初始化Spark会话 spark = SparkSession.builder \ .appName(f"Merge{file_prefix}SmallFiles") \ .getOrCreate() # 转换最大文件大小为字节 max_size_bytes = max_size_mb * 1024 * 1024 target_file_path = f"{hdfs_root}/{file_prefix}" # 获取HDFS文件系统实例,用于查询文件状态 fs = spark.sparkContext._jvm.org.apache.hadoop.fs.FileSystem.get( spark.sparkContext._jsc.hadoopConfiguration() ) file_path = spark.sparkContext._jvm.org.apache.hadoop.fs.Path(f"{hdfs_root}/{file_prefix}_*") # 筛选出小于指定大小的文件路径 small_files = [] for status in fs.listStatus(file_path): if status.getLen() < max_size_bytes: small_files.append(status.getPath().toString()) if small_files: print(f"发现{len(small_files)}个小于{max_size_mb}MB的文件,开始合并...") # 读取所有小文件,合并为1个分区后写入 df = spark.read.text(small_files) df.coalesce(1).write.mode("overwrite").text(target_file_path) # 【可选操作】删除原小文件(务必先备份!) # for file in small_files: # fs.delete(spark.sparkContext._jvm.org.apache.hadoop.fs.Path(file), False) print(f"合并完成,目标文件路径:{target_file_path}") else: print(f"{file_prefix}类型没有小于{max_size_mb}MB的文件,无需处理") # 停止Spark会话 spark.stop() if __name__ == "__main__": # 替换成你的HDFS根目录 HDFS_ROOT = "/your/hdfs/data/dir" # 依次处理三类文件 merge_small_files(HDFS_ROOT, "product_info") merge_small_files(HDFS_ROOT, "user_info") merge_small_files(HDFS_ROOT, "user_activity")
Spark方案说明
coalesce(1):将所有数据合并到一个分区,最终生成单个文件(如果合并后的文件过大,可调整这个数字,但你的需求是单个文件所以设为1)- 分布式读取小文件,处理效率远高于单线程Shell脚本
- 可以集成到Airflow、Oozie等调度系统中,实现每日自动执行
关键注意事项
- 数据备份优先:无论用哪种方案,合并前一定要先备份原小文件,避免误操作导致数据丢失
- 定时执行:如果需要每日自动处理,Shell脚本可以加到Linux的crontab中,Spark作业可以提交到调度系统
- 权限问题:确保执行脚本/作业的用户拥有HDFS对应目录的读写权限
- 排序需求:如果需要按时间戳顺序合并文件,可以在生成文件列表时加上排序逻辑(比如Shell脚本中用
sort对时间列排序)
内容的提问来源于stack exchange,提问作者user3829376




