You need to enable JavaScript to run this app.
导航

进阶使用

最近更新时间2022.10.26 10:49:28

首次发布时间2022.10.26 10:49:28

1 查阅历史版本

1.1 历史版本信息

Spark SQL 方式

-- 直接通过 path 查询
DESCRIBE HISTORY '/path/to/delta/' [LIMIT <n>]
-- 通过 delta 前缀查询
DESCRIBE HISTORY delta.`/path/to/delta/` [LIMIT <n>]
-- 通过表名查询
DESCRIBE HISTORY deltaTable [LIMIT <n>]

Spark Python API 方式

from delta.tables import *


# 通过指定表路径获得表
deltaTable = DeltaTable.forPath(spark, pathToTable)
# 查询历史版本,其中参数 n 可选,指定获取 n 条记录。如果没有指定 n,则获取全部记录
lastOperationDF = deltaTable.history(<n>)

1.2 历史版本查询

Spark SQL 方式

-- 根据时间戳查询历史版本
SELECT * FROM table_name TIMESTAMP AS OF timestamp_expression
-- 根据版本号查询历史版本
SELECT * FROM table_name VERSION AS OF version

Spark Python API 方式

# 根据时间戳查询历史版本
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/tmp/delta/people")
# 根据版本号查询历史版本
df2 = spark.read.format("delta").option("versionAsOf", version).load("/tmp/delta/people")

其中:

  • timestamp_expression 的格式为

    • '2018-10-18T22:15:12.013Z', 可以被转换为 timestamp 的标准时间格式

    • cast('2018-10-18 13:36:32 CEST' as timestamp)

    • '2018-10-18', 日期

    • current_timestamp() - interval 12 hours 时间表达式

    • date_sub(current_date(), 1) 时间表达式

    • 其他可以被转换为 timestamp 的时间格式

  • version 则可以通过 DESCRIBE HISTORY 指令获取。

2 查阅表的详情

Spark SQL 方式

-- 通过路径获得表信息
DESCRIBE DETAIL '/path/to/delta/'
-- 通过表名获得表信息
DESCRIBE DETAIL deltaTable

Spark Python API 方式

from delta.tables import *

# 通过指定表路径获得表
deltaTable = DeltaTable.forPath(spark, pathToTable)
# 查询表属性明细
detailDF = deltaTable.detail()

3 表管理

3.1 清理过期数据

3.1.1 Delta Lake 的保存期机制

Delta Lake 有历史版本回溯的功能,它记录了所有的针对表的修改动作。每一次的表更改都会生成新的日志文件,还可能生成新的数据文件。针对日志文件和数据文件,Delta Lake 都引入了保存期机制:

  • 对于日志文件,默认保存 30 天内的数据,过期会自动删除,您无需关心;

  • 对于数据文件,默认有 7 天的保存期,过期的数据需要用户手动执行 VACUUM 命令删除;

用户可以通过配置 delta.logRetentionDuration = "interval <interval>"delta.deletedFileRetentionDuration = "interval <interval>" 来分别设置日志文件和数据文件的保存期。具体的设置方式见下文:表配置

3.1.2 清理过期数据

Spark SQL 方式

-- 通过表名执行清理动作。RETAIN 100 HOURS 表示清理 100 
VACUUM deltaTable [RETAIN 100 HOURS]  -- vacuum files not required by versions older than the default retention period
-- 通过路径字符串执行清理动作
VACUUM '/path/to/delta/' [RETAIN 100 HOURS]-- vacuum files in path-based table
-- 通过 delta. 前缀执行清理动作
VACUUM delta.`/path/to/delta/` [RETAIN 100 HOURS]
-- 只是列出清理详情而不真正执行
VACUUM deltaTable DRY RUN

Spark Python API 方式

from delta.tables import *

# 通过指定表路径获得表
deltaTable = DeltaTable.forPath(spark, pathToTable)
# 或者通过指定表名获得表
deltaTable = DeltaTable.forName(spark, tableName)
# 清理超过安全期的历史数据
deltaTable.vacuum()
# 清理超过100小时的历史数据(相对于表的版本)
deltaTable.vacuum(100)

3.2 小文件合并(Compaction,bin-packing)

用流式写入 Delta 的方式通常会产生大量小文件,时间粒度越细文件越小。另外,如果您执行一段复杂的 SQL 最终写入 Delta,则写入数据也可能因为 Partition 数量过多而产生大量小文件。
小文件的存在会造成很多问题,比如元数据处理速度下降、执行时因为文件过碎导致的磁盘随机读、用户设置并行度过大引起的小 task 过多等等,这些都会显著降低 Spark 的查询性能,因此需要对其进行合并操作。Delta 通过提供 optimize 指令来完成这个动作。
Spark SQL 方式

-- 通过指定路径进行优化
OPTIMIZE '/path/to/delta/' [WHERE CLAUSE]
-- 通过表名进行优化
OPTIMIZE deltaTable [WHERE CLAUSE]
-- 通过 delta. 前缀进行优化
OPTIMIZE delta.`/path/to/delta/` [WHERE CLAUSE]

Spark Python API 方式

from delta.tables import *

# 通过指定表路径获得表
deltaTable = DeltaTable.forPath(spark, pathToTable)
# 或者通过指定表名获得表
deltaTable = DeltaTable.forName(spark, tableName)
# 执行优化,其中 where 是可选的
deltaTable.optimize().where("date='2021-11-18'").executeCompaction()

