cosmos-dbt-core
npx skills add https://github.com/astronomer/agents --skill cosmos-dbt-core
Agent 安装分布
Skill 文档
Cosmos + dbt Core: Implementation Checklist
Execute steps in order. Prefer the simplest configuration that meets the user’s constraints.
Version note: This skill targets Cosmos 1.11+ and Airflow 3.x. If the user is on Airflow 2.x, adjust imports accordingly (see Appendix A).
Reference: Latest stable: https://pypi.org/project/astronomer-cosmos/
Before starting, confirm: (1) dbt engine = Core (not Fusion â use cosmos-dbt-fusion), (2) warehouse type, (3) Airflow version, (4) execution environment (Airflow env / venv / container), (5) DbtDag vs DbtTaskGroup, (6) manifest availability.
1. Choose Parsing Strategy (RenderConfig)
Pick ONE load mode based on constraints:
| Load mode | When to use | Required inputs | Constraints |
|---|---|---|---|
dbt_manifest |
Large projects; containerized execution; fastest | ProjectConfig.manifest_path |
Remote manifest needs manifest_conn_id |
dbt_ls |
Complex selectors; need dbt-native selection | dbt installed OR dbt_executable_path |
Cannot use with containerized execution |
dbt_ls_file |
dbt_ls selection without running dbt_ls every parse | RenderConfig.dbt_ls_path |
select/exclude won’t work |
automatic |
Simple setups; let Cosmos pick | (none) | Falls back: manifest â dbt_ls â custom |
CRITICAL: Containerized execution (
DOCKER/KUBERNETES/etc.) â MUST usedbt_manifestload mode.
from cosmos import RenderConfig, LoadMode
_render_config = RenderConfig(
load_method=LoadMode.DBT_MANIFEST, # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)
2. Choose Execution Mode (ExecutionConfig)
Reference: See reference/cosmos-config.md for detailed configuration examples per mode.
Pick ONE execution mode:
| Execution mode | When to use | Speed | Required setup |
|---|---|---|---|
WATCHER |
Fastest; single dbt build visibility |
Fastest | dbt adapter in env OR dbt_executable_path |
LOCAL + DBT_RUNNER |
dbt + adapter in Airflow env | Fast | dbt 1.5+ in requirements.txt |
LOCAL + SUBPROCESS |
dbt in venv baked into image | Medium | dbt_executable_path |
AIRFLOW_ASYNC |
BigQuery + long-running transforms | Varies | Airflow â¥2.8; provider deps |
VIRTUALENV |
Can’t modify image; runtime venv | Slower | py_requirements in operator_args |
| Containerized | Full isolation per task | Slowest | manifest required; container config |
CRITICAL: Containerized execution (
DOCKER/KUBERNETES/etc.) â MUST usedbt_manifestload mode.
from cosmos import ExecutionConfig, ExecutionMode
_execution_config = ExecutionConfig(
execution_mode=ExecutionMode.LOCAL, # or WATCHER, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)
3. Configure Warehouse Connection (ProfileConfig)
Reference: See reference/cosmos-config.md for detailed ProfileConfig options and all ProfileMapping classes.
Option A: Airflow Connection + ProfileMapping (Recommended)
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
profile_args={"schema": "my_schema"},
),
)
Option B: Existing profiles.yml
CRITICAL: Do not hardcode secrets; use environment variables.
from cosmos import ProfileConfig
_profile_config = ProfileConfig(
profile_name="my_profile",
target_name="dev",
profiles_yml_filepath="/path/to/profiles.yml",
)
4. Configure Project (ProjectConfig)
| Approach | When to use | Required param |
|---|---|---|
| Project path | Files available locally | dbt_project_path |
| Manifest only | dbt_manifest load; containerized |
manifest_path + project_name |
from cosmos import ProjectConfig
_project_config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
# manifest_path="/path/to/manifest.json", # for dbt_manifest load mode
# project_name="my_project", # if using manifest_path without dbt_project_path
# install_dbt_deps=False, # if deps precomputed in CI
)
5. Configure Testing Behavior (RenderConfig)
Reference: See reference/cosmos-config.md for detailed testing options.
| TestBehavior | Behavior |
|---|---|
AFTER_EACH |
Tests run immediately after each model (default) |
BUILD |
Combine run + test into single dbt build |
AFTER_ALL |
All tests after all models complete |
NONE |
Skip tests |
from cosmos import RenderConfig, TestBehavior
_render_config = RenderConfig(
test_behavior=TestBehavior.AFTER_EACH,
)
6. Configure operator_args
Reference: See reference/cosmos-config.md for detailed operator_args options.
_operator_args = {
# BaseOperator params
"retries": 3,
# Cosmos-specific params
"install_deps": False,
"full_refresh": False,
"quiet": True,
# Runtime dbt vars (XCom / params)
"vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}
7. Assemble DAG / TaskGroup
Option A: DbtDag (Standalone)
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime
_project_config = ProjectConfig(
dbt_project_path="/usr/local/airflow/dbt/my_project",
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
my_cosmos_dag = DbtDag(
dag_id="my_cosmos_dag",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
operator_args={},
start_date=datetime(2025, 1, 1),
schedule="@daily",
)
Option B: DbtTaskGroup (Inside Existing DAG)
from airflow.sdk import dag, task # Airflow 3.x
# from airflow.decorators import dag, task # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task
def pre_dbt():
return "some_value"
dbt = DbtTaskGroup(
group_id="dbt_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt, post_dbt())
my_dag()
Setting Dependencies on Individual Cosmos Tasks
from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain
with DbtDag(...) as dag:
@task
def upstream_task():
pass
_upstream = upstream_task()
for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
if dbt_node.resource_type == DbtResourceType.SEED:
my_dbt_task = dag.tasks_map[unique_id]
chain(_upstream, my_dbt_task)
8. Safety Checks
Before finalizing, verify:
- Execution mode matches constraints (AIRFLOW_ASYNC â BigQuery only; containerized â manifest required)
- Warehouse adapter installed for chosen execution mode
- Secrets via Airflow connections or env vars, NOT plaintext
- Load mode matches execution (containerized â manifest; complex selectors â dbt_ls)
- Airflow 3 asset URIs if downstream DAGs scheduled on Cosmos assets (see Appendix A)
Appendix A: Airflow 3 Compatibility
Import Differences
| Airflow 3.x | Airflow 2.x |
|---|---|
from airflow.sdk import dag, task |
from airflow.decorators import dag, task |
from airflow.sdk import chain |
from airflow.models.baseoperator import chain |
Asset/Dataset URI Format Change
Cosmos â¤1.9 (Airflow 2 Datasets):
postgres://0.0.0.0:5434/postgres.public.orders
Cosmos â¥1.10 (Airflow 3 Assets):
postgres://0.0.0.0:5434/postgres/public/orders
CRITICAL: Update asset URIs when upgrading to Airflow 3.
Appendix B: Operational Extras
Caching
Cosmos caches artifacts to speed up parsing. Enabled by default.
Reference: https://astronomer.github.io/astronomer-cosmos/configuration/caching.html
Memory-Optimized Imports
AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True
When enabled:
from cosmos.airflow.dag import DbtDag # instead of: from cosmos import DbtDag
Artifact Upload to Object Storage
AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
from cosmos.io import upload_to_cloud_storage
my_dag = DbtDag(
# ...
operator_args={"callback": upload_to_cloud_storage},
)
dbt Docs Hosting (Airflow 3.1+ / Cosmos 1.11+)
AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
"my_project": {
"dir": "s3://bucket/docs/",
"index": "index.html",
"conn_id": "aws_default",
"name": "My Project"
}
}'
Reference: https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html
Related Skills
- cosmos-dbt-fusion: For dbt Fusion projects (not dbt Core)
- authoring-dags: General DAG authoring patterns
- testing-dags: Testing DAGs after creation