You need to enable JavaScript to run this app.
导航

使用 Kafka 协议上传日志

最近更新时间2024.04.09 21:17:09

首次发布时间2022.10.14 11:41:16

日志服务支持通过 Kafka 协议上传日志数据到服务端,即可以使用 Kafka Producer SDK 来采集日志数据,并通过 Kafka 协议上传到日志服务。本文介绍通过 Kafka 协议将日志上传到日志服务的操作步骤。

背景信息

Kafka 作为高吞吐量的消息中间件,在多种自建场景的日志采集方案中被用于消息管道。例如在日志源服务器中的开源采集工具采集日志,或通过 Producer 直接写入日志数据,再通过消费管道供下游应用进行消费。日志服务支持通过 Kafka 协议上传和消费日志数据,基于 Kafka 数据管道提供完整的数据上下行服务。
使用 Kafka 协议上传日志功能,无需手动开启功能,无需在数据源侧安装数据采集工具,基于简单的配置即可实现 Kafka Producer 采集并上传日志信息到日志服务。日志服务提供基于 Java 和 Go 语言的示例项目供您参考,详细信息请参考示例
通过 Kafka 协议采集日志时,对于合法的 JSON 格式日志,日志服务会正常解析为 Key-Value 对;对于不合法的 JSON 格式,部分字段可能出现会解析错乱的情况;对于其他格式的日志数据,原始日志全文会以字符串格式被统一封装在字段 __content__ 中。

说明

通过 Kafka 协议解析 JSON 格式日志时,最多支持一层扩展,包含多层嵌套的日志字段将被作为一个字符串进行采集和保存。

限制说明

  • 支持的 Kafka 协议版本为 0.11.x~2.0.x。
  • 支持压缩方式包括 gzip、snappy 和 lz4。
  • 为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务项目 ID,密码为火山引擎账号密钥,详细信息请参考示例
  • 如果日志主题中有多个 Shard,日志服务不保证数据的有序性,建议使用负载均衡模式上传日志。
  • 当使用 Kafka Producer Batch 打包发送数据的时候,一次 Batch 数据的大小不能超过 5MiB,一条消息的大小上限是 5MiB,一个 Batch 请求中消息条数不能超过 10000 条,服务端会对每次 Producer 请求写入的日志数据进行长度检查,如果超出限制则整个请求失败且无任何日志数据成功写入。

前提条件

  • 已开通日志服务,创建日志项目与日志主题,并成功采集到日志数据。详细说明请参考快速入门
  • 确保当前操作账号拥有开通 Kafka 协议上传日志的权限,即具备 Action PutLogs 的权限。详细信息请参考可授权的操作

参数说明

使用 Kafka 协议上传日志时,您需要配置以下参数。

参数

示例

说明

连接类型

SASL_SSL

为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务项目 ID,密码为火山引擎账号密钥。

username

c8f20efe-405f-4d57-98cf-8c58d890****

Kafka SASL 用户名。应配置为日志服务的日志项目 ID。

password

AKLTYmQzOWUzMWx*******#WVRnM05UWTRaVGhrTUdFNE5EazNZV0kyTjJRME********

Kafka SASL 用户密码。应配置为火山引擎账户密钥。
格式为 ${access-key-id}#${access-key-secret},其中:

  • ${access-key-id} 应替换为您的 AccessKey ID。
  • ${access-key-secret} 应替换为您的 AccessKey Secret。

说明

建议使用 IAM 用户的 AK,且 IAM 用户应具备 Action PutLogs 的权限。详细信息请参考可授权的操作

hosts

  • 公网:tls-cn-beijing.volces.com:9094
  • 私网:tls-cn-beijing.ivolces.com:9094

初始连接的集群地址,格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9094,其中:

  • 服务地址为当前地域下日志服务的服务入口。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址
  • 端口号固定为 9094。

说明

hosts 中的服务地址部分无需指定 https://

topic

20a50a35-304a-4c01-88d2-23349c30****

配置为日志服务的日志主题 ID。

配置步骤

