You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spark-SQL报错:无法解析'spid'列,EMR异常Databricks正常

Spark 3.0.1 on EMR: AnalysisException "cannot resolve 'spid'" despite column existing in schema

问题背景

我每日运行以下Spark代码处理Parquet数据,目的是聚合用户连接信息:

def aggConnections()(input: DataFrame) = { 
    input 
        .groupBy ($"spid"(0).as("domain_userid"), $"cxid"(0).as("key_id")) 
        .agg ( min($"Time").as("first_seen"), max($"Time").as("last_seen") ) 
        .select ($"domain_userid", $"key_id", lit("cxense_id").as("key_source"), lit(1).as("times_seen"), $"first_seen", $"last_seen") 
        .filter($"domain_userid".isNotNull && $"key_id".isNotNull) 
} 
val requests = spark.read.parquet("day1").transform(aggConnections())

其中spid是数组类型(spid:array element:string),通过$"spid"(0)访问其第一个元素。

AWS EMR Spark 3.0.1上,部分日期的任务执行时抛出如下错误:

diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve '`spid`' given input columns: [AdUnitId, AudienceSegmentIds, BandWidth, BandwidthGroupId, BandwidthId, Browser, BrowserId, City, CityId, CmsMetadata, Country, CountryId, DeviceCategory, Domain, GfpContentId, IsCompanion, IsFilledRequest, IsInterstitial, IsVideoFallbackRequest, KeyPart, Metro, MetroId, MobileAppId, MobileCapability, MobileCarrier, MobileDevice, OS, OSId, OSVersion, PodPosition, PostalCode, PostalCodeId, PublisherProvidedID, RefererURL, Region, RegionId, RequestLanguage, RequestedAdUnitSizes, Time, TimeUsec2, UserId, VideoPosition, addefend, ads_enabled, app_name, app_version, bereidingstijd, brand, careerlevel, cat, category, categorycode, categoryname, channel_id, channel_title, cid, cmssource, companyname, compid, content_url, contentid, contentsubstype, crt_cpm, crt_size, cxid, cxsg, dc_yt, deviceid, dos, dossier, educationlevel, floor, fsr, gang, gdpr_applies, gdpr_consentstring, gelegenheid, hb_pb, hb_size, hour, ifa, industry, ingredient, iom, itemid, ix_apnx_om, ix_cdb_om, ix_imdi_cpm, jobtitle, k21, kage, kar, kauth, kembed, keuken, kgender, klg, ko, kpid, kvlg, kvz, kw, lat, long, mc_asset_type, mediation, model, moeilijkheid, origin, pag, pagetype, path, pay, pos, positie, production_id, productname, retailercode, retailername, rpfl_12108, screen, sector, shopid, show_id, show_title, soort, src, stad, starttype, station, subforum, tag, theme, top, video_duration, video_label, yt_vrallowed, ytdevice];; 
'Aggregate ['spid[0], cxid#136[0]], ['spid[0] AS domain_userid#276, cxid#136[0] AS key_id#277, min(Time#0) AS first_seen#417, max(Time#0) AS last_seen#419]

我确认数据schema中存在spid列,且在同版本Spark 3.0.1的Databricks环境中,包括EMR上失败的日期数据在内,所有数据都能正常运行,目前无法解释该现象。


可能的原因与解决方案

结合跨Spark环境的schema问题经验,这里有几个方向可以排查:

1. Parquet文件schema不一致,未开启schema合并

Spark默认读取Parquet时只会加载第一个文件的schema,如果部分日期的Parquet文件中实际缺失spid列(比如数据生成流程在某些日期有变更,导致schema演化),就会出现整体schema显示有spid但实际查询时找不到的情况。而Databricks默认可能开启了schema合并,所以能正常处理。

解决方法

  • 读取数据时显式开启schema合并:
    val requests = spark.read.option("mergeSchema", "true").parquet("day1").transform(aggConnections())
    
  • 检查出错日期的具体Parquet文件,验证每个文件的schema是否一致:
    // 列出该日期下的所有文件路径
    val filePaths = spark.sparkContext.wholeTextFiles("day1/*").keys.collect()
    // 逐个文件读取并打印schema
    filePaths.foreach(path => {
        println(s"Schema for file: $path")
        spark.read.parquet(path).printSchema()
    })
    

2. EMR与Databricks的Spark配置差异

EMR的默认Spark配置可能和Databricks存在差异,比如分区列类型推断、大小写敏感等设置,导致列识别异常。

解决方法

  • 检查并修改EMR的Spark配置,添加以下配置(可以在集群启动时设置,或者代码中动态设置):
    spark.conf.set("spark.sql.parquet.mergeSchema", "true")
    spark.conf.set("spark.sql.caseSensitive", "false") // 如果存在列名大小写不一致的情况
    
  • 对比Databricks的Spark配置(可以通过spark.conf.getAll查看),确保关键的schema处理配置和EMR一致。

3. Parquet文件元数据损坏

部分日期的Parquet文件可能存在元数据损坏,导致Spark无法正确识别spid列,虽然整体的schema汇总显示该列存在,但实际读取时失败。

解决方法

  • 尝试重新生成出错日期的Parquet数据,或者过滤掉损坏的文件:
    // 尝试读取spid列,定位报错的文件
    try {
        spark.read.parquet("day1").select("spid").count()
    } catch {
        case e: Exception => println(s"Error when reading spid column: ${e.getMessage}")
    }
    
  • 如果确认是部分文件损坏,可以删除这些文件后重新运行任务。

4. 分区路径的隐式列干扰

如果数据是按分区存储的,分区路径中的某些字段可能被Spark识别为数据列,导致schema冲突或列名被覆盖。

解决方法

  • 读取数据时关闭分区列类型推断:
    spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
    val requests = spark.read.parquet("day1").transform(aggConnections())
    
  • 检查分区路径是否存在与spid同名的目录,避免列名冲突。

内容的提问来源于stack exchange,提问作者dadadima

火山引擎 最新活动