You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Databricks:如何关联包含array<string>类型员工ID列的部门视图与员工表

嘿,这个需求其实很好实现,核心思路就是先把部门里的员工ID数组拆成单行记录,关联员工表拿到对应名字,再重新把名字聚合回数组——重点是要保证名字数组的顺序和原员工ID数组的顺序一致,不然结果就乱啦!下面给你两种常用的实现方式:

一、Databricks SQL 实现
SELECT 
  tv.DeptID,
  tv.DeptName,
  tv.EmployeeIDs,
  COLLECT_LIST(e.EmpName) OVER (
    PARTITION BY tv.DeptID 
    ORDER BY pos 
    ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
  ) AS EmployeeNames
FROM (
  SELECT 
    DeptID,
    DeptName,
    EmployeeIDs,
    pos,
    emp_id
  FROM vw_TestView
  LATERAL VIEW POSEXPLODE(EmployeeIDs) AS pos, emp_id
) tv
LEFT JOIN Employees e ON tv.emp_id = e.EmpID
GROUP BY tv.DeptID, tv.DeptName, tv.EmployeeIDs
ORDER BY tv.DeptID;

步骤说明:

  1. 用POSEXPLODE拆分数组:和普通EXPLODE不同,POSEXPLODE会同时返回数组元素的位置(pos)和元素值(emp_id),这是保证名字顺序和原ID数组一致的关键。
  2. 关联员工表:通过拆分后的emp_id匹配员工表的EmpID,拿到对应的员工姓名。
  3. 聚合生成有序数组:用窗口函数按部门分区、按位置排序,再用COLLECT_LIST把姓名重新拼成数组,最后按部门分组得到最终结果。

如果完全不关心名字数组的顺序(比如业务上允许乱序),可以简化成更简洁的版本:

SELECT 
  tv.DeptID,
  tv.DeptName,
  tv.EmployeeIDs,
  COLLECT_LIST(e.EmpName) AS EmployeeNames
FROM vw_TestView
LATERAL VIEW EXPLODE(EmployeeIDs) AS emp_id
LEFT JOIN Employees e ON emp_id = e.EmpID
GROUP BY tv.DeptID, tv.DeptName, tv.EmployeeIDs;
二、PySpark DataFrame API 实现
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1. 拆分员工ID数组并保留每个元素的位置
exploded_df = spark.table("vw_TestView") \
  .withColumn("pos_emp", F.posexplode(F.col("EmployeeIDs"))) \
  .select(
    "DeptID", "DeptName", "EmployeeIDs",
    F.col("pos_emp.pos").alias("pos"),
    F.col("pos_emp.col").alias("emp_id")
  )

# 2. 关联员工表获取对应的姓名
joined_df = exploded_df.join(
  spark.table("Employees"),
  exploded_df.emp_id == Employees.EmpID,
  how="left"
)

# 3. 按部门聚合,按位置排序生成有序的姓名数组
window_spec = Window.partitionBy("DeptID").orderBy("pos")
result_df = joined_df \
  .withColumn("EmployeeNames", F.collect_list(F.col("EmpName")).over(window_spec)) \
  .groupBy("DeptID", "DeptName", "EmployeeIDs") \
  .agg(F.max("EmployeeNames").alias("EmployeeNames")) \
  .orderBy("DeptID")

# 查看最终结果
result_df.show(truncate=False)

步骤说明:

  1. POSEXPLODE拆分数组:用F.posexplode拆分数组,同时拿到元素位置和ID值,重命名列方便后续操作。
  2. 关联员工表:用DataFrame的join方法匹配员工ID,获取姓名。
  3. 窗口函数聚合保序:定义窗口按部门分区、位置排序,用collect_list生成有序的姓名数组,最后分组取max(同一部门的窗口结果数组完全一致,取max即可得到完整数组)。

同样,如果不关心顺序,简化版代码如下:

from pyspark.sql import functions as F

result_df = spark.table("vw_TestView") \
  .withColumn("emp_id", F.explode(F.col("EmployeeIDs"))) \
  .join(spark.table("Employees"), on="emp_id", how="left") \
  .groupBy("DeptID", "DeptName", "EmployeeIDs") \
  .agg(F.collect_list(F.col("EmpName")).alias("EmployeeNames")) \
  .orderBy("DeptID")

result_df.show(truncate=False)

内容的提问来源于stack exchange,提问作者Mr.Human

火山引擎 最新活动