You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Paimon Catalog
JAR 作业集成 Paimon LAS Catalog 指南
复制全文
下载 pdf
JAR 作业集成 Paimon LAS Catalog 指南

本文档旨在提供使用 Flink DataStream API 对接 Paimon 及 LAS Catalog 的标准操作流程和代码示例,帮助开发者快速构建高性能、高可靠的数据湖入湖与读取任务。

准备工作
  • 您已注册火山引擎账号并完成实名认证,具体步骤,请参见账号注册实名认证
  • 如果要使用火山引擎Java SDK访问指定服务的API,请确认您已在 火山引擎控制台 开通当前服务。
  • 您已获取账号的AccessKey、SecretKey,具体步骤,请参见获取AccessKey、SecretKey
  • 火山引擎 Java SDK 支持 Java JDK 1.8 及其以上版本。

说明

在开始开发前,请确保您的 Maven 项目中已包含以下核心依赖,并正确配置了火山引擎内部镜像仓库。

Maven 仓库配置

pom.xml 中添加 LAS 专用仓库地址:

<repositories>
    <repository>
        <id>bytedance-las-repo</id>
        <url>https://artifact.bytedance.com/repository/data_compute_engine_service</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

核心依赖清单

主要涉及 Paimon Flink Connector、LAS Client 及 Flink 运行依赖。

<dependencies>
    <!-- Paimon Flink Connector -->
    <dependency>
        <groupId>org.apache.paimon</groupId>
        <artifactId>paimon-flink-1.17</artifactId>
        <version>1.1.1</version>
    </dependency>

    <!-- LAS Catalog 核心组件 -->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>lf-client-3</artifactId>
        <version>1.6.0-RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-metastore</artifactId>
        <version>3.1.2</version>
    </dependency>

    <!-- Flink Streaming & Table API (Scope 设为 provided) -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.17.2</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

示例代码

<?xml version="1.0" encoding="UTF-8"?>
<!-- XML 声明:版本1.0,编码UTF-8,固定写法 -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <!-- Maven 模型版本,固定为4.0.0 -->
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.apache.paimon.demo</groupId>
    <artifactId>flink-paimon-las-catalog-job</artifactId>
    <!-- 版本号,请根据实际情况修改 -->
    <version>1.0-SNAPSHOT</version>
    <!-- 打包方式:jar 包,用于提交 Flink 运行 -->
    <packaging>jar</packaging>

    <!-- ==================== 全局变量配置(统一管理版本) ==================== -->
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!-- 自定义变量:Paimon 版本 -->
        <paimon.version>1.1.1</paimon.version>
        <!-- 自定义变量:Flink 版本 -->
        <flink.version>1.17.2</flink.version>
    </properties>

    <!-- ==================== 仓库配置 ==================== -->
    <!-- 用于下载火山引擎 LAS 相关依赖 -->
    <repositories>
        <repository>
            <id>bytedance-las-repo</id>
            <url>https://artifact.bytedance.com/repository/data_compute_engine_service</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!-- ==================== 项目依赖库 ==================== -->
    <dependencies>
        <!-- Paimon 与 Flink 1.17 集成核心依赖 -->
        <dependency>
            <groupId>org.apache.paimon</groupId>
            <artifactId>paimon-flink-1.17</artifactId>
            <version>${paimon.version}</version>
        </dependency>

        <!-- LAS Catalog 客户端依赖(火山引擎元数据服务) -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>lf-client-3</artifactId>
            <version>1.6.0-RELEASE</version>
        </dependency>

        <!-- Hive 元数据客户端,LAS Catalog 依赖 -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-metastore</artifactId>
            <version>3.1.2</version>
        </dependency>

        <!-- Flink 流式计算核心依赖 -->
        <!-- provided:编译时使用,打包不打入jar(Flink集群已自带) -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Flink 表相关依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Flink 客户端依赖,用于提交作业 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <!-- ==================== 构建打包配置 ==================== -->
    <build>
        <plugins>
            <!-- 1. Jar 打包插件:配置程序入口主类 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>org.apache.paimon.demo.WriteToLASPaimonTable</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>

            <!-- 2. Shade 插件:将所有依赖打包成一个可执行 Jar -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.apache.paimon.demo.WriteToLASPaimonTable</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

注意事项
  • 资源管理:在 Flink 任务中,Flink 相关的依赖项(如 flink-streaming-java)应设置为 scope: provided,以避免与集群运行环境冲突。
  • 配置优化
    • 建议根据数据量合理设置 bucket 数,通常建议单个 Bucket 处理 100MB-1GB 数据。
    • hive.hms.client.is.public.cloud 在公有云环境下必须设置为 true
  • 权限控制:请确保提供的 AK/SK 拥有对对应 TOS Bucket 及 LAS Catalog 的读写权限。

