You need to enable JavaScript to run this app.
导航

通过 ConsumeLogs 消费日志

最近更新时间2023.11.13 16:31:31

首次发布时间2022.08.27 17:07:48

日志服务提供消费日志的 API 接口 ConsumeLogs,支持实时消费采集到服务端的日志数据。本文档介绍如何通过日志服务 SDK 调用 ConsumeLogs 接口,实现日志全量数据的顺序读写。

背景信息

日志服务可作为日志数据的传输中转,提供类似 Kafka 的日志数据中转功能。通过日志服务的消费日志接口和多语言 SDK,您可以将 Go 等语言的应用作为消费者实时消费日志服务数据。关于消费日志的服务端接口,请查看 ConsumeLogs,关于日志服务提供的多语言 SDK,请参考 SDK 文档

说明

日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据,通过消费组消费时,日志服务会自动均衡各个消费者的消费能力与进度,自动分配 Shard。详细说明请参考 通过消费组消费数据

示例代码

参考以下示例代码,创建一个 ConsumeLogs.go 文件,调用接口 ConsumeLogs 读取日志数据,并实时消费日志数据。

import (
        "fmt"
        "time"
        . "github.com/volcengine/volc-sdk-golang/service/tls"
        "github.com/volcengine/volc-sdk-golang/service/tls/pb"
)
func consumeLogs(topicID string, shardID int) {
        client := NewClient(os.Getenv("LOG_TEST_ENDPOINT"), os.Getenv("LOG_TEST_ACCESS_KEY_ID"),
        //LOG_TEST_SECURITY_TOKEN为通过IAM的STS机制获取的临时安全令牌,使用临时Token时也应传入临时的AK、SK。详细说明请参考https://www.volcengine.com/docs/6470/160162。不使用临时Token时LOG_TEST_SECURITY_TOKEN传空即可。
                os.Getenv("LOG_TEST_ACCESS_KEY_SECRET"), os.Getenv("LOG_TEST_SECURITY_TOKEN"),
                os.Getenv("LOG_TEST_REGION"))
        // 获取最初的日志游标
        beginCursorResp, err := client.DescribeCursor(&DescribeCursorRequest{
                TopicID: topicID,
                ShardID: shardID,
                From:    "begin",
        })
        if err != nil {
                panic(err.Error())
        }
        // 获取最后的日志游标
        endCursorResp, err := client.DescribeCursor(&DescribeCursorRequest{
                TopicID: topicID,
                ShardID: shardID,
                From:    "end",
        })
        if err != nil {
                panic(err.Error())
        }
        // 获取某个时间点的日志游标
        timestampCursorResp, err := client.DescribeCursor(&DescribeCursorRequest{
                TopicID: topicID,
                ShardID: shardID,
                From:    "1661418000",
        })
        if err != nil {
                panic(err.Error())
        }
        var logGroupCount = 10
        // 消费十组日志
        consumedLogs, err := client.ConsumeLogs(&ConsumeLogsRequest{
                TopicID:       topicID,
                ShardID:       shardID,
                Cursor:        timestampCursorResp.Cursor,
                LogGroupCount: &logGroupCount,
        })
        if err != nil {
                panic(err.Error())
        }
        fmt.Println(consumedLogs)
        // 消费十组日志,且指定压缩格式为lz4(目前支持lz4)
        var compressionType = "lz4"
        consumedLogs, err = client.ConsumeLogs(&ConsumeLogsRequest{
                TopicID:       topicID,
                ShardID:       shardID,
                Cursor:        timestampCursorResp.Cursor,
                LogGroupCount: &logGroupCount,
                Compression:   &compressionType,
        })
        if err != nil {
                panic(err.Error())
        }
        // 指定起始、结尾消费日志
        consumedLogs, err = client.ConsumeLogs(&ConsumeLogsRequest{
                TopicID:       topicID,
                ShardID:       shardID,
                Cursor:        beginCursorResp.Cursor,
                EndCursor:     &endCursorResp.Cursor,
                LogGroupCount: &logGroupCount,
                Compression:   &compressionType,
        })
        if err != nil {
                panic(err.Error())
        }
}