求助:如何从AWS Lambda调用Apache Airflow REST API触发DAG运行
Hey there, let's break down the common issues that might be preventing your Python 3.6 Lambda function from triggering Airflow DAG runs via the Experimental API, along with actionable fixes:
1. Verify Network Connectivity Between Lambda and Airflow
- If your Airflow instance is hosted in a VPC: Ensure your Lambda function is deployed to the same VPC (or a VPC with peering access), and that the Airflow security group allows inbound traffic on port 8080 (default Airflow port) from Lambda's security group.
- If Airflow is publicly accessible: Confirm Lambda has internet access (either via a NAT gateway if in a VPC, or deployed without a VPC). Test connectivity from Lambda using
requests.getto the Airflow UI endpoint first to rule out network blocks.
2. Fix Authentication Issues
Most Airflow instances enable Basic Auth by default—missing this will cause a 401 Unauthorized error:
- Encode your Airflow username and password in Base64 and add an
Authorizationheader to your POST request. - Example code snippet for auth:
import base64 auth_str = f"{airflow_username}:{airflow_password}" auth_base64 = base64.b64encode(auth_str.encode('utf-8')).decode('utf-8') headers = {'Authorization': f'Basic {auth_base64}', 'Content-Type': 'application/json'}
3. Validate the API Endpoint and Payload
The Experimental API's DAG trigger endpoint has strict requirements—double-check these details:
- Correct endpoint structure:
POST {AIRFLOW_BASE_URL}/api/experimental/dags/{DAG_ID}/dag_runs - Ensure your DAG ID is case-sensitive and matches exactly what's listed in your Airflow UI.
- The request payload must include a
conffield (even if empty):payload = {"conf": {}} # Add custom key-value pairs here if you need to pass runtime config
4. Check Airflow Configuration
Make sure the Experimental API is enabled in your Airflow airflow.cfg file:
[api] experimental_api_enabled = True
Restart Airflow webserver and scheduler services after updating this setting if needed.
5. Complete Lambda Function Example
Here's a tested Python 3.6 Lambda function that handles all the above points (use environment variables for sensitive configs to follow best practices):
import requests import base64 import os def lambda_handler(event, context): # Load configs from Lambda environment variables airflow_base_url = os.getenv("AIRFLOW_BASE_URL") dag_id = os.getenv("DAG_ID") airflow_user = os.getenv("AIRFLOW_USER") airflow_pass = os.getenv("AIRFLOW_PASS") # Validate required configs are present if not all([airflow_base_url, dag_id, airflow_user, airflow_pass]): return { "statusCode": 400, "body": "Missing required environment variables (AIRFLOW_BASE_URL, DAG_ID, AIRFLOW_USER, AIRFLOW_PASS)" } # Build the trigger endpoint trigger_endpoint = f"{airflow_base_url}/api/experimental/dags/{dag_id}/dag_runs" # Setup authentication headers auth_encoded = base64.b64encode(f"{airflow_user}:{airflow_pass}".encode()).decode() headers = { "Authorization": f"Basic {auth_encoded}", "Content-Type": "application/json" } # Prepare request payload payload = {"conf": {}} try: response = requests.post(trigger_endpoint, json=payload, headers=headers) response.raise_for_status() # Raise exception for HTTP 4xx/5xx errors return { "statusCode": 200, "body": f"Successfully triggered DAG: {dag_id}. API Response: {response.text}" } except requests.exceptions.RequestException as e: error_msg = f"Failed to trigger DAG: {str(e)}" # Capture detailed response if available if hasattr(e, "response") and e.response: error_msg += f" | Airflow API Response: {e.response.text}" print(error_msg) return { "statusCode": 500, "body": error_msg }
6. Use CloudWatch Logs to Diagnose
Check your Lambda function's CloudWatch Logs for specific error clues:
ConnectionTimeoutError: Network issue (VPC/NAT/Security Group misconfiguration)401 Unauthorized: Incorrect credentials or missing auth header404 Not Found: Wrong endpoint URL or invalid DAG ID403 Forbidden: Airflow RBAC permissions are blocking the request (ensure your user has DAG trigger permissions)
内容的提问来源于stack exchange,提问作者BenAhm