数据入湖

使用 DataStream API 将数据流写入 LAS Catalog Paimon 表。

代码示例

package org.apache.paimon.demo;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataTypes;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.util.Random;

public class WriteToLASPaimonTable {

    private static final String DATABASE = "my_db";
    private static final String TABLE_NAME = "T_20260227";

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1. create las catalog catalog
        Options catalogOptions = new Options();
        catalogOptions.set("metastore", "hive");
        catalogOptions.set("hive.client.las.region.name", "cn-beijing"); // region 
        catalogOptions.set("hive.metastore.uris",
            "thrift://lakeformation.las.cn-beijing.ivolces.com:48869"); // endpoint
        catalogOptions.set("hive.hms.client.is.public.cloud", "true");
        catalogOptions.set("hive.client.las.ak", "xxxx"); // ak 
        catalogOptions.set("hive.client.las.sk", "xxxx"); // sk
        catalogOptions.set("metastore.catalog.default", "weibin_paimon_1211"); // las catalog
        catalogOptions.set("warehouse", "tos://weibin-tos/warehouse"); // tos warehouse

        Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);

        // 2. create if not exist db/table
        Identifier tableId = Identifier.create(DATABASE, TABLE_NAME);
        catalog.createDatabase(DATABASE, true);

        Schema schema = Schema.newBuilder()
            .column("name", DataTypes.STRING())
            .column("age", DataTypes.INT())
            .primaryKey("name")
            .option("bucket", "2")
            .build();
        catalog.createTable(tableId, schema, true);

        // 3. get table from las catalog
        Table table = catalog.getTable(tableId);

        // 4. wrire to las catalog paimon table
        DataStream<Row> input = env.addSource(new UnboundedSource(1000L))
                .returns(Types.ROW_NAMED(new String[]{"name", "age"}, Types.STRING, Types.INT));

        DataType inputType = org.apache.flink.table.api.DataTypes.ROW(
                org.apache.flink.table.api.DataTypes.FIELD(
                        "name", org.apache.flink.table.api.DataTypes.STRING()),
                org.apache.flink.table.api.DataTypes.FIELD(
                        "age", org.apache.flink.table.api.DataTypes.INT()));

        new FlinkSinkBuilder(table)
                .forRow(input, inputType)
                .build();

        env.execute("WriteToLASPaimonTable");
    }

    private static class UnboundedSource implements SourceFunction<Row> {
        private static final String[] NAMES = {"Alice", "Bob", "Charlie", "Diana", "Eve"};
        private final long intervalMs;
        private volatile boolean running = true;

        UnboundedSource(long intervalMs) {
            this.intervalMs = intervalMs;
        }

        @Override
        public void run(SourceContext<Row> ctx) throws Exception {
            Random random = new Random();
            while (running) {
                ctx.collect(Row.ofKind(RowKind.INSERT,
                        NAMES[random.nextInt(NAMES.length)], random.nextInt(100) + 1));
                Thread.sleep(intervalMs);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

数据读取

使用 DataStream API 从 LAS Catalog Paimon 表读取数据。

代码示例

package org.apache.paimon.demo;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;

public class ReadFromLASPaimonTable {

    private static final String DATABASE = "my_db";
    private static final String TABLE_NAME = "T_20260227";

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1. create las catalog
        Options catalogOptions = new Options();
        catalogOptions.set("metastore", "hive");
        catalogOptions.set("hive.client.las.region.name", "cn-beijing"); // region 
        catalogOptions.set("hive.metastore.uris",
            "thrift://lakeformation.las.cn-beijing.ivolces.com:48869"); // endpoint
        catalogOptions.set("hive.hms.client.is.public.cloud", "true");
        catalogOptions.set("hive.client.las.ak", "xxxx"); // ak 
        catalogOptions.set("hive.client.las.sk", "xxxx"); // sk
        catalogOptions.set("metastore.catalog.default", "weibin_paimon_1211"); // las catalog
        catalogOptions.set("warehouse", "tos://weibin-tos/warehouse"); // tos warehouse

        Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);

        // 2. get table
        Table table = catalog.getTable(Identifier.create(DATABASE, TABLE_NAME));

        // 3. streaming read from paimon table
        DataStream<Row> stream = new FlinkSourceBuilder(table)
                .env(env)
                .buildForRow();

        stream.map(row -> "name: " + row.getField("name") + ", age: " + (Integer) row.getField("age")).print();

        env.execute("ReadFromLASPaimonTable");
    }
}

最近更新时间:2026.06.03 18:42:52
这个页面对您有帮助吗?
有用
有用
无用
无用