You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Flume 1.7 Spooling Dir Source:HDFS Sink如何保留子目录结构

解决Flume 1.7 Spooling Source子目录结构映射到HDFS Sink的问题

我明白你的需求——要把/tmp/test下的子目录结构完整复刻到HDFS的/sink路径下,而不是用%{file}带来的完整绝对路径。Flume本身的file header返回的是文件的全路径,所以我们需要用**正则提取拦截器(Regex Extractor Interceptor)**来提取出相对于spoolDir的路径部分,再把它用到HDFS Sink的配置里。

具体配置步骤

1. 配置Spooling Directory Source并添加拦截器

首先在你的Source配置里,除了已有的recursiveDirectorySearch=true,添加一个正则提取拦截器,用来从file header中剥离掉/tmp/test/前缀,提取出相对路径的目录部分和文件名:

# Source配置
agent.sources.spool_source.type = spooldir
agent.sources.spool_source.spoolDir = /tmp/test
agent.sources.spool_source.recursiveDirectorySearch = true

# 添加正则提取拦截器
agent.sources.spool_source.interceptors = path_extractor
agent.sources.spool_source.interceptors.path_extractor.type = regex_extractor
# 正则匹配:捕获/tmp/test/之后的目录部分和文件名,支持多层子目录
agent.sources.spool_source.interceptors.path_extractor.regex = ^/tmp/test/(.*)/([^/]+)$
# 定义两个序列化器,分别对应目录部分和文件名
agent.sources.spool_source.interceptors.path_extractor.serializers = rel_dir file_name
agent.sources.spool_source.interceptors.path_extractor.serializers.rel_dir.name = relative_directory
agent.sources.spool_source.interceptors.path_extractor.serializers.file_name.name = file_name

这个正则表达式会把/tmp/test/data1/file.csv拆成两部分:

  • 第一个捕获组data1 → 存入relative_directory header
  • 第二个捕获组file.csv → 存入file_name header

如果你的子目录有多层(比如/tmp/test/data1/sub/file3.csv),这个正则也能正常工作,捕获组data1/sub作为relative_directory,完美支持递归目录的结构。

2. 配置HDFS Sink使用提取出的Header

接下来在HDFS Sink的配置里,用刚才提取的两个header来构建目标路径和文件名:

# HDFS Sink配置
agent.sinks.hdfs_sink.type = hdfs
agent.sinks.hdfs_sink.hdfs.path = hdfs://your-namenode-host:8020/sink/%{relative_directory}
agent.sinks.hdfs_sink.hdfs.filePrefix = %{file_name}
# 可选:添加文件滚动策略配置(根据你的需求调整)
agent.sinks.hdfs_sink.hdfs.rollInterval = 300
agent.sinks.hdfs_sink.hdfs.rollSize = 134217728
agent.sinks.hdfs_sink.hdfs.fileType = DataStream

这样配置后,Flume会自动在HDFS上创建/sink/data1/sink/data2这些子目录,然后把文件以原文件名(比如file.csvfile2.csv)写入对应目录,完全复刻你本地的子目录结构。

小提示:为什么拆分目录和文件名?

你可能会想直接提取data1/file.csv作为一个header,然后设置hdfs.path = /sink/%{full_relative_path},但这样Flume会把data1/file.csv当成目录名来创建,而不是把file.csv作为文件名写入data1目录——这显然不符合我们的需求。所以拆分目录和文件名是更可靠的实现方式。

内容的提问来源于stack exchange,提问作者jyotsna ganji

火山引擎 最新活动