You need to enable JavaScript to run this app.
导航

基于 Flume 上传日志

最近更新时间2024.03.28 10:35:47

首次发布时间2024.03.28 10:35:47

Flume 是一个分布式、高可靠、高可用的海量日志采集、聚合和传输系统,支持从各个应用程序中收集和聚合数据,并将其存储到一个数据存储系统中。本文介绍如何通过 Flume 的 Kafka Sink 将数据上传到日志服务。

背景信息

当 Flume 作为数据采集工具时,Flume的 Kafka Sink 支持将 Flume Channel 中的数据发送到 Kafka 中,而日志服务支持通过 Kafka 协议接收数据,因此 Flume 可以通过 Kafka Sink 将数据上传到日志服务的日志主题中。

前提条件

  • 已创建日志项目和日志主题。详细操作步骤请参考创建日志项目创建日志主题
  • 推荐使用 IAM 用户进行访问鉴权。使用 IAM 用户前,需确认已创建 IAM 用户且拥有通过 Kafka 协议上传日志的权限,即具备PutLogs 权限。详细信息请参考可授权的操作

Flume 配置

通过 Flume 上传数据到日志服务时,需要配置 Flume 提供的 Kafka Sink。此处仅罗列了上传数据到日志服务所需要的参数和影响数据上传性能的参数,其他参数说明请参考 Kafka Sink

说明

本文以 Flume 1.9.0 版本为例,不同 Flume 版本的配置参数可能不同,详细信息请参考Flume 官网文档

参数名称

是否必选

示例值

描述

type

org.apache.flume.sink.kafka.KafkaSink。

必须配置为 org.apache.flume.sink.kafka.KafkaSink。

kafka.bootstrap.servers

tls-cn-beijing.volces.com:9094

初始连接的集群地址,格式为服务地址:端口,其中:

  • 服务地址为日志服务的服务入口。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址
  • 端口号固定为 9094。

说明

服务地址无需指定 https://

kafka.producer.security.protocol

SASL_SSL

为保证数据传输的安全性,此处必须设置连接协议为 SASL_SSL。

kafka.producer.sasl.mechanism

PLAIN

设置上传数据到日志服务的鉴权方式,此处需设置为 PLAIN。

kafka.producer.sasl.jaas.config

上传数据到日志服务的具体鉴权配置,包括 username 和 password。

  • 设置 username 为日志服务的日志项目 ID。
  • 设置 password 为火山引擎账号访问密钥。您可以在火山引擎控制台密钥管理页面,根据页面提示查看并复制 Access Key ID 和 Secret Access Key。此处的配置格式为 ${access-key-id}#${access-key-secret}
org.apache.kafka.common.security.plain.PlainLoginModule required username="日志服务项目ID" password="火山引擎账号密钥";

kafka.topic

f93fc61xxxxefec8a5

Kafka 主题,此处需配置为日志服务的日志主题 ID。Flume 上传数据时,将上传到日志服务的该日志主题中。

flumeBatchSize

100

每次上传的数据条数。较大的值将提高吞吐量但同时将增加延迟。默认值为100,不建议修改。

kafka.producer.acks

1

Kafka 生产者的消息发送确认机制。默认值为 1 ,表示 Leader broker 写入成功。不建议修改。

kafka.producer.batch.size

1048576

Kafka 生产者批量发送数据的大小,单位为字节。

说明

建议设置为 1048576,即 1MB。

kafka.producer.linger.ms

100

Kafka 生产者在批量发送消息前的等待时间,单位为毫秒。

说明

建议设置为 100。

kafka.producer.compression.type

lz4

Kafka 生产者发送消息的压缩方式,建议设置为 lz4。

配置示例

下述配置表示 Flume 将 /var/log/test1/example.log 日志文件中的数据上传到日志服务。

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log #替换为实际的日志文件。

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = tls-cn-beijing.ivolces.com:9094
a1.sinks.k1.channel = c1
a1.sinks.k1.kafka.topic = 5f93fcxxxxfec8a5
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.batch.size = 1048576
a1.sinks.k1.kafka.producer.linger.ms = 100
a1.sinks.k1.kafka.producer.compression.type = lz4
a1.sinks.k1.kafka.producer.security.protocol = SASL_SSL
a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
a1.sinks.k1.kafka.producer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="20fxxxx8ad7af" password="AKxxxx#T0RNME5EUTFZVxxxx";

a1.channels.c1.type = memory