本文档旨在提供使用 Flink DataStream API 对接 Paimon 及 LAS Catalog 的标准操作流程和代码示例,帮助开发者快速构建高性能、高可靠的数据湖入湖与读取任务。
说明
在开始开发前,请确保您的 Maven 项目中已包含以下核心依赖,并正确配置了火山引擎内部镜像仓库。
在 pom.xml 中添加 LAS 专用仓库地址:
<repositories> <repository> <id>bytedance-las-repo</id> <url>https://artifact.bytedance.com/repository/data_compute_engine_service</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories>
主要涉及 Paimon Flink Connector、LAS Client 及 Flink 运行依赖。
<dependencies> <!-- Paimon Flink Connector --> <dependency> <groupId>org.apache.paimon</groupId> <artifactId>paimon-flink-1.17</artifactId> <version>1.1.1</version> </dependency> <!-- LAS Catalog 核心组件 --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>lf-client-3</artifactId> <version>1.6.0-RELEASE</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-metastore</artifactId> <version>3.1.2</version> </dependency> <!-- Flink Streaming & Table API (Scope 设为 provided) --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.17.2</version> <scope>provided</scope> </dependency> </dependencies>
<?xml version="1.0" encoding="UTF-8"?> <!-- XML 声明:版本1.0,编码UTF-8,固定写法 --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <!-- Maven 模型版本,固定为4.0.0 --> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.paimon.demo</groupId> <artifactId>flink-paimon-las-catalog-job</artifactId> <!-- 版本号,请根据实际情况修改 --> <version>1.0-SNAPSHOT</version> <!-- 打包方式:jar 包,用于提交 Flink 运行 --> <packaging>jar</packaging> <!-- ==================== 全局变量配置(统一管理版本) ==================== --> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!-- 自定义变量:Paimon 版本 --> <paimon.version>1.1.1</paimon.version> <!-- 自定义变量:Flink 版本 --> <flink.version>1.17.2</flink.version> </properties> <!-- ==================== 仓库配置 ==================== --> <!-- 用于下载火山引擎 LAS 相关依赖 --> <repositories> <repository> <id>bytedance-las-repo</id> <url>https://artifact.bytedance.com/repository/data_compute_engine_service</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <!-- ==================== 项目依赖库 ==================== --> <dependencies> <!-- Paimon 与 Flink 1.17 集成核心依赖 --> <dependency> <groupId>org.apache.paimon</groupId> <artifactId>paimon-flink-1.17</artifactId> <version>${paimon.version}</version> </dependency> <!-- LAS Catalog 客户端依赖(火山引擎元数据服务) --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>lf-client-3</artifactId> <version>1.6.0-RELEASE</version> </dependency> <!-- Hive 元数据客户端,LAS Catalog 依赖 --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-metastore</artifactId> <version>3.1.2</version> </dependency> <!-- Flink 流式计算核心依赖 --> <!-- provided:编译时使用,打包不打入jar(Flink集群已自带) --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Flink 表相关依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Flink 客户端依赖,用于提交作业 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> </dependencies> <!-- ==================== 构建打包配置 ==================== --> <build> <plugins> <!-- 1. Jar 打包插件:配置程序入口主类 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.3.0</version> <configuration> <archive> <manifest> <mainClass>org.apache.paimon.demo.WriteToLASPaimonTable</mainClass> </manifest> </archive> </configuration> </plugin> <!-- 2. Shade 插件:将所有依赖打包成一个可执行 Jar --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.5.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.apache.paimon.demo.WriteToLASPaimonTable</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
flink-streaming-java)应设置为 scope: provided,以避免与集群运行环境冲突。bucket 数,通常建议单个 Bucket 处理 100MB-1GB 数据。hive.hms.client.is.public.cloud 在公有云环境下必须设置为 true。使用 DataStream API 将数据流写入 LAS Catalog Paimon 表。
package org.apache.paimon.demo; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.sink.FlinkSinkBuilder; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; import org.apache.paimon.types.DataTypes; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import java.util.Random; public class WriteToLASPaimonTable { private static final String DATABASE = "my_db"; private static final String TABLE_NAME = "T_20260227"; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1. create las catalog catalog Options catalogOptions = new Options(); catalogOptions.set("metastore", "hive"); catalogOptions.set("hive.client.las.region.name", "cn-beijing"); // region catalogOptions.set("hive.metastore.uris", "thrift://lakeformation.las.cn-beijing.ivolces.com:48869"); // endpoint catalogOptions.set("hive.hms.client.is.public.cloud", "true"); catalogOptions.set("hive.client.las.ak", "xxxx"); // ak catalogOptions.set("hive.client.las.sk", "xxxx"); // sk catalogOptions.set("metastore.catalog.default", "weibin_paimon_1211"); // las catalog catalogOptions.set("warehouse", "tos://weibin-tos/warehouse"); // tos warehouse Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); // 2. create if not exist db/table Identifier tableId = Identifier.create(DATABASE, TABLE_NAME); catalog.createDatabase(DATABASE, true); Schema schema = Schema.newBuilder() .column("name", DataTypes.STRING()) .column("age", DataTypes.INT()) .primaryKey("name") .option("bucket", "2") .build(); catalog.createTable(tableId, schema, true); // 3. get table from las catalog Table table = catalog.getTable(tableId); // 4. wrire to las catalog paimon table DataStream<Row> input = env.addSource(new UnboundedSource(1000L)) .returns(Types.ROW_NAMED(new String[]{"name", "age"}, Types.STRING, Types.INT)); DataType inputType = org.apache.flink.table.api.DataTypes.ROW( org.apache.flink.table.api.DataTypes.FIELD( "name", org.apache.flink.table.api.DataTypes.STRING()), org.apache.flink.table.api.DataTypes.FIELD( "age", org.apache.flink.table.api.DataTypes.INT())); new FlinkSinkBuilder(table) .forRow(input, inputType) .build(); env.execute("WriteToLASPaimonTable"); } private static class UnboundedSource implements SourceFunction<Row> { private static final String[] NAMES = {"Alice", "Bob", "Charlie", "Diana", "Eve"}; private final long intervalMs; private volatile boolean running = true; UnboundedSource(long intervalMs) { this.intervalMs = intervalMs; } @Override public void run(SourceContext<Row> ctx) throws Exception { Random random = new Random(); while (running) { ctx.collect(Row.ofKind(RowKind.INSERT, NAMES[random.nextInt(NAMES.length)], random.nextInt(100) + 1)); Thread.sleep(intervalMs); } } @Override public void cancel() { running = false; } } }
使用 DataStream API 从 LAS Catalog Paimon 表读取数据。
package org.apache.paimon.demo; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.source.FlinkSourceBuilder; import org.apache.paimon.options.Options; import org.apache.paimon.table.Table; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; public class ReadFromLASPaimonTable { private static final String DATABASE = "my_db"; private static final String TABLE_NAME = "T_20260227"; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1. create las catalog Options catalogOptions = new Options(); catalogOptions.set("metastore", "hive"); catalogOptions.set("hive.client.las.region.name", "cn-beijing"); // region catalogOptions.set("hive.metastore.uris", "thrift://lakeformation.las.cn-beijing.ivolces.com:48869"); // endpoint catalogOptions.set("hive.hms.client.is.public.cloud", "true"); catalogOptions.set("hive.client.las.ak", "xxxx"); // ak catalogOptions.set("hive.client.las.sk", "xxxx"); // sk catalogOptions.set("metastore.catalog.default", "weibin_paimon_1211"); // las catalog catalogOptions.set("warehouse", "tos://weibin-tos/warehouse"); // tos warehouse Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); // 2. get table Table table = catalog.getTable(Identifier.create(DATABASE, TABLE_NAME)); // 3. streaming read from paimon table DataStream<Row> stream = new FlinkSourceBuilder(table) .env(env) .buildForRow(); stream.map(row -> "name: " + row.getField("name") + ", age: " + (Integer) row.getField("age")).print(); env.execute("ReadFromLASPaimonTable"); } }