You need to enable JavaScript to run this app.
导航
快速开始
最近更新时间:2025.02.28 16:12:27首次发布时间:2022.09.23 16:22:42
复制全文
我的收藏
有用
有用
无用
无用

本文介绍如何快速使用日志服务 Python SDK 实现基础的日志采集流程,包括创建日志项目、创建日志主题、写入日志和查询日志等操作。

前提条件

注意事项

日志服务 Python SDK 在调用 PutLogs 接口时默认使用 lz4 压缩,但出于跨平台兼容性考虑,lz4a 库未包含在日志服务 Python SDK 的安装脚本中。因此如果您需要在上传日志过程中使用 lz4 压缩,则需要在环境中手动安装 lz4a 库。

pip install lz4a==0.7.0

如果您使用的是 Windows 系统或 Python 3.10 及后续版本,则 SDK 无法兼容 lz4 压缩,请您跳过 lz4a 库安装并在 PutLogsV2Request 中指定 compression=zlib
PutLogsV2Request 的使用示例如下:

  • 使用 lz4 压缩

    tls_service.put_logs_v2(PutLogsV2Request(topic_id, logs))
    
  • 使用 zlib 压缩

    tls_service.put_logs_v2(PutLogsV2Request(topic_id, logs, compression="zlib"))
    

初始化客户端

初始化 Client 实例之后,才可以向 TLS 服务发送请求。初始化时推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。
初始化代码如下:

from volcengine.tls.TLSService import TLSService

endpoint = os.environ["VOLCENGINE_ENDPOINT"]
region = os.environ["VOLCENGINE_REGION"]
access_key_id = os.environ["VOLCENGINE_ACCESS_KEY_ID"]
access_key_secret = os.environ["VOLCENGINE_ACCESS_KEY_SECRET"]

tls_service = TLSService(endpoint, access_key_id, access_key_secret, region)

示例代码

本示例中,创建一个 example_tls.py 文件,并调用接口分别完成创建项目、创建主题、创建索引、写入日志数据、消费日志和查询日志数据。
代码示例如下:

# coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import time

from volcengine.tls.TLSService import TLSService
from volcengine.tls.tls_requests import *


if __name__ == "__main__":
    # 初始化客户端,推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455 
    # 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空
    endpoint = os.environ["VOLCENGINE_ENDPOINT"]
    region = os.environ["VOLCENGINE_REGION"]
    access_key_id = os.environ["VOLCENGINE_ACCESS_KEY_ID"]
    access_key_secret = os.environ["VOLCENGINE_ACCESS_KEY_SECRET"]

    # 实例化TLS客户端
    tls_service = TLSService(endpoint, access_key_id, access_key_secret, region)

    # 创建日志项目
    create_project_request = CreateProjectRequest(project_name="project-name", region=region,
                                                  description="project-description")
    create_project_response = tls_service.create_project(create_project_request)
    project_id = create_project_response.project_id

    # 创建日志主题
    create_topic_request = CreateTopicRequest(topic_name="topic-name", project_id=project_id,
                                              ttl=3650, description="topic-description", shard_count=2)
    create_topic_response = tls_service.create_topic(create_topic_request)
    topic_id = create_topic_response.topic_id

    # 创建索引
    full_text = FullTextInfo(case_sensitive=False, delimiter=",-;", include_chinese=False)
    value_info_a = ValueInfo(value_type="text", delimiter="", case_sensitive=True,
                             include_chinese=False, sql_flag=False)
    value_info_b = ValueInfo(value_type="long", delimiter="", case_sensitive=False,
                             include_chinese=False, sql_flag=True)
    key_value_info_a = KeyValueInfo(key="key1", value=value_info_a)
    key_value_info_b = KeyValueInfo(key="key2", value=value_info_b)
    key_value = [key_value_info_a, key_value_info_b]
    create_index_request = CreateIndexRequest(topic_id, full_text, key_value)
    create_index_response = tls_service.create_index(create_index_request)

    # 写入日志数据
    logs = PutLogsV2Logs(source="192.168.1.1", filename="sys.log")
    for i in range(100):
        logs.add_log(contents={"key1": "value1-" + str(i + 1), "key2": "value2-" + str(i + 1)},
                     log_time=int(round(time.time())))
    tls_service.put_logs_v2(PutLogsV2Request(topic_id, logs))
    time.sleep(30)

    # 查询消费游标
    describe_cursor_request = DescribeCursorRequest(topic_id, shard_id=0, from_time="begin")
    describe_cursor_response = tls_service.describe_cursor(describe_cursor_request)

    # 消费日志数据
    consume_logs_request = ConsumeLogsRequest(topic_id, shard_id=0, cursor=describe_cursor_response.cursor)
    consume_logs_response = tls_service.consume_logs(consume_logs_request)

    # 当您需要检索和分析日志时,推荐您使用Python SDK提供的search_logs_v2方法,下面的代码提供了具体的调用示例
    # 查询日志数据(全文检索)
    search_logs_request = SearchLogsRequest(topic_id, query="error", limit=10,
                                            start_time=1346457600000, end_time=1630454400000)
    search_logs_response = tls_service.search_logs_v2(search_logs_request)

    # 查询日志数据(键值检索)
    search_logs_request = SearchLogsRequest(topic_id, query="key1:error", limit=10,
                                            start_time=1346457600000, end_time=1630454400000)
    search_logs_response = tls_service.search_logs_v2(search_logs_request)

    # 查询日志数据(SQL分析)
    search_logs_request = SearchLogsRequest(topic_id, query="* | select key1, key2", limit=10,
                                            start_time=1346457600000, end_time=1630454400000)
    search_logs_response = tls_service.search_logs_v2(search_logs_request)

    # 查询日志数据(SQL分析)
    search_logs_request = SearchLogsRequest(topic_id, query="* | select key1, key2", limit=10,
                                            start_time=1346457600000, end_time=1630454400000)
    search_logs_response = tls_service.search_logs(search_logs_request)