You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

从Azure Blob Storage加载数据至Delta Lake遇代码错误求助

解决Delta Lake写入错误与路径问题

首先说你遇到的AttributeError: 'DataFrameWriter' object has no attribute 'write'问题,完全是代码逻辑的小失误:

  • 你写了dx = df.write.format("parquet"),这行已经生成了一个DataFrameWriter对象,这个对象本身就是用来执行写入操作的,不能再对它调用.write()方法,这才导致了报错。

修正后的写入代码

你不需要先创建parquet的writer,直接读取完CSV后,调用Delta格式的写入即可。另外要注意:不要把Delta文件写入到原始CSV所在的相同路径,会引发文件冲突,建议指定一个独立的路径,比如在原路径后加_delta后缀:

storage_account_name = "xxxxxxxxdev"
storage_account_access_key = "xxxxxxxxxxxxxxxxxxxxx"
# 原始CSV文件路径
file_location = "wasbs://bicc-hdspk-eus-qc@xxxxxxxxdev.blob.core.windows.net/FSHC/DIM/FSHC_DIM_SBU"
# Delta Lake存储路径(独立于原始CSV路径)
delta_save_path = "wasbs://bicc-hdspk-eus-qc@xxxxxxxxdev.blob.core.windows.net/FSHC/DIM/FSHC_DIM_SBU_delta"
file_type = "csv"

# 配置Azure Blob访问权限
spark.conf.set("fs.azure.account.key."+storage_account_name+".blob.core.windows.net", storage_account_access_key)

# 读取CSV数据
df = spark.read.format(file_type) \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", '|') \
    .load(file_location)

# 直接写入Delta Lake
df.write.format("delta").save(delta_save_path)

关于Delta Lake路径的说明

  • 路径格式本身是正确的(wasbs://容器名@存储账户名.blob.core.windows.net/路径),核心要求是路径要独立,避免和其他数据文件混放,这样Delta Lake才能管理自己的事务日志和数据文件。
  • 如果之后想把这个Delta数据注册成Databricks中可直接查询的表,可以执行:
spark.sql("""
CREATE TABLE IF NOT EXISTS fshc_dim_sbu
USING DELTA
LOCATION '{delta_save_path}'
""".format(delta_save_path=delta_save_path))

内容的提问来源于stack exchange,提问作者Abhirup Bose

火山引擎 最新活动