PySpark技术问询:使用Explode后如何为句子添加序号
解决Spark中为拆分后的句子添加原始列表顺序序号的问题
我太懂你的烦恼了——用explode拆完句子列表后,原列表里的顺序信息直接丢了,row_number()根本找不到合适的排序字段。其实Spark早就为这种场景准备了更贴心的函数:posexplode,它能同时返回数组元素和对应的位置索引,完美解决你的序号问题!
核心思路
posexplode会把数组的每个元素拆成单独一行,还会额外生成一列记录该元素在原数组中的位置(从0开始计数)。我们只需要把这个位置值加1,就能得到你想要的从1开始的句子序号,完全不用纠结orderBy的问题!
完整实现代码
from pyspark.sql import functions as F # 用posexplode替代explode,同时获取元素位置和句子内容 df2 = df.withColumn("pos_sent", F.posexplode("SENTENCES_LIST")) \ .select( "REVIEW_ID", "REVIEW_COMMENTS", "SENTENCES_LIST", F.col("pos_sent.col").alias("SENTENCE"), F.col("pos_sent.pos").alias("pos") ) # 生成从1开始的句子序号,再清理临时列 df3 = df2.withColumn("SENT_NUMBER", F.col("pos") + 1) \ .drop("pos") # 查看最终结果 df3.show()
代码细节解释
posexplode的作用:这个函数把SENTENCES_LIST拆成一个包含pos(位置,从0开始)和col(句子内容)的结构体,我们通过select把这两个字段提取出来并重新命名。- 生成连续序号:因为
pos是从0开始计数的,加1后就得到了和原列表顺序完全一致的、从1开始的SENT_NUMBER。 - 清理临时字段:最后删掉临时的
pos列,就得到了你想要的完整结果结构。
关于用row_number()的替代方案(不推荐)
如果一定要用row_number(),也可以通过数组索引函数来实现,但这种方法有明显缺陷:
from pyspark.sql import Window window = Window.partitionBy("REVIEW_ID").orderBy(F.expr("array_position(SENTENCES_LIST, SENTENCE)")) df3 = df.withColumn("SENTENCE", F.explode("SENTENCES_LIST")) \ .withColumn("SENT_NUMBER", F.row_number().over(window))
这个方法的问题在于:如果评论里有重复句子,array_position只会返回第一个匹配的位置,导致序号错误。而posexplode是完全基于原始列表的顺序生成序号,不会有这个问题,所以更推荐使用。
最终输出效果
运行代码后,你会得到和期望完全一致的结果:
| REVIEW_ID | REVIEW_COMMENTS | SENTENCES_LIST | SENTENCE | SENT_NUMBER |
|---|---|---|---|---|
| 1 | Hi. Sent1. Sent2. | [Hi., Sent1., Sent2.] | Hi. | 1 |
| 1 | Hi. Sent1. Sent2. | [Hi., Sent1., Sent2.] | Sent1. | 2 |
| 1 | Hi. Sent1. Sent2. | [Hi., Sent1., Sent2.] | Sent2. | 3 |
| 2 | Yeah. Ok. | [Yeah., Ok.] | Yeah. | 1 |
| 2 | Yeah. Ok. | [Yeah., Ok.] | Ok. | 2 |
内容的提问来源于stack exchange,提问作者user3242036




