You need to enable JavaScript to run this app.
湖仓一体分析服务 LAS 私有化

湖仓一体分析服务 LAS 私有化

复制全文
Flink
基础使用
复制全文
基础使用

背景信息

通过 SSH 方式登录 LAS 集群的服务器,通过命令行提交 Flink 作业。
基于 YARN 模式部署的 Flink 支持 Application 模式、Session 模式以及 Per-Job 模式运维作业。
Image
图片来自 Flink 官方文档

模式

描述

优缺点

Application模式

Application 模式会为每个提交的应用程序创建一个集群,这个集群可以被视为一个仅在特定应用程序的作业之间共享的 Session 集群,并且会在所有作业完成时关闭。从整个应用程序的粒度来看,Application 模式提供了与 Per-Job 模式相同的资源隔离和负载均衡保障。作业的 main() 方法在 JobManager 上执行,这能够降低提交端的 CPU 压力,同时节省本地下载依赖所需的带宽。

  • 优点
    • 提供较好的资源隔离保证
    • 降低提交端提交作业时的压力
  • 缺点:作业启动时间以及作业资源开销会大一些

Per-Job 模式

在 Per-Job 模式下,每一个作业都会启动一个专属的 Flink 集群,各个作业之间不会相互影响,能够提供更好的资源隔离保障,作业完成后集群关闭。

  • 优点:提供较好的资源隔离保证
  • 缺点:作业启动时间以及作业资源开销会大一些

Session 模式

Session 模式会预先启动一个 Flink 集群,能在该集群中运行多个作业,不过该集群在作业运行结束后不会自动释放。作业之间的隔离性欠佳,倘若某个作业异常致使 Task Manager 退出,那么其他所有运行在该 Task Manager 上的作业都会失败。

  • 优点: 因为集群预先启动,可以更快的提交作业
  • 缺点: 资源隔离差,作业间会互相影响

使用前提

  1. 已创建包含 Flink 组件服务的 LAS 集群。详情参见:创建集群
  2. 集群的访问链接需要保证客户端和集群网络互相连通。

基础使用

Application 模式

  1. 通过 SSH 方式,使用命令行终端连接集群。
  2. 执行以下命令,提交作业。
source /etc/emr/flink/flink-env.sh
flink run-application -t yarn-application -j /usr/lib/emr/current/flink/examples/streaming/WordCount.jar

提交成功后,会返回已提交的 Flink 作业的 YARN Application ID。返回如下类似信息。
Image

  1. 执行以下命令,查看作业状态。
flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY

Image
也可以通过访问 YARN ResourceManager UI,根据 YARN Application ID 搜索并查看 Flink Web UI

  1. 执行以下命令,停止作业
flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

Image

Session 模式

  1. 通过 SSH 方式,使用命令行终端连接集群。
  2. 执行以下命令,创建 Session 集群。
source /etc/emr/flink/flink-env.sh
/usr/lib/emr/current/flink/bin/yarn-session.sh --detached

创建成功后,会返回 Session 集群的 YARN Application ID。返回如下类似信息。
Image

  1. 执行以下命令,提交作业。
flink run -t yarn-session --detached -j /usr/lib/emr/current/flink/examples/streaming/TopSpeedWindowing.jar -Dyarn.application.id=application_XXXX_YY
  1. 提交成功后,会返回已提交的 Flink 作业的 Job ID。返回如下类似信息

Image

  1. 执行以下命令,查看作业状态。
flink list -t yarn-session -Dyarn.application.id=application_XXXX_YY

Image
也可以通过访问 YARN ResourceManager UI,根据 YARN Application ID 搜索并查看 Flink Web UI

  1. 执行以下命令,停止作业。
flink cancel -t yarn-session -Dyarn.application.id=application_XXXX_YY <jobId>

Image

  1. 执行以下命令,停止集群
echo "stop" | /usr/lib/emr/current/flink/bin/yarn-session.sh -id application_XXXXX_XXX

Image

Per-job 模式

  1. 通过 SSH 方式,使用命令行终端连接集群。
  2. 执行以下命令,提交作业。
source /etc/emr/flink/flink-env.sh
flink run -t yarn-per-job --detached -j /usr/lib/emr/current/flink/examples/streaming/TopSpeedWindowing.jar

提交成功后,会返回已提交的 Flink 作业的 YARN Application ID 以及 Job ID。返回如下类似信息。
Image

  1. 执行以下命令,查看作业状态。
flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY

Image
也可以通过访问 YARN ResourceManager UI,根据 YARN Application ID 搜索并查看 Flink Web UI。

  1. 执行以下命令,停止作业。
flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

Image

DataStream 开发指南

依赖

Flink 核心依赖无需打入最终的 jar 包,maven-shade-plugin 插件用于构建 fat jar。下面提供了 Demo,如需获取更多信息,可参考:Flink 打包 Pom 依赖示例

<properties>
  <maven.compiler.source>8</maven.compiler.source>
  <maven.compiler.target>8</maven.compiler.target>
  <flink.version>1.16.1</flink.version>
</properties>

<dependencies>
  <!--flink 核心依赖-->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>

  <!-- kafka connector 依赖-->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
  </dependency>
</dependencies>

<build>
  <finalName>kafka-2-kafka-job</finalName>
  <plugins>
    <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
    <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <version>3.0.0</version>
      <executions>
        <execution>
          <phase>package</phase>
          <goals>
            <goal>shade</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

Java 实现

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;

public class Kafka2KafkaJob {

  private static final String BROKERS = "ip:port,ip1:port1,ip2:port2";

  public static void main(final String[] args) {
    new Kafka2KafkaJob().run();
  }

  private void run() {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().setParallelism(1);
    final KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(BROKERS)
        .setTopics("emr-topic-test")
        .setGroupId("emr-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    final DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(),
        "Kafka Source");

    /*
     * print sink.
     */
    kafkaSource.addSink(new PrintSinkFunction<>());

    /*
     * kafka sink.
     */
    final KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
        .setBootstrapServers(BROKERS)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("emr-topic-test-1")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
    kafkaSource.sinkTo(kafkaSink);

    try {
      env.execute("kafka2KafkaJob");
    } catch (final Exception e) {
      e.printStackTrace();
    }
  }
}

作业 UI 图

Image

注意事项

  1. 代码中不可使用匿名类,可采用 lambda 或显式声明的方式。
  2. 提交 Kafka 作业时需指定父类优先加载 class,以规避依赖冲突,参数为-D classloader.resolve-order=parent-first
最近更新时间:2025.04.01 20:13:41
这个页面对您有帮助吗?
有用
有用
无用
无用