You need to enable JavaScript to run this app.
导航
基础使用
最近更新时间:2025.06.13 16:46:46首次发布时间:2025.06.13 16:46:46
我的收藏
有用
有用
无用
无用

目前火山引擎 LAS 集群中 Paimon 支持的版本为 1.0.1-release。

Paimon 安装

Paimon 在创建 Hadoop 集群过程中作为可选组件安装,集群创建完成后确保 Paimon 组件可见并且状态是正常的。 如果在集群初始化时没有安装,也可以通过添加 Paimon 组件在已有集群上添加 Paimon。详细操作参见:添加服务

拉起 Flink session 集群。

<FLINK_HOME>/bin/yarn-session.sh -d

使用 Flink sql-client 执行 Flink SQL。

<FLINK_HOME>/bin/sql-client.sh

Catalog

在使用 Paimon 库表之前,需要为 Paimon 创建 Catalog。
目前 Paimon 支持两种类型的 Catalog:FileSystem 或 Hive。

FileSystem

这种 Catalog 会将 Paimon 库表元数据存储在文件系统上,比如 HDFS。
用Flink创建 FileSystem 类型的 Catalog 的方式如下:

CREATE CATALOG my_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://nn:8020/path/to/warehousee'
);

USE CATALOG my_catalog;

Hive

这种 catalog 会将 Paimon 库表元数据存储在 Hive metastore 当中,用户将可以通过 Hive 来访问这些 Paimon 表。

说明

使用 Hive metastore作为Catalog 时,依赖 Flink Hive bunndle,请确保 Flink Hive bundle 已被添加至 Flink 相关依赖中。

用 Flink 创建 HMS 类型的 Catalog 的方式如下:

CREATE  CATALOG my_hive  WITH  (
    'type' = 'paimon',
    'metastore' = 'hive',
    -- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf
    -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
    -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment
    -- 'warehouse' = 'hdfs://nn:8020/path/to/warehouse', 此参数指定warehouse地址,默认为hive-site中hive.metastore.warehouse.dir参数对应的值,也可自定义路径,可支持TOS路径。
);

USE CATALOG my_hive;

创建表

Primary Key 表(主键表

CREATE TABLE word_count (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt BIGINT
);

Append Only 表(非主键表)

CREATE TABLE word_count_nonkey (
    word STRING,
    cnt BIGINT
);

流式写入

-- 首先创建一个流式数据源表
CREATE TEMPORARY TABLE word_table (
    word STRING
) WITH (
     'connector' = 'datagen',
     'fields.word.length' = '1'
);

-- 流模式写入Paimon时需要设置checkpoint间隔
SET 'execution.checkpointing.interval' = '10 s';

-- 将源表流式写入Paimon中
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;

流式读取

-- 切换至流模式
SET 'execution.runtime-mode' = 'streaming';

SELECT `interval`, COUNT(*) AS interval_cnt FROM(
SELECT cnt / 10000 AS `interval` FROM word_count
) GROUP BY `interval`;

批式写入

INSERT INTO word_count_nonkey VALUES
('a', 1),
('b', 2),
('c', 3);

批式读取

-- 切换至批模式
SET 'execution.runtime-mode' = 'batch';

SELECT * FROM word_count_nonkey;

通过 Spark 快速上手

Catalog

Hive catalog

在 Spark 上可以实现对 Paimon 表的操作。LAS 默认配置了 Spark 和 Paimon 的连接器和 Hive catalog。或者使用 Spark 创建 HMS 类型的 Catalog 的方式连接:

spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.metastore=hive \
    --conf spark.sql.catalog.paimon.warehouse=hdfs://nn:8020/path/to/warehouse \ #此参数指定warehouse地址,默认为hive-site中hive.metastore.warehouse.dir参数对应的值,也可自定义路径,可支持TOS路径。
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

使用 paimon catalog

use paimon;

创建表

create table test_table (
    k int,
    v string
) tblproperties (
    'primary-key' = 'k'
);

批式写入

INSERT INTO test_table VALUES (1, 'Hi'), (2, 'Hello');

批式查询

SELECT * FROM test_table;

通过 Presto 快速上手

在 Presto 上可以实现对 Paimon 表的操作。LAS 默认配置了 Presto 和 Paimon 的连接器。

# 打开 Presto 客户端
presto-cli --user hive --password cptatLnbyBQ5nPuP

批式查询

SELECT * FROM paimon.default.customer_orders;