You need to enable JavaScript to run this app.
日志服务

日志服务

复制全文
数据采集
使用 Kafka 协议上传日志
复制全文
使用 Kafka 协议上传日志

日志服务支持通过 Kafka 协议上传日志数据到服务端,即可以使用 Kafka Producer SDK 来采集日志数据,并通过 Kafka 协议上传到日志服务。本文介绍通过 Kafka 协议将日志上传到日志服务的操作步骤。

背景信息

Kafka 作为高吞吐量的消息中间件,在多种自建场景的日志采集方案中被用于消息管道。例如在日志源服务器中的开源采集工具采集日志,或通过 Producer 直接写入日志数据,再通过消费管道供下游应用进行消费。日志服务支持通过 Kafka 协议上传和消费日志数据,基于 Kafka 数据管道提供完整的数据上下行服务。
使用 Kafka 协议上传日志功能,无需手动开启功能,无需在数据源侧安装数据采集工具,基于简单的配置即可实现 Kafka Producer 采集并上传日志信息到日志服务。日志服务提供基于 Java 和 Go 语言的示例项目供您参考,详细信息请参考示例
通过 Kafka 协议采集日志时,对于合法的 JSON 格式日志,日志服务会正常解析为 Key-Value 对;对于不合法的 JSON 格式,部分字段可能出现会解析错乱的情况;对于其他格式的日志数据,原始日志全文会以字符串格式被统一封装在字段 __content__ 中。

说明

通过 Kafka 协议解析 JSON 格式日志时,最多支持一层扩展,包含多层嵌套的日志字段将被作为一个字符串进行采集和保存。

限制说明

  • 支持的 Kafka 协议版本为 0.11.x~2.0.x。
  • 支持压缩方式包括 gzip、snappy 和 lz4。
  • 为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务项目 ID,密码为火山引擎账号密钥,详细信息请参考示例
  • 如果日志主题中有多个 Shard,日志服务不保证数据的有序性,建议使用负载均衡模式上传日志。
  • 当使用 Kafka Producer Batch 打包发送数据的时候,一次 Batch 数据的大小不能超过 5MiB,一条消息的大小上限是 5MiB,一个 Batch 请求中消息条数不能超过 10000 条,服务端会对每次 Producer 请求写入的日志数据进行长度检查,如果超出限制则整个请求失败且无任何日志数据成功写入。

前提条件

  • 已开通日志服务,创建日志项目与日志主题,并成功采集到日志数据。详细说明请参考快速入门
  • 确保当前操作账号拥有开通 Kafka 协议上传日志的权限,即具备 Action PutLogs 的权限。详细信息请参考可授权的操作

参数说明

使用 Kafka 协议上传日志时,您需要配置以下参数。

参数

示例

说明

连接类型

SASL_SSL

为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务项目 ID,密码为火山引擎账号密钥。

username

c8f20efe-405f-4d57-98cf-8c58d890****

Kafka SASL 用户名。应配置为日志服务的日志项目 ID。

password

AKLTYmQzOWUzMWx*******#WVRnM05UWTRaVGhrTUdFNE5EazNZV0kyTjJRME********

Kafka SASL 用户密码。应配置为火山引擎账户密钥。
格式为 ${access-key-id}#${access-key-secret},其中:

  • ${access-key-id} 应替换为您的 AccessKey ID。
  • ${access-key-secret} 应替换为您的 AccessKey Secret。

说明

建议使用 IAM 用户的 AK,且 IAM 用户应具备 Action PutLogs 的权限。详细信息请参考可授权的操作

hosts

  • 公网:tls-cn-beijing.volces.com:9094
  • 私网:tls-cn-beijing.ivolces.com:9094

初始连接的集群地址,格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9094,其中:

  • 服务地址为当前地域下日志服务的服务入口。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址
  • 端口号固定为 9094。

说明

hosts 中的服务地址部分无需指定 https://

topic

20a50a35-304a-4c01-88d2-23349c30****

配置为日志服务的日志主题 ID。

为了获得更好的性能,建议添加以下 Kafka producer 参数。

参数

推荐值

说明

batch.size

  • 设置了压缩:262144
  • 未设置压缩:2621440

用于控制生产者发送单个分区消息时的批处理大小。单位:字节。

说明

批次中的消息总大小达到 batch.size 或者生产者发送消息的等待时间达到 linger.ms,生产者都会立即发送当前批次消息。

max.request.size

  • 设置了压缩:1046576
  • 未设置压缩:10465760

用于指定单个请求(一个请求中可能有多个分区)的最大大小。单位:字节。

linger.ms

100

用于指定生产者在发送当前批次消息前的最长等待时间,单位:毫秒。如果不配置,表示立即发送。

compression.type

lz4

用于指定消息在发送到 Kafka 之前的压缩算法。

配置步骤

