You need to enable JavaScript to run this app.
文档中心
湖仓一体分析服务 LAS 私有化

湖仓一体分析服务 LAS 私有化

复制全文
下载 pdf
Paimon
基础使用
复制全文
下载 pdf
基础使用

目前火山引擎 LAS 集群中 Paimon 支持的版本为 1.0.1-release。

Paimon 安装

Paimon 在创建 Hadoop 集群过程中作为可选组件安装,集群创建完成后确保 Paimon 组件可见并且状态是正常的。 如果在集群初始化时没有安装,也可以通过添加 Paimon 组件在已有集群上添加 Paimon。详细操作参见:添加服务

拉起 Flink session 集群。

<FLINK_HOME>/bin/yarn-session.sh -d

使用 Flink sql-client 执行 Flink SQL。

<FLINK_HOME>/bin/sql-client.sh

Catalog

在使用 Paimon 库表之前,需要为 Paimon 创建 Catalog。
目前 Paimon 支持两种类型的 Catalog:FileSystem 或 Hive。

FileSystem

这种 Catalog 会将 Paimon 库表元数据存储在文件系统上,比如 HDFS。
用Flink创建 FileSystem 类型的 Catalog 的方式如下:

CREATE CATALOG my_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://nn:8020/path/to/warehousee'
);

USE CATALOG my_catalog;

Hive

这种 catalog 会将 Paimon 库表元数据存储在 Hive metastore 当中,用户将可以通过 Hive 来访问这些 Paimon 表。

说明

使用 Hive metastore作为Catalog 时,依赖 Flink Hive bunndle,请确保 Flink Hive bundle 已被添加至 Flink 相关依赖中。

用 Flink 创建 HMS 类型的 Catalog 的方式如下:

CREATE  CATALOG my_hive  WITH  (
    'type' = 'paimon',
    'metastore' = 'hive',
    -- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf
    -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
    -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment
    -- 'warehouse' = 'hdfs://nn:8020/path/to/warehouse', 此参数指定warehouse地址,默认为hive-site中hive.metastore.warehouse.dir参数对应的值,也可自定义路径,可支持TOS路径。
);

USE CATALOG my_hive;

创建表

Primary Key 表(主键表

CREATE TABLE word_count (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt BIGINT
);

Append Only 表(非主键表)

CREATE TABLE word_count_nonkey (
    word STRING,
    cnt BIGINT
);

流式写入

-- 首先创建一个流式数据源表
CREATE TEMPORARY TABLE word_table (
    word STRING
) WITH (
     'connector' = 'datagen',
     'fields.word.length' = '1'
);

-- 流模式写入Paimon时需要设置checkpoint间隔
SET 'execution.checkpointing.interval' = '10 s';

-- 将源表流式写入Paimon中
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;

流式读取

-- 切换至流模式
SET 'execution.runtime-mode' = 'streaming';

SELECT `interval`, COUNT(*) AS interval_cnt FROM(
SELECT cnt / 10000 AS `interval` FROM word_count
) GROUP BY `interval`;

批式写入

INSERT INTO word_count_nonkey VALUES
('a', 1),
('b', 2),
('c', 3);

批式读取

-- 切换至批模式
SET 'execution.runtime-mode' = 'batch';

SELECT * FROM word_count_nonkey;

通过 Spark 快速上手

Catalog

Hive catalog

在 Spark 上可以实现对 Paimon 表的操作。LAS 默认配置了 Spark 和 Paimon 的连接器和 Hive catalog。或者使用 Spark 创建 HMS 类型的 Catalog 的方式连接:

spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.metastore=hive \
    --conf spark.sql.catalog.paimon.warehouse=hdfs://nn:8020/path/to/warehouse \ #此参数指定warehouse地址,默认为hive-site中hive.metastore.warehouse.dir参数对应的值,也可自定义路径,可支持TOS路径。
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

使用 paimon catalog

use paimon;

创建表

create table test_table (
    k int,
    v string
) tblproperties (
    'primary-key' = 'k'
);

批式写入

INSERT INTO test_table VALUES (1, 'Hi'), (2, 'Hello');

批式查询

SELECT * FROM test_table;

通过 Presto 快速上手

在 Presto 上可以实现对 Paimon 表的操作。LAS 默认配置了 Presto 和 Paimon 的连接器。

# 打开 Presto 客户端
presto-cli --user hive --password cptatLnbyBQ5nPuP

批式查询

SELECT * FROM paimon.default.customer_orders;
最近更新时间:2025.06.13 16:46:46
这个页面对您有帮助吗?
有用
有用
无用
无用