You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Windows+Python2.7环境下如何将CSV转为Parquet文件?

解决Windows+Python2.7下Spark CSV转Parquet的问题

首先得给你澄清一个关键点:你看到生成output.parquet文件夹而非单个文件,其实这是Spark的正常行为,不是错误!Spark作为分布式计算框架,写Parquet时默认会把数据拆分成多个分区文件(比如文件夹里的part-xxxx.parquet),这是为了适配分布式存储的特性。如果你确实需要单个文件,可以通过合并分区实现,但先别急,先把报错的问题解决掉。

先分析你之前代码里的几个问题:

  • 第一段代码的import语句写在了同一行,属于语法错误,应该拆分;另外CSV最后一个字段Denver后面带空格,直接分割后会保留空格,可能导致数据处理异常;
  • 第二段代码没有指定schema,Spark自动推断类型可能出错,而且也没处理字段前后的空格;
  • 另外Windows环境下Spark依赖Hadoop的winutils工具,如果你没配置的话也会触发报错。

针对Windows+Python2.7的环境,我给你整理了一套正确的流程和代码:

前置准备

  1. Spark版本选择:必须用支持Python2.7的Spark版本,比如Spark 2.4.8(Spark 3.x已经彻底放弃Python2.7支持了),下载时选对应Hadoop版本的包(比如Hadoop 2.7)。
  2. 配置winutils:Windows下Spark需要依赖Hadoop的文件系统工具,你需要下载对应Hadoop版本的winutils.exe,放到HADOOP_HOME\bin目录,同时系统环境变量里添加HADOOP_HOME指向这个目录,不然会报找不到winutils的错误。

正确转换代码

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import os

if __name__ == "__main__":
    # 初始化SparkSession,设置分区数为1(减少生成的part文件数量)
    spark = SparkSession \
        .builder \
        .appName("CSV2Parquet") \
        .config("spark.sql.shuffle.partitions", "1") \
        .getOrCreate()
    
    # 获取当前脚本所在目录,拼接CSV文件路径
    dirname = os.path.dirname(os.path.abspath(__file__))
    csv_path = os.path.join(dirname, 'Temp.csv')
    
    # 定义和CSV匹配的schema,避免自动推断出错
    csv_schema = StructType([
        StructField("col1", IntegerType(), nullable=True),
        StructField("col2", StringType(), nullable=True),
        StructField("col3", StringType(), nullable=True),
        StructField("col4", StringType(), nullable=True)
    ])
    
    # 读取CSV,自动去除字段前后空格,指定分隔符和schema
    df = spark.read.csv(
        path=csv_path,
        schema=csv_schema,
        sep=",",
        ignoreLeadingWhiteSpace=True,
        ignoreTrailingWhiteSpace=True
    )
    
    # 如果你一定要生成单个Parquet文件,合并到1个分区(仅测试/小数据量用,大数据量不推荐)
    df = df.coalesce(1)
    
    # 写入Parquet,覆盖已存在的文件
    parquet_path = os.path.join(dirname, 'output.parquet')
    df.write.mode('overwrite').parquet(parquet_path)
    
    # 关闭SparkSession,释放资源
    spark.stop()

关键说明

  • 关于文件夹 vs 单个文件:生成的output.parquet文件夹里的part-xxxx.parquet就是实际的Parquet数据文件,你用Spark读取时直接指定文件夹路径即可,不需要单个文件。如果非要单个文件,用coalesce(1)合并分区,但大数据量下会严重影响性能,生产环境不建议这么做。
  • 代码里的细节:指定schema可以避免Spark自动推断类型出错;ignoreLeadingWhiteSpaceignoreTrailingWhiteSpace会自动处理字段前后的空格,解决你CSV里最后一个字段带空格的问题;设置spark.sql.shuffle.partitions=1可以减少生成的part文件数量。

内容的提问来源于stack exchange,提问作者inquisitiveProgrammer

火山引擎 最新活动