Spark 2.0:将DataFrame中竖线分隔数据拆分为独立行
当然可以实现!这是Spark处理这类行转列需求的典型场景,核心就是用split+explode组合操作拆分字符串并展开为多行,之后再写入数据库就能达到你想要的效果。
核心思路
- 将竖线分隔的
offer_ids字符串拆分为字符串数组 - 把数组中的每个元素展开为独立行,同时保留原DataFrame中其他所有列的值
- 将拆分后的字符串类型
offer_id转换为INT类型,匹配数据库列的类型 - 过滤无效值(可选)后,写入关系型数据库
代码实现示例
Scala版本
import org.apache.spark.sql.functions.{split, explode, col} import org.apache.spark.sql.SaveMode // 假设你的原始DataFrame名为inputDF,包含其他列(如user_id、create_time等) val processedDF = inputDF // 按竖线拆分offer_ids为数组(注意竖线是正则特殊字符,需转义) .withColumn("offer_id_array", split(col("offer_ids"), "\\|")) // 将数组展开为多行,其他列自动复制对应值 .withColumn("offer_id", explode(col("offer_id_array"))) // 将字符串类型的offer_id转为INT,匹配数据库列类型 .withColumn("offer_id", col("offer_id").cast("int")) // 过滤空值或空字符串(避免插入无效数据,可选) .where(col("offer_id").isNotNull && col("offer_id") =!= "") // 移除临时生成的数组列和原offer_ids列(可选,让DataFrame更整洁) .drop("offer_ids", "offer_id_array") // 写入关系型数据库 val connectionProps = new java.util.Properties { setProperty("user", "你的数据库用户名") setProperty("password", "你的数据库密码") setProperty("driver", "com.mysql.cj.jdbc.Driver") // 根据数据库类型调整,如PostgreSQL用org.postgresql.Driver } processedDF.write .mode(SaveMode.Append) .jdbc( url = "jdbc:mysql://你的数据库地址:端口/数据库名", table = "目标表名", connectionProperties = connectionProps )
Python版本
from pyspark.sql.functions import split, explode, col from pyspark.sql import SaveMode # 假设原始DataFrame为inputDF processed_df = inputDF \ # 拆分offer_ids为数组 .withColumn("offer_id_array", split(col("offer_ids"), "\\|")) \ # 展开数组为多行 .withColumn("offer_id", explode(col("offer_id_array"))) \ # 转换为INT类型 .withColumn("offer_id", col("offer_id").cast("int")) \ # 过滤无效值 .filter(col("offer_id").isNotNull() & (col("offer_id") != "")) \ # 删除临时列 .drop("offer_ids", "offer_id_array") # 数据库连接配置 connection_props = { "user": "你的数据库用户名", "password": "你的数据库密码", "driver": "com.mysql.cj.jdbc.Driver" # 根据数据库类型调整 } # 写入数据库 processed_df.write \ .mode(SaveMode.Append) \ .jdbc( url="jdbc:mysql://你的数据库地址:端口/数据库名", table="目标表名", properties=connection_props )
关键细节说明
split函数中的\\|:因为竖线|是正则表达式的特殊字符,所以必须用双反斜杠转义才能正确按竖线拆分字符串。explode函数:这是实现行转列的核心,它会把数组中的每个元素生成一条新记录,原DataFrame的其他列值会自动复制到每一条新记录中,完全符合你“其他列值保持不变”的需求。- 类型转换:拆分后的
offer_id默认是字符串类型,必须转为INT才能匹配数据库的INT列,否则会出现类型不匹配的写入错误。 - 过滤步骤:如果原始数据存在连续竖线(如
9||10),拆分后会产生空字符串,过滤操作可以避免插入无效的INT值。
内容的提问来源于stack exchange,提问作者Vijay




