You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
Third Party
SeaTunnel Flink 使用方法
复制全文
SeaTunnel Flink 使用方法

背景介绍

SeaTunnel 是一个开源、易用、多模态、超高性能的分布式数据集成平台,支持实时海量数据同步。具体项目介绍和使用方法可以参考官方文档。本文将介绍如何在流式计算 Flink 版中部署并使用 SeaTunnel 产品。

注意:SeaTunnel 属于开源产品。流式计算 Flink 版不会对 SeaTunnel 开源项目本身的问题提供任何支持和 SLA 保障。本文档仅供部署参考,请用户使用前仔细学习并理解社区文档。

前提条件

在使用产品前,需要注意以下几点

  1. 需要开通流式计算 Flink 版,并购买资源池资源
  2. 需要下载 SeaTunnel 产品,并且熟悉产品的使用方法

使用方法

SeaTunnel 部署,我们主要包含以下几个步骤:

  1. 下载并且安装 SeaTunnel 插件
  2. 按照 SeaTunnel 官方文档进行参数配置
  3. 创建 Flink JAR 作业,并且上传相关资源文件、配置自定义参数
  4. 启动任务,观察任务状态

准备 SeaTunnel

参考官方文档,下载 SeaTunnel 安装包,本文以 2.3.12 版本为例

export version="2.3.12"
wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz"
tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"

在 SeaTunnel 项目目录下,安装插件(如果只依赖 Faker 和 Console 这两个测试 Connector,本步骤可以跳过):

sh bin/install-plugin.sh 2.3.12

将生成目录文件压缩打包:

export version="2.3.12"
cd ..
tar czf apache-seatunnel-${version}-all.tar.gz apache-seatunnel-${version}

因为该压缩包较大,所以我们提前通过 TOS 控制台上传到 TOS 桶上,以供后续 Flink 服务使用:
Image

上传完成后,记录下 tos 的存储路径,例如 tos://<bucket_name>/path/to/apache-seatunnel-${version}-all.tar.gz

任务配置

步骤一:在流式计算 Flink 版控制台创建 Flink JAR 作业,因为 SeaTunnel 版本支持情况,建议选择 Flink 1.16 版本测试使用。

注意:本步骤以 Flink 1.16 的流式同步任务为例。如果 SeaTunnel 任务类型为批式作业,请结合实际情况调整

Image

步骤二:上传 JAR 包并设置相关启动参数:

Image

其中几个需要设置的参数

  1. Jar URI:需要上传 SeaTunnel 安装包 starter 目录下的 seatunnel-flink-15-starter.jar 作业
  2. Entrypoint Class: 填写主类全路径名 org.apache.seatunnel.core.starter.flink.SeaTunnelFlink
  3. Entrypoint main args:填写命令参数 --config /opt/tiger/workdir/v2.streaming.conf。其中
    1. /opt/tiger/workdir/是容器内固定的文件加载路径,无需修改
    2. v2.streaming.conf 是 SeaTunnel 的配置文件名,可以根据您的配置文件名调整

步骤三:准备并上传 SeaTunnel 的配置文件, 本文以 v2.streaming.conf 为例,本地生成如下配置文件。其中配置的含义是生成一个流式 Flink 作业,通过 Fake Source 生成数据,写入 TOS 的 Paimon 数据表(具体参数含义请参考 SeaTunnel 的官方文档)

env {
  parallelism = 1
  job.mode = "STREAMING"
}

source {
  FakeSource {
    plugin_output = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
}

transform {
  FieldMapper {
    plugin_input = "fake"
    plugin_output = "fake1"
    field_mapper = {
      age = age
      name = new_name
    }
  }
}

sink {
  Paimon {
    warehouse = "tos://<Bucket_Name>/<Path>/"
    database = "st"
    table = "st_test_sink"
  }
}

步骤四:上传配置文件,以及引用上一步准备的 SeaTunnel 依赖文件包

  1. 配置文件:通过文件上传并选择上一步准备的 v2.streaming.conf 的配置文件
  2. SeaTunnel 依赖文件:通过填写 TOS 地址,设置为第一步 TOS 桶中的指定路径 tos://<bucket_name>/path/to/apache-seatunnel-${version}-all.tar.gz

Image

自定义参数

在自定义参数中设置如下参数

containerized.master.env.SEATUNNEL_HOME: /opt/tiger/workdir/apache-seatunnel-2.3.12
containerized.taskmanager.env.SEATUNNEL_HOME: /opt/tiger/workdir/apache-seatunnel-2.3.12
kubernetes.container-start-command-prefix: tar -zvxf /opt/tiger/workdir/apache-seatunnel-2.3.12-all.tar.gz -C /opt/tiger/workdir

设置参数的示意如图:

Image

如果写入 Paimon 或者 TOS 等路径可以选择额外设置以下参数,加速 TOS 写入能力:

flink.plugins.filesystem.tos.proton.enabled: true

任务启动

检查无误后,保存并发布任务
Image
进入任务运维界面,选择全新启动/重启任务,后续更新任务的话可以从最新状态启动:
Image
等待一个 Checkpoint 的周期,可以从 TOS 的桶文件,确认数据已经写入 Paimon:

Image

常见问题

问题一:SeaTunnel 文件过大,上传文件管理的时候会超时。启动 Flink 容器的时候会有些慢。
主要原因:SeaTunnel 包含了大量的 Connector 文件,会大大增加
解决方案:

  1. 当前 Flink 的文件管理模块不建议管理超过 500MB 以上的 JAR 包,可以参考步骤一,将 SeaTunnel 的压缩包提前上传到 TOS。
  2. SeaTunnel 中 connectors 目录可以进行裁剪,将不需要的 connector 从安装包中删除,减小 tar 包体积大小。

问题二:SeaTunnel 是否支持所有的火山引擎云产品?
答案:SeaTunnel 属于开源产品,是否已经和火山引擎的云产品完成对接需要咨询相关云产品和 SeaTunnel 社区维护者。请在使用 Flink 产品前自行调研相关能力。

最近更新时间:2025.12.29 11:42:32
这个页面对您有帮助吗?
有用
有用
无用
无用