You need to enable JavaScript to run this app.
导航
Fluent Bit
最近更新时间:2025.10.30 16:33:23首次发布时间:2025.10.30 16:33:23
复制全文
我的收藏
有用
有用
无用
无用

Fluent Bit 是一个开源的日志处理器和转发器,用于从各种数据源收集日志和指标,支持自定义输出插件将数据写入存储系统。本文将介绍如何通过 Fluent Bit 将数据写入 ByteHouse,适用于需要将系统日志、应用日志汇总到 ByteHouse 进行分析的场景。

概述

本文主要介绍如何通过 Fluent Bit 将日志数据写入 ByteHouse,演示了基本的操作流程。您可以基于该演示,根据您的实际需求写入数据以及调优参数。
数据写入流程与本示例的数据和相关文件脚本的说明如下。

数据写入流程

本示例中涉及的文件/脚本说明

注意事项

1

准备日志数据写入 ByteHouse 的目标库表

不涉及,本示例创建的库表为demo_db.demo_table,您在实际操作时根据情况创建目标库表即可。

建议将各脚本、配置文件放置在同一目录下,方便后续操作。

2

准备 Fluent Bit 环境

不涉及,为了减少环境因素影响并简化安装过程,本文将使用 Docker 来进行演示。

3

准备 Fluent Bit 中待写入的日志数据

本示例准备了一个日志文件:demo.log 文件,并使用 Python 脚本 gen.py 构造日志数据写入日志文件 demo.log 中,模拟业务日志数据写入的过程。

4

解析 Fluent Bit 中的日志数据并写入 ByteHouse

  • 本示例准备了一个日志数据解析配置文件:parsers.conf,用于将日志文件中 JSON 格式的数据解析为结构化数据,便于后续写入 ByteHouse 库表中。
  • 本示例通过配置文件 fluent-bit.conf,用于配置 Fluent Bit 的输入、输出等相关配置,通过执行命令调用此配置文件,实现将日志数据写入 ByteHouse 库表中。

5

查看写入 ByteHouse 中的结果

不涉及,执行数据写入 ByteHouse 命令后,可在 ByteHouse 中查询并验证写入的结果数据。

准备工作
  • 为了减少环境因素影响并简化安装过程,本文将使用 Docker 来进行演示,因此您需要先下载并安装 Docker,详情请参见官方文档
  • 本文使用 Python 3 执行脚本文件,建议您提前安装 Python 3 环境。

操作步骤

步骤一:创建 ByteHouse 库表

使用以下 SQL 在 ByteHouse 中创建测试数据库、表。假设我们要将日志输入写入 demo_db.demo_table 表,该表中的三个字段分别对应步骤二中创建的日志文件中 JSON 数据行中的三个字段。

# 创建数据库
CREATE DATABASE IF NOT EXISTS demo_db

# 创建表
CREATE TABLE demo_db.demo_table (
    `id` Int32 NULL,
    `level` String NULL,
    `content` String NULL
)
ENGINE = CnchMergeTree
ORDER BY id
UNIQUE KEY id; 

步骤二:下载 Fluent Bit 镜像

docker pull cr.fluentbit.io/fluent/fluent-bit

