Spark基于属性文件过滤:按文件类型与投资组合筛选目标文件
解决方案:基于配置文件过滤Spark处理的目标文件
我来帮你搞定这个需求,核心是把配置读取、文件名解析和分布式过滤结合起来,还能优化你原代码里的性能问题(原代码用collect把所有文件拉到Driver端,文件量大时容易出现内存溢出)。
步骤1:准备配置文件
首先用配置文件定义允许的文件类型和投资组合,推荐用HOCON格式(比如application.conf),可读性和扩展性都不错:
# application.conf file-filter { allowed-types = ["trade", "position", "balance"] # 允许的文件类型集合 allowed-portfolios = ["US_EQ", "EU_FX", "AP_BOND"] # 允许的投资组合集合 }
如果不想依赖第三方配置库,也可以用Spark启动参数传递配置(比如--conf spark.file.filter.allowed-types=trade,position),后面会讲这种方式的代码实现。
步骤2:加载配置到Spark应用
方式A:用Typesafe Config加载本地配置文件
先确保你的项目依赖了Typesafe Config(SBT项目可以加libraryDependencies += "com.typesafe" % "config" % "1.4.2"),然后在代码里加载:
import com.typesafe.config.ConfigFactory // 加载classpath下的配置文件 val config = ConfigFactory.load() val allowedFileTypes = config.getStringList("file-filter.allowed-types").toSet val allowedPortfolios = config.getStringList("file-filter.allowed-portfolios").toSet
方式B:用Spark Conf传递配置(集群环境更友好)
如果是在集群运行,把配置通过Spark启动参数传递,代码里直接读取:
// 从Spark Conf读取配置,启动时用--conf spark.file.filter.allowed-types=trade,position val allowedFileTypes = spark.conf.get("spark.file.filter.allowed-types").split(",").toSet val allowedPortfolios = spark.conf.get("spark.file.filter.allowed-portfolios").split(",").toSet
步骤3:编写文件名解析与过滤逻辑
根据你给出的文件名格式<file_name>_<file_type>_<portfolio>_<date>_<time>.csv,我们用正则表达式提取关键信息,然后判断是否在允许的列表里:
import java.util.regex.Pattern // 正则匹配文件名:适配本地/HDFS路径,提取file_type和portfolio // 解释:.*?([^/]+) 提取最后一个/后的文件名前缀;([^_]+) 匹配file_type;([^_]+) 匹配portfolio;后面匹配日期(8位)、时间(6位)和.csv后缀 val fileNameRegex = Pattern.compile(".*?([^/]+)_([^_]+)_([^_]+)_\\d{8}_\\d{6}\\.csv$") def isFileQualified(filePath: String): Boolean = { val matcher = fileNameRegex.matcher(filePath) if (matcher.matches()) { val fileType = matcher.group(2) val portfolio = matcher.group(3) // 同时满足类型和投资组合在允许列表中 allowedFileTypes.contains(fileType) && allowedPortfolios.contains(portfolio) } else { // 不符合命名格式的文件直接过滤 false } }
步骤4:整合到Spark代码中(分布式过滤)
不要用collect把所有文件拉到Driver端处理,而是用RDD的filter操作在Executor端分布式过滤,性能更优:
val wholeFilesPath = "/your/target/folder/*" // 替换成你的目标文件路径 val rawFilesRDD = spark.sparkContext.wholeTextFiles(wholeFilesPath) // 过滤符合条件的文件 val qualifiedFilesRDD = rawFilesRDD.filter { case (filePath, _) => isFileQualified(filePath) } // 处理每个符合条件的文件(这里替换成你的业务代码) qualifiedFilesRDD.foreach { case (filePath, fileContent) => println(s"Processing file: $filePath") // 示例:按行解析CSV内容 val csvLines = fileContent.split("\n") csvLines.tail.foreach(line => { // 跳过表头行 // 你的业务处理逻辑,比如解析字段、写入数据库等 }) }
如果你的业务更适合用DataFrame处理,可以把过滤后的文件路径收集起来(确保数量在合理范围),然后读取为DataFrame:
val qualifiedFilePaths = qualifiedFilesRDD.map(_._1).collect() val csvDF = spark.read .option("header", "true") .csv(qualifiedFilePaths: _*) // 后续DataFrame处理逻辑 csvDF.show()
优化与注意事项
- 正则适配:如果你的日期/时间格式不是固定的8位/6位,需要调整正则表达式的对应部分。
- 空配置处理:如果允许列表为空,可以添加逻辑默认允许所有文件,比如
if (allowedFileTypes.isEmpty || allowedFileTypes.contains(fileType))。 - 大文件处理:
wholeTextFiles会把整个文件读成一个字符串,大文件可能占用较多内存,建议用textFile按行读取,或者直接用DataFrame的读取方式结合自定义过滤。 - 集群配置:如果用Typesafe Config,要确保配置文件被打包到Jar里,或者放到所有节点的Spark classpath中;用Spark Conf传递配置则无需额外操作。
内容的提问来源于stack exchange,提问作者1pluszara




