Windows+Python2.7环境下如何将CSV转为Parquet文件?
解决Windows+Python2.7下Spark CSV转Parquet的问题
首先得给你澄清一个关键点:你看到生成output.parquet文件夹而非单个文件,其实这是Spark的正常行为,不是错误!Spark作为分布式计算框架,写Parquet时默认会把数据拆分成多个分区文件(比如文件夹里的part-xxxx.parquet),这是为了适配分布式存储的特性。如果你确实需要单个文件,可以通过合并分区实现,但先别急,先把报错的问题解决掉。
先分析你之前代码里的几个问题:
- 第一段代码的import语句写在了同一行,属于语法错误,应该拆分;另外CSV最后一个字段
Denver后面带空格,直接分割后会保留空格,可能导致数据处理异常; - 第二段代码没有指定schema,Spark自动推断类型可能出错,而且也没处理字段前后的空格;
- 另外Windows环境下Spark依赖Hadoop的winutils工具,如果你没配置的话也会触发报错。
针对Windows+Python2.7的环境,我给你整理了一套正确的流程和代码:
前置准备
- Spark版本选择:必须用支持Python2.7的Spark版本,比如Spark 2.4.8(Spark 3.x已经彻底放弃Python2.7支持了),下载时选对应Hadoop版本的包(比如Hadoop 2.7)。
- 配置winutils:Windows下Spark需要依赖Hadoop的文件系统工具,你需要下载对应Hadoop版本的winutils.exe,放到
HADOOP_HOME\bin目录,同时系统环境变量里添加HADOOP_HOME指向这个目录,不然会报找不到winutils的错误。
正确转换代码
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, IntegerType, StringType import os if __name__ == "__main__": # 初始化SparkSession,设置分区数为1(减少生成的part文件数量) spark = SparkSession \ .builder \ .appName("CSV2Parquet") \ .config("spark.sql.shuffle.partitions", "1") \ .getOrCreate() # 获取当前脚本所在目录,拼接CSV文件路径 dirname = os.path.dirname(os.path.abspath(__file__)) csv_path = os.path.join(dirname, 'Temp.csv') # 定义和CSV匹配的schema,避免自动推断出错 csv_schema = StructType([ StructField("col1", IntegerType(), nullable=True), StructField("col2", StringType(), nullable=True), StructField("col3", StringType(), nullable=True), StructField("col4", StringType(), nullable=True) ]) # 读取CSV,自动去除字段前后空格,指定分隔符和schema df = spark.read.csv( path=csv_path, schema=csv_schema, sep=",", ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True ) # 如果你一定要生成单个Parquet文件,合并到1个分区(仅测试/小数据量用,大数据量不推荐) df = df.coalesce(1) # 写入Parquet,覆盖已存在的文件 parquet_path = os.path.join(dirname, 'output.parquet') df.write.mode('overwrite').parquet(parquet_path) # 关闭SparkSession,释放资源 spark.stop()
关键说明
- 关于文件夹 vs 单个文件:生成的
output.parquet文件夹里的part-xxxx.parquet就是实际的Parquet数据文件,你用Spark读取时直接指定文件夹路径即可,不需要单个文件。如果非要单个文件,用coalesce(1)合并分区,但大数据量下会严重影响性能,生产环境不建议这么做。 - 代码里的细节:指定schema可以避免Spark自动推断类型出错;
ignoreLeadingWhiteSpace和ignoreTrailingWhiteSpace会自动处理字段前后的空格,解决你CSV里最后一个字段带空格的问题;设置spark.sql.shuffle.partitions=1可以减少生成的part文件数量。
内容的提问来源于stack exchange,提问作者inquisitiveProgrammer




