You need to enable JavaScript to run this app.
导航

通过消费组消费数据

最近更新时间2023.11.13 16:31:40

首次发布时间2023.09.27 12:24:24

日志服务支持通过消费组消费数据,并提供一系列 SDK 供您管理消费组。本文档介绍通过日志服务消费组消费数据的相关操作。

背景信息

日志服务提供消费日志的 OpenAPI 接口 ConsumeLogs,支持实时消费采集到服务端的日志数据。在使用 ConsumeLogs 接口时,需要按照日志分区维度消费日志数据,消费时自行指定日志主题 ID、Shard ID 和起始结束游标(Cursor),所以消费日志的进度受限于单个 Shard 的读写能力,还需要自行维护消费进度,在 Shard 自动分裂的场景下消费逻辑与流程繁琐。
日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据,通过消费组消费时,日志服务会自动均衡各个消费者的消费能力与进度,自动分配 Shard,您无需关注消费组的内部调度细节及消费者之间的负载均衡、故障转移等,只需要专注于业务逻辑。

基本概念

概念

说明

消费组(ConsumerGroup)

多个消费者组成的虚拟集合。以消费组维度消费日志数据时,消费组中的所有消费者订阅同一个日志主题,共同消费一个日志主题中的数据。每个消费者消费日志主题中一个或多个 Shard 的数据,各个消费者间不会重复消费数据。

消费者(Consumer)

一个从日志服务中消费数据的客户端,是消费组的组成部分。

  • 同一个消费组中的消费者名称唯一。
  • 同一时刻,日志主题的一个分区将会分配给消费组中的某一个消费者,一个消费者可能负责多个 Shard。在消费组的运行过程中,消费者和日志分区的分配关系可能发生变更。

消费位点(Checkpoint)

一个 Shard 在被一个消费者消费的过程中,会随时记录当前 Shard 的消费位点(即游标进度)并上报服务端,以此来作为程序重启时的起始消费游标,从而保证数据不会被重复消费。

限制说明

  • 每个日志项目可配置 150 个消费组。
  • 一个消费组可以消费同一个日志项目中的多个日志主题,但不支持跨日志项目进行消费。

前提条件

  • 已创建并获取火山引擎密钥 AccessKey。
  • 火山引擎账号的访问密钥 AccessKey 拥有所有 API 的全部权限。建议您通过 IAM 用户进行 API 相关操作和日常运维。使用 IAM 用户前,主账号应为 IAM 用户授予消费组相关的权限。授权示例请参考访问策略模板
  • 已安装日志服务 SDK。目前支持消费组的日志服务 SDK 包括 Go SDKJava SDKPython SDK

通过消费组消费数据

日志服务提供了 Consumer 异步日志消费库,支持消费同一个日志项目下多个日志主题,具有异步消费、高性能、失败重试、优雅关闭等特性。您可以通过日志服务 Go SDKJava SDKPython SDK 管理消费组,并消费日志数据。

示例代码

以下代码以 Go SDK 为例,演示通过 SDK 创建消费组和消费者,并消费日志的整体流程。

package tls

import (
    "context"
    "fmt"
    "os"
    "time"

    "github.com/pkg/errors"
    log_consumer "github.com/volcengine/volc-sdk-golang/service/tls/consumer"
    "github.com/volcengine/volc-sdk-golang/service/tls/pb"
)

func launchConsumer() error {
    // 获取消费组的默认配置
    consumerCfg := log_consumer.GetDefaultConsumerConfig()
    // 请配置您的Endpoint、Region、AccessKeyID、AccessKeySecret等基本信息
    consumerCfg.Endpoint = os.Getenv("LOG_SERVICE_ENDPOINT")
    consumerCfg.Region = os.Getenv("LOG_SERVICE_REGION")
    consumerCfg.AccessKeyID = os.Getenv("LOG_SERVICE_AK")
    consumerCfg.AccessKeySecret = os.Getenv("LOG_SERVICE_SK")
    // 请配置您的日志项目ID和日志主题ID列表
    consumerCfg.ProjectID = "<YOUR-PROJECT-ID>"
    consumerCfg.TopicIDList = []string{"<YOUR-TOPIC-ID>"}
    // 请配置您的消费组名称(若您未创建过消费组,SDK将默认为您创建指定名称的消费组)
    consumerCfg.ConsumerGroupName = "<CONSUMER-GROUP-NAME>"
    // 请配置消费者名称(同一个消费组的不同消费者需要保证不同名)
    consumerCfg.ConsumerName = "<CONSUMER_NAME>"

    // 定义日志消费函数,您可根据业务需要,自行实现处理LogGroupList的日志消费函数
    // 下面展示了逐个打印消费到的每条日志的每个键值对的代码实现示例
    var handleLogs = func(topicID string, shardID int, l *pb.LogGroupList) {
       fmt.Printf("received new logs from topic: %s, shard: %d\n", topicID, shardID)
       for _, logGroup := range l.LogGroups {
          for _, log := range logGroup.Logs {
             for _, content := range log.Contents {
                fmt.Printf("%s: %s\n", content.Key, content.Value)
             }
          }
       }
    }
    
    // 创建消费者
    consumer, err := log_consumer.NewConsumer(context.TODO(), consumerCfg, handleLogs)
    if err != nil {
       return errors.Wrap(err, "get new consumer failed: ")
    }

    // 启动消费者消费
    if err := consumer.Start(); err != nil {
       return errors.Wrap(err, "start consumer failed: ")
    }

    // 等待消费
    <-time.After(time.Second * 60)

    // 停止消费
    consumer.Stop()

    return nil
}

