You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Third Party
SeaTunnel Flink 使用方法
复制全文
下载 pdf
SeaTunnel Flink 使用方法

背景介绍

SeaTunnel 是一个开源、易用、多模态、超高性能的分布式数据集成平台,支持实时海量数据同步。具体项目介绍和使用方法可以参考官方文档。本文将介绍如何在流式计算 Flink 版中部署并使用 SeaTunnel 产品。

注意:SeaTunnel 属于开源产品。流式计算 Flink 版不会对 SeaTunnel 开源项目本身的问题提供任何支持和 SLA 保障。本文档仅供部署参考,请用户使用前仔细学习并理解社区文档。

前提条件

在使用产品前,需要注意以下几点

  1. 需要开通流式计算 Flink 版,并购买资源池资源
  2. 需要下载 SeaTunnel 产品,并且熟悉产品的使用方法

使用方法

SeaTunnel 部署,我们主要包含以下几个步骤:

  1. 下载并且安装 SeaTunnel 插件
  2. 按照 SeaTunnel 官方文档进行参数配置
  3. 创建 Flink JAR 作业,并且上传相关资源文件、配置自定义参数
  4. 启动任务,观察任务状态

准备 SeaTunnel

参考官方文档,下载 SeaTunnel 安装包,本文以 2.3.12 版本为例

export version="2.3.12"
wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz"
tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"

在 SeaTunnel 项目目录下,安装插件(如果只依赖 Faker 和 Console 这两个测试 Connector,本步骤可以跳过):

sh bin/install-plugin.sh 2.3.12

将生成目录文件压缩打包:

export version="2.3.12"
cd ..
tar czf apache-seatunnel-${version}-all.tar.gz apache-seatunnel-${version}

因为该压缩包较大,所以我们提前通过 TOS 控制台上传到 TOS 桶上,以供后续 Flink 服务使用:
Image

上传完成后,记录下 tos 的存储路径,例如 tos://<bucket_name>/path/to/apache-seatunnel-${version}-all.tar.gz

任务配置

步骤一:在流式计算 Flink 版控制台创建 Flink JAR 作业,因为 SeaTunnel 版本支持情况,建议选择 Flink 1.16 版本测试使用。

注意:本步骤以 Flink 1.16 的流式同步任务为例。如果 SeaTunnel 任务类型为批式作业,请结合实际情况调整

Image

步骤二:上传 JAR 包并设置相关启动参数:

Image

其中几个需要设置的参数

  1. Jar URI:需要上传 SeaTunnel 安装包 starter 目录下的 seatunnel-flink-15-starter.jar 作业
  2. Entrypoint Class: 填写主类全路径名 org.apache.seatunnel.core.starter.flink.SeaTunnelFlink
  3. Entrypoint main args:填写命令参数 --config /opt/tiger/workdir/v2.streaming.conf。其中
    1. /opt/tiger/workdir/是容器内固定的文件加载路径,无需修改
    2. v2.streaming.conf 是 SeaTunnel 的配置文件名,可以根据您的配置文件名调整

步骤三:准备并上传 SeaTunnel 的配置文件, 本文以 v2.streaming.conf 为例,本地生成如下配置文件。其中配置的含义是生成一个流式 Flink 作业,通过 Fake Source 生成数据,写入 TOS 的 Paimon 数据表(具体参数含义请参考 SeaTunnel 的官方文档)

env {
  parallelism = 1
  job.mode = "STREAMING"
}

source {
  FakeSource {
    plugin_output = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
}

transform {
  FieldMapper {
    plugin_input = "fake"
    plugin_output = "fake1"
    field_mapper = {
      age = age
      name = new_name
    }
  }
}

sink {
  Paimon {
    warehouse = "tos://<Bucket_Name>/<Path>/"
    database = "st"
    table = "st_test_sink"
  }
}

步骤四:上传配置文件,以及引用上一步准备的 SeaTunnel 依赖文件包

  1. 配置文件:通过文件上传并选择上一步准备的 v2.streaming.conf 的配置文件
  2. SeaTunnel 依赖文件:通过填写 TOS 地址,设置为第一步 TOS 桶中的指定路径 tos://<bucket_name>/path/to/apache-seatunnel-${version}-all.tar.gz

Image

自定义参数

在自定义参数中设置如下参数

containerized.master.env.SEATUNNEL_HOME: /opt/tiger/workdir/apache-seatunnel-2.3.12
containerized.taskmanager.env.SEATUNNEL_HOME: /opt/tiger/workdir/apache-seatunnel-2.3.12
kubernetes.container-start-command-prefix: tar -zvxf /opt/tiger/workdir/apache-seatunnel-2.3.12-all.tar.gz -C /opt/tiger/workdir

设置参数的示意如图:

Image

如果写入 Paimon 或者 TOS 等路径可以选择额外设置以下参数,加速 TOS 写入能力:

flink.plugins.filesystem.tos.proton.enabled: true

任务启动

检查无误后,保存并发布任务
Image
进入任务运维界面,选择全新启动/重启任务,后续更新任务的话可以从最新状态启动:
Image
等待一个 Checkpoint 的周期,可以从 TOS 的桶文件,确认数据已经写入 Paimon:

Image

常见问题

问题一:SeaTunnel 文件过大,上传文件管理的时候会超时。启动 Flink 容器的时候会有些慢。
主要原因:SeaTunnel 包含了大量的 Connector 文件,会大大增加
解决方案:

  1. 当前 Flink 的文件管理模块不建议管理超过 500MB 以上的 JAR 包,可以参考步骤一,将 SeaTunnel 的压缩包提前上传到 TOS。
  2. SeaTunnel 中 connectors 目录可以进行裁剪,将不需要的 connector 从安装包中删除,减小 tar 包体积大小。

问题二:SeaTunnel 是否支持所有的火山引擎云产品?
答案:SeaTunnel 属于开源产品,是否已经和火山引擎的云产品完成对接需要咨询相关云产品和 SeaTunnel 社区维护者。请在使用 Flink 产品前自行调研相关能力。

最近更新时间:2025.12.29 11:42:32
这个页面对您有帮助吗?
有用
有用
无用
无用