Scala Spark分区数据读写异常:如何正确读取分区存储的数据?
问题分析与解决方案
从你的代码和数据样本来看,读取分区文件时无法正确解析成foo对象,主要是因为存储格式与解析逻辑不匹配、变量作用域错误,还有解析时的索引误用这几个核心问题,下面一步步帮你修正:
关键问题拆解
- 存储格式与解析逻辑不兼容:你存储时用了
position.toList,导致文件里出现List(...)带包裹的格式,但读取时直接用split(",")会把List内部的逗号也拆分,比如第一个字段会变成List(86.6582767815429,完全无法转成Array[Double]。 - 变量作用域风险:
data变量只在if (c_itr != 0)块内定义,块外访问会触发未初始化的编译错误。 - 解析索引错误:代码里
line(2).toDouble是取字符串的第三个字符,而不是拆分后数组的第三个元素,应该用p(2).toDouble;另外你定义的是case class foo,但读取时写了BAT1,明显是笔误。
修正后的完整代码
第一步:优化存储格式(推荐方案)
先把存储逻辑改成直接输出数组元素,去掉List包裹,这样解析会简单很多:
def execute(RDD: RDD[foo], c_itr: Int ): Array[(foo, Int)] = { val newRDD = RDD.mapPartitionsWithIndex { (index, iterator) => { var arr: Array[foo] = iterator.toArray var data: Array[foo] = Array.empty[foo] // 初始化data,规避作用域问题 if (c_itr != 0) { // 读取分区文件,用try-with-resources自动关闭流 Source.fromFile(s"/result/${index}.txt").use { bufferedSource => data = bufferedSource.getLines().map { line => val parts = line.split(",") // 假设position和velocity各有5个元素,可根据实际调整长度 val posLen = 5 val velLen = 5 // 提取各字段值 val posVals = parts.slice(0, posLen).map(_.toDouble) val velVals = parts.slice(posLen, posLen + velLen).map(_.toDouble) val fitnessVal = parts(posLen + velLen).toDouble val LR1 = parts(posLen + velLen + 1).toDouble val PR1 = parts(posLen + velLen + 2).toDouble // 创建foo对象 foo(posVals, velVals, sphere, fitnessVal, LR1, PR1) }.toArray } // 替换为加载的数据 arr = data.clone() } // 用优化后的格式保存数据 new FileWriter(s"/result/${index}.txt").use { writer => arr.foreach { item => val posStr = item.position.mkString(",") val velStr = item.velocity.mkString(",") writer.write(s"$posStr,$velStr,${item.fitness},${item.LoudnessRate},${item.PulseRate}\n") } } // 返回每个foo对象与对应分区索引 arr.map(x => (x, index)).toIterator }} newRDD.persist().collect() }
第二步:如果必须保留原存储格式(不推荐)
如果不想修改已有的存储文件,可以添加一个辅助函数来解析List(...)格式的字符串:
// 辅助函数:解析List(...)格式的字符串为Array[Double] def parseListStr(str: String): Array[Double] = { str.replace("List(", "") .replace(")", "") .split(",") .map(_.trim.toDouble) }
然后读取部分改成:
if (c_itr != 0) { Source.fromFile(s"/result/${index}.txt").use { bufferedSource => data = bufferedSource.getLines().map { line => // 按"),List("拆分,分离position和velocity部分 val mainParts = line.split("\\),List\\(") val posVals = parseListStr(mainParts(0)) // 拆分velocity与后续字段 val restParts = mainParts(1).split(",", 4) val velVals = parseListStr(restParts(0) + ")") val fitnessVal = restParts(1).toDouble val LR1 = restParts(2).toDouble val PR1 = restParts(3).toDouble foo(posVals, velVals, sphere, fitnessVal, LR1, PR1) }.toArray } arr = data.clone() }
额外注意事项
- 用
use方法(Scala 2.13+支持)来自动关闭文件流,避免资源泄漏;如果是旧版本Scala,用try-finally块手动关闭。 foo类中的f字段是函数类型,虽然继承了Serializable,但函数序列化可能存在隐式问题,建议把函数作为参数传入,或者用可序列化的函数实现类替代。
内容的提问来源于stack exchange,提问作者Asif




