You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

PySpark读取Parquet时单列读取失败问题求助

解决PySpark读取Parquet特定字符串列时的连接拒绝问题

看起来你遇到了一个挺棘手的Parquet读取问题——只要包含某一个字符串类型的ID列,PySpark就直接报Java网关连接拒绝的错误,排除这个列就一切正常,哪怕限制只读20条数据也没用。结合你的环境和报错信息,我整理了几个可以尝试的解决方向:

1. 先排查目标列的数据是否损坏

既然排除该列就完全正常,大概率是这个列的部分数据存在格式异常或者损坏。你可以用parquet-tools直接查看该列的内容,快速定位问题:

# 替换product_id为你有问题的列名
parquet-tools cat --column product_id data/purchases.parquet | head -20

如果你的Parquet是分区存储的,还可以逐个分区测试读取,找到具体哪个分区的文件出了问题,之后要么修复要么移除损坏的文件就行。

2. 调整Spark的Parquet读取配置

Spark 3.0.0的向量化读取在处理某些特殊字符串数据时可能会触发崩溃,你可以尝试关闭向量化读取,改用传统行式读取:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.sql.parquet.enableVectorizedReader", "false")  # 关闭向量化读取
    .config("spark.sql.parquet.columnarReaderBatchSize", "1024")  # 减小批次大小
    .getOrCreate()

df = spark.read.load("data/purchases.parquet")
df.select('client_id', 'product_id').limit(20).write.csv('c_id_to_p_id.csv')

关闭向量化读取虽然会牺牲一点性能,但能避免很多格式兼容类的崩溃问题。

3. 检查Java环境的兼容性

你用的OpenJDK 11理论上和Spark 3.0.0兼容,但部分Linux发行版的OpenJDK可能存在一些小bug。可以试试:

  • 切换到Oracle JDK 11,看看问题是否消失;
  • 确认JAVA_HOME环境变量是否正确设置,并且和Spark使用的JDK路径一致。

4. 升级Spark版本

你提到用的是为Apache 3.2+预构建的Spark,但实际运行的是3.0.0版本,这可能存在版本不匹配的问题。Spark 3.0.0是比较早的版本,后续的3.1.x、3.2.x修复了大量Parquet读取相关的bug,升级到3.2.x版本应该能解决不少这类兼容性问题。

5. 尝试重新生成Parquet文件

如果这个Parquet文件是其他系统生成的,可能存在格式兼容问题。你可以先跳过有问题的列读取数据,然后强制转换该列类型后重新写入,再尝试读取:

from pyspark.sql.types import StringType

# 读取时强制转换有问题的列类型
df = spark.read.load("data/purchases.parquet") \
    .select('client_id', df['product_id'].cast(StringType()).alias('product_id'))

# 重新写入新的Parquet文件
df.write.mode("overwrite").parquet("data/new_purchases.parquet")

# 读取新文件测试
new_df = spark.read.load("data/new_purchases.parquet")
new_df.select('client_id', 'product_id').limit(20).show()

内容的提问来源于stack exchange,提问作者Arioll

火山引擎 最新活动