Apache Paimon 是一种新型的流式数据湖存储技术,结合了 LSM-Tree 结构和湖格式,提供了高吞吐、低延迟的数据摄入、完整的流式变更订阅以及高效的 OLAP 查询能力。
本手册将指导您如何使用 Flink 引擎进行 Paimon 的开发任务,并且利用 EMR-Hive 统一管理 Paimon 湖的元数据,可以方便将 Paimon 充分融入已有的 EMR 大数据组件当中。
确保您已经:
前置条件:Flink 需要访问 E-MapReduce 集群,我们需要获取以下文件
配置文件的获取方式如下,在 E-MapReduce 产品集群 - 集群详情 - 节点管理 - MasterGroup - 选择第一台 Master 节点:
通过控制台远程连接或者通过公网 SSH 连接指定 Master 服务器,通过 Terminal 控制台进行访问
注意:如果通过公网 SSH 连接,则需要提前为 ECS 绑定公网 IP
可以通过如下命令行获取:
# hive conf 一般在以下这个目录 cd /etc/emr/hive/conf # 没有这个目录的话也可以尝试从环境变量中获取 env | grep HIVE_CONF_DIR
将下图中的 hive-site.xml 通过 SCP 命令拷贝到本地,或者直接 cat 文本,获取文件内容:
hdfs-site.xml操作同 hive-site.xml
另外由于有些 Hive 表的 Format 会涉及到 YARN 的类,需要视情况将 yarn-site.xml 中的配置项 yarn.resourcemanager.principal 拷贝出来添加到 hdfs-site.xml 中。
内容类似如下:
<property> <name>yarn.resourcemanager.principal</name> <value>yarn/_HOST@6C5B5406ADDF347BB8C9.EMR.COM</value> </property>
keytab 在 EMR 集群概览页面,需要创建一个用户,在用户管理这边,直接下载即可。
krb5.conf 在 /etc/krb5.conf 根目录,直接scp拷贝或者cat文本copy出来
操作路径:作业开发 - Flink SQL 作业 - 创建作业。
参考文档:开发 Flink SQL 任务
为了解析 EMR-Hive 的域名,需要设置以下两个自定义参数:
containerized.master.env.ENV_SEARCH_DOMAIN:emr-volces.com containerized.taskmanager.env.ENV_SEARCH_DOMAIN:emr-volces.com
配置结果如下
我们可以通过如下 SQL 创建 Hive Catalog:
CREATE CATALOG paimon_test1 WITH ( 'type' = 'paimon', 'metastore' = 'hive', -- uri 填写 HMS thrift 的内网地址如下, 'uri' = 'thrift://master-1-1.emr-0fc031e89a242e96662b.cn-beijing.emr-volces.com:9083', 'hive-conf-dir' = '/opt/tiger/workdir', 'hadoop-conf-dir' = '/opt/tiger/workdir', 'warehouse' = 'tos://<BUCKET_NAME>/paimon_emr_test' );
WITH 参数的意义如下:
type:选择 paimon 类型 Catalog。metastore:选择 hive 的方式进行元数据管理。uri:Hive 的 thrift 接口地址hive-conf-dir:这里指定 hive-site.xml 的文件路径。注意:要固定填写 /opt/tiger/workdir。hadoop-conf-dir:这里指定 hdfs-site.xml 的文件路径。注意:要固定填写 /opt/tiger/workdir。warehouse:和 Hive Catalog 中的数据目录存储位置保持一致。其中 thrift 接口地址可以在 EMR Hive 服务地址查看,服务列表 - Hive - 服务参数:
在 Catalog 中创建一个 Database,用于组织和管理表。
CREATE DATABASE IF NOT EXISTS ${catalog_name}.${db_name};
${db_name}:Database 的名称,自定义。在 Database 中创建表,定义表结构和相关配置。以下是一个主键表的非分区表的示例
CREATE TABLE IF NOT EXISTS `${catalog_name}`.`${db_name}`.`${table_name}` ( word varchar, -- 示例字段 cnt bigint, PRIMARY KEY (word) NOT ENFORCED ) WITH ( 'bucket' = '4', -- 控制分桶数量,单个 bucket 推荐存储 1GB 左右数据 'changelog-producer' = 'input', -- 产生 changelog,用于下游流读 );
${table_name}:表的名称,自定义。bucket:分桶数量,推荐单个 bucket 存储 1GB 左右数据。changelog-producer:
input,表示产生根据上游新增数据,用于下游流式读取。具体参考 Changelog 产出机制进行详细选择。如果不需要 changelog,则使用 none选项以节省存储和写入资源。在 Database 中创建表,定义表结构和相关配置。以下是一个主键表的分区表的示例:
CREATE TABLE IF NOT EXISTS paimon_test1.test_db5.test_table5 ( word varchar, -- 示例字段 cnt bigint, dt STRING, hh STRING, PRIMARY KEY (dt, hh, word) NOT ENFORCED -- 一般分区主键表的主键字段必须包含分区字段 ) PARTITIONED BY (dt, hh) WITH ( 'bucket' = '4', -- 控制分桶数量,单个 bucket 推荐存储 1GB 左右数据 'changelog-producer' = 'none', -- 产生 changelog,用于下游流读 'metastore.partitioned-table' = 'true' -- 确定是否将分区信息同步到 Hive 元数据管理 );
metastore.partitioned-table:开启后将分区信息会同步到 Hive 元数据管理,默认 false不开启。以下示例展示了如何使用 Flink SQL 将数据写入 Paimon 表。
首先,创建一个数据源表,用于生成模拟数据。
CREATE TABLE doc_source (word varchar) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.word.length' = '30' );
connector:使用 datagen 连接器生成模拟数据。rows-per-second:每秒生成的行数。fields.word.length:生成字段 word 的长度。将数据源表中的数据写入 Paimon 表。
INSERT INTO `paimon_test`.`default`.`doc_result` select t.word, count(1) from doc_source t GROUP BY t.word;
paimon_test:Catalog 名称。default:Database 名称。doc_result:目标表名称。Paimon 的快照提交机制与 Flink 的 Checkpoint 紧密关联,因此合理配置 Checkpoint 参数对于确保下游流式或批处理任务能够及时感知上游数据变化至关重要。以 Flink 默认的 Checkpoint 间隔为例,若设置为 5 分钟,则意味着每 5 分钟完成一次 Checkpoint 后,下游任务才能观察到最新的数据插入、更新或删除操作。因此,根据业务需求调整 Checkpoint 间隔是优化数据可见性和处理时效性的关键。
注意:可以根据数据实时性要求降低或者提高 Checkpoint 的间隔。一般建议 Checkpoint 时间间隔不低于 1 分钟。如果数据延迟严重、数据更新频繁、数据新鲜度要求不高等场景,建议适当提升 Checkpoint 时间到 5 分钟或者更长。
Checkpoint 开启如下图,在作业开发 - 配置参数 - Checkpoint 配置 - Checkpoint 间隔进行设置。
此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。请注意:如果遇到 Caused by: org.apache.thrift.transport.TTransportException此类问题,可以参考 8.1 验证 SQL 时报错的描述。可以先暂时忽略此问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适的资源池和跳过上线前深度检查后。可以上线任务。
上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。
可以进入 TOS 桶管理界面(需要账户有相关访问权限),可以看到已经在数据表的 TOS 目录下,产生了相关的数据文件。则说明数据写入成功。
我们以上已经确认了数据写入成功,以下示例展示了如何使用 Flink SQL 从 Paimon 表中流/批式读取数据。进一步可以确认数据准确性。
创建一个打印表,用于输出读取的数据。
CREATE TABLE `print_table` ( word varchar, cnt bigint ) WITH ( 'connector' = 'print' );
connector:使用 print 连接器将数据打印到控制台。从 Paimon 表中读取数据并写入打印表。
INSERT INTO `print_table` SELECT * FROM `paimon_test`.`default`.`doc_result`;
paimon_test:Catalog 名称。default:Database 名称。doc_result:源表名称。可以通过 Hive 命令行确认数据是否已经正确写入
root@master-1-1(192.168.13.162):~$ hive hive> show databases; OK default ... test_db1 Time taken: 0.583 seconds, Fetched: 5 row(s) hive> use test_db1; OK Time taken: 0.031 seconds hive> show tables; OK test_table Time taken: 0.033 seconds, Fetched: 1 row(s) hive> select * from test_table limit 10; ...
参考 Spark 访问 Paimon ,通过 Spark 引擎对 Paimon 的湖数据进行批式处理。
注意:用Spark创建HMS类型的Catalog的方式如下:
# 注意将 tos 路径修改成你的测试路径 spark-sql ... \ --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.paimon.metastore=hive \ --conf spark.sql.catalog.paimon.warehouse=tos://<BUCKET_NAME>/paimon_emr_test \ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
接着可以首先 use paimon ,之后就可以使用 SQL 访问 Paimon 数据表了:
spark-sql (default)> use paimon; spark-sql (default)> show databases; default ... test_db1 Time taken: 0.53 seconds, Fetched 5 row(s) spark-sql (default)> use test_db1; Time taken: 0.288 seconds spark-sql (test_db1)> select * from test_table limit 1; ...
如果在验证 SQL 的时候(点击验证按钮,或者上线时候自动检查 SQL)报错如下,形如 Caused by: org.apache.thrift.transport.TTransportException此类错误,说明当前连接 Hive 接口不同。请不要慌张,当前版本暂时无法在验证 SQL 阶段访问 Hive 元数据。
org.apache.flink.table.api.ValidationException: Unable to create catalog 'paimon_test1'. Catalog options are: 'hive-conf-dir'='/opt/tiger/workdir' 'metastore'='hive' 'type'='paimon' 'uri'='thrift://lakeformation.xxx.cn-beijing.ivolces.com:48869' 'warehouse'='tos://flink-cwz-paimon/paimon_test1' at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:511) ... Caused by: java.lang.RuntimeException: Failed to determine if database default exists at org.apache.paimon.hive.HiveCatalog.databaseExistsImpl(HiveCatalog.java:223) ... 9 more Caused by: org.apache.thrift.transport.TTransportException at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) ... 15 more
解决方案:任务上线过程中选择更多设置 - 跳过上线前的深度检查。
在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且在日志中报如下类似错误,这个问题说明 Hive 的接口无法访问
Caused by: org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException
可能原因
在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且运行事件中发现如下报错,org.apache.flink.table.catalog exceptions.DatabaseNotExistException
解决办法:这个原因是因为在 Flink 任务提交阶段,静态解析 SQL 的时候,当前不会去连接 Hive 获取已有的数据库,所以必须在 SQL 中显式写明建库语句。在 SQL 代码中加入以下语句:
CREATE TABLE IF NOT EXISTS test_db;
重新提交任务之后,就可以恢复正常。
在执行完 6. 上线任务并且启动任务后,如果发现任务失败,并且日志中发现如下报错,com.ctc.wstx.exc.WstxParsingException:
Caused by: java.lang.RuntimeException: com.ctc.wstx.exc.WstxParsingException: Illegal processing instruction target ("xml"); xml (case insensitive) is reserved by the specs. at [row,col {unknown-source}]: [2,5]
解决办法:这个原因是因为对接 Hive 元数据中依赖的 hive-site.xml 的格式可能不正确。需要检查 xml 文件是否符合语法规范。常见 xml 格式问题可以参考:
<?xml ...,在尖括号前方不能包含任何不可见字符、空格、空行等。< 、>、&等特殊字符,如果出现需要用 CDATA 语法标记为普通文本。如果需要删除 Hive 元数据中的库表,需要同时手动删除 Hive 元数据中的库表信息,以及 TOS 目录上的数据库表的文件路径。如果仅仅删除 Hive 元数据或者仅仅删除 Hive 目录的数据都会造成数据不一致。报以下类似的错误,导致任务失败:
解决方案:判断属于哪一种情况,将已有的 Hive 元数据和 TOS 文件数据都删除后才能保证数据库表继续正常写入。
解决方案:Paimon 不会自动将表的分区信息同步到 Hive 元数据管理。如果需要在 Hive 元数据管理看到数据表的分区字段,需要在建表语句中增加如下 WITH 参数:
'metastore.partitioned-table' = 'true' -- 确定是否将分区信息同步到 Hive 元数据管理
需要注意的是:因为分区字段无法动态增加,增加参数后,需要将原有的数据表清掉(包括 Hive 元数据和 TOS 的数据文件),然后重新创建。