日志服务支持自动生成 Kafka 协议上传日志数据的相关参数配置。根据以下操作步骤填写上传日志的各项基础配置和数据源信息后,控制台页面将自动生成对应的参数配置列表供您参考。

  1. 登录日志服务控制台

  2. 在顶部导航栏中选择日志服务所在的地域。

  3. 在左侧导航栏中选择日志服务 > 日志项目管理,并单击指定的日志项目名称。

  4. 在左侧导航栏中单击日志接入

  5. 数据导入区域选择Kafka协议写入

  6. 填写基础信息配置,并单击下一步

    配置

    说明

    导入类型

    默认为Kafka协议写入

    日志主题名称

    选择通过 Kafka 协议上传的日志数据所保存的日志主题。

  7. 填写数据源配置,并单击下一步

    配置

    说明

    密钥

    火山引擎账户密钥,包括 AccessKey ID 和 AccessKey Secret。您可以参考页面提示获取密钥。

    说明

    建议使用 IAM 用户的 AK,且 IAM 用户应具备 Action PutLogs 的权限。详细信息请参考可授权的操作

    网络连接方式

    选择通过内网公网连接日志服务并上传日志。若选择内网,请确认数据源可通过火山引擎内网正常访问日志服务。

    日志压缩方式

    原始日志的压缩方式。目前支持的压缩方式包括 gzip、snappy 和 lz4。

    Kafka生产端

    日志数据源类型。目前支持 Kafka 开源 SDK 或 Logstash 通过 Kafka 协议上传日志。

    结果预览

    日志服务根据以上参数配置自动生成对应的示例供您参考。

    • Kafka 开源 SDK:直接复制各个配置项的取值,并将其填写在 Kafka 开源 SDK 的对应参数中。完整的代码示例请参考示例代码
    • Logstash:日志服务自动生成 Logstash 的 Kafka 插件配置,测试插件连通性。详细说明请参考通过 Logstash 上传日志

    结果预览示例如下:

    图片

  8. 设置索引,并单击提交
    设置索引后,采集到服务端的日志数据才能被检索分析。设置索引的详细说明请参考配置索引

示例

通过 Logstash 上传日志

Logstash 内置 Kafka 输出插件(logstash_output_kafka),可以通过 Kafka 协议采集数据。在通过 ELK 搭建日志采集分析系统的场景下,只需修改 Logstash 配置文件,实现通过 Kafka 协议上传日志数据到日志服务。
通过日志服务控制台自动生成 Logstash 的 Kafka 插件配置示例后,可以通过该示例测试插件连通性。其中,${hosts} 等参数说明请参考配置步骤中的结果预览

说明

  • 日志服务使用 SASL_SSL 连接协议,因此 Kakfa 需要通过 SASL_SSL 接入点实现消息的收发机制。
  • 支持的 Logstash 版本为 7.12~8.8.1。如果您需要使用其他 Logstash 版本,可以通过工单系统联系技术支持沟通业务需求。
  • 建议在测试阶段通过以下配置测试插件连通性,生产环境中需要删除其中 stdout 相关的输出配置。
  • 对于 JSON 格式的日志,建议在配置中将日志输出格式设置为 JSON(codec => json),否则会导致上传的数据不完整。日志服务会自动解析并结构化 JSON 格式日志数据。

通过 Kafka Java SDK 上传日志

通过简单的参数配置,即可使用各类 Kafka Producer SDK 采集日志数据,并通过 Kafka 协议上传到日志服务。通过 Kafka Java SDK 上传日志的相关依赖及示例代码如下:

  1. 添加依赖。
    在 pom 文件中添加 kafka-clients 的相关依赖。

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.2</version>
    </dependency>
    
  2. 上传日志。
    参考以下示例代码通过 Kafka Java SDK 上传日志。

    说明

    执行以下示例代码之前请参考配置步骤中的结果预览正确填写 userName 等参数配置。

    package org.kafka;
    
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.config.SaslConfigs;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class ProducerBatchDemo {
        public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
            Properties props = new Properties();
            String userName = "${projectId}";
            //火山引擎账号的密钥,或具备对应权限的子账号密钥。不支持STS临时安全令牌。
            String passWord = "${AK}#${SK}";
            props.put("bootstrap.servers", "${hosts}");
            props.put("acks", "1"); // NoResponse 0;WaitForLocal 1;WaitForAll -1
            props.put("retries", 5);
            props.put("linger.ms", 100);
            props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 配置压缩方式
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.put(SaslConfigs.SASL_JAAS_CONFIG,
                    "org.apache.kafka.common.security.plain.PlainLoginModule " +
                            "required username=\"" + userName + "\" password=\"" + passWord + "\";");
            // 1.创建一个生产者对象
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
    
            // 2.调用send方法
            for (int i = 0; i < 1000; i++) {
                // java sdk不区分发送一条消息还是批量发送消息,通过配置文件做区分,主要是通过batch.size, linger.ms, buffer.memory
                Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("${topicId}", "testMessageValue-" + i));
                RecordMetadata recordMetadata = meta.get(100, TimeUnit.SECONDS);
                // 默认返回offset为-1
                System.out.println("partition=" + recordMetadata.partition() + "offset = " + recordMetadata.offset());
            }
            // 3.关闭生产者
            System.out.println("Kafka Producer 是异步发送数据,建议等待 10s 后再关闭进程,以确保所有的数据都成功发送,否则会导致上传的数据不完整。");
            Thread.sleep(10000);
            producer.close();
        }
    }
    

