You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spark 2.0:将DataFrame中竖线分隔数据拆分为独立行

当然可以实现!这是Spark处理这类行转列需求的典型场景,核心就是用split+explode组合操作拆分字符串并展开为多行,之后再写入数据库就能达到你想要的效果。

核心思路

  1. 将竖线分隔的offer_ids字符串拆分为字符串数组
  2. 把数组中的每个元素展开为独立行,同时保留原DataFrame中其他所有列的值
  3. 将拆分后的字符串类型offer_id转换为INT类型,匹配数据库列的类型
  4. 过滤无效值(可选)后,写入关系型数据库

代码实现示例

Scala版本

import org.apache.spark.sql.functions.{split, explode, col}
import org.apache.spark.sql.SaveMode

// 假设你的原始DataFrame名为inputDF,包含其他列(如user_id、create_time等)
val processedDF = inputDF
  // 按竖线拆分offer_ids为数组(注意竖线是正则特殊字符,需转义)
  .withColumn("offer_id_array", split(col("offer_ids"), "\\|"))
  // 将数组展开为多行,其他列自动复制对应值
  .withColumn("offer_id", explode(col("offer_id_array")))
  // 将字符串类型的offer_id转为INT,匹配数据库列类型
  .withColumn("offer_id", col("offer_id").cast("int"))
  // 过滤空值或空字符串(避免插入无效数据,可选)
  .where(col("offer_id").isNotNull && col("offer_id") =!= "")
  // 移除临时生成的数组列和原offer_ids列(可选,让DataFrame更整洁)
  .drop("offer_ids", "offer_id_array")

// 写入关系型数据库
val connectionProps = new java.util.Properties {
  setProperty("user", "你的数据库用户名")
  setProperty("password", "你的数据库密码")
  setProperty("driver", "com.mysql.cj.jdbc.Driver") // 根据数据库类型调整,如PostgreSQL用org.postgresql.Driver
}

processedDF.write
  .mode(SaveMode.Append)
  .jdbc(
    url = "jdbc:mysql://你的数据库地址:端口/数据库名",
    table = "目标表名",
    connectionProperties = connectionProps
  )

Python版本

from pyspark.sql.functions import split, explode, col
from pyspark.sql import SaveMode

# 假设原始DataFrame为inputDF
processed_df = inputDF \
    # 拆分offer_ids为数组
    .withColumn("offer_id_array", split(col("offer_ids"), "\\|")) \
    # 展开数组为多行
    .withColumn("offer_id", explode(col("offer_id_array"))) \
    # 转换为INT类型
    .withColumn("offer_id", col("offer_id").cast("int")) \
    # 过滤无效值
    .filter(col("offer_id").isNotNull() & (col("offer_id") != "")) \
    # 删除临时列
    .drop("offer_ids", "offer_id_array")

# 数据库连接配置
connection_props = {
    "user": "你的数据库用户名",
    "password": "你的数据库密码",
    "driver": "com.mysql.cj.jdbc.Driver"  # 根据数据库类型调整
}

# 写入数据库
processed_df.write \
    .mode(SaveMode.Append) \
    .jdbc(
        url="jdbc:mysql://你的数据库地址:端口/数据库名",
        table="目标表名",
        properties=connection_props
    )

关键细节说明

  • split函数中的\\|:因为竖线|是正则表达式的特殊字符,所以必须用双反斜杠转义才能正确按竖线拆分字符串。
  • explode函数:这是实现行转列的核心,它会把数组中的每个元素生成一条新记录,原DataFrame的其他列值会自动复制到每一条新记录中,完全符合你“其他列值保持不变”的需求。
  • 类型转换:拆分后的offer_id默认是字符串类型,必须转为INT才能匹配数据库的INT列,否则会出现类型不匹配的写入错误。
  • 过滤步骤:如果原始数据存在连续竖线(如9||10),拆分后会产生空字符串,过滤操作可以避免插入无效的INT值。

内容的提问来源于stack exchange,提问作者Vijay

火山引擎 最新活动