目前火山引擎 LAS 集群中 Paimon 支持的版本为 1.0.1-release。
Paimon 在创建 Hadoop 集群过程中作为可选组件安装,集群创建完成后确保 Paimon 组件可见并且状态是正常的。 如果在集群初始化时没有安装,也可以通过添加 Paimon 组件在已有集群上添加 Paimon。详细操作参见:添加服务。
拉起 Flink session 集群。
<FLINK_HOME>/bin/yarn-session.sh -d
使用 Flink sql-client 执行 Flink SQL。
<FLINK_HOME>/bin/sql-client.sh
在使用 Paimon 库表之前,需要为 Paimon 创建 Catalog。
目前 Paimon 支持两种类型的 Catalog:FileSystem 或 Hive。
这种 Catalog 会将 Paimon 库表元数据存储在文件系统上,比如 HDFS。
用Flink创建 FileSystem 类型的 Catalog 的方式如下:
CREATE CATALOG my_catalog WITH ( 'type' = 'paimon', 'warehouse' = 'hdfs://nn:8020/path/to/warehousee' ); USE CATALOG my_catalog;
这种 catalog 会将 Paimon 库表元数据存储在 Hive metastore 当中,用户将可以通过 Hive 来访问这些 Paimon 表。
说明
使用 Hive metastore作为Catalog 时,依赖 Flink Hive bunndle,请确保 Flink Hive bundle 已被添加至 Flink 相关依赖中。
用 Flink 创建 HMS 类型的 Catalog 的方式如下:
CREATE CATALOG my_hive WITH ( 'type' = 'paimon', 'metastore' = 'hive', -- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment -- 'warehouse' = 'hdfs://nn:8020/path/to/warehouse', 此参数指定warehouse地址,默认为hive-site中hive.metastore.warehouse.dir参数对应的值,也可自定义路径,可支持TOS路径。 ); USE CATALOG my_hive;
CREATE TABLE word_count ( word STRING PRIMARY KEY NOT ENFORCED, cnt BIGINT );
CREATE TABLE word_count_nonkey ( word STRING, cnt BIGINT );
-- 首先创建一个流式数据源表 CREATE TEMPORARY TABLE word_table ( word STRING ) WITH ( 'connector' = 'datagen', 'fields.word.length' = '1' ); -- 流模式写入Paimon时需要设置checkpoint间隔 SET 'execution.checkpointing.interval' = '10 s'; -- 将源表流式写入Paimon中 INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;
-- 切换至流模式 SET 'execution.runtime-mode' = 'streaming'; SELECT `interval`, COUNT(*) AS interval_cnt FROM( SELECT cnt / 10000 AS `interval` FROM word_count ) GROUP BY `interval`;
INSERT INTO word_count_nonkey VALUES ('a', 1), ('b', 2), ('c', 3);
-- 切换至批模式 SET 'execution.runtime-mode' = 'batch'; SELECT * FROM word_count_nonkey;
在 Spark 上可以实现对 Paimon 表的操作。LAS 默认配置了 Spark 和 Paimon 的连接器和 Hive catalog。或者使用 Spark 创建 HMS 类型的 Catalog 的方式连接:
spark-sql ... \ --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.paimon.metastore=hive \ --conf spark.sql.catalog.paimon.warehouse=hdfs://nn:8020/path/to/warehouse \ #此参数指定warehouse地址,默认为hive-site中hive.metastore.warehouse.dir参数对应的值,也可自定义路径,可支持TOS路径。 --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
use paimon;
create table test_table ( k int, v string ) tblproperties ( 'primary-key' = 'k' );
INSERT INTO test_table VALUES (1, 'Hi'), (2, 'Hello');
SELECT * FROM test_table;
在 Presto 上可以实现对 Paimon 表的操作。LAS 默认配置了 Presto 和 Paimon 的连接器。
# 打开 Presto 客户端 presto-cli --user hive --password cptatLnbyBQ5nPuP
SELECT * FROM paimon.default.customer_orders;