LAS Catalog 通过提供的统一元数据管理、数据访问控制、元数据发现和数据集成等关键功能。LAS Catalog 连接器提供对 LAS Catalog 表的读写能力。
在 LAS Catalog 平台上创建数据目录、数据库和数据表,详细参见 LAS Catalog 文档。另外要确保 LAS Catalog 为 Flink 用户开通合理的权限。
前置条件:因为 Flink 同步 LAS 元数据,需要通过 API 接口访问。需要
在 Flink 里创建 LAS Catalog Catalog,当前 LAS Catalog Catalog 是基于 Hive Catalog 扩展实现的,在使用上需要传递 LAS Catalog 所需的参数,以创建对应的 HMS client。具体创建 Catalog 的语句示例如下:
CREATE CATALOG lf_catalog
WITH
(
'type' = 'hive',
'is-lf' = 'true',
'hive-version' = '3.1.3-with-lf3',
-- 以下根据区域进行调整
'hive.client.las.region.name' = 'cn-beijing',
'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
'hive.hms.client.is.public.cloud' = 'true',
-- 填写相关子账号 AK/SK,需要在控制台创建
'hive.client.las.ak' = 'xxx',
'hive.client.las.sk' = 'xxx'
);
Catalog 各参数解释如下:
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
type | 是 | hive | String | Catalog 类型是 Hive。 |
is-lf | 是 | false | Boolean | 设置为 true,表示访问 LAS Catalog 元信息;设置为 false,表示访问开源 HMS。 |
hive-version | 是 | 无 | String | 访问 LAS Catalog 元信息时,需要设置为 3.1.3-with-lf3。 |
hive.client.las.region.name | 是 | 无 | String | LAS Catalog 所在的区域,如下:
|
hive.metastore.uris | 是 | 无 | String | LAS Catalog 的 metastore 地址,和所在的区域一一对应:
|
hive.hms.client.is.public.cloud | 是 | false | Boolean | 设置为 true,表示访问 LAS Catalog 元信息;设置为 false,表示访问开源 HMS。 |
hive.client.las.ak | 是 | 无 | String | 火山账号的 access key |
hive.client.las.sk | 是 | 无 | String | 火山账号的 secret key |
创建 Catalog 之后,在 Flink SQL 中,数据表通过Catalog
.Database
.Table
三段式来表示,可以直接进行和 Hive 表一样的读写操作,包括批读、批写和流写三种模式。
-- 创建 LAS Catalog Catalog
CREATE CATALOG lf_catalog
WITH
(
'type' = 'hive',
'is-lf' = 'true',
'hive-version' = '3.1.3-with-lf3',
'hive.client.las.region.name' = 'cn-beijing',
'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
'hive.hms.client.is.public.cloud' = 'true',
'hive.client.las.ak' = 'xxx',
'hive.client.las.sk' = 'xxx'
);
CREATE TABLE print_sink (
a STRING,
b INT,
c DOUBLE,
d BOOLEAN,
`day` STRING,
`hour` STRING
) WITH (
'connector' = 'print',
'print-identifier' = 'out'
);
-- 批读 LAS Catalog 表
INSERT INTO print_sink
SELECT * FROM lf_catalog.lf_db_test.parquet_partition_table;
-- 创建 LAS Catalog Catalog
CREATE CATALOG lf_catalog
WITH
(
'type' = 'hive',
'is-lf' = 'true',
'hive-version' = '3.1.3-with-lf3',
'hive.client.las.region.name' = 'cn-beijing',
'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
'hive.hms.client.is.public.cloud' = 'true',
'hive.client.las.ak' = 'xxx',
'hive.client.las.sk' = 'xxx'
);
-- 插入新数据到非分区表
INSERT INTO lf_catalog.lf_db_test.mytable SELECT 'Tom', 25;
-- 覆盖写入非分区表
INSERT OVERWRITE lf_catalog.lf_db_test.mytable SELECT 'Tom', 25;
-- 插入新数据到分区表
INSERT INTO lf_catalog.lf_db_test.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25;
-- 覆盖写入分区表
INSERT OVERWRITE lf_catalog.lf_db_test.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25;
-- 创建 LAS Catalog Catalog
CREATE CATALOG lf_catalog
WITH
(
'type' = 'hive',
'is-lf' = 'true',
'hive-version' = '3.1.3-with-lf3',
'hive.client.las.region.name' = 'cn-beijing',
'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
'hive.hms.client.is.public.cloud' = 'true',
'hive.client.las.ak' = 'xxx',
'hive.client.las.sk' = 'xxx'
);
CREATE TABLE datagen_source (
a STRING,
b INT,
c DOUBLE,
d BOOLEAN
)
WITH
(
'connector' = 'datagen',
'rows-per-second' = '100'
);
-- 流写 LAS Catalog 表
INSERT INTO lf_catalog.lf_db_test.parquet_partition_table
/*+ OPTIONS('sink.partition-commit.policy.kind'='metastore,success-file') */
SELECT
a, b, c, d,
cast(current_date as string) as `day`,
cast(hour(current_timestamp) as string) as `hour`
FROM
datagen_source;
Connector Jar 包如下:
下载到本地后,通过 maven install 安装 LAS Catalog Connector 到本地:
mvn install:install-file -Dfile=flink-sql-connector-hive-las-formation-3_2.12-1.16-byted-connector-SNAPSHOT.jar -DgroupId=org.apache.flink -DartifactId=flink-sql-connector-hive-las-formation-3_2.12 -Dversion=1.16-byted-connector-SNAPSHOT -Dpackaging=jar
该作业为写 LAS Catalog 表的示例作业,主要包含两部分:
Catalog
.Database
.Table
三段式访问。package com.bigdata;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DatagenToLASFormationDemo2 {
private static final Logger LOG = LoggerFactory.getLogger(DatagenToLASFormationDemo2.class);
public static void main(String[] args) throws Exception {
// 填写 LF 相关的参数
String lfRegion = "cn-beijing";
String lfThriftUris = "thrift://lakeformation.las.cn-beijing.ivolces.com:48869";
Boolean lfMetastoreIsPublicCloud = true;
String lfAccessKey = "xxx";
String lfSecretKey = "xxx";
String hiveDatabase = "lf_db";
String hiveTable = "lf_table";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 构造 Elements Source
DataStream<String> elementStream = env.fromElements("a", "b", "c");
// 注册 LF Catalog
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String catalogName = "lf_catalog";
HiveCatalog hiveCatalog = new HiveCatalog(catalogName, hiveDatabase, null, null, "3.1.3-with-lf3",
true, lfRegion, lfThriftUris, lfMetastoreIsPublicCloud, lfAccessKey, lfSecretKey);
tableEnv.registerCatalog(catalogName, hiveCatalog);
// 把上游 DataStream 转为 Table API 的 View,使得可以在 Table API 中访问
tableEnv.createTemporaryView("sourceTable", elementStream);
// 通过 Table API 写入到 LF 表
// 注意这里写入时在 LF 表后面通过 option hint 注入了一个动态参数,该参数为分区提交策略,必须设置,比如这里设置为提交到 LF 的 metastore,以及 tos 的 success-file
String insertSql = "insert into " + catalogName + "." + hiveDatabase + "." + hiveTable + " /*+ OPTIONS('sink.partition-commit.policy.kind'='metastore,success-file') */ select * from sourceTable";
tableEnv.executeSql(insertSql);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bigdata</groupId>
<artifactId>flink-datastream-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.16.2</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-las-formation-3_${scala.binary.version}</artifactId>
<version>1.16-byted-connector-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
要访问 LAS Catalog 的其它数据目录下的表,比如数据目录名称是 other_catalog,则需要在 Flink SQL,在三段式访问 LAS Catalog 表时,把 Database 设置为 other_catalog.database
的方式,示例如下:
SELECT * FROM lf_catalog.`other_catalog.lf_db_test`.parquet_partition_table;