You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
Catalog 参考
Paimon
复制全文
Paimon

Apache Paimon 是一个基于流和批处理的实时数据湖解决方案,结合了高效的存储和灵活的计算,专为处理大规模实时数据和流式数据而设计。Paimon 为 Flink 引擎提供了完善的 Catalog 接口支持,可以通过 Catalog 方便的管理实时数据湖元数据。

1. 使用限制

  • Paimon 支持基于文件系统、LAS Catalog (Hive)等多种元数据,如果要使用 LAS Catalog 管理元数据,需要开通 LAS Catalog 服务
  • Paimon 连接器仅支持在 Flink 1.16-volcano 及以上引擎版本中使用。

2. 使用步骤

2.1 创建 Filesystem Catalog

在创建 Catalog 之前需要创建相关的 TOS 桶和 Catalog 文件夹。相关文档,请参见创建存储桶创建文件夹

注意:请确保 Flink 和 TOS 处在同一个 Region,Flink 当前暂不支持跨 Region 访问 TOS Bucket。

CREATE CATALOG ${catalog_name}
WITH
  (
    'type' = 'paimon',
    'warehouse' = 'tos://${bucket_name}/${catalog_name}'
  );
  • ${catalog_name}:Catalog 的名称,自定义。
  • ${bucket_name}:存储 Paimon 数据的 TOS(对象存储)桶名称。

2.2 创建 LAS Catalog

前置条件:因为 Flink 同步 LAS 元数据,需要通过 API 接口访问。需要

  1. 使用 API 密钥管理为子账号创建 AccessKey / AccessKeySecret (后续需要填写在 hive-site.xml 中)。
  2. 在权限管理中为子账号开通 LASFullAccess 权限。

2.2.1 LAS 数据目录创建和授权

在 LAS Catalog 产品中创建 Paimon 的数据目录,需要参考数据目录管理,进行数据目录创建。需要填写数据目录名称和数据目录存储位置。

注意:这里的 TOS 桶和目录需要提前创建好,也需要和 Flink SQL 中的目录地址保持一致。

Image

并且在 LAS Catalog 权限管理模块,选择对于刚刚创建好的 Catalog 进行授权,这里可以参考权限管理,进行权限分配。因为后续需要使用 Flink 进行数据库表创建,以及数据写入等操作。所以建议给 Flink 开发者开通 Catalog 的 Admin 权限。确保可以进行以下的任务。
Image

2.2.2 Flink 创建 LAS Catalog

2.2.2.1(推荐)LAS Connector 模式

注意:此方法在 Flink 1.16 及以下版本不支持此模式,如果需要的话,请使用 2.2.3.2(不推荐)Hive 兼容模式进行访问

CREATE CATALOG paimon_las_catalog_117 WITH (
    'type'='paimon',
    'metastore'='hive',
    -- 参数标识使用 LAS Connector 连接
    'is-las' = 'true',
    -- LAS Region,支持 cn-beijing, cn-shanghai, cn-guangzhou 等,不同区域需要配置不同的 uri
    'hive.client.las.region.name' = 'cn-beijing',
    'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
    'hive.hms.client.is.public.cloud' = 'true',
    'hive.client.las.ak' = '<YOUR ACCESS KEY>',
    'hive.client.las.sk' = '<YOUR ACCESS KEY SECRET>',
    'metastore.catalog.default' = '<YOUR LAS CATALOG NAME>',
    -- 这个是 Paimon 的 TOS 存储目录
    'warehouse' = 'tos://<YOUR BUCKET>/<YOUR WAREHOUSE PATH>',
);

2.2.2.2(不推荐)Hive 兼容模式

注意:这种方案仅为兼容 Hive 而设计,Flink 1.17 版本及以上版本建议使用 2.2.3.1(推荐)LAS Connector 模式

此模式下,元数据存储在 LAS Catalog 的元数据存储服务中,表文件存储在指定的文件系统路径下。

