You need to enable JavaScript to run this app.
导航

读取 Kafka 数据写入 TOS 再映射到 LAS 外表

最近更新时间2024.01.29 14:12:20

首次发布时间2023.12.26 14:03:37

Flink 是一个兼容 Apache Flink 的全托管流式计算平台,支持对海量实时数据的高效处理。LAS 是湖仓一体架构的 Serverless 数据平台,提供海量数据存储、管理、计算和交互分析功能。本文通过一个示例场景模拟 Flink 与 LAS 的联动,从而体验跨源查询分析、元数据自动发现等能力。

场景介绍

本文模拟场景主要实现:读取消息队列 Kafka 数据写入对象存储 TOS,并映射为湖仓一体分析服务 LAS 外表进行数据分析。
在 Flink 控制台通过开发 Flink SQL 任务,实现 Datagen -> Kafka -> TOS 的数据流转链路,然后在 LAS 控制台创建外表,从 TOS 数据源读取文件并映射到新建的外表中。

注意事项

  • 通过 Flink 任务往 TOS 写入文件时,使用 filesystem 连接器。为确保数据的一致性和容错性,需要在 Flink 参数配置中开启 Checkpoint
    如果不启用 Checkpoint,TOS Bucket 中只会写入临时文件,此时将无法映射数据到外表。
  • LAS 外表数据更新,但是元数据不会自动更新,您可以配置元数据发现任务来定时更新元数据。但是元数据发现任务对 TOS 路径格式有强制要求,需要路径格式为bucket/库/表/文件或者bucket/库/表/分区/文件
    在本文中将 TOS 路径格式定义为:tos://doc_bucket/las_db/las_table_1
  • 为保证网络访问安全,本文所使用的云产品服务均使用内网访问方式,因此要求 Flink 资源池、Kafka 实例、TOS Bucket、LAS 外表均处于相同地域、相同 VPC。