步骤三:准备日志文件并构造日志数据

  1. 准备了一个演示用日志文件 demo.log,该日志文件中的内容大致如下:

    ..
    {"id": 995, "level": "info", "content": "hYb7yQT3qrIiMo7cd"}
    {"id": 996, "level": "debug", "content": "Ln"}
    ...
    
  2. 准备日志内容追加 Python 脚本 gen.py,用于向该日志文件追加写入内容。

    1. 创建 Python 脚本 gen.py

      import json
      import time
      import random
      import string
      import sys
      # 文件名
      filename = 'demo.log'
      def generate_random_content(max_length=20):
          """生成随机内容,长度小于 max_length"""
          length = random.randint(1, max_length)
          return ''.join(random.choices(string.ascii_letters + string.digits + ' ', k=length))
      # 初始 id
      current_id = 1
      max_id = 1000
      try:
          with open(filename, 'a', buffering=1) as file:
              while True:
                  # 随机选择 level
                  level = random.choice(['info', 'debug'])
                  # 生成随机 content
                  content = generate_random_content()
                  # 定义要写入的 JSON 数据
                  log_entry = {
                      "id": current_id,
                      "level": level,
                      "content": content
                  }
                  # 将字典转换为 JSON 字符串
                  json_line = json.dumps(log_entry)
      
                  # 写入文件并换行
                  file.write(json_line + '\n')
      
                  # 输出日志到控制台(可选)
                  print(f'Written to file: {json_line}')
      
                  # 增加 id
                  current_id += 1
                  if current_id >= max_id:
                    sys.exit(0)
      
                  # 等待 5 秒
                  time.sleep(5)
      except KeyboardInterrupt:
          print("Logging stopped.")
      

      参数说明

      参数

      配置说明

      filename

      自定义生成的日志文件名称。

      generate_random_content(max_length=value)

      设置 max_length 的值,用于限制随机生成的日志内容的最大长度。

      current_id

      日志条目起始 ID,推荐设置为 1。

      max_id

      最大日志 ID 限制,当日志 ID 达到此值时,脚本会自动退出。

      level

      推荐使用示例值。random.choice 表示随机为每条日志分配日志级别,可选值 info、debug,用于区分不同重要程度或用途的日志信息。

      • info:信息级别,用于记录系统正常运行过程中的关键信息,比如 "服务启动成功"、"数据处理完成" 等。
      • debug:调试级别,主要用于开发和调试阶段,记录更详细的过程信息。

      content

      推荐使用示例值。该参数表示日志的具体内容,通过 generate_random_content() 函数生成,是由字母、数字和空格组成的随机字符串。

      log_entry

      根据您的日志内容定义需写入的 JSON 数据。

      json_line

      推荐使用示例值。该参数表示 JSON 格式的日志字符串,通过 json.dumps(log_entry) 将字典对象转换为 JSON 字符串,将日志数据转换为通用的 JSON 格式,便于日志的存储和解析。

      file.write

      推荐使用示例值。该参数定义了向文件写入内容的操作。json_line + '\n' 表示在每条 JSON 日志后添加换行符,保证每条日志单独占一行。

      print()

      可选配置,推荐使用示例值。f'Written to file: {json_line}' 是格式化字符串,用于输出日志到控制台。

      time.sleep(n)

      日志生成的时间间隔,单位为秒。推荐值: 5 秒。

    2. 执行以下命令启动脚本,启动后,脚本将每隔 5s 向 demo.log 文件中写入一行 JSON 行。
      需确保该脚本一直处于运行状态,确保其持续生成日志内容。

      python3 gen.py
      

步骤四:解析日志数据并写入 ByteHouse

注意

