通过 SSH 方式登录 LAS 集群的服务器,通过命令行提交 Flink 作业。
基于 YARN 模式部署的 Flink 支持 Application 模式、Session 模式以及 Per-Job 模式运维作业。
图片来自 Flink 官方文档。
模式 | 描述 | 优缺点 |
---|---|---|
Application模式 | Application 模式会为每个提交的应用程序创建一个集群,这个集群可以被视为一个仅在特定应用程序的作业之间共享的 Session 集群,并且会在所有作业完成时关闭。从整个应用程序的粒度来看,Application 模式提供了与 Per-Job 模式相同的资源隔离和负载均衡保障。作业的 |
|
Per-Job 模式 | 在 Per-Job 模式下,每一个作业都会启动一个专属的 Flink 集群,各个作业之间不会相互影响,能够提供更好的资源隔离保障,作业完成后集群关闭。 |
|
Session 模式 | Session 模式会预先启动一个 Flink 集群,能在该集群中运行多个作业,不过该集群在作业运行结束后不会自动释放。作业之间的隔离性欠佳,倘若某个作业异常致使 Task Manager 退出,那么其他所有运行在该 Task Manager 上的作业都会失败。 |
|
source /etc/emr/flink/flink-env.sh flink run-application -t yarn-application -j /usr/lib/emr/current/flink/examples/streaming/WordCount.jar
提交成功后,会返回已提交的 Flink 作业的 YARN Application ID。返回如下类似信息。
flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
也可以通过访问 YARN ResourceManager UI,根据 YARN Application ID 搜索并查看 Flink Web UI
flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
source /etc/emr/flink/flink-env.sh /usr/lib/emr/current/flink/bin/yarn-session.sh --detached
创建成功后,会返回 Session 集群的 YARN Application ID。返回如下类似信息。
flink run -t yarn-session --detached -j /usr/lib/emr/current/flink/examples/streaming/TopSpeedWindowing.jar -Dyarn.application.id=application_XXXX_YY
flink list -t yarn-session -Dyarn.application.id=application_XXXX_YY
也可以通过访问 YARN ResourceManager UI,根据 YARN Application ID 搜索并查看 Flink Web UI
flink cancel -t yarn-session -Dyarn.application.id=application_XXXX_YY <jobId>
echo "stop" | /usr/lib/emr/current/flink/bin/yarn-session.sh -id application_XXXXX_XXX
source /etc/emr/flink/flink-env.sh flink run -t yarn-per-job --detached -j /usr/lib/emr/current/flink/examples/streaming/TopSpeedWindowing.jar
提交成功后,会返回已提交的 Flink 作业的 YARN Application ID 以及 Job ID。返回如下类似信息。
flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
也可以通过访问 YARN ResourceManager UI,根据 YARN Application ID 搜索并查看 Flink Web UI。
flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
Flink 核心依赖无需打入最终的 jar 包,maven-shade-plugin 插件用于构建 fat jar。下面提供了 Demo,如需获取更多信息,可参考:Flink 打包 Pom 依赖示例
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.16.1</flink.version>
</properties>
<dependencies>
<!--flink 核心依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- kafka connector 依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<finalName>kafka-2-kafka-job</finalName>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
public class Kafka2KafkaJob {
private static final String BROKERS = "ip:port,ip1:port1,ip2:port2";
public static void main(final String[] args) {
new Kafka2KafkaJob().run();
}
private void run() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setParallelism(1);
final KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(BROKERS)
.setTopics("emr-topic-test")
.setGroupId("emr-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
final DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(),
"Kafka Source");
/*
* print sink.
*/
kafkaSource.addSink(new PrintSinkFunction<>());
/*
* kafka sink.
*/
final KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers(BROKERS)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("emr-topic-test-1")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
kafkaSource.sinkTo(kafkaSink);
try {
env.execute("kafka2KafkaJob");
} catch (final Exception e) {
e.printStackTrace();
}
}
}
-D classloader.resolve-order=parent-first