airflow
npx skills add https://github.com/astronomer/agents --skill airflow
Agent 安装分布
Skill 文档
Airflow Operations
Use af commands to query, manage, and troubleshoot Airflow workflows.
Running the CLI
Run all af commands using uvx (no installation required):
uvx --from astro-airflow-mcp af <command>
Throughout this document, af is shorthand for uvx --from astro-airflow-mcp af.
Instance Configuration
Manage multiple Airflow instances with persistent configuration:
# Add a new instance
af instance add prod --url https://airflow.example.com --token "$API_TOKEN"
af instance add staging --url https://staging.example.com --username admin --password admin
# List and switch instances
af instance list # Shows all instances in a table
af instance use prod # Switch to prod instance
af instance current # Show current instance
af instance delete old-instance
# Auto-discover instances (use --dry-run to preview first)
af instance discover --dry-run # Preview all discoverable instances
af instance discover # Discover from all backends (astro, local)
af instance discover astro # Discover Astro deployments only
af instance discover astro --all-workspaces # Include all accessible workspaces
af instance discover local # Scan common local Airflow ports
af instance discover local --scan # Deep scan all ports 1024-65535
# IMPORTANT: Always run with --dry-run first and ask for user consent before
# running discover without it. The non-dry-run mode creates API tokens in
# Astro Cloud, which is a sensitive action that requires explicit approval.
# Override instance for a single command
af --instance staging dags list
Config file: ~/.af/config.yaml (override with --config or AF_CONFIG env var)
Tokens in config can reference environment variables using ${VAR} syntax:
instances:
- name: prod
url: https://airflow.example.com
auth:
token: ${AIRFLOW_API_TOKEN}
Or use environment variables directly (no config file needed):
export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here
# Or username/password:
export AIRFLOW_USERNAME=admin
export AIRFLOW_PASSWORD=admin
Or CLI flags: af --airflow-url http://localhost:8080 --token "$TOKEN" <command>
Quick Reference
| Command | Description |
|---|---|
af health |
System health check |
af dags list |
List all DAGs |
af dags get <dag_id> |
Get DAG details |
af dags explore <dag_id> |
Full DAG investigation |
af dags source <dag_id> |
Get DAG source code |
af dags pause <dag_id> |
Pause DAG scheduling |
af dags unpause <dag_id> |
Resume DAG scheduling |
af dags errors |
List import errors |
af dags warnings |
List DAG warnings |
af dags stats |
DAG run statistics |
af runs list |
List DAG runs |
af runs get <dag_id> <run_id> |
Get run details |
af runs trigger <dag_id> |
Trigger a DAG run |
af runs trigger-wait <dag_id> |
Trigger and wait for completion |
af runs diagnose <dag_id> <run_id> |
Diagnose failed run |
af tasks list <dag_id> |
List tasks in DAG |
af tasks get <dag_id> <task_id> |
Get task definition |
af tasks instance <dag_id> <run_id> <task_id> |
Get task instance |
af tasks logs <dag_id> <run_id> <task_id> |
Get task logs |
af config version |
Airflow version |
af config show |
Full configuration |
af config connections |
List connections |
af config variables |
List variables |
af config variable <key> |
Get specific variable |
af config pools |
List pools |
af config pool <name> |
Get pool details |
af config plugins |
List plugins |
af config providers |
List providers |
af config assets |
List assets/datasets |
af api <endpoint> |
Direct REST API access |
af api ls |
List available API endpoints |
af api ls --filter X |
List endpoints matching pattern |
User Intent Patterns
DAG Operations
- “What DAGs exist?” / “List all DAGs” ->
af dags list - “Tell me about DAG X” / “What is DAG Y?” ->
af dags explore <dag_id> - “What’s the schedule for DAG X?” ->
af dags get <dag_id> - “Show me the code for DAG X” ->
af dags source <dag_id> - “Stop DAG X” / “Pause this workflow” ->
af dags pause <dag_id> - “Resume DAG X” ->
af dags unpause <dag_id> - “Are there any DAG errors?” ->
af dags errors - “Create a new DAG” / “Write a pipeline” -> use the authoring-dags skill
Run Operations
- “What runs have executed?” ->
af runs list - “Run DAG X” / “Trigger the pipeline” ->
af runs trigger <dag_id> - “Run DAG X and wait” ->
af runs trigger-wait <dag_id> - “Why did this run fail?” ->
af runs diagnose <dag_id> <run_id> - “Test this DAG and fix if it fails” -> use the testing-dags skill
Task Operations
- “What tasks are in DAG X?” ->
af tasks list <dag_id> - “Get task logs” / “Why did task fail?” ->
af tasks logs <dag_id> <run_id> <task_id> - “Full root cause analysis” / “Diagnose and fix” -> use the debugging-dags skill
Data Operations
- “Is the data fresh?” / “When was this table last updated?” -> use the checking-freshness skill
- “Where does this data come from?” -> use the tracing-upstream-lineage skill
- “What depends on this table?” / “What breaks if I change this?” -> use the tracing-downstream-lineage skill
System Operations
- “What version of Airflow?” ->
af config version - “What connections exist?” ->
af config connections - “Are pools full?” ->
af config pools - “Is Airflow healthy?” ->
af health
API Exploration
- “What API endpoints are available?” ->
af api ls - “Find variable endpoints” ->
af api ls --filter variable - “Access XCom values” / “Get XCom” ->
af api xcom-entries -F dag_id=X -F task_id=Y - “Get event logs” / “Audit trail” ->
af api event-logs -F dag_id=X - “Create connection via API” ->
af api connections -X POST --body '{...}' - “Create variable via API” ->
af api variables -X POST -F key=name -f value=val
Common Workflows
Investigate a Failed Run
# 1. List recent runs to find failure
af runs list --dag-id my_dag
# 2. Diagnose the specific run
af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00
# 3. Get logs for failed task (from diagnose output)
af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data
Morning Health Check
# 1. Overall system health
af health
# 2. Check for broken DAGs
af dags errors
# 3. Check pool utilization
af config pools
Understand a DAG
# Get comprehensive overview (metadata + tasks + source)
af dags explore my_dag
Check Why DAG Isn’t Running
# Check if paused
af dags get my_dag
# Check for import errors
af dags errors
# Check recent runs
af runs list --dag-id my_dag
Trigger and Monitor
# Option 1: Trigger and wait (blocking)
af runs trigger-wait my_dag --timeout 1800
# Option 2: Trigger and check later
af runs trigger my_dag
af runs get my_dag <run_id>
Output Format
All commands output JSON (except instance commands which use human-readable tables):
af dags list
# {
# "total_dags": 5,
# "returned_count": 5,
# "dags": [...]
# }
Use jq for filtering:
# Find failed runs
af runs list | jq '.dag_runs[] | select(.state == "failed")'
# Get DAG IDs only
af dags list | jq '.dags[].dag_id'
# Find paused DAGs
af dags list | jq '[.dags[] | select(.is_paused == true)]'
Task Logs Options
# Get logs for specific retry attempt
af tasks logs my_dag run_id task_id --try 2
# Get logs for mapped task index
af tasks logs my_dag run_id task_id --map-index 5
Direct API Access with af api
Use af api for endpoints not covered by high-level commands (XCom, event-logs, backfills, etc).
# Discover available endpoints
af api ls
af api ls --filter variable
# Basic usage
af api dags
af api dags -F limit=10 -F only_active=true
af api variables -X POST -F key=my_var -f value="my value"
af api variables/old_var -X DELETE
Field syntax: -F key=value auto-converts types, -f key=value keeps as string.
Full reference: See api-reference.md for all options, common endpoints (XCom, event-logs, backfills), and examples.
Related Skills
| Skill | Use when… |
|---|---|
| authoring-dags | Creating or editing DAG files with best practices |
| testing-dags | Iterative test -> debug -> fix -> retest cycles |
| debugging-dags | Deep root cause analysis and failure diagnosis |
| checking-freshness | Checking if data is up to date or stale |
| tracing-upstream-lineage | Finding where data comes from |
| tracing-downstream-lineage | Impact analysis — what breaks if something changes |
| migrating-airflow-2-to-3 | Upgrading DAGs from Airflow 2.x to 3.x |
| managing-astro-local-env | Starting, stopping, or troubleshooting local Airflow |
| setting-up-astro-project | Initializing a new Astro/Airflow project |