通过 Kafka Go SDK 上传日志

通过简单的参数配置,即可使用各类 Kafka Producer SDK 采集日志数据,并通过 Kafka 协议上传到日志服务。通过 Kafka Go SDK 上传日志的相关依赖和示例代码如下:

  1. 执行以下命令安装 Sarama。

    go get github.com/Shopify/sarama@v1.38.1
    
  2. 导入 Sarama 等必要的依赖包,并上传日志。
    参考以下示例代码通过 Kafka Go SDK 上传日志。

    说明

    执行以下示例代码之前请参考配置步骤中的结果预览正确填写 User 等参数配置。

    package kafka_produce_example
    
    import (
        "fmt"
        "time"
    
        "github.com/Shopify/sarama"
    )
    
    func ExampleProduce() {
        config := sarama.NewConfig()
        config.Version = sarama.V2_0_0_0 // specify appropriate version
        config.ApiVersionsRequest = true
        config.Net.SASL.Mechanism = "PLAIN"
        config.Producer.Return.Successes = true
        config.Net.SASL.Version = int16(0)
        config.Net.SASL.Enable = true
        // projectId
        config.Net.SASL.User = "${projectId}"
        // 火山引擎账号的密钥,或具备对应权限的子账号密钥。不支持STS临时安全令牌。
        config.Net.SASL.Password = "${access-key-id}#${access-key-secret}"
        config.Producer.RequiredAcks = sarama.WaitForLocal
        config.Producer.Compression = sarama.CompressionLZ4
    
        config.Net.TLS.Enable = true
    
        // hosts为生产的地址,具体见文档
        asyncProducer, err := sarama.NewAsyncProducer([]string{"${hosts}"}, config)
        if err != nil {
           fmt.Println("new producer error:" + err.Error())
           panic(err)
        }
    
        msg := &sarama.ProducerMessage{
           // ${topicID}为tls的日志主题ID,例如"0fdaa6b6-3c9f-424c-8664-fc0d222c****"。
           Topic: "${topicID}",
           Value: sarama.StringEncoder("test"),
        }
        asyncProducer.Input() <- msg
        go func() {
           for {
              select {
              case msg := <-asyncProducer.Successes():
                 if msg != nil {
                    // offset 默认返回为-1
                    fmt.Printf("send response; partition:%d, offset:%d\n", msg.Partition, msg.Offset)
                 }
              case msg := <-asyncProducer.Errors():
                 if msg != nil {
                    fmt.Println("producer send error:" + msg.Error())
                 }
              }
           }
        }()
    
        time.Sleep(time.Minute)
        _ = asyncProducer.Close()
    }
    

错误信息

使用 Kafka 协议上传日志失败时,会按照 Kafka 的错误码返回对应的错误信息,请参考 Kafka error list获取更多信息。
除此之外,日志服务还在 Java 语言的 Kafka 错误码 SASLAuthenticationException 中封装了鉴权、配置相关参数的错误信息,详细说明如下:

错误信息

说明

invalid SASL/PLAIN request: expected 3 tokens

未配置 user 或者 password。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: empty projectId

未配置 user 字段。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: Password format wrong

password 字段配置错误。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: empty AccessKey

未配置 AK。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: empty SecretKey

未配置 SK。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: Invalid projectId

指定的 Project ID 不存在。请检查 Project ID 是否输入正确。

Access Denied. You are not authorized to perform this operation.

当前用户不具备操作权限。建议检查用户权限。