You need to enable JavaScript to run this app.
导航

通过 Java SDK 消费日志数据

最近更新时间2023.11.27 15:29:23

首次发布时间2023.11.23 21:39:11

日志服务支持通过 SDK 消费采集到服务端的日志数据。本文档通过示例代码演示如何通过 Java SDK 消费日志。

前提条件

  • 已安装日志服务 Java SDK。更多信息,请参见安装 Java SDK
  • 已添加 VOLCENGINE_ACCESS_KEY_ID 等环境变量。环境变量的配置方式请参考配置身份认证信息

    注意

    推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。

消费日志

场景说明

本文档通过示例代码演示如何通过 SDK 消费日志数据。Java SDK 支持通过以下方式写入日志:

写入方式

说明

Consumer

推荐。
在实际生产环境中,为了提高数据消费效率,建议通过 Java Consumer 方式消费日志数据。Consumer 支持负载均衡地消费日志主题下所有分区的数据,具有异步消费、高性能、失败重试、优雅关闭等特性。示例代码请参考Consumer 消费日志数据,通过消费组消费日志的详细说明请参考通过消费组消费数据通过 Java SDK 消费组消费日志

ConsumeLogs

不推荐。
日志服务支持通过 ConsumeLogs 接口同步请求的方式上传日志。消费日志的进度受限于单个 Shard 的读写能力,还需要自行维护消费进度,在 Shard 自动分裂的场景下消费逻辑与流程繁琐。
如果您在调用 PutLogs 时选择了 HashKey 路由 Shard 模式,日志数据将有序写入到指定分区中。在这种场景下,您可调用 ConsumeLogs 接口针对性地消费某个分区的日志数据。示例代码请参考ConsumeLogs 同步接口消费日志

Consumer 消费日志数据

通过 Java Consumer 消费日志数据的示例代码如下。

package com.volcengine.example.tls.demo;

import java.util.ArrayList;
import java.util.List;

import com.volcengine.model.tls.consumer.ConsumerConfig;
import com.volcengine.model.tls.exception.LogException;
import com.volcengine.model.tls.pb.PutLogRequest;
import com.volcengine.service.tls.consumer.Consumer;
import com.volcengine.service.tls.consumer.ConsumerImpl;
import com.volcengine.service.tls.consumer.LogProcessor;


// 您需要定义一个实现LogProcessor接口的类
public class ConsumerDemo implements LogProcessor {
    public static void main(String[] args) throws LogException, InterruptedException {
        // 初始化客户端,推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455
        // 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空 
        ConsumerConfig config = new ConsumerConfig(System.getenv("VOLCENGINE_ENDPOINT"), System.getenv("VOLCENGINE_REGION"),
                System.getenv("VOLCENGINE_ACCESS_KEY_ID"), System.getenv("VOLCENGINE_ACCESS_KEY_SECRET"), System.getenv("VOLCENGINE_TOKEN"));
        // 请配置您的日志项目ID
        config.setProjectID("your-project-id");
        // 请配置您待消费的日志主题ID列表
        config.setTopicIDList(new ArrayList<String>(){{
            add("your-topic-id");
        }});
        // 请配置您的消费组名称
        config.setConsumerGroupName("java-consumer-group");
        // 请配置消费者名称
        config.setConsumerName("java-consumer");

        // 实例化ConsumerImpl,调用consumer.start()开始持续消费
        Consumer consumer = new ConsumerImpl(config, new ConsumerDemo());
        consumer.start();

        // 可通过调用consumer.stop()来结束消费组消费
        Thread.sleep(10000);
        consumer.stop();
    }

