You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

求助:如何从AWS Lambda调用Apache Airflow REST API触发DAG运行

Troubleshooting Airflow Experimental API Trigger from AWS Lambda

Hey there, let's break down the common issues that might be preventing your Python 3.6 Lambda function from triggering Airflow DAG runs via the Experimental API, along with actionable fixes:

1. Verify Network Connectivity Between Lambda and Airflow

  • If your Airflow instance is hosted in a VPC: Ensure your Lambda function is deployed to the same VPC (or a VPC with peering access), and that the Airflow security group allows inbound traffic on port 8080 (default Airflow port) from Lambda's security group.
  • If Airflow is publicly accessible: Confirm Lambda has internet access (either via a NAT gateway if in a VPC, or deployed without a VPC). Test connectivity from Lambda using requests.get to the Airflow UI endpoint first to rule out network blocks.

2. Fix Authentication Issues

Most Airflow instances enable Basic Auth by default—missing this will cause a 401 Unauthorized error:

  • Encode your Airflow username and password in Base64 and add an Authorization header to your POST request.
  • Example code snippet for auth:
    import base64
    auth_str = f"{airflow_username}:{airflow_password}"
    auth_base64 = base64.b64encode(auth_str.encode('utf-8')).decode('utf-8')
    headers = {'Authorization': f'Basic {auth_base64}', 'Content-Type': 'application/json'}
    

3. Validate the API Endpoint and Payload

The Experimental API's DAG trigger endpoint has strict requirements—double-check these details:

  • Correct endpoint structure: POST {AIRFLOW_BASE_URL}/api/experimental/dags/{DAG_ID}/dag_runs
  • Ensure your DAG ID is case-sensitive and matches exactly what's listed in your Airflow UI.
  • The request payload must include a conf field (even if empty):
    payload = {"conf": {}}  # Add custom key-value pairs here if you need to pass runtime config
    

4. Check Airflow Configuration

Make sure the Experimental API is enabled in your Airflow airflow.cfg file:

[api]
experimental_api_enabled = True

Restart Airflow webserver and scheduler services after updating this setting if needed.

5. Complete Lambda Function Example

Here's a tested Python 3.6 Lambda function that handles all the above points (use environment variables for sensitive configs to follow best practices):

import requests
import base64
import os

def lambda_handler(event, context):
    # Load configs from Lambda environment variables
    airflow_base_url = os.getenv("AIRFLOW_BASE_URL")
    dag_id = os.getenv("DAG_ID")
    airflow_user = os.getenv("AIRFLOW_USER")
    airflow_pass = os.getenv("AIRFLOW_PASS")

    # Validate required configs are present
    if not all([airflow_base_url, dag_id, airflow_user, airflow_pass]):
        return {
            "statusCode": 400,
            "body": "Missing required environment variables (AIRFLOW_BASE_URL, DAG_ID, AIRFLOW_USER, AIRFLOW_PASS)"
        }

    # Build the trigger endpoint
    trigger_endpoint = f"{airflow_base_url}/api/experimental/dags/{dag_id}/dag_runs"

    # Setup authentication headers
    auth_encoded = base64.b64encode(f"{airflow_user}:{airflow_pass}".encode()).decode()
    headers = {
        "Authorization": f"Basic {auth_encoded}",
        "Content-Type": "application/json"
    }

    # Prepare request payload
    payload = {"conf": {}}

    try:
        response = requests.post(trigger_endpoint, json=payload, headers=headers)
        response.raise_for_status()  # Raise exception for HTTP 4xx/5xx errors
        return {
            "statusCode": 200,
            "body": f"Successfully triggered DAG: {dag_id}. API Response: {response.text}"
        }
    except requests.exceptions.RequestException as e:
        error_msg = f"Failed to trigger DAG: {str(e)}"
        # Capture detailed response if available
        if hasattr(e, "response") and e.response:
            error_msg += f" | Airflow API Response: {e.response.text}"
        print(error_msg)
        return {
            "statusCode": 500,
            "body": error_msg
        }

6. Use CloudWatch Logs to Diagnose

Check your Lambda function's CloudWatch Logs for specific error clues:

  • ConnectionTimeoutError: Network issue (VPC/NAT/Security Group misconfiguration)
  • 401 Unauthorized: Incorrect credentials or missing auth header
  • 404 Not Found: Wrong endpoint URL or invalid DAG ID
  • 403 Forbidden: Airflow RBAC permissions are blocking the request (ensure your user has DAG trigger permissions)

内容的提问来源于stack exchange,提问作者BenAhm

火山引擎 最新活动