func main() {
    if err := launchConsumer(); err != nil {
       fmt.Println(err.Error())
    }
}

配置说明

日志服务 SDK 支持通过参数配置消费组消费的各种细节配置,例如是否开启顺序消费、Consumer 心跳上报时间间隔等,您可以通过这些配置管理消费组的各种消费逻辑。
例如,在上述 Go SDK 示例代码中,log_consumer.GetDefaultConsumerConfig() 函数返回了消费组的默认配置 consumerCfg,并向您展示了如何配置您的 Endpoint、Region、AccessKeyID、AccessKeySecret 等基本信息、日志项目 ID 和日志主题 ID 列表、消费组名称和消费者名称。除此之外,您还可通过 consumerCfg 的其他字段进行其他自定义配置。consumerCfg 支持的字段如下所示。

参数

类型

示例值

描述

MaxFetchLogGroupCount

Integer

100

消费者单次消费日志时,最大获取 LogGroup 数量,默认为 100,最大为 1000。

HeartbeatIntervalInSecond

Integer

20

Consumer 心跳上报时间间隔,单位为秒。

DataFetchIntervalInMillisecond

Integer

200

Consumer 消费日志时间间隔,单位为毫秒。

FlushCheckpointIntervalSecond

Integer

5

Consumer 上传消费进度的时间间隔,单位为秒。

ConsumeFrom

String

begin

开始消费时的默认消费位点,与 DescribeCursor 的 From 参数一致。仅在该消费者从未上传过消费位点时有效。

OrderedConsume

Boolean

FALSE

是否开启顺序消费。开启顺序消费后,消费者会根据 Shard 分裂的父子关系进行消费。
例如 Shard0 分裂为 Shard1 与 Shard2,而 Shard1 又分裂为 Shard3 与 Shard4。在开启顺序消费之后,会根据 (Shard0) -> (Shard1, Shard2) -> (Shard2, Shard3, Shard4) 的顺序进行消费。

LoggerConfig

LoggerConfig

/

日志相关配置,详细信息请参考下表。

LoggerConfig 配置说明:

参数

类型

示例值

描述

LogLevel

String

info

设置日志输出级别,默认值为 info。consumer 中一共有 4 种日志输出级别,分别为 debug、info、warn 和 error。

LogFileName

String

50

日志文件输出路径。若未设置,则默认输出到 stdout。

IsJsonType

Boolean

true

是否格式化文件输出格式,默认为 false。

LogMaxSize

Integer

10

单个日志存储数量,默认为 10MiB。

LogCompress

Boolean

true

日志是否开启压缩,默认为 false。

异常处理

日志服务 SDK 消费组实现了请求失败自动重试、消费进度消费位点自动上报等机制。因此,您仅需要关注于如何处理每次消费得到的 LogGroupList 的业务逻辑实现即可。

查看或重置消费进度

在使用消费者消费日志数据的过程中,您可以随时查看当前的消费进度。在离线数据处理等场景下,如果需要消费过去某个时段的日志数据,或从指定位置消费数据,可以通过重置消费位点的方式为消费组指定新的 checkpoint,表示从指定位置开始消费数据。
日志服务控制台支持查看消费进度,重置消费组在订阅的 Topic 上的所有消费位点,您也可以通过 API 重置指定分区的消费位点。

说明

重置消费位点之前,应关闭对应的消费进程,如果重置消费组的所有消费位点,则关闭所有消费进程,否则消费位点重置失败。

在控制台相关的操作步骤如下:

  1. 登录日志服务控制台
  2. 在左侧导航栏中选择日志服务 > 日志项目管理
  3. 单击指定日志项目名称。
  4. 在左侧导航栏中选择日志消费
    • 查看消费位点。
      在左侧消费组列表中单击消费组名称,查看消费组各个消费者在各个 Shard 上的最近消费时间。
      图片
    • 重置消费位点。
      1. 在左侧消费组列表中单击消费组名称。

      2. 在页面右上角单击重置消费位点

      3. 选择重置位置

        重置位置

        说明

        最早位置

        从日志主题上最早的一条数据开始消费。

        最新位置

        跳过所有历史数据,直接从日志主题上最近写入的一条数据开始消费。

        指定时间点

        从过去某个指定时间点开始消费,该时间点以日志主题的数据保留时长为准。指定时间点重置消费点时,不支持指定为超出数据保留时长的历史时刻或未来时刻。

      4. 单击确定

相关 API

API

说明

CreateConsumerGroup

调用 CreateConsumerGroup 接口创建日志服务消费组(ConsumerGroup)。

DeleteConsumerGroup

调用 DeleteConsumerGroup 接口删除日志服务消费组。

DescribeConsumerGroups

调用 DescribeConsumerGroups 接口获取一个日志项目下的所有消费组信息。

ModifyConsumerGroup

调用 ModifyConsumerGroup 接口修改日志服务消费组的配置。

ConsumerHeartbeat

调用 ConsumerHeartbeat 接口向日志服务发送消费组中的一个消费者的心跳信息。

DescribeCheckPoint

调用 DescribeCheckPoint 接口查看指定消费组在指定日志分区上的当前消费位点。

ModifyCheckPoint

调用 ModifyCheckPoint 接口为指定消费组重置指定分区的消费位点。

ResetCheckPoint

调用 ResetCheckPoint 接口为指定消费组重置全部消费位点。