前提条件

  1. 登录流式计算 Flink 版控制台
  2. 在顶部菜单栏选择目标地域,然后从项目管理页面进入项目。
  3. 创建 Flink SQL 任务。
    1. 在项目左侧导航栏选择任务开发 > Jupyter lab,然后单击加号按钮创建任务,也可以单击 Launcher 页签下的 Flink Stream SQL 区块。
      图片

    2. 创建任务对话框,设置任务名称、类型、文件夹和引擎版本,然后单击确定
      图片

      配置

      说明

      任务名称

      自定义设置任务的名称,如“datagen-kafka-tos”。
      名称的字符长度限制在 1~48,支持数字、大小写英文字母、下划线(_)、短横线(-)和英文句号(.),且首尾只能是数字或字母。

      任务类型

      选择 Flink 任务 > Flink Stream > SQL

      所属文件夹

      系统提供文件夹管理功能,用于分类管理任务。
      您可以直接选择系统默认存在的数据开发文件夹,也可以使用自定义创建的文件夹。

      引擎版本

      按需选择引擎版本,本文选择引擎版本为 Flink 1.16-volcano 版本。

      任务描述

      输入任务的描述语句,一般描述任务实现的功能。

    3. 在任务编辑区编写 SQL 任务的业务逻辑代码。
      示例代码含义为:将 Datagen 连接器实时生成的随机数写入 Kafka Topic 中;然后读取 Kafka Topic 数据并输出到 TOS Bucket。

      注意

      • 往 TOS 写入文件时,使用 filesystem 连接器。如果需要尽快在 TOS Bucket 中看到写入的文件和保证数据一致性,需要增加部分配置。您可以设置连接器的 sink.rolling-policy.file-sizesink.rolling-policy.rollover-interval 参数,以及在 Flink 参数配置中开启 Checkpoint
        如需详细了解 Filesystem 连接器的滚动策略,请参见开源文档Filesystem-Rolling Policy
      • 一个任务中,如果存在一个表同时作为 source 和 sink,建议您直接验证 SQL 正确性,确保无误后可直接在线上进行测试。如果执行调试操作,可能会出现类似Table:xxx should not be both source and sink.的报错信息。
      create table orders (
          order_id bigint,
          order_product_id bigint,
          order_customer_id bigint,
          order_status varchar,
          order_update_time as localtimestamp
      ) WITH (
       'connector' = 'datagen',
       'rows-per-second'='1',
       'fields.order_status.length' = '3',
       'fields.order_id.min' = '1',
       'fields.order_id.max' = '10000',
       'fields.order_product_id.min' = '1',
       'fields.order_product_id.max' = '100',
       'fields.order_customer_id.min' = '1',
       'fields.order_customer_id.max' = '1000'
      );
      
      create table kafka_table (
          order_id bigint,
          order_product_id bigint,
          order_customer_id bigint,
          order_status varchar,
          order_update_time timestamp
      ) WITH (
          'connector' = 'kafka', 
          --安全协议设置为SASL_PLAINTEXT。
          'properties.security.protocol' = 'SASL_PLAINTEXT',  
          --SASL 机制为  PLAIN。
          'properties.sasl.mechanism' = 'PLAIN', 
          -- 配置JAAS。
          'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="doc-user" password="qaP***6";',
          --Kafka实例的SASL_PLAINTEXT接入点。
          'properties.bootstrap.servers' = 'kafka-***hd8md.kafka.ivolces.com:9093',
          --Group和Topic。
          'topic' = 'topic-b', 
          'properties.group.id' = 'group-b', 
          --读取数据的启动模式,“earliest-offset”表示从最早分区开始读取。
          'scan.startup.mode' = 'earliest-offset',
          --定期扫描并发现新的Topic和Partition的时间间隔。
          'scan.topic-partition-discovery.interval' = '120s', 
          'format' = 'json',
          --关闭幂等性。
          'properties.enable.idempotence' = 'false'
      );
      
      
      insert into kafka_table 
      select * from orders;
      
      
      CREATE TABLE tos_sink (
          order_id bigint,
          order_product_id bigint,
          order_customer_id bigint,
          order_status varchar,
          order_update_time timestamp,
          dt STRING,
          `hour` STRING
        ) PARTITIONED BY (dt, `hour`)
      WITH (
        'connector' = 'filesystem',   --访问 TOS 资源时使用 filesystem 连接器。
        'path' = 'tos://wuch-doc-tos/las_db/las_table_1',  --tos 路径,由 Bucket 名称和文件夹名称组成,文件夹名称为 LAS DB 和 Table 名称。
        'sink.rolling-policy.file-size' = '1M',  --文件内存最大限制,达到该值关闭文件并新打开一个文件写入。
        'sink.rolling-policy.rollover-interval' = '5 min',  --文件持续写入时间,达到该值关闭文件并打开一个新文件写入。
        'format' = 'parquet'
      );
      
      
      insert into tos_sink
      select   
        order_id,
        order_product_id,
        order_customer_id,
        order_status,
        order_update_time,
        DATE_FORMAT (order_update_time, 'yyyy-MM-dd') as dt,
        DATE_FORMAT (order_update_time, 'HH') as `hour` 
        from kafka_table;
      
    4. 在代码编辑区上方,单击验证按钮。
      系统会自动校验您的 SQL 语句正确性,如果报错,请根据提示自主完成 SQL 语句修改。检验通过后,系统提示success
      图片

  4. 启用 Checkpoint。
    在代码编辑区上方,单击参数配置,然后开启 Checkpoint。
    图片
  5. 上线任务。
    1. 设置执行方式引擎版本,然后单击上线
      本文场景中执行方式设置为 STREAMING,引擎版本设置为 Flink 1.16-volcano
      图片

    2. 任务上线设置对话框,选择运行资源池、设置任务优先级和调度策略,然后单击确定
      图片

      配置

      说明

      运行资源池

      从下拉列表中选择任务运行的 Flink 资源池。

      任务优先级

      系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。
      任务优先级决定了任务内部的调度顺序,优先级高的任务先被调度,即 L3 先于 L4 被调度。

      调度策略

      根据需求配置任务调度策略:

      • GANG:保证任务的所有实例被一起调度,即当剩余资源满足任务正常运行所需资源时才进行分配;不满足所需资源则不分配。
        该策略不会出现分配资源后,任务却不能启动的现象,解决了资源死锁问题。
      • DRF:从多维资源考虑,更为合理地将资源公平分配给资源池内的各个任务,从而提升利用率。
        例如:剩余 10 核 40 GB 的资源,A 任务需要10 核 20 GB 资源;B 任务需要 2 核 8 GB 的资源。如果分配给 A,剩余 0 核 20 GB 资源无法被利用;DRF 策略会选择分配给 B,剩下 8 核 32 GB 可以继续给后来任务使用。

      调度时长

      设置为 GANG 调度策略时,需要设置调度时长。
      如果超过调度时长,任务就会调度失败。如果设置为 0,则会一直重试。

  6. 启动任务。
    1. 在项目左侧导航栏选择任务运维 > 任务管理,然后单击目标任务后方的启动按钮。
      图片
    2. 启动任务对话框,选择全新启动,然后单击确定
      图片
      任务启动需要一定时长,请耐心等待,启动成功后状态为运行中。如果您想了解其他启动方式和启动时的参数配置,请参见启动任务

