You need to enable JavaScript to run this app.
导航

Hive

最近更新时间2024.04.01 15:48:20

首次发布时间2024.04.01 15:48:20

Hive 连接器提供对 Hive 数据源的读写能力,通过使用 Hive Catalog,Flink 可以对 Hive 表做统一的批和流处理。这意味着 Flink 可以作为 Hive 批处理引擎的一个性能更好的选择,或者流式写 Hive 表中的数据以支持实时数仓应用。

使用限制

  • Hive 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。
  • 流写 Hive 不支持 insert overwrite

建表方式

在 Flink SQL 语法中,数据表通过Catalog.Database.Table三段式来表示。对应到 Hive 数据源,Catalog 是 Hive Catalog。
其中流写的 Hive 表,需要设置表参数,参见流写的表级别参数

参数

批读&写的作业级别参数

参数

是否必选

默认值

数据类型

描述

table.exec.hive.fallback-mapred-reader

true

Boolean

设置是否开启向量化读取的参数。
当满足以下条件时,Flink 会自动对 Hive 表进行向量化读取:

  • 格式:ORC 或者 Parquet。
  • 没有复杂类型的列,比如 Hive 列类型:List、Map、Struct、Union。

该特性默认开启,如果要禁用,则设置为 false。

table.exec.hive.infer-source-parallelism

true

Boolean

设置是否开启 Source 并发推断。默认情况下,Flink 会基于文件的数量,以及每个文件中块的数量推断出读取 Hive 的最佳并行度。
Flink 允许灵活地配置并发推断策略。
如果该参数是 true,会根据 split 的数量推断 source 的并发度。如果是 false,source 的并发度由配置决定。

说明

该参数会影响当前作业的所有 hive source。

table.exec.hive.infer-source-parallelism.max

1000

Integer

设置 source 算子推断的最大并发度。

说明

该参数会影响当前作业的所有 hive source。

table.exec.hive.split-max-size

128mb

MemorySize

设置读 Hive 表时调整数据分片大小。
读 Hive 表时, 数据文件将会被切分为若干个分片(split), 每一个分片是要读取的数据的一部分。 分片是 Flink 进行任务分配和数据并行读取的基本粒度。 用户可以通过该参数来调整每个分片的大小,来做一定的读性能调优。
该参数表示读 Hive 表时,每个分片最大可以包含的字节数 (默认是 128MB)。

说明

仅适用于 ORC 格式的 Hive 表。

table.exec.hive.file-open-cost

4mb

MemorySize

设置读 Hive 表时,打开一个文件预估的开销,以字节为单位,默认是 4MB。
如果这个值比较大,Flink 将会倾向于将 Hive 表切分为更少的分片,这在 Hive 表中包含大量小文件的时候很有用。 反之,Flink 将会倾向于将 Hive 表切分为更多的分片,这有利于提升数据读取的并行度。

说明

仅适用于 ORC 格式的 Hive 表。

table.exec.hive.calculate-partition-size.thread-num

3

Integer

设置计算所有分区下的所有文件大小的线程数。
为了调整数据分片的大小, Flink 首先将计算得到所有分区下的所有文件的大小,但是这在分区数量很多的情况下会比较耗时。通过调整该参数为一个更大的值使用更多的线程来进行加速。

table.exec.hive.load-partition-splits.thread-num

3

Integer

设置 split 切分的线程数。
Flink 使用多个线程并发将 Hive 分区切分成多个 split 进行读取。通过该参数配置线程数,以加速 split 切分过程。

table.exec.hive.sink.statistic-auto-gather.enable

true

Boolean

设置是否开启自动收集统计信息。
在使用 Flink 写入 Hive 表的时候,Flink 将默认自动收集写入数据的统计信息然后将其提交至 Hive metastore 中。 但在某些情况下,你可能不想自动收集统计信息,因为收集这些统计信息可能会花费一定的时间。 为了避免 Flink 自动收集统计信息,可以设置该参数为 false。

说明

只有批写模式才支持自动收集统计信息,流写模式目前还不支持自动收集统计信息。

table.exec.hive.sink.statistic-auto-gather.thread-num

3

Integer

设置收集统计信息的线程数。
对于 Parquet 或者 ORC 格式的表,为了快速收集到统计信息 numRows/rawDataSize, Flink 只会读取文件的 footer。但是在文件数量很多的情况下,这可能也会比较耗时,您可以通过设置该参数为一个更大的值来加快统计信息的收集。

说明

只有批写模式才支持自动收集统计信息,流写模式目前还不支持自动收集统计信息。

流写的表级别参数

流写的参数是表级别的,需要在 Hive 建表时,添加到表参数里。这些参数主要是 Filesystem 的表参数,请参见Filesystem
除此之外,还有一些参数是流写 Hive 独有的,如下:

参数

是否必选

默认值

数据类型

描述

sink.partition-commit.policy.kind

metastore,success-file

String

分区提交策略,具体为通知下游某个分区已经写完毕可以被读取了,推荐设置为 metastore,success-file。

  • metastore:向 metadata 增加分区。仅 hive 支持 metastore 策略,文件系统通过目录结构管理分区;
  • success-file:在目录中增加 '_success' 文件; 上述两个策略可以同时指定:'metastore,success-file'。
  • custom:通过指定的类来创建提交策略。 支持同时指定多个提交策略:'metastore,success-file'。

示例代码

示例 SQL 中的 Hive Catalog 以 default_hive 为例。

批读

SELECT * FROM default_hive.db.table WHERE `date`='${date}'

批写

-- 插入新数据到非分区表 
INSERT INTO default_hive.db.mytable SELECT 'Tom', 25; 
-- 覆盖写入非分区表 
INSERT OVERWRITE default_hive.db.mytable SELECT 'Tom', 25; 
-- 插入新数据到分区表 
INSERT INTO default_hive.db.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25; 
-- 覆盖写入分区表 
INSERT OVERWRITE default_hive.db.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25;

流写

创建 Hive 表,在表属性中添加流写 Hive 相关的参数:

CREATE TABLE db.hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

流写 Hive 的 Flink SQL:

CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- 在 TIMESTAMP 列声明 watermark。
) WITH (...);

INSERT INTO TABLE default_hive.db.hive_table 
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;