TLS 日志服务提供 Kafka 协议消费功能,可以将一个日志主题当作一个 Kafka Topic 来消费,每条日志对应一条 Kafka 消息。您可以使用 Flink kafka 连接器连接日志服务,通过 Flink 任务将日志服务中采集的日志数据消费到下游的大数据组件或者数据仓库。
CREATE TABLE kafka_source (
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
-- 消费 topic 需要加前缀 out-
'topic' = 'out-${TLS_TOPIC_ID}',
-- 参考 TLS 文档站通过私网域名进行访问,9093 端口
'properties.bootstrap.servers' = 'tls-cn-beijing.ivolces.com:9093',
-- 配置安全协议为 SASL_SSL,验证机制为 PLAIN
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
-- 配置 JAAS。用户名密码分别填写 TLS 项目 ID 和 AK 、SK 内容
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="${TLS_PROJECT_ID}" password="${ACCESS_KEY}#${SECRET_KEY}";',
-- 指定消费者组,和消费模式以及解析格式
'properties.group.id' = 'test_topic_01',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE kafka_sink (
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
-- 结合具体地域选择相应的私网域名 9094 端口
'properties.bootstrap.servers' = 'tls-cn-beijing.ivolces.com:9094',
--关闭幂等。
'properties.enable.idempotence' = 'false',
-- 配置安全协议为 SASL_SSL,机制为 PLAIN
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
-- 配置 JAAS。用户名密码分别填写项目id AK 、SK 内容
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="${TLS_PROJECT_ID}" password="${ACCESS_KEY}#${SECRET_KEY}";',
-- 注意写入的时候 topic 为 TLS 的 Topic id
'topic' = '${TLS_TOPIC_ID}',
'format' = 'json'
);
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,TLS 当前支持 Kafka Connector 上传、消费 数据。 注意 Kafka-0.10 和 Kafka-0.11 两个版本的连接器使用的 Kafka 客户端有缺陷,在某些情况下可能无法自动提交 Kafka offset 信息。 |
topic | 是 | (none) | String | 指定 TLS 日志 Topic ID。注意:
|
properties.bootstrap.servers | 是 | (none) | String | 指定 TLS 的访问地址,可以参考 TLS 服务地址。注意:
|
properties.group.id | 是 | (none) | String | 指定 TLS 消费组的 ID。 |
format | 是 | (none) | String | 用来反序列化 TLS 返回消息体(value)时使用的格式。常见的日志格式如下:
|
scan.startup.mode | 否 | group-offsets | String | 读取数据时的启动模式。 取值如下:
|
scan.startup.specific-offsets | 否 | (none) | String | 在 specific-offsets 启动模式下,指定每个分区的启动偏移量。如 |
scan.startup.timestamp-millis | 否 | (none) | Long | 在 timestamp 启动模式下,指定启动位点时间戳,单位毫秒。 |
scan.parallelism | 否 | (none) | Integer | 单独设置 Source 并发。如果不设置,则并行度为作业的默认并发数。 |
另外作为生产者参数,TLS Kafka 支持如下参数类型:
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
properties.enable.idempotence | 否 | true | Boolean | 如果是写入 TLS 服务,需要关闭幂等。设置为 false 。 注意 如果您通过 Kafka 连接器写入 TLS 日志服务,那么必须将 |
properties.compression.type | 否 | (none) | string | 用于指定消息在发送到 Kafka 之前的压缩算法。推荐使用 lz4 算法。 |
properties.batch.size | 否 | 16 | string | 单个 Partition 对应的 Batch 中支持写入的最大字节数,默认值为 16 KB。
推荐值
|
properties.linger.ms | 否 | 0 | string | 消息在 Batch 中的停留时间,即发送消息前的等待时长。默认为 0 毫秒,表示“立即发送消息”。
|
properties.max.request.size | 否 | (none) | Integer | 用于指定单个请求(一个请求中可能有多个分区)的最大大小。单位:字节。
|
properties.buffer.memory | 否 | 32M | string | 缓存消息的总可用 Memory 空间,如果 Memory 用完,则会立即发送缓存中的消息。
说明 如果 buffer.memory 较小,可能会造成 Batch 失效,从而导致 QPS 升高被下游限流等问题。 |
sink.partitioner | 否 | fixed | String | Flink 分区到 TLS 分区的映射关系。取值如下:
|
sink.parallelism | 否 | (none) | Integer | 单独设置 TLS Sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。 |
以下是 TLS 作为源表的消费方式:
CREATE TABLE tls_source (
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'out-${TLS_TOPIC_ID}',
'properties.bootstrap.servers' = 'tls-cn-beijing.ivolces.com:9093',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="${TLS_PROJECT_ID}" password="${ACCESS_KEY}#${SECRET_KEY}";',
'properties.group.id' = 'test_topic_01',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE print_sink (
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'print'
);
insert into print_sink
select * from tls_source;
以下是 TLS 作为生产者的结果表示例代码:
CREATE TABLE datagen_source (
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5'
);
CREATE TABLE tls_sink (
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'tls-cn-beijing.ivolces.com:9094',
'properties.enable.idempotence' = 'false',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="${TLS_PROJECT_ID}" password="${ACCESS_KEY}#${SECRET_KEY}";',
'topic' = '${TLS_TOPIC_ID}',
'format' = 'json'
);
insert into tls_sink
select * from datagen_source;
POM 文件依赖,其中 Flink 版本当前支持 1.11、1.16 和 1.17 等选项。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
TLS 消费代码示例:
package com.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KafkaConsumerJob {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule" +
" required username=\"${TLS_PROJECT_ID}\"" +
" password=\"${ACCESS_KEY}#${SECRET_KEY}\";";
// 配置 Kafka source
KafkaSource<String> source = KafkaSource.<String>builder()
// 设置消费
.setBootstrapServers("tls-cn-beijing.ivolces.com:9093")
.setTopics("out-${TLS_TOPIC_ID}")
.setGroupId("flink-consumer-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", jaas)
.build();
// 添加 source 并处理数据
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 添加数据处理逻辑
stream.map(value -> {
System.out.println("Received message: " + value);
return value;
});
// 执行任务
env.execute("Kafka Consumer Job");
}
}
TLS 写入代码示例:
package com.example;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import java.time.LocalDateTime;
import java.util.Properties;
public class KafkaProducerJob {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建数据源
DataStream<String> sourceStream = env.addSource(new CustomDataSource());
// 认证信息中注意填写 project-id、AK、SK
String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule" +
" required username=\"${project-id}\"" +
" password=\"${ACCESS_KEY}#${SECRET_KEY}\";";
// 配置 Kafka sink
KafkaSink<String> sink = KafkaSink.<String>builder()
// 结合具体地域修改私网访问地址
.setBootstrapServers("tls-cn-beijing.ivolces.com:9094")
// 注意配置 topic-id
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("${topic-d}")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// 配置认证、关闭幂等特性等配置
.setProperty("security.protocol", "SASL_SSL")
.setProperty("enable.idempotence", "false")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", jaas)
.build();
// 将数据写入 Kafka
sourceStream.sinkTo(sink);
// 执行任务
env.execute("Kafka Producer Job");
}
// 自定义数据源
public static class CustomDataSource implements SourceFunction<String> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
// 模拟 json 数据写入
String message = "{\"a\": 1, \"b\": 2}";
ctx.collect(message);
Thread.sleep(500); // 每秒生成一条数据
}
}
@Override
public void cancel() {
isRunning = false;
}
}
}
如果修改了作业的拓扑结构,新增了一些算子,则不能从历史状态恢复,此时可以丢状态,并从 timestamp 或者 group-offsets 模式启动:
适用于之前的作业没开过 checkpoint。需要从数据曲线-业务延迟监控里,找到当前消费延迟,反推出当前消费的时间点,从这个时间点恢复即可。
比如业务延迟是 1h,当前时刻是 12 点,则反推出已消费时间点是 11 点,为了避免丢数,则可以根据情况往前多消费一点,比如往前 30min,那么要设置的恢复的时刻就是 10:30。
参数如下:
'properties.scan.startup.mode' = 'timestamp',
'properties.scan.startup.timestamp-millis' = 'xxxx' -- 毫秒时间戳,可以从 https://tool.chinaz.com/tools/unixtime.aspx 获取
适用于之前的作业开过 checkpoint,并完成过 checkpoint。由于 checkpoint 时,kafka source 会向 kafka 实例提交 offset,因此 offset 会保存一份到 kafka 实例。
此时需要把 kafka source 里的 startup mode 改为 group-offsets,参数如下:
'properties.scan.startup.mode' = 'group-offsets'
注意,如果这个消费者组之前没有 offset 信息,第一次启动的时候会报错无法获取 partition 的 offset 信息,此时需要设置如下 kafka 参数:
'properties.auto.offset.reset' = 'earliest'
此参数意义为如果没有 group-offset 则从最老的位置进行消费
目前一般使用以下两种方式自动提交 Kafka Offsets。
注意
依赖 Flink 任务 Checkpoint 来管理 Kafka Offsets 时,如果上游数据量很大,很可能会触发上游的 LAG 告警。
enable.auto.commit
和auto.commit.interval.ms
两个参数来控制位点定时自动提交。-- 是否自动提交 Offsets。取值为 true 表示自动提交 Offsets;取值为 false,表示手动同步或异步提交。
'enable.auto.commit' = 'true',
-- 自动提交 Offsets 的时间间隔,单位为 ms。
'auto.commit.interval.ms' = '500',