You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在HDFS集群中按类别合并小于100MB的每日小文件

嗨,这个需求我之前帮不少同行处理过,其实用HDFS自带工具+简单脚本就能轻松搞定,要是数据量偏大或者需要更灵活的处理逻辑,用Spark也很合适。我给你分两种实用方案详细说说:

方案一:Shell脚本 + HDFS原生命令(轻量无依赖)

这种方案不需要额外依赖,直接用HDFS自带命令和Shell脚本就能完成,适合中小规模的文件处理场景。

完整脚本示例

#!/bin/bash

# 替换成你的HDFS文件根目录
HDFS_ROOT="/your/hdfs/data/dir"

# 定义要处理的三类文件前缀
FILE_TYPES=("product_info" "user_info" "user_activity")

# 循环处理每一类文件
for TYPE in "${FILE_TYPES[@]}"
do
    echo "=== 开始处理${TYPE}类型文件 ==="
    
    # 1. 筛选出该类型下小于100MB的文件,保存到本地临时列表
    # 注:100MB = 100*1024*1024字节
    hdfs dfs -ls "${HDFS_ROOT}/${TYPE}_*" | awk -v max_size=$((100*1024*1024)) '$5 < max_size {print $8}' > /tmp/${TYPE}_small_files.txt
    
    # 2. 检查是否有需要合并的小文件
    if [ -s /tmp/${TYPE}_small_files.txt ]; then
        # 使用HDFS的getmerge命令合并小文件到目标文件
        hdfs dfs -getmerge /tmp/${TYPE}_small_files.txt "${HDFS_ROOT}/${TYPE}"
        
        # 3. 【可选操作】删除原小文件(务必先备份再执行!)
        # while read FILE_PATH; do
        #     hdfs dfs -rm "$FILE_PATH"
        # done < /tmp/${TYPE}_small_files.txt
        
        echo "${TYPE}类型小文件合并完成,合并后文件路径:${HDFS_ROOT}/${TYPE}"
    else
        echo "${TYPE}类型没有小于100MB的文件,无需处理"
    fi
    
    # 清理本地临时文件
    rm /tmp/${TYPE}_small_files.txt
done

脚本关键点说明

  • hdfs dfs -ls:列出HDFS目标目录下的所有对应前缀文件
  • awk:过滤出文件大小(第5列)小于100MB的文件路径(第8列)
  • hdfs dfs -getmerge:将列表中的文件合并成单个文件,直接写入HDFS
  • 删除原文件的部分被注释了,一定要先确认合并后的文件没问题再取消注释执行

方案二:Spark 分布式处理(适合大数据量场景)

如果每日要处理的小文件数量特别多(比如上千个),Shell脚本的单线程处理效率会很低,这时候用Spark的分布式能力会更高效。

Python版Spark作业示例

from pyspark.sql import SparkSession

def merge_small_files(hdfs_root: str, file_prefix: str, max_size_mb: int = 100):
    # 初始化Spark会话
    spark = SparkSession.builder \
        .appName(f"Merge{file_prefix}SmallFiles") \
        .getOrCreate()
    
    # 转换最大文件大小为字节
    max_size_bytes = max_size_mb * 1024 * 1024
    target_file_path = f"{hdfs_root}/{file_prefix}"
    
    # 获取HDFS文件系统实例,用于查询文件状态
    fs = spark.sparkContext._jvm.org.apache.hadoop.fs.FileSystem.get(
        spark.sparkContext._jsc.hadoopConfiguration()
    )
    file_path = spark.sparkContext._jvm.org.apache.hadoop.fs.Path(f"{hdfs_root}/{file_prefix}_*")
    
    # 筛选出小于指定大小的文件路径
    small_files = []
    for status in fs.listStatus(file_path):
        if status.getLen() < max_size_bytes:
            small_files.append(status.getPath().toString())
    
    if small_files:
        print(f"发现{len(small_files)}个小于{max_size_mb}MB的文件,开始合并...")
        # 读取所有小文件,合并为1个分区后写入
        df = spark.read.text(small_files)
        df.coalesce(1).write.mode("overwrite").text(target_file_path)
        
        # 【可选操作】删除原小文件(务必先备份!)
        # for file in small_files:
        #     fs.delete(spark.sparkContext._jvm.org.apache.hadoop.fs.Path(file), False)
        
        print(f"合并完成,目标文件路径:{target_file_path}")
    else:
        print(f"{file_prefix}类型没有小于{max_size_mb}MB的文件,无需处理")
    
    # 停止Spark会话
    spark.stop()

if __name__ == "__main__":
    # 替换成你的HDFS根目录
    HDFS_ROOT = "/your/hdfs/data/dir"
    
    # 依次处理三类文件
    merge_small_files(HDFS_ROOT, "product_info")
    merge_small_files(HDFS_ROOT, "user_info")
    merge_small_files(HDFS_ROOT, "user_activity")

Spark方案说明

  • coalesce(1):将所有数据合并到一个分区,最终生成单个文件(如果合并后的文件过大,可调整这个数字,但你的需求是单个文件所以设为1)
  • 分布式读取小文件,处理效率远高于单线程Shell脚本
  • 可以集成到Airflow、Oozie等调度系统中,实现每日自动执行

关键注意事项

  • 数据备份优先:无论用哪种方案,合并前一定要先备份原小文件,避免误操作导致数据丢失
  • 定时执行:如果需要每日自动处理,Shell脚本可以加到Linux的crontab中,Spark作业可以提交到调度系统
  • 权限问题:确保执行脚本/作业的用户拥有HDFS对应目录的读写权限
  • 排序需求:如果需要按时间戳顺序合并文件,可以在生成文件列表时加上排序逻辑(比如Shell脚本中用sort对时间列排序)

内容的提问来源于stack exchange,提问作者user3829376

火山引擎 最新活动