You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

如何通过Python的boto3获取AWS CloudTrail日志及事件?

Can I use Python's boto3 to get AWS CloudTrail logs/events?

Short Answer

Absolutely! You can definitely use Python's boto3 library to fetch AWS CloudTrail logs and events. There are two main approaches depending on your needs: pulling recent events directly via the CloudTrail API, or parsing long-term log files stored in an S3 bucket. Let me walk you through both with practical examples.

Prerequisites First

Before jumping into code, make sure you have:

  • The boto3 library installed: run pip install boto3 in your terminal
  • AWS credentials configured (either via ~/.aws/credentials file, environment variables, or an IAM role if running on AWS services like EC2/EKS)
  • Proper IAM permissions:
    • For direct event lookup: cloudtrail:LookupEvents
    • For accessing S3-stored logs: s3:GetObject and s3:ListBucket on your CloudTrail bucket

Method 1: Fetch Recent Events with lookup_events

This is the quickest way to get CloudTrail events from the last 90 days. The API returns structured event data you can process directly:

from datetime import datetime
import json
import boto3

# Initialize the CloudTrail client (specify your region)
cloudtrail_client = boto3.client('cloudtrail', region_name='us-east-1')

# Define your time range (all timestamps are in UTC!)
start_time = datetime(2024, 5, 1)
end_time = datetime(2024, 5, 10)

# Fetch initial batch of events (max 50 per request)
response = cloudtrail_client.lookup_events(
    StartTime=start_time,
    EndTime=end_time,
    MaxResults=50
)

# Process each event
for event in response['Events']:
    print(f"📅 Event Time: {event['EventTime']}")
    print(f"🔍 Event Name: {event['EventName']}")
    print(f"👤 Username: {event.get('Username', 'N/A')}")
    # Parse the full event details (stored as a JSON string)
    event_details = json.loads(event['CloudTrailEvent'])
    print(f"🔗 Resource ARN: {event_details.get('resources', [{}])[0].get('ARN', 'N/A')}")
    print("---")

# Handle pagination for results beyond 50 events
while 'NextToken' in response:
    response = cloudtrail_client.lookup_events(
        StartTime=start_time,
        EndTime=end_time,
        MaxResults=50,
        NextToken=response['NextToken']
    )
    for event in response['Events']:
        # Repeat your processing logic here
        print(f"📅 Event Time: {event['EventTime']}")
        print(f"🔍 Event Name: {event['EventName']}")
        print("---")

Method 2: Parse CloudTrail Logs from S3

If you've configured CloudTrail to deliver logs to an S3 bucket (the recommended long-term storage option), you can download and parse those compressed log files. Here's how:

import boto3
import json
from gzip import GzipFile
from io import BytesIO

# Initialize the S3 client
s3_client = boto3.client('s3', region_name='us-east-1')

# Replace with your bucket name and log prefix
bucket_name = 'your-cloudtrail-log-bucket'
# Prefix follows the pattern: AWSLogs/ACCOUNT_ID/CloudTrail/REGION/YEAR/MONTH/DAY/
log_prefix = 'AWSLogs/123456789012/CloudTrail/us-east-1/2024/05/10/'

# List all log files in the specified prefix
s3_objects = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=log_prefix)

for obj in s3_objects.get('Contents', []):
    # Skip directory markers (if any)
    if obj['Key'].endswith('/'):
        continue
    
    # Download and decompress the gzipped log file
    file_response = s3_client.get_object(Bucket=bucket_name, Key=obj['Key'])
    with GzipFile(fileobj=BytesIO(file_response['Body'].read()), mode='rb') as gz_file:
        log_content = gz_file.read().decode('utf-8')
        
        # CloudTrail logs use newline-delimited JSON - parse each line
        for line in log_content.splitlines():
            try:
                event = json.loads(line)
                print(f"📅 Event Time: {event['eventTime']}")
                print(f"🔍 Event Name: {event['eventName']}")
                print(f"👤 User ARN: {event['userIdentity']['arn']}")
                print("---")
            except json.JSONDecodeError:
                # Skip rare malformed lines
                continue

Key Notes

  • Time Zones: All CloudTrail timestamps use UTC, so ensure your StartTime/EndTime are in UTC to avoid missing events.
  • Pagination: The lookup_events API caps results at 50 per request—use the NextToken parameter to fetch additional pages.
  • Log Structure: S3-stored logs are gzipped and use newline-delimited JSON, so you need to decompress first and parse each line individually.
  • Permissions: Double-check your IAM policies to confirm you have access to the CloudTrail API and target S3 bucket.

内容的提问来源于stack exchange,提问作者FCoding

火山引擎 最新活动