日志服务支持自动生成 Kafka 协议上传日志数据的相关参数配置。根据以下操作步骤填写上传日志的各项基础配置和数据源信息后,控制台页面将自动生成对应的参数配置列表供您参考。

  1. 登录日志服务控制台

  2. 在顶部导航栏中,选择日志服务所在的地域。

  3. 在左侧导航栏中,选择常用功能 > 日志接入

  4. 数据导入页签中,单击创建 Kafka 协议写入配置

  5. 选择用于存储 Kafka 数据的日志项目和日志主题。

  6. 填写数据源配置信息。

    配置

    说明

    密钥

    火山引擎账户密钥,包括 AccessKey ID 和 AccessKey Secret。您可以参考页面提示获取密钥。

    说明

    • 建议使用 IAM 用户的 AK,且 IAM 用户应具备 Action PutLogs 的权限。详细信息请参考可授权的操作
    • 日志服务不会保存上述配置,将仅用于为您生成 Kafka 生产端输出配置。

    网络连接方式

    选择通过内网公网连接日志服务并上传日志。
    若选择内网,请确认数据源可通过火山引擎内网正常访问日志服务。

    日志压缩方式

    原始日志的压缩方式。目前支持的压缩方式包括无压缩、gzip、snappy 和 lz4。

    Kafka生产端

    日志数据源类型。目前支持 Kafka 开源 SDK 或 Logstash 通过 Kafka 协议上传日志。

    结果预览

    日志服务根据以上参数配置自动生成对应的示例供您参考。

    • Kafka 开源 SDK:直接复制各个配置项的取值,并将其填写在 Kafka 开源 SDK 的对应参数中。完整的代码示例请参考示例代码
    • Logstash:日志服务自动生成 Logstash 的 Kafka 插件配置,测试插件连通性。详细说明请参考通过 Logstash 上传日志

    结果预览示例如下:

    • Kafka 开源 SDK
      Image
    • Logstash
      Image
  7. 单击确定,系统弹出复制参数配置页面。
    系统不会直接创建写入任务,且不会出现在任务列表中。
    要创建写入任务,请在 producer 中贴入相关配置,并检查目标日志主题的索引配置。

    说明

    设置索引后,采集到服务端的日志数据才能被检索分析。设置索引的详细说明请参考配置索引

示例

通过 Logstash 上传日志

Logstash 内置 Kafka 输出插件(logstash_output_kafka),可以通过 Kafka 协议采集数据。在通过 ELK 搭建日志采集分析系统的场景下,只需修改 Logstash 配置文件,实现通过 Kafka 协议上传日志数据到日志服务。
通过日志服务控制台自动生成 Logstash 的 Kafka 插件配置示例后,可以通过该示例测试插件连通性。其中,${hosts} 等参数说明请参考配置步骤中的结果预览

说明

  • 日志服务使用 SASL_SSL 连接协议,因此 Kakfa 需要通过 SASL_SSL 接入点实现消息的收发机制。
  • 支持的 Logstash 版本为 7.12~8.8.1。如果您需要使用其他 Logstash 版本,可以通过工单系统联系技术支持沟通业务需求。
  • 建议在测试阶段通过以下配置测试插件连通性,生产环境中需要删除其中 stdout 相关的输出配置。
  • 对于 JSON 格式的日志,建议在配置中将日志输出格式设置为 JSON(codec => json),否则会导致上传的数据不完整。日志服务会自动解析并结构化 JSON 格式日志数据。

通过 Kafka Java SDK 上传日志

通过简单的参数配置,即可使用各类 Kafka Producer SDK 采集日志数据,并通过 Kafka 协议上传到日志服务。通过 Kafka Java SDK 上传日志的相关依赖及示例代码如下:

  1. 添加依赖。
    在 pom 文件中添加 kafka-clients 的相关依赖。

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.2</version>
    </dependency>
    
  2. 上传日志。
    参考以下示例代码通过 Kafka Java SDK 上传日志。

    说明

    执行以下示例代码之前请参考配置步骤中的结果预览正确填写 userName 等参数配置。

    package bytedance;
    
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.config.SaslConfigs;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    //    pom.xml依赖
    //    <dependency>
    //      <groupId>org.apache.kafka</groupId>
    //      <artifactId>kafka-clients</artifactId>
    //      <version>2.2.2</version> 
    //    </dependency>
    
    public class KafkaProducer {
        public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
            Properties props = new Properties();
    
            // TLS 连接以及鉴权信息配置。
            String userName = "${projectId}";
            // 火山引擎账号的密钥,或具备对应权限的 IAM 账号密钥。不支持STS临时安全令牌。
            String passWord = "${AK}#${SK}";
            props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "${hosts}");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.put(SaslConfigs.SASL_JAAS_CONFIG,
                    "org.apache.kafka.common.security.plain.PlainLoginModule " +
                            "required username=\"" + userName + "\" password=\"" + passWord + "\";");
    
            // producer 参数配置。
            props.put(ProducerConfig.ACKS_CONFIG, "1"); // NoResponse 0;WaitForLocal 1;WaitForAll -1
            props.put(CommonClientConfigs.RETRIES_CONFIG, 5);
            props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 配置压缩方式。
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, "262144");
            props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1046576");
            // 1.创建一个生产者对象。
            Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
            // 2.调用 send 方法。
            Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("${topicId}", "testMessageValue"));
            RecordMetadata recordMetadata = meta.get(100, TimeUnit.SECONDS);
            // 默认返回的 offset 为 -1。
            System.out.println("partition=" + recordMetadata.partition() + "offset = " + recordMetadata.offset());
            // 3.关闭生产者。
            System.out.println("Kafka Producer 是异步发送数据,建议等待 10s 后再关闭进程,以确保所有的数据都成功发送,否则会导致上传的数据不完整。");
            Thread.sleep(10000);
            // 在生产环境中,producer 应该常驻在进程中。只有在进程退出时,才调用 close 方法关闭 producer。
            producer.close();
        }
    }
    