CREATE CATALOG my_hive WITH (
  'type' = 'paimon',
  'metastore' = 'hive',
  -- LAS 元数据服务的 Thrift 接口地址,注意修改其中具体 Region 信息
  'uri' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
  -- Hive Conf 目录,由 Flink 作业开发界面依赖文件模块上传
  'hive-conf-dir' = '/opt/tiger/workdir', 
  -- 存储的桶目录地址
  'warehouse' = 'tos://<bucket-name>/path/to/warehouse'
);

上述 SQL 语句创建了一个名为 my_hive 的 Paimon Catalog,元数据存储在 LAS Catalog 元数据中,数据仓库路径为 tos://<bucket-name>/path/to/warehouse
另外在创建作业的时候需要在作业开发界面上传相应的 hive.xml 文件:
Image
其中 hive.xml 文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hive.server2.max.start.attempts</name>
    <value>5</value>
  </property>
  <property>
    <name>hive.client.las.region.name</name>
    <value>cn-shanghai</value>
  </property>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://lakeformation.las.cn-shanghai.ivolces.com:48869</value>
  </property>
  <property>
    <name>hive.hms.client.is.public.cloud</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.client.las.ak</name>
    <value>__LAS_ACCESS_KEY__</value>
  </property>
  <property>
    <name>hive.client.las.sk</name>
    <value>__LAS_ACCESS_KEY_SECRET__</value>
  </property>
</configuration>

其中注意要填写具体的 LAS Catalog 和账号 AK/SK 信息。然后将 hive.xml 文件通过作业开发-依赖文件。
另外,如果是使用 Flink-1.16-volcano 的话,引擎没有内置 LAS Catalog 的连接器,需要手动上传以下 JAR 包到依赖文件中:

flink-sql-connector-hive-las-formation-3_2.12-1.16-byted-connector-SNAPSHOT.jar
未知大小

2.3 Paimon Catalog 读写

2.3.1 创建数据库

在 Catalog 中创建一个 Database,用于组织和管理表。

CREATE DATABASE IF NOT EXISTS ${catalog_name}.${db_name};
  • ${db_name}:Database 的名称,自定义。

2.3.2 创建数据表

在 Database 中创建表,定义表结构和相关配置。

CREATE TABLE IF NOT EXISTS `${catalog_name}`.`${db_name}`.`${table_name}` (
    word varchar, -- 示例字段
    cnt bigint,
    PRIMARY KEY (word) NOT ENFORCED
) WITH (
    'bucket' = '4',  -- 控制分桶数量,单个 bucket 推荐存储 1GB 左右数据
    'changelog-producer' = 'input' -- 产生 changelog,用于下游流读
);
  • ${table_name}:表的名称,自定义。
  • bucket:分桶数量,推荐单个 bucket 存储 1GB 左右数据。
  • changelog-producer
    • 设置为 input,表示产生根据上游新增数据,用于下游流式读取。具体参考 Changelog 产出机制进行详细选择。如果不需要 changelog,则使用 none选项以节省存储和写入资源。

2.3.3 查询 Paimon 表

Paimon 查询 SQL 同时支持流读、批读,只要在 Flink 运行过程中选择对应的模式即可:

INSERT INTO `print_table`
SELECT * FROM `paimon_test`.`default`.`doc_result`;

2.3.4 流批读写

Flink Insert 语句支持流写、批写两种语义,只要在运行过程中选择相应的执行模式即可:

INSERT INTO `paimon_test`.`default`.`doc_result`
select
  t.word,
  count(1)
from
  doc_source t
GROUP BY
  t.word;

2.3.5 覆盖写

除了 Insert 语句之外,Paimon 也支持对数据表、分区等进行批式覆盖写:

-- 覆盖写入非分区表
INSERT OVERWRITE my_table SELECT ...

-- 覆盖写入分区表
INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...
最近更新时间:2025.12.29 11:42:32
这个页面对您有帮助吗?
有用
有用
无用
无用