You need to enable JavaScript to run this app.
导航
基础使用
最近更新时间:2025.04.01 20:13:41首次发布时间:2024.11.12 16:54:08
我的收藏
有用
有用
无用
无用

背景信息

通过 SSH 方式登录 LAS 集群的服务器,通过命令行提交 Flink 作业。
基于 YARN 模式部署的 Flink 支持 Application 模式、Session 模式以及 Per-Job 模式运维作业。
Image
图片来自 Flink 官方文档

模式

描述

优缺点

Application模式

Application 模式会为每个提交的应用程序创建一个集群,这个集群可以被视为一个仅在特定应用程序的作业之间共享的 Session 集群,并且会在所有作业完成时关闭。从整个应用程序的粒度来看,Application 模式提供了与 Per-Job 模式相同的资源隔离和负载均衡保障。作业的 main() 方法在 JobManager 上执行,这能够降低提交端的 CPU 压力,同时节省本地下载依赖所需的带宽。

  • 优点
    • 提供较好的资源隔离保证
    • 降低提交端提交作业时的压力
  • 缺点:作业启动时间以及作业资源开销会大一些

Per-Job 模式

在 Per-Job 模式下,每一个作业都会启动一个专属的 Flink 集群,各个作业之间不会相互影响,能够提供更好的资源隔离保障,作业完成后集群关闭。

  • 优点:提供较好的资源隔离保证
  • 缺点:作业启动时间以及作业资源开销会大一些

Session 模式

Session 模式会预先启动一个 Flink 集群,能在该集群中运行多个作业,不过该集群在作业运行结束后不会自动释放。作业之间的隔离性欠佳,倘若某个作业异常致使 Task Manager 退出,那么其他所有运行在该 Task Manager 上的作业都会失败。

  • 优点: 因为集群预先启动,可以更快的提交作业
  • 缺点: 资源隔离差,作业间会互相影响

使用前提

  1. 已创建包含 Flink 组件服务的 LAS 集群。详情参见:创建集群
  2. 集群的访问链接需要保证客户端和集群网络互相连通。

基础使用

Application 模式

  1. 通过 SSH 方式,使用命令行终端连接集群。
  2. 执行以下命令,提交作业。
source /etc/emr/flink/flink-env.sh
flink run-application -t yarn-application -j /usr/lib/emr/current/flink/examples/streaming/WordCount.jar

提交成功后,会返回已提交的 Flink 作业的 YARN Application ID。返回如下类似信息。
Image

  1. 执行以下命令,查看作业状态。
flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY

Image
也可以通过访问 YARN ResourceManager UI,根据 YARN Application ID 搜索并查看 Flink Web UI

  1. 执行以下命令,停止作业
flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

Image

Session 模式

  1. 通过 SSH 方式,使用命令行终端连接集群。
  2. 执行以下命令,创建 Session 集群。
source /etc/emr/flink/flink-env.sh
/usr/lib/emr/current/flink/bin/yarn-session.sh --detached

创建成功后,会返回 Session 集群的 YARN Application ID。返回如下类似信息。
Image

  1. 执行以下命令,提交作业。
flink run -t yarn-session --detached -j /usr/lib/emr/current/flink/examples/streaming/TopSpeedWindowing.jar -Dyarn.application.id=application_XXXX_YY
  1. 提交成功后,会返回已提交的 Flink 作业的 Job ID。返回如下类似信息

Image

  1. 执行以下命令,查看作业状态。
flink list -t yarn-session -Dyarn.application.id=application_XXXX_YY

Image
也可以通过访问 YARN ResourceManager UI,根据 YARN Application ID 搜索并查看 Flink Web UI

  1. 执行以下命令,停止作业。
flink cancel -t yarn-session -Dyarn.application.id=application_XXXX_YY <jobId>

Image

  1. 执行以下命令,停止集群
echo "stop" | /usr/lib/emr/current/flink/bin/yarn-session.sh -id application_XXXXX_XXX

Image

Per-job 模式

  1. 通过 SSH 方式,使用命令行终端连接集群。
  2. 执行以下命令,提交作业。
source /etc/emr/flink/flink-env.sh
flink run -t yarn-per-job --detached -j /usr/lib/emr/current/flink/examples/streaming/TopSpeedWindowing.jar

提交成功后,会返回已提交的 Flink 作业的 YARN Application ID 以及 Job ID。返回如下类似信息。
Image

  1. 执行以下命令,查看作业状态。
flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY

Image
也可以通过访问 YARN ResourceManager UI,根据 YARN Application ID 搜索并查看 Flink Web UI。

  1. 执行以下命令,停止作业。
flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

Image

DataStream 开发指南

依赖

Flink 核心依赖无需打入最终的 jar 包,maven-shade-plugin 插件用于构建 fat jar。下面提供了 Demo,如需获取更多信息,可参考:Flink 打包 Pom 依赖示例

<properties>
  <maven.compiler.source>8</maven.compiler.source>
  <maven.compiler.target>8</maven.compiler.target>
  <flink.version>1.16.1</flink.version>
</properties>

<dependencies>
  <!--flink 核心依赖-->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>

  <!-- kafka connector 依赖-->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
  </dependency>
</dependencies>

<build>
  <finalName>kafka-2-kafka-job</finalName>
  <plugins>
    <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
    <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <version>3.0.0</version>
      <executions>
        <execution>
          <phase>package</phase>
          <goals>
            <goal>shade</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

Java 实现

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;

public class Kafka2KafkaJob {

  private static final String BROKERS = "ip:port,ip1:port1,ip2:port2";

  public static void main(final String[] args) {
    new Kafka2KafkaJob().run();
  }

  private void run() {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().setParallelism(1);
    final KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(BROKERS)
        .setTopics("emr-topic-test")
        .setGroupId("emr-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    final DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(),
        "Kafka Source");

    /*
     * print sink.
     */
    kafkaSource.addSink(new PrintSinkFunction<>());

    /*
     * kafka sink.
     */
    final KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
        .setBootstrapServers(BROKERS)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("emr-topic-test-1")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
    kafkaSource.sinkTo(kafkaSink);

    try {
      env.execute("kafka2KafkaJob");
    } catch (final Exception e) {
      e.printStackTrace();
    }
  }
}

作业 UI 图

Image

注意事项

  1. 代码中不可使用匿名类,可采用 lambda 或显式声明的方式。
  2. 提交 Kafka 作业时需指定父类优先加载 class,以规避依赖冲突,参数为-D classloader.resolve-order=parent-first