testing-dags
npx skills add https://github.com/astronomer/agents --skill testing-dags
Agent 安装分布
Skill 文档
DAG Testing Skill
Use af commands to test, debug, and fix DAGs in iterative cycles.
Running the CLI
Run all af commands using uvx (no installation required):
uvx --from astro-airflow-mcp af <command>
Throughout this document, af is shorthand for uvx --from astro-airflow-mcp af.
FIRST ACTION: Just Trigger the DAG
When the user asks to test a DAG, your FIRST AND ONLY action should be:
af runs trigger-wait <dag_id>
DO NOT:
- Call
af dags listfirst - Call
af dags getfirst - Call
af dags errorsfirst - Use
greporlsor any other bash command - Do any “pre-flight checks”
Just trigger the DAG. If it fails, THEN debug.
Testing Workflow Overview
âââââââââââââââââââââââââââââââââââââââ
â 1. TRIGGER AND WAIT â
â Run DAG, wait for completion â
âââââââââââââââââââââââââââââââââââââââ
â
âââââââââ´ââââââââ
â â
âââââââââââ ââââââââââââ
â SUCCESS â â FAILED â
â Done! â â Debug... â
âââââââââââ ââââââââââââ
â
âââââââââââââââââââââââââââââââââââââââ
â 2. DEBUG (only if failed) â
â Get logs, identify root cause â
âââââââââââââââââââââââââââââââââââââââ
â
âââââââââââââââââââââââââââââââââââââââ
â 3. FIX AND RETEST â
â Apply fix, restart from step 1 â
âââââââââââââââââââââââââââââââââââââââ
Philosophy: Try first, debug on failure. Don’t waste time on pre-flight checks â just run the DAG and diagnose if something goes wrong.
Phase 1: Trigger and Wait
Use af runs trigger-wait to test the DAG:
Primary Method: Trigger and Wait
af runs trigger-wait <dag_id> --timeout 300
Example:
af runs trigger-wait my_dag --timeout 300
Why this is the preferred method:
- Single command handles trigger + monitoring
- Returns immediately when DAG completes (success or failure)
- Includes failed task details if run fails
- No manual polling required
Response Interpretation
Success:
{
"dag_run": {
"dag_id": "my_dag",
"dag_run_id": "manual__2025-01-14T...",
"state": "success",
"start_date": "...",
"end_date": "..."
},
"timed_out": false,
"elapsed_seconds": 45.2
}
Failure:
{
"dag_run": {
"state": "failed"
},
"timed_out": false,
"elapsed_seconds": 30.1,
"failed_tasks": [
{
"task_id": "extract_data",
"state": "failed",
"try_number": 2
}
]
}
Timeout:
{
"dag_id": "my_dag",
"dag_run_id": "manual__...",
"state": "running",
"timed_out": true,
"elapsed_seconds": 300.0,
"message": "Timed out after 300 seconds. DAG run is still running."
}
Alternative: Trigger and Monitor Separately
Use this only when you need more control:
# Step 1: Trigger
af runs trigger my_dag
# Returns: {"dag_run_id": "manual__...", "state": "queued"}
# Step 2: Check status
af runs get my_dag manual__2025-01-14T...
# Returns current state
Handling Results
If Success
The DAG ran successfully. Summarize for the user:
- Total elapsed time
- Number of tasks completed
- Any notable outputs (if visible in logs)
You’re done!
If Timed Out
The DAG is still running. Options:
- Check current status:
af runs get <dag_id> <dag_run_id> - Ask user if they want to continue waiting
- Increase timeout and try again
If Failed
Move to Phase 2 (Debug) to identify the root cause.
Phase 2: Debug Failures (Only If Needed)
When a DAG run fails, use these commands to diagnose:
Get Comprehensive Diagnosis
af runs diagnose <dag_id> <dag_run_id>
Returns in one call:
- Run metadata (state, timing)
- All task instances with states
- Summary of failed tasks
- State counts (success, failed, skipped, etc.)
Get Task Logs
af tasks logs <dag_id> <dag_run_id> <task_id>
Example:
af tasks logs my_dag manual__2025-01-14T... extract_data
For specific retry attempt:
af tasks logs my_dag manual__2025-01-14T... extract_data --try 2
Look for:
- Exception messages and stack traces
- Connection errors (database, API, S3)
- Permission errors
- Timeout errors
- Missing dependencies
Check Upstream Tasks
If a task shows upstream_failed, the root cause is in an upstream task. Use af runs diagnose to find which task actually failed.
Check Import Errors (If DAG Didn’t Run)
If the trigger failed because the DAG doesn’t exist:
af dags errors
This reveals syntax errors or missing dependencies that prevented the DAG from loading.
Phase 3: Fix and Retest
Once you identify the issue:
Common Fixes
| Issue | Fix |
|---|---|
| Missing import | Add to DAG file |
| Missing package | Add to requirements.txt |
| Connection error | Check af config connections, verify credentials |
| Variable missing | Check af config variables, create if needed |
| Timeout | Increase task timeout or optimize query |
| Permission error | Check credentials in connection |
After Fixing
- Save the file
- Retest:
af runs trigger-wait <dag_id>
Repeat the test â debug â fix loop until the DAG succeeds.
CLI Quick Reference
| Phase | Command | Purpose |
|---|---|---|
| Test | af runs trigger-wait <dag_id> |
Primary test method â start here |
| Test | af runs trigger <dag_id> |
Start run (alternative) |
| Test | af runs get <dag_id> <run_id> |
Check run status |
| Debug | af runs diagnose <dag_id> <run_id> |
Comprehensive failure diagnosis |
| Debug | af tasks logs <dag_id> <run_id> <task_id> |
Get task output/errors |
| Debug | af dags errors |
Check for parse errors (if DAG won’t load) |
| Debug | af dags get <dag_id> |
Verify DAG config |
| Debug | af dags explore <dag_id> |
Full DAG inspection |
| Config | af config connections |
List connections |
| Config | af config variables |
List variables |
Testing Scenarios
Scenario 1: Test a DAG (Happy Path)
af runs trigger-wait my_dag
# Success! Done.
Scenario 2: Test a DAG (With Failure)
# 1. Run and wait
af runs trigger-wait my_dag
# Failed...
# 2. Find failed tasks
af runs diagnose my_dag manual__2025-01-14T...
# 3. Get error details
af tasks logs my_dag manual__2025-01-14T... extract_data
# 4. [Fix the issue in DAG code]
# 5. Retest
af runs trigger-wait my_dag
Scenario 3: DAG Doesn’t Exist / Won’t Load
# 1. Trigger fails - DAG not found
af runs trigger-wait my_dag
# Error: DAG not found
# 2. Find parse error
af dags errors
# 3. [Fix the issue in DAG code]
# 4. Retest
af runs trigger-wait my_dag
Scenario 4: Debug a Failed Scheduled Run
# 1. Get failure summary
af runs diagnose my_dag scheduled__2025-01-14T...
# 2. Get error from failed task
af tasks logs my_dag scheduled__2025-01-14T... failed_task_id
# 3. [Fix the issue]
# 4. Retest
af runs trigger-wait my_dag
Scenario 5: Test with Custom Configuration
af runs trigger-wait my_dag --conf '{"env": "staging", "batch_size": 100}' --timeout 600
Scenario 6: Long-Running DAG
# Wait up to 1 hour
af runs trigger-wait my_dag --timeout 3600
# If timed out, check current state
af runs get my_dag manual__2025-01-14T...
Debugging Tips
Common Error Patterns
Connection Refused / Timeout:
- Check
af config connectionsfor correct host/port - Verify network connectivity to external system
- Check if connection credentials are correct
ModuleNotFoundError:
- Package missing from
requirements.txt - After adding, may need environment restart
PermissionError:
- Check IAM roles, database grants, API keys
- Verify connection has correct credentials
Task Timeout:
- Query or operation taking too long
- Consider adding timeout parameter to task
- Optimize underlying query/operation
Reading Task Logs
Task logs typically show:
- Task start timestamp
- Any print/log statements from task code
- Return value (for @task decorated functions)
- Exception + full stack trace (if failed)
- Task end timestamp and duration
Focus on the exception at the bottom of failed task logs.
Related Skills
- authoring-dags: For creating new DAGs (includes validation before testing)
- debugging-dags: For general Airflow troubleshooting