    /**
     * 您需要根据业务需要,自行实现这里的process方法,用于处理每次消费得到的LogGroupList
     * 下面给出了逐个打印消费到的日志的代码示例
     */
    @Override
    public void process(String topicID, int shardID, PutLogRequest.LogGroupList logGroupList) {
        System.out.println(topicID + " --- " + shardID);
        System.out.println(logGroupList.getLogGroupsCount());

        int count = 0;

        List<PutLogRequest.LogGroup> logGroups = logGroupList.getLogGroupsList();
        for (PutLogRequest.LogGroup logGroup: logGroups) {
            List<PutLogRequest.Log> logs = logGroup.getLogsList();
            for (PutLogRequest.Log log: logs) {
                count++;
                System.out.println("*** Count = " + count + " ***");
                List<PutLogRequest.LogContent> logContents = log.getContentsList();
                for (PutLogRequest.LogContent logContent: logContents) {
                    System.out.println(logContent.getKey() + ": " + logContent.getValue());
                }
                System.out.println();
            }
        }
    }
}

ConsumeLogs 同步接口消费日志

通过调用 ConsumeLogs 同步接口消费日志数据的示例代码如下。

package com.volcengine.example.tls.demo;

import com.volcengine.model.tls.*;
import com.volcengine.model.tls.exception.LogException;
import com.volcengine.model.tls.pb.PutLogRequest;
import com.volcengine.model.tls.request.*;
import com.volcengine.model.tls.response.*;
import com.volcengine.service.tls.TLSLogClient;

import java.util.ArrayList;
import java.util.List;


public class Demo {
    public static void main(String[] args) throws LogException {
        // 初始化SDK配置,请您根据账号和服务信息配置endpoint、region、acccessKeyID、accessKeySecret和token(token可为null)
        ClientConfig clientConfig = new ClientConfig(System.getenv("VOLCENGINE_ENDPOINT"), System.getenv("VOLCENGINE_REGION"),
            System.getenv("VOLCENGINE_ACCESS_KEY_ID"), System.getenv("VOLCENGINE_ACCESS_KEY_SECRET"), System.getenv("VOLCENGINE_TOKEN"));
        TLSLogClient client = ClientBuilder.newClient(clientConfig);

        // 请填写您希望消费日志的TopicID和shardID
        String topicID = "your-topic-id";
        int shardID = 0;

        // 获取日志消费的起始游标
        // DescribeCursor API的请求参数规范和限制请参阅https://www.volcengine.com/docs/6470/112193
        DescribeCursorRequest describeCursorRequest = new DescribeCursorRequest(topicID, shardID, "begin");
        DescribeCursorResponse describeCursorResponse = client.describeCursor(describeCursorRequest);
        String beginCursor = describeCursorResponse.getCursor();

        // 消费日志数据
        // 请根据您的需要,填写TopicId、ShardId、Cursor、LogGroupCount、Compression等参数,推荐您使用lz4压缩
        // 您可再次调用DescribeCursor接口获取日志消费的结束游标,作为ConsumeLogs接口的EndCursor参数值
        // ConsumeLogs API的请求参数规范和限制请参阅https://www.volcengine.com/docs/6470/112194
        ConsumeLogsRequest consumeLogsRequest = new ConsumeLogsRequest();
        consumeLogsRequest.setTopicId(topicID);
        consumeLogsRequest.setShardId(shardID);
        consumeLogsRequest.setCursor(beginCursor);
        consumeLogsRequest.setLogGroupCount(1000);
        consumeLogsRequest.setCompression("lz4");
        ConsumeLogsResponse consumeLogsResponse = client.consumeLogs(consumeLogsRequest);
        PutLogRequest.LogGroupList logGroupList = consumeLogsResponse.getLogGroupList();
    }
}

相关文档

  • 通过 SDK 发送调用 API 的请求以后,您会收到服务端的响应,如果响应中包含 200 以外的状态码,表示接口调用失败。您可以参考各个 API 的文档查看对应的错误码信息。
  • 关于 Java Consumer 的详细信息,请参考 Java Consumer
  • 关于 Java Consumer 上传日志的完整示例代码,请参考 TLS Java SDK Demo on GitHub