在 Flink 控制台通过开发 Flink SQL 任务,实现 Datagen -> Kafka -> TOS 的数据流转链路。您可以通过以下三种方式验证任务结果:

  • 方式一:查看 Flink UI
  • 方式二:查询 Kafka Topic 消息
  • 方式三:查看 TOS Bucket 文件

Flink SQL 任务正常运行后,您可以进入 Flink UI 页面,查看任务运行情况。

  1. 在 Flink 控制台左侧导航栏选择任务运维 >任务管理。,然后单击单击目标任务后方的 Flink UI。
    图片
  2. 浏览器将会自动打开 Flink UI 页面,可查看任务详情。

方式二:查询 Kafka Topic 消息

Flink SQL 任务正常运行后,您可以在 Kafka 控制台查看目标 Topic 中的数据。

  1. 登录消息队列 Kafka 控制台
  2. 实例列表页面单击实例名称,进入 Kafka 实例详情页面。
  3. 消息查询页签下,选择按位点查询按时间查询,查询 Topic 中的数据。
    如需了解 Kafka Topic 消息查询的详细信息,请参见消息查询
    图片

方式三:查看 TOS Bucket 文件

Flink SQL 任务正常运行后,您可以在 TOS Bucket 目标路径下查看文件。

  1. 登录对象存储控制台
  2. 桶列表页面,单击目标存储桶,进入存储通详情页面。
  3. 在存储桶的文件列表页面,进入目标文件夹,查看写入的文件详情。
    图片

基于 TOS 创建 LAS 外部表

当 Flink SQL 任务正常运行后,您可以在 LAS 控制台创建 TOS 外表,然后便可以从 TOS 读取文件数据并映射到 LAS 外表。

  1. 登录湖仓一体分析服务 LAS 控制台

  2. 创建 LAS 数据库。

    1. 在左侧导航栏选择数据管理,然后在 Schema 库管理页签下单击创建 Schema
      图片
    2. 创建 Schema 对话框,设置库名、环境等信息,然后单击确定
      图片
  3. 创建 LAS 外表。

    1. 表管理页签下,单击创建表
      图片
    2. 选择创建 TOS 外表的方式。
      本文选择 DDL 建表方式,需要设置库表名称、表格字段信息、分区信息、文件格式、TOS 数据源信息。
      如果选择可视化建表方式,请参见创建 TOS 外部表
      图片
      CREATE EXTERNAL TABLE IF NOT EXISTS `las_db`.`las_table_1` (
          order_id bigint,
          order_product_id bigint,
          order_customer_id bigint,
          order_status string,
          order_update_time timestamp
      ) 
      PARTITIONED BY (dt STRING, hour STRING)
      STORED AS PARQUET
      location 'tos://wuch-doc-tos/las_db/las_table_1'
      
  4. 刷新外表分区。
    由于创建的 TOS 外表是分区表,需要手动刷新分区以加载分区信息。请先选择 Schema,然后单击表后方的刷新分区
    刷新分区需要一定时长,请耐心等待。
    图片

  5. 预览外表数据。
    分区刷新成功后,您可以在表格的数据预览页签下查看数据。LAS 外部有数据后,您便可以按需进行查询分析。
    图片

  6. (可选)配置元数据发现任务。

    1. 在左侧导航栏选择生态连接,然后在元数据发现页签下单击创建元数据发现
    2. 设置任务名称、TOS 数据源、LAS 元数据、调度频率等信息,然后单击提交
      如需详细了解创建元数据发现任务,请参见创建元数据发现
      图片
    3. 查看元数据发现任务执行情况。
      元数据发现任务创建后,将根据调度频率配置自动进行元数据更新。
      图片