You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Scala Spark分区数据读写异常:如何正确读取分区存储的数据?

问题分析与解决方案

从你的代码和数据样本来看,读取分区文件时无法正确解析成foo对象,主要是因为存储格式与解析逻辑不匹配、变量作用域错误,还有解析时的索引误用这几个核心问题,下面一步步帮你修正:


关键问题拆解

  • 存储格式与解析逻辑不兼容:你存储时用了position.toList,导致文件里出现List(...)带包裹的格式,但读取时直接用split(",")会把List内部的逗号也拆分,比如第一个字段会变成List(86.6582767815429,完全无法转成Array[Double]
  • 变量作用域风险data变量只在if (c_itr != 0)块内定义,块外访问会触发未初始化的编译错误。
  • 解析索引错误:代码里line(2).toDouble是取字符串的第三个字符,而不是拆分后数组的第三个元素,应该用p(2).toDouble;另外你定义的是case class foo,但读取时写了BAT1,明显是笔误。

修正后的完整代码

第一步:优化存储格式(推荐方案)

先把存储逻辑改成直接输出数组元素,去掉List包裹,这样解析会简单很多:

def execute(RDD: RDD[foo], c_itr: Int ): Array[(foo, Int)] = {
  val newRDD = RDD.mapPartitionsWithIndex { (index, iterator) => {
    var arr: Array[foo] = iterator.toArray
    var data: Array[foo] = Array.empty[foo] // 初始化data,规避作用域问题
    
    if (c_itr != 0) {
      // 读取分区文件,用try-with-resources自动关闭流
      Source.fromFile(s"/result/${index}.txt").use { bufferedSource =>
        data = bufferedSource.getLines().map { line =>
          val parts = line.split(",")
          // 假设position和velocity各有5个元素,可根据实际调整长度
          val posLen = 5
          val velLen = 5
          
          // 提取各字段值
          val posVals = parts.slice(0, posLen).map(_.toDouble)
          val velVals = parts.slice(posLen, posLen + velLen).map(_.toDouble)
          val fitnessVal = parts(posLen + velLen).toDouble
          val LR1 = parts(posLen + velLen + 1).toDouble
          val PR1 = parts(posLen + velLen + 2).toDouble
          
          // 创建foo对象
          foo(posVals, velVals, sphere, fitnessVal, LR1, PR1)
        }.toArray
      }
      // 替换为加载的数据
      arr = data.clone()
    }
    
    // 用优化后的格式保存数据
    new FileWriter(s"/result/${index}.txt").use { writer =>
      arr.foreach { item =>
        val posStr = item.position.mkString(",")
        val velStr = item.velocity.mkString(",")
        writer.write(s"$posStr,$velStr,${item.fitness},${item.LoudnessRate},${item.PulseRate}\n")
      }
    }
    
    // 返回每个foo对象与对应分区索引
    arr.map(x => (x, index)).toIterator
  }}
  newRDD.persist().collect()
}

第二步:如果必须保留原存储格式(不推荐)

如果不想修改已有的存储文件,可以添加一个辅助函数来解析List(...)格式的字符串:

// 辅助函数:解析List(...)格式的字符串为Array[Double]
def parseListStr(str: String): Array[Double] = {
  str.replace("List(", "")
     .replace(")", "")
     .split(",")
     .map(_.trim.toDouble)
}

然后读取部分改成:

if (c_itr != 0) {
  Source.fromFile(s"/result/${index}.txt").use { bufferedSource =>
    data = bufferedSource.getLines().map { line =>
      // 按"),List("拆分,分离position和velocity部分
      val mainParts = line.split("\\),List\\(")
      val posVals = parseListStr(mainParts(0))
      
      // 拆分velocity与后续字段
      val restParts = mainParts(1).split(",", 4)
      val velVals = parseListStr(restParts(0) + ")")
      val fitnessVal = restParts(1).toDouble
      val LR1 = restParts(2).toDouble
      val PR1 = restParts(3).toDouble
      
      foo(posVals, velVals, sphere, fitnessVal, LR1, PR1)
    }.toArray
  }
  arr = data.clone()
}

额外注意事项

  • use方法(Scala 2.13+支持)来自动关闭文件流,避免资源泄漏;如果是旧版本Scala,用try-finally块手动关闭。
  • foo类中的f字段是函数类型,虽然继承了Serializable,但函数序列化可能存在隐式问题,建议把函数作为参数传入,或者用可序列化的函数实现类替代。

内容的提问来源于stack exchange,提问作者Asif

火山引擎 最新活动