You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
Connector 参考
TOS
复制全文
TOS

火山引擎对象存储 TOS(Torch Object Storage)是火山引擎提供的海量、安全、低成本、易用、高可靠、高可用的分布式云存储服务。通过 Flink 内置的 Filesytem Connector,您可以轻松访问和管理火山引擎 TOS 上的数据。

DDL 定义

用作数据源(Source)

将 TOS 文件/目录作为数据源非常简单,只要用如下建表语句,指定 filesystem connector,以及 tos://开头的文件路径,以及文件格式即可:

CREATE TABLE filesystem_source (
    name String,
    score INT
 ) WITH (
     'connector' = 'filesystem',
     'path' = 'tos://<bucket>/<path>',
     'format' = 'json'
 );

用作数据目的(Sink)

将 TOS 文件/目录作为数据下游,用如下建表语句,指定 filesystem connector,以及 tos://开头的文件路径、文件格式,以及文件生成规则:

CREATE TABLE filesystem_sink (
    name String,
    score INT
 ) WITH (
     'connector' = 'filesystem',
     'path' = 'tos://<bucket>/<path>',
     'sink.rolling-policy.file-size' = '1M',
     'sink.rolling-policy.rollover-interval' = '10 min',
     'format' = 'json' 
 );

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 Filesystem 连接器。

path

(none)

String

文件路径。
在使用对象存储 TOS 时,路径格式为tos://bucket_name/file_path/

sink.rolling-policy.file-size

128MB

MemorySize

文件内存最大限制。
当前写入的文件大小达到阈值时,写入的文件将被关闭,并打开一个新的文件进行写入。

注意

写入文件时,如果需要尽快在文件系统中看到写入的文件,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-sizesink.rolling-policy.rollover-interval 参数,以及在 Flink 参数中开启 Checkpoint。
如需详细了解 Filesystem 连接器的滚动策略,请参见开源文档Filesystem-Rolling Policy

sink.rolling-policy.rollover-interval

30min

Duration

文件最大持续写入时间。
当前写入的时间超过了阈值时,写入的文件将被关闭,并打开一个新的文件进行写入。

注意

写入文件时,如果需要尽快在文件系统中看到写入的文件,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-sizesink.rolling-policy.rollover-interval 参数,以及在 Flink 参数中开启 Checkpoint。
如需详细了解 Filesystem 连接器的滚动策略,请参见开源文档Filesystem-Rolling Policy

sink.rolling-policy.check-interval

1min

Duration

文件检查间隔。
FileSystem 按照这个间隔检查文件的写入时间是否已经满足了关闭条件,并将满足条件的文件进行关闭。

sink.partition-commit.trigger

process-time

String

分区关闭策略。取值如下:

  • process-time:当分区创建超过一定时间后将这个分区关闭,分区创建时间为分区创建时的物理时间。
  • partition-time:当分区创建超过一定时间之后将这个分区关闭,分区创建时间从分区中抽取出来。
    partition-time 依赖于 watermark 生成,需要配合 wartermark 才能支持自动分区发现。当 watermark 时间超过了从分区抽取的时间delay 参数配置时间之和后会提交分区。

sink.partition-commit.delay

0s

Duration

分区关闭延迟。
当分区在创建超过一定时间之后将被关闭。

partition.time-extractor.kind

default

String

分区时间抽取方式。
这个配置仅当 sink.partition-commit.trigger 配置为 partition-time 时生效。如果用户有自定义的分区时间抽取方法,配置为 custom

partition.time-extractor.class

(none)

String

分区时间抽取类。

partition.time-extractor.timestamp-pattern

(none)

String

分区时间戳的抽取格式。需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。默认支持第一个字段为 yyyy-mm-dd hh:mm:ss。

  • 如果时间戳应该从单个分区字段 'dt' 提取,可以配置 '$dt'。
  • 如果时间戳应该从多个分区字段中提取,例如 'year'、'month'、'day' 和 'hour',可以配置 '$year-$month-$day $hour:00:00'。
  • 如果时间戳应该从两个分区字段中提取,例如 'dt' 和 'hour' ,可以配置 '$dt $hour:00:00'。

sink.partition-commit.policy.kind

(none)

String

用于提交分区的策略。取值如下:

  • success-file:当分区关闭时将在分区对应的目录下生成一个 _success 的文件。
  • custom:用户自定义分区提交策略。

sink.partition-commit.policy.class

(none)

String

分区提交类。
这个类必须实现 PartitionCommitPolicy。

format

(none)

String

必须要指定文件格式。当前支持文件格式可以参考 概览

示例代码

  • 结果表
CREATE TABLE datagen_source (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar
  )
WITH
  (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.order_status.length' = '3',
    'fields.order_id.min' = '1',
    'fields.order_id.max' = '1000',
    'fields.order_product_id.min' = '1',
    'fields.order_product_id.max' = '100',
    'fields.order_customer_id.min' = '1',
    'fields.order_customer_id.max' = '1000'
  );

CREATE TABLE file_sink (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar
  )
WITH
  (
    'connector' = 'filesystem',
    'path' = 'tos://doc-test/test_write_tos/',
    'format' = 'json'
  );

INSERT INTO file_sink
SELECT * FROM datagen_source;
  • 源表
CREATE TABLE file_source(
  order_id bigint, 
  order_product_id bigint,
  order_customer_id bigint,
  order_status varchar
) WITH (
    'connector' = 'filesystem',
    'path' = 'tos://doc-test/test_write_tos/',
    'format' = 'json'
);

create table print_sink (
  order_id bigint, 
  order_product_id bigint,
  order_customer_id bigint,
  order_status varchar
  )
with
  ('connector' = 'print');

insert into print_sink
SELECT * FROM file_source;
最近更新时间:2025.03.28 15:32:06
这个页面对您有帮助吗?
有用
有用
无用
无用