Spark SQL 2.3:DataFrame以Append模式插入现有表的方法是否正确?
嘿,我来帮你排查这个Spark插入数据的问题~
首先,你代码里的核心问题有两个:临时视图和永久表的区别,以及insertInto的严格schema要求,咱们一个个说:
1. 临时视图不等于永久表
你else分支里用createOrReplaceTempView("mylogs")创建的是会话级临时视图,它只存在于当前SparkSession中,程序结束或者会话关闭后就会消失,不会被注册到Spark的元数据里。所以下次运行程序时,spark.catalog().tableExists("mylogs")还是会返回false,根本走不到插入分支。正确的做法是创建永久表,用saveAsTable代替临时视图。
2. insertInto的schema严格匹配要求
insertInto对DataFrame和目标表的schema要求极高:列的顺序、名称、数据类型必须完全一致,哪怕只是列顺序不对,都会抛出异常。这也是你走到插入分支时出错的大概率原因。相比之下,saveAsTable在append模式下会更灵活一些(当然严格匹配schema还是最佳实践)。
修正后的代码示例
方案一:用saveAsTable创建并插入(推荐)
这个方案能解决临时表的问题,同时降低schema不匹配的风险:
Dataset<Row> logDataFrame = spark.createDataFrame(rowRDD, schema); if (spark.catalog().tableExists("mylogs")) { // 用saveAsTable替代insertInto,自动处理部分schema兼容场景 logDataFrame.write().mode("append").saveAsTable("mylogs"); } else { // 创建永久表,注册到元数据中 logDataFrame.write().saveAsTable("mylogs"); } Dataset<Row> results = spark.sql("SELECT count(a1) FROM mylogs");
方案二:坚持用insertInto?先对齐schema
如果你必须用insertInto,那一定要确保DataFrame和目标表的schema完全对齐,比如先读取目标表的列顺序,调整你的DataFrame:
Dataset<Row> logDataFrame = spark.createDataFrame(rowRDD, schema); if (spark.catalog().tableExists("mylogs")) { Dataset<Row> targetTable = spark.table("mylogs"); // 按照目标表的列顺序重新排列当前DataFrame的列 logDataFrame = logDataFrame.select(targetTable.columns()); // 现在再执行插入就不会因为schema不匹配报错了 logDataFrame.write().mode("append").insertInto("mylogs"); } else { // 同样要创建永久表,而不是临时视图 logDataFrame.write().saveAsTable("mylogs"); } Dataset<Row> results = spark.sql("SELECT count(a1) FROM mylogs");
小提示
你可以用spark.sql("DESCRIBE mylogs")查看目标表的schema,和你的logDataFrame.printSchema()输出对比,就能快速找到不匹配的地方啦~
内容的提问来源于stack exchange,提问作者Molay




