通过 SSH 方式登录 LAS 集群的服务器,通过命令行提交 Flink 作业。
基于 YARN 模式部署的 Flink 支持 Application 模式、Session 模式以及 Per-Job 模式运维作业。
图片来自 Flink 官方文档。
模式 | 描述 | 优缺点 |
|---|---|---|
Application模式 | Application 模式会为每个提交的应用程序创建一个集群,这个集群可以被视为一个仅在特定应用程序的作业之间共享的 Session 集群,并且会在所有作业完成时关闭。从整个应用程序的粒度来看,Application 模式提供了与 Per-Job 模式相同的资源隔离和负载均衡保障。作业的 |
|
Per-Job 模式 | 在 Per-Job 模式下,每一个作业都会启动一个专属的 Flink 集群,各个作业之间不会相互影响,能够提供更好的资源隔离保障,作业完成后集群关闭。 |
|
Session 模式 | Session 模式会预先启动一个 Flink 集群,能在该集群中运行多个作业,不过该集群在作业运行结束后不会自动释放。作业之间的隔离性欠佳,倘若某个作业异常致使 Task Manager 退出,那么其他所有运行在该 Task Manager 上的作业都会失败。 |
|
source /etc/emr/flink/flink-env.sh flink run-application -t yarn-application -j /usr/lib/emr/current/flink/examples/streaming/WordCount.jar
提交成功后,会返回已提交的 Flink 作业的 YARN Application ID。返回如下类似信息。
flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
也可以通过访问 YARN ResourceManager UI,根据 YARN Application ID 搜索并查看 Flink Web UI
flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
source /etc/emr/flink/flink-env.sh /usr/lib/emr/current/flink/bin/yarn-session.sh --detached
创建成功后,会返回 Session 集群的 YARN Application ID。返回如下类似信息。
flink run -t yarn-session --detached -j /usr/lib/emr/current/flink/examples/streaming/TopSpeedWindowing.jar -Dyarn.application.id=application_XXXX_YY
flink list -t yarn-session -Dyarn.application.id=application_XXXX_YY
也可以通过访问 YARN ResourceManager UI,根据 YARN Application ID 搜索并查看 Flink Web UI
flink cancel -t yarn-session -Dyarn.application.id=application_XXXX_YY <jobId>
echo "stop" | /usr/lib/emr/current/flink/bin/yarn-session.sh -id application_XXXXX_XXX
source /etc/emr/flink/flink-env.sh flink run -t yarn-per-job --detached -j /usr/lib/emr/current/flink/examples/streaming/TopSpeedWindowing.jar
提交成功后,会返回已提交的 Flink 作业的 YARN Application ID 以及 Job ID。返回如下类似信息。
flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
也可以通过访问 YARN ResourceManager UI,根据 YARN Application ID 搜索并查看 Flink Web UI。
flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
Flink 核心依赖无需打入最终的 jar 包,maven-shade-plugin 插件用于构建 fat jar。下面提供了 Demo,如需获取更多信息,可参考:Flink 打包 Pom 依赖示例
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.16.1</flink.version> </properties> <dependencies> <!--flink 核心依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- kafka connector 依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <build> <finalName>kafka-2-kafka-job</finalName> <plugins> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; public class Kafka2KafkaJob { private static final String BROKERS = "ip:port,ip1:port1,ip2:port2"; public static void main(final String[] args) { new Kafka2KafkaJob().run(); } private void run() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setParallelism(1); final KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(BROKERS) .setTopics("emr-topic-test") .setGroupId("emr-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); final DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); /* * print sink. */ kafkaSource.addSink(new PrintSinkFunction<>()); /* * kafka sink. */ final KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setBootstrapServers(BROKERS) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("emr-topic-test-1") .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); kafkaSource.sinkTo(kafkaSink); try { env.execute("kafka2KafkaJob"); } catch (final Exception e) { e.printStackTrace(); } } }
-D classloader.resolve-order=parent-first