3.2 Data skipping 与 Z-Order

Data skpping 是一种利用统计信息来过滤数据的一种方式,能够在表 scan 的时期根据过滤条件过滤掉大量数据进而加快查询。Delta 的统计信息是关于列的、文件级别的 MIN、MAX 统计信息。如果一个列在多个文件相对有序,那么可以根据该列统计信息过滤掉多个文件。反之,如果列值均衡的分布在多个文件之中,则过滤效果会大打折扣甚至没有过滤。对于一个多维表格,如果按照多维排序的方式,前面的维度过滤效果会比较好,后边的维度过滤效果会比较差。
Z-Order 提供了一种均衡各个维度排序效果的方法。Z-Order 是一种算法,能够使得参与排序的每个列都在局部相对有序,因此拿任何参与排序的列来过滤都能取得不错的过滤效果。
Delta Lake 在 OPTIMIZE 语句中提供了 ZORDER BY 子句来完成表的 Z-Order 排序。
Spark SQL 方式

OPTIMIZE events [WHERE CLAUSE] ZORDER BY (<columns>)

Spark Python API 方式

from delta.tables import *

# 通过指定表路径获得表
deltaTable = DeltaTable.forPath(spark, pathToTable)
# 或者通过指定表名获得表
deltaTable = DeltaTable.forName(spark, tableName)
# 执行优化,其中 where 是可选的
deltaTable.optimize().where("date='2021-11-18'").executeZOrderBy(<columns>)

3.4 审计功能

Delta Lake 的 history 功能提供了表的详细审计信息。当执行 DESCRIBE HISTORY 或者在 API 中调用 history() 函数时,返回的字段包含了不限于

  • userId

  • userName

  • operation

  • job

  • notebook

  • clusterId

  • ...

等字段。这些字段有助于管理员发现并管理普通用户提交作业的历史,并据其优化集群管理。

4 Delta 表配置

4.1 配置说明

您可以通过多种方式对 Delta 表进行配置:

  • 通过建表时指定 TBLPROPERTIES 进行配置。

  • 通过 Spark 的配置文件添加默认配置。

  • 建表后通过修改 TBLPROPERTIES 进行配置。

  • 用 CLI 查询时在 session 里进行配置。

Delta 表支持三种配置格式:

  1. delta.<property>:用于通过 TBLPROPERTIES 的进行的配置。
  2. spark.databricks.delta.properties.defaults.<property>:用于通过 Spark SQL 设置表的默认配置,它会在建表时作为相应配置项的默认配置填到表中,没有这个配置的话会采用 Delta 的默认配置,也就是说,它等同于“delta.defaults.<property>”,但是由于某些原因,使用了 spark.databricks.delta.properties.defaults. 这个前缀。
  3. spark.databricks.delta.<property>:Spark SQL 在读写 Delta 表时的运行时配置。

1 和 2 都属于表级的配置,3 是 Spark 引擎的配置

表配置优先级从高到低如下:

  1. 使用 TBLPROPERTIES 显式设置表的属性。

  2. 用户在 session 中通过 set spark.databricks.delta.properties.defaults.<proeprty> = xxx 设置表的默认配置。

  3. 用户在 spark-default-conf 中设置的 spark.databricks.delta.properties.defaults.<property>

如果使用其他引擎,建议通过显式指定 TBLPROPERTIES 对表进行配置。
对于 3,它影响 Spark SQL 的执行行为。

4.2 常用配置列表

4.2.1 表配置

注:下面的 <prefix> 指 delta 或者 spark.databricks.delta.properties.defaults

配置项说明
<prefix>.logRetentionDuration日志文件的生存期,默认为 30 天。
<prefix>.deletedFileRetentionDuration数据文件的生存期,默认为 7 周。
<prefix>.checkpointInterval设置日志文件的 checkpoint 周期,默认为 10。
<prefix>.appendOnly设置表为 appendOnly,此时表能够删除和追加数据,但不能更新数据。Delta 对 appendOnly 的表有优化,如果没有更新删除需求,建议设置此值。
<prefix>.enableChangeDataFeed是否开启 ChangeDataFeed 功能。

注意

强烈不建议通过设置 deletedFileRetentionDuration 为一个较小的值达到更快清理数据的目的。比如某些情况下不需要历史版本,设置了该值为 1h 甚至是 0,以便让自己能快速清理历史版本。在这种情况下可能会导致执行时长比较长的作业失败!

4.2.2 Spark SQL 引擎配置

注:下面的<prefix> 指 spark.databricks.delta

配置项说明
<prefix>.stats.collect是否收集统计信息,默认为 true。
<prefix>.stats.skipping是否打开 data skipping 功能,默认为 true。
<prefix>.maxCommitAttempts提交失败是最大的重试次数,默认为 10000000。
<prefix>.optimize.minFileSize小于此大小的文件会被合并,默认值为 1024 * 1024 * 1024。
<prefix>.optimize.maxFileSize合并文件的目标最大值,默认为 1024 * 1024 * 1024。
<prefix>.optimize.maxThreads执行 optimize 动作时启用的线程数量,默认为 15。
<prefix>.vacuum.parallelDelete.enabled是否并行执行 vacuum,默认为 false,对于大表建议并行。
<prefix>.vacuum.parallelDelete.parallelism并行 vacuum 时的线程数量。