开始操作前,请再次确认 fluent-bit.confparsers.confdemo.log 三个文件放置在同一目录下,并进入该目录执行命令。

  1. 准备自定义 parser 配置文件 parsers.conf,用于解析日志文件中的字段,将日志文件中的 JSON 字段解析为结构化数据。

    [PARSER]
      Name json_log
      Format json
    
  2. 准备 Fluent Bit 配置文件 fluent-bit.conf,配置 Fluent Bit 的输入和输出。
    使用时,请使用实际的参数值替换命令中的占位符,配置说明请参见下文参数说明表格。

    [SERVICE]
        Flush 20
        Log_Level debug
        Parsers_File /fluent-bit/etc/parsers.conf
    [INPUT]
        Name         tail
        Path         /fluent-bit/demo.log
        Refresh_Interval 60
        Tag          my_logs
        DB           /var/log/flb_kv.db
        Mem_Buf_Limit 5MB
        Skip_Long_Lines Off
        Parser json_log
    
    [OUTPUT]
        Name http
        tls on
        Match *
        Host <bytehouse-host>
        Port 8123
        URI /?user=bytehouse&password=<apitoken>&query=INSERT+INTO+<db.table>+FORMAT+JSONEachRow&input_format_skip_unknown_fields=1
        Format json_stream
        Retry_Limit 5
    

    参数说明

    参数

    配置说明

    SERVICE

    Flush

    设置数据刷新间隔(单位:秒),Fluent Bit 会按照此间隔将缓存的数据发送到输出目标。

    Log_Level

    配置 Fluent Bit 的日志级别,可选值包括 debug、info、warn、error 等。推荐使用示例中设置的 debug,会输出详细调试信息,便于问题排查。

    Parsers_File

    这里指定了一个 parser 配置文件,实际将要使用的就是上述步骤中准备的 parsers.conf,用于自定义解析日志文件,将日志文件中的 JSON 字段解析为结构化数据。
    参数值配置示例:/fluent-bit/etc/parsers.conf

    INPUT

    Name

    指定输入插件类型,推荐使用示例中设置的 tail 插件,用于监控文件并读取新增内容。

    Path

    指定要监控的日志文件路径。参数值配置示例:/fluent-bit/demo.log

    Refresh_Interval

    设置检查文件变化的时间间隔(单位:秒)。示例中设置为 60 秒,即每隔 60 秒检查一次文件是否有新增或变化。

    Tag

    为收集的日志添加标签,用于后续输出时的过滤或匹配。示例中设置为 my_logs

    DB

    指定存储文件位置和偏移量的数据库文件路径,用于记录已读取的位置,避免重复读取日志。参数值配置示例:/var/log/flb_kv.db

    Mem_Buf_Limit

    设置内存缓冲区的上限。示例中设置为 5MB,当缓存数据超过此限制时会触发处理机制。

    Skip_Long_Lines

    配置是否跳过过长的日志行。设置为 Off 时不跳过;设置为 On 时会跳过超过缓冲区大小的行。推荐设置为 Off

    Parser

    指定用于解析日志内容的解析器名称,需与 parsers.conf 中定义的 [PARSER] 名称一致。示例中使用 json_log 解析器解析 JSON 格式日志。

    OUTPUT

    Name

    指定输出插件类型,示例中使用 http 插件,用于通过 HTTP 协议将数据发送到目标服务。

    tls

    配置是否启用 TLS 加密传输。设置为 on 时启用加密,增强数据传输安全性。

    Match

    设置日志匹配规则,用于筛选需要发送的日志。示例中设置为 *,表示匹配所有标签的日志。

    Host

    指定输出目标的主机地址,此处为 ByteHouse 的访问域名。获取方式如下:
    您可登录 ByteHouse 控制台,通过租户管理 > 基本信息 > 网络信息路径查看并复制 ByteHouse 的公网/私网域名,详情请参见步骤二:配置网络信息

    Port

    指定输出目标的端口号,固定配置为 8123。

    URI

    包含认证信息和数据插入命令的 URL 参数。其中:

    • user:固定配置为 bytehouse。
    • <apitoken>:ByteHouse API Key。您可登录 ByteHouse 控制台,通过租户管理 > 基本信息中获取 API Key,详情请参见获取 API Key
    • query:指定数据插入的目标库表和数据格式,其中 ,
      • <db.table>:用于指定需写入的 ByteHouse 数据库、表名称,如 demo_db.demo_table
      • input_format_skip_unknown_fields=1:设置忽略日志中不存在于目标表的字段。

    Format

    设置输出数据的格式。示例中设置为 json_stream,表示以 JSON 流格式发送数据。

  3. 进入配置文件所在的工作路径,执行如下命令,启动 Fluent Bit,访问日志文件,将日志数据解析后写入ByteHouse库表中。

    注意

    执行命令时,需要保证此时 Python 脚本gen.py 也在运行,它将持续生成日志内容。

    docker run -ti -v ./fluent-bit.conf:/fluent-bit/etc/fluent-bit.conf \
      -v ./parsers.conf:/fluent-bit/etc/parsers.conf \
      -v ./demo.log:/fluent-bit/demo.log \
      cr.fluentbit.io/fluent/fluent-bit
    

    参数说明

    参数项

    说明

    -v ./fluent-bit.conf:/fluent-bit/etc/fluent-bit.conf

    挂载 Fluent Bit 主配置文件:将当前目录下的 fluent-bit.conf 文件映射到容器内的 fluent-bit/etc/fluent-bit.conf 路径,使容器使用本地自定义配置而非默认配置。

    -v ./parsers.conf:/fluent-bit/etc/parsers.conf

    挂载日志解析器配置文件:将当前目录下的 parsers.conf 文件映射到容器内的 /fluent-bit/etc/parsers.conf 路径,与主配置中 Parsers_File 参数指定的路径对应,确保日志解析规则生效。

    -v ./demo.log:/fluent-bit/demo.log

    挂载日志文件:将当前目录下的 demo.log 文件映射到容器内的 /fluent-bit/demo.log 路径,与输入配置中 Path 参数指定的监控路径对应,使 Fluent Bit 能够读取到 Python 脚本 gen.py 生成的日志内容。

    cr.fluentbit.io/fluent/fluent-bit

    指定使用的 Docker 镜像:这是 Fluent Bit 官方提供的镜像,确保容器运行的是标准的 Fluent Bit 服务。

  4. 观察屏幕日志是否有报错,如果无报错,则表示成功写入增量的日志数据。

步骤五:查看写入结果

写入成功后,可在 ByteHouse 中执行以下命令,查看具体的数据。

SELECT count() FROM demo_db.demo_table;
SELECT * FROM demo_db.demo_table LIMIT 10;