通过 Kafka Go SDK 上传日志

通过简单的参数配置,即可使用各类 Kafka Producer SDK 采集日志数据,并通过 Kafka 协议上传到日志服务。通过 Kafka Go SDK 上传日志的相关依赖和示例代码如下:

  1. 执行以下命令安装 Sarama。

    go get github.com/Shopify/sarama@v1.38.1
    
  2. 导入 Sarama 等必要的依赖包,并上传日志。
    参考以下示例代码通过 Kafka Go SDK 上传日志。

    说明

    执行以下示例代码之前请参考配置步骤中的结果预览正确填写 User 等参数配置。

    package kafka_produce_example
    
    import (
        "fmt"
        "time"
    
        "github.com/Shopify/sarama"
    )
    
    func ExampleProduce() {
            config := sarama.NewConfig()
            config.Version = sarama.V2_0_0_0 // 指定合适的版本。
            config.ApiVersionsRequest = true
            config.Net.SASL.Mechanism = "PLAIN"
            config.Producer.Return.Successes = true
            config.Net.SASL.Version = int16(0)
            config.Net.SASL.Enable = true
            // 日志项目 ID。
            config.Net.SASL.User = "${projectId}"
            // 火山引擎账号的密钥,或具备对应权限的 IAM 账号密钥。不支持 STS 临时安全令牌。
            config.Net.SASL.Password = "${access-key-id}#${access-key-secret}"
            config.Producer.RequiredAcks = sarama.WaitForLocal
            config.Producer.Compression = sarama.CompressionLZ4
            config.Producer.Flush.Bytes = 262144
            config.Producer.Flush.Frequency = 100 * time.Millisecond
    
            config.Net.TLS.Enable = true
    
            // hosts 为生产的地址。具体说明,请参考本文中的参数说明。
            asyncProducer, err := sarama.NewAsyncProducer([]string{"${hosts}"}, config)
            if err != nil {
                    fmt.Println("new producer error:" + err.Error())
                    panic(err)
            }
    
            msg := &sarama.ProducerMessage{
                    // ${topicID} 为日志服务的日志主题 ID,例如 0fdaa6b6-3c9f-424c-8664-fc0d222c****。
                    Topic: "${topicID}",
                    Value: sarama.StringEncoder("test"),
            }
            asyncProducer.Input() <- msg
            go func() {
                    for {
                            select {
                            case msg := <-asyncProducer.Successes():
                                    if msg != nil {
                                            // offset 默认返回为 -1。
                                            fmt.Printf("send response; partition:%d, offset:%d\n", msg.Partition, msg.Offset)
                                    }
                            case msg := <-asyncProducer.Errors():
                                    if msg != nil {
                                            fmt.Println("producer send error:" + msg.Error())
                                    }
                            }
                    }
            }()
    
            time.Sleep(time.Minute)
            _ = asyncProducer.Close()
    }
    

错误信息

使用 Kafka 协议上传日志失败时,会按照 Kafka 的错误码返回对应的错误信息,请参考 Kafka error list获取更多信息。
除此之外,日志服务还在 Java 语言的 Kafka 错误码 SASLAuthenticationException 中封装了鉴权、配置相关参数的错误信息,详细说明如下:

错误信息

说明

invalid SASL/PLAIN request: expected 3 tokens

未配置 user 或者 password。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: empty projectId

未配置 user 字段。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: Password format wrong

password 字段配置错误。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: empty AccessKey

未配置 AK。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: empty SecretKey

未配置 SK。请参考配置步骤中的结果预览正确填写配置。

invalid SASL/PLAIN request: Invalid projectId

指定的 Project ID 不存在。请检查 Project ID 是否输入正确。

Access Denied. You are not authorized to perform this operation.

当前用户不具备操作权限。建议检查用户权限。

最近更新时间:2025.08.19 10:43:36
这个页面对您有帮助吗?
有用
有用
无用
无用