You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spark基于属性文件过滤:按文件类型与投资组合筛选目标文件

解决方案:基于配置文件过滤Spark处理的目标文件

我来帮你搞定这个需求,核心是把配置读取、文件名解析和分布式过滤结合起来,还能优化你原代码里的性能问题(原代码用collect把所有文件拉到Driver端,文件量大时容易出现内存溢出)。

步骤1:准备配置文件

首先用配置文件定义允许的文件类型和投资组合,推荐用HOCON格式(比如application.conf),可读性和扩展性都不错:

# application.conf
file-filter {
  allowed-types = ["trade", "position", "balance"]  # 允许的文件类型集合
  allowed-portfolios = ["US_EQ", "EU_FX", "AP_BOND"]  # 允许的投资组合集合
}

如果不想依赖第三方配置库,也可以用Spark启动参数传递配置(比如--conf spark.file.filter.allowed-types=trade,position),后面会讲这种方式的代码实现。

步骤2:加载配置到Spark应用

方式A:用Typesafe Config加载本地配置文件

先确保你的项目依赖了Typesafe Config(SBT项目可以加libraryDependencies += "com.typesafe" % "config" % "1.4.2"),然后在代码里加载:

import com.typesafe.config.ConfigFactory

// 加载classpath下的配置文件
val config = ConfigFactory.load()
val allowedFileTypes = config.getStringList("file-filter.allowed-types").toSet
val allowedPortfolios = config.getStringList("file-filter.allowed-portfolios").toSet

方式B:用Spark Conf传递配置(集群环境更友好)

如果是在集群运行,把配置通过Spark启动参数传递,代码里直接读取:

// 从Spark Conf读取配置,启动时用--conf spark.file.filter.allowed-types=trade,position
val allowedFileTypes = spark.conf.get("spark.file.filter.allowed-types").split(",").toSet
val allowedPortfolios = spark.conf.get("spark.file.filter.allowed-portfolios").split(",").toSet

步骤3:编写文件名解析与过滤逻辑

根据你给出的文件名格式<file_name>_<file_type>_<portfolio>_<date>_<time>.csv,我们用正则表达式提取关键信息,然后判断是否在允许的列表里:

import java.util.regex.Pattern

// 正则匹配文件名:适配本地/HDFS路径,提取file_type和portfolio
// 解释:.*?([^/]+) 提取最后一个/后的文件名前缀;([^_]+) 匹配file_type;([^_]+) 匹配portfolio;后面匹配日期(8位)、时间(6位)和.csv后缀
val fileNameRegex = Pattern.compile(".*?([^/]+)_([^_]+)_([^_]+)_\\d{8}_\\d{6}\\.csv$")

def isFileQualified(filePath: String): Boolean = {
  val matcher = fileNameRegex.matcher(filePath)
  if (matcher.matches()) {
    val fileType = matcher.group(2)
    val portfolio = matcher.group(3)
    // 同时满足类型和投资组合在允许列表中
    allowedFileTypes.contains(fileType) && allowedPortfolios.contains(portfolio)
  } else {
    // 不符合命名格式的文件直接过滤
    false
  }
}

步骤4:整合到Spark代码中(分布式过滤)

不要用collect把所有文件拉到Driver端处理,而是用RDD的filter操作在Executor端分布式过滤,性能更优:

val wholeFilesPath = "/your/target/folder/*" // 替换成你的目标文件路径
val rawFilesRDD = spark.sparkContext.wholeTextFiles(wholeFilesPath)

// 过滤符合条件的文件
val qualifiedFilesRDD = rawFilesRDD.filter { case (filePath, _) => isFileQualified(filePath) }

// 处理每个符合条件的文件(这里替换成你的业务代码)
qualifiedFilesRDD.foreach { case (filePath, fileContent) =>
  println(s"Processing file: $filePath")
  // 示例:按行解析CSV内容
  val csvLines = fileContent.split("\n")
  csvLines.tail.foreach(line => { // 跳过表头行
    // 你的业务处理逻辑,比如解析字段、写入数据库等
  })
}

如果你的业务更适合用DataFrame处理,可以把过滤后的文件路径收集起来(确保数量在合理范围),然后读取为DataFrame:

val qualifiedFilePaths = qualifiedFilesRDD.map(_._1).collect()
val csvDF = spark.read
  .option("header", "true")
  .csv(qualifiedFilePaths: _*)

// 后续DataFrame处理逻辑
csvDF.show()

优化与注意事项

  • 正则适配:如果你的日期/时间格式不是固定的8位/6位,需要调整正则表达式的对应部分。
  • 空配置处理:如果允许列表为空,可以添加逻辑默认允许所有文件,比如if (allowedFileTypes.isEmpty || allowedFileTypes.contains(fileType))
  • 大文件处理wholeTextFiles会把整个文件读成一个字符串,大文件可能占用较多内存,建议用textFile按行读取,或者直接用DataFrame的读取方式结合自定义过滤。
  • 集群配置:如果用Typesafe Config,要确保配置文件被打包到Jar里,或者放到所有节点的Spark classpath中;用Spark Conf传递配置则无需额外操作。

内容的提问来源于stack exchange,提问作者1pluszara

火山引擎 最新活动