You need to enable JavaScript to run this app.
导航

快速开始

最近更新时间2024.01.03 15:59:10

首次发布时间2022.09.23 16:22:42

本文介绍如何快速使用日志服务 Python SDK 实现基础的日志采集流程,包括创建日志项目、创建日志主题、写入日志和查询日志等操作。

前提条件

注意事项

日志服务 Python SDK 在调用 PutLogs 接口时默认使用 lz4 压缩,但出于跨平台兼容性考虑,lz4a 库未包含在日志服务 Python SDK 的安装脚本中。如果您需要在上传日志过程中使用 lz4 压缩,则需要在环境中手动安装 lz4a 库。

pip install lz4a==0.7.0

如果您使用的是 Windows 系统或 Python 3.10 及后续版本,则 SDK 无法兼容 lz4 压缩,请您跳过 lz4a 库安装并在 PutLogsV2Request 中指定 compression=zlib
PutLogsV2Request 的使用示例如下:

  • 使用 lz4 压缩

    tls_service.put_logs_v2(PutLogsV2Request(topic_id, logs))
    
  • 使用 zlib 压缩

    tls_service.put_logs_v2(PutLogsV2Request(topic_id, logs, compression="zlib"))
    

初始化客户端

初始化 Client 实例之后,才可以向 TLS 服务发送请求。初始化时推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。
初始化代码如下:

from volcengine.tls.TLSService import TLSService

# 注意,环境变量中的endpoint不包含协议头(https://或http://),例如 tls-cn-beijing.ivolces.com。
endpoint = os.environ["VOLCENGINE_ENDPOINT"]
region = os.environ["VOLCENGINE_REGION"]
access_key_id = os.environ["VOLCENGINE_ACCESS_KEY_ID"]
access_key_secret = os.environ["VOLCENGINE_ACCESS_KEY_SECRET"]

tls_service = TLSService(endpoint, access_key_id, access_key_secret, region)

示例代码

本示例中,创建一个 example_tls.py 文件,并调用接口分别完成创建项目、创建主题、创建索引、写入日志数据、消费日志和查询日志数据。
代码示例如下:

# coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import time

from volcengine.tls.TLSService import TLSService
from volcengine.tls.tls_requests import *


if __name__ == "__main__":
    # 初始化客户端,推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455 
    # 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空
    endpoint = os.environ["VOLCENGINE_ENDPOINT"]
    region = os.environ["VOLCENGINE_REGION"]
    access_key_id = os.environ["VOLCENGINE_ACCESS_KEY_ID"]
    access_key_secret = os.environ["VOLCENGINE_ACCESS_KEY_SECRET"]

    # 实例化TLS客户端
    tls_service = TLSService(endpoint, access_key_id, access_key_secret, region)

    # 创建日志项目
    create_project_request = CreateProjectRequest(project_name="project-name", region=region,
                                                  description="project-description")
    create_project_response = tls_service.create_project(create_project_request)
    project_id = create_project_response.project_id

    # 创建日志主题
    create_topic_request = CreateTopicRequest(topic_name="topic-name", project_id=project_id,
                                              ttl=3650, description="topic-description", shard_count=2)
    create_topic_response = tls_service.create_topic(create_topic_request)
    topic_id = create_topic_response.topic_id

    # 创建索引
    full_text = FullTextInfo(case_sensitive=False, delimiter=",-;", include_chinese=False)
    value_info_a = ValueInfo(value_type="text", delimiter="", case_sensitive=True,
                             include_chinese=False, sql_flag=False)
    value_info_b = ValueInfo(value_type="long", delimiter="", case_sensitive=False,
                             include_chinese=False, sql_flag=True)
    key_value_info_a = KeyValueInfo(key="key1", value=value_info_a)
    key_value_info_b = KeyValueInfo(key="key2", value=value_info_b)
    key_value = [key_value_info_a, key_value_info_b]
    create_index_request = CreateIndexRequest(topic_id, full_text, key_value)
    create_index_response = tls_service.create_index(create_index_request)

    # 写入日志数据
    logs = PutLogsV2Logs(source="192.168.1.1", filename="sys.log")
    for i in range(100):
        logs.add_log(contents={"key1": "value1-" + str(i + 1), "key2": "value2-" + str(i + 1)},
                     log_time=int(round(time.time())))
    tls_service.put_logs_v2(PutLogsV2Request(topic_id, logs))
    time.sleep(30)

    # 查询消费游标
    describe_cursor_request = DescribeCursorRequest(topic_id, shard_id=0, from_time="begin")
    describe_cursor_response = tls_service.describe_cursor(describe_cursor_request)

    # 消费日志数据
    consume_logs_request = ConsumeLogsRequest(topic_id, shard_id=0, cursor=describe_cursor_response.cursor)
    consume_logs_response = tls_service.consume_logs(consume_logs_request)

    # 当您需要检索和分析日志时,推荐您使用Python SDK提供的search_logs_v2方法,下面的代码提供了具体的调用示例
    # 查询日志数据(全文检索)
    search_logs_request = SearchLogsRequest(topic_id, query="error", limit=10,
                                            start_time=1346457600000, end_time=1630454400000)
    search_logs_response = tls_service.search_logs_v2(search_logs_request)

    # 查询日志数据(键值检索)
    search_logs_request = SearchLogsRequest(topic_id, query="key1:error", limit=10,
                                            start_time=1346457600000, end_time=1630454400000)
    search_logs_response = tls_service.search_logs_v2(search_logs_request)

    # 查询日志数据(SQL分析)
    search_logs_request = SearchLogsRequest(topic_id, query="* | select key1, key2", limit=10,
                                            start_time=1346457600000, end_time=1630454400000)
    search_logs_response = tls_service.search_logs_v2(search_logs_request)

    # 查询日志数据(SQL分析)
    search_logs_request = SearchLogsRequest(topic_id, query="* | select key1, key2", limit=10,
                                            start_time=1346457600000, end_time=1630454400000)
    search_logs_response = tls_service.search_logs(search_logs_request)