dagster-conventions
npx skills add https://github.com/dagster-io/dagster-claude-plugins --skill dagster-conventions
Agent 安装分布
Skill 文档
Dagster Development Expert
When to Use This Skill vs. Others
| If User Says… | Use This Skill/Command | Why |
|---|---|---|
| “what’s the best way to X” | /dagster-conventions |
Need patterns/best practices |
| “how do I structure assets” | /dagster-conventions |
Asset design guidance |
| “which integration should I use” | /dagster-integrations |
Integration discovery needed |
| “implement X pipeline” | /dg:prototype |
Ready to build, not just learn |
| “is this pythonic” | /dignified-python |
Python code review needed |
| “create new project” | /dg:create-project |
Project initialization needed |
| “how do I test assets” | /dagster-conventions (testing section) |
Testing patterns guidance |
| “schedule patterns” | /dagster-conventions (automation section) |
Scheduling/automation guidance |
| “dbt best practices” | /dagster-conventions (dbt section) |
dbt-specific patterns |
Core Philosophy
Think in Assets: Dagster is built around the asset abstractionâpersistent objects like tables, files, or models that your pipeline produces. Assets provide:
- Clear Lineage: Explicit dependencies define data flow
- Better Observability: Track what data exists and how it was created
- Improved Testability: Assets are just Python functions that can be tested directly
- Declarative Pipelines: Focus on what to produce, not how to execute
Assets over Ops: For most data pipelines, prefer assets over ops. Use ops only when the asset abstraction doesn’t fit (non-data workflows, complex execution patterns).
Environment Separation: Use resources and EnvVar to maintain separate configurations for dev,
staging, and production without code changes.
Quick Reference
| If you’re writing… | Check this section/reference |
|---|---|
@dg.asset |
Assets or references/assets.md |
ConfigurableResource |
Resources or references/resources.md |
AutomationCondition |
Declarative Automation or references/automation.md |
@dg.schedule or ScheduleDefinition |
Automation or references/automation.md |
@dg.sensor |
Sensors or references/automation.md |
PartitionsDefinition |
Partitions or references/automation.md |
Tests with dg.materialize() |
Testing or references/testing.md |
@asset_check |
references/testing.md#asset-checks |
@dlt_assets or @sling_assets |
references/etl-patterns.md |
@dbt_assets |
dbt Integration or dbt-development skill |
Definitions or code locations |
references/project-structure.md |
Components (defs.yaml) |
references/project-structure.md#components |
Core Concepts
Asset: A persistent object (table, file, model) that your pipeline produces. Define with
@dg.asset.
Resource: External services/tools (databases, APIs) shared across assets. Define with
ConfigurableResource.
Job: A selection of assets to execute together. Create with dg.define_asset_job().
Schedule: Time-based automation for jobs. Create with dg.ScheduleDefinition.
Sensor: Event-driven automation that watches for changes. Define with @dg.sensor.
Partition: Logical divisions of data (by date, category). Define with PartitionsDefinition.
Definitions: The container for all Dagster objects in a code location.
Component: Reusable, declarative building blocks that generate Definitions from configuration
(YAML). Use for standardized patterns.
Declarative Automation: Modern automation framework where you set conditions on assets rather than scheduling jobs.
Assets Quick Reference
Basic Asset
import dagster as dg
@dg.asset
def my_asset() -> None:
"""Asset description appears in the UI."""
# Your computation logic here
pass
Asset with Dependencies
@dg.asset
def downstream_asset(upstream_asset) -> dict:
"""Depends on upstream_asset by naming it as a parameter."""
return {"processed": upstream_asset}
Asset with Metadata
@dg.asset(
group_name="analytics",
key_prefix=["warehouse", "staging"],
description="Cleaned customer data",
owners=["team:data-engineering", "alice@example.com"],
tags={"priority": "high", "domain": "sales"},
code_version="1.2.0",
)
def customers() -> None:
pass
Best Practices:
- Naming: Use nouns describing what is produced (
customers,daily_revenue), not verbs (load_customers) - Tags: Primary mechanism for organization (use liberally)
- Owners: Specify team or individual owners for accountability
- code_version: Track when asset logic changes for lineage
Resources Quick Reference
Define a Resource
from dagster import ConfigurableResource
class DatabaseResource(ConfigurableResource):
connection_string: str
def query(self, sql: str) -> list:
# Implementation here
pass
Use in Assets
@dg.asset
def my_asset(database: DatabaseResource) -> None:
results = database.query("SELECT * FROM table")
Register in Definitions
dg.Definitions(
assets=[my_asset],
resources={"database": DatabaseResource(connection_string="...")},
)
Automation Quick Reference
Schedule
import dagster as dg
from my_project.defs.jobs import my_job
my_schedule = dg.ScheduleDefinition(
job=my_job,
cron_schedule="0 0 * * *", # Daily at midnight
)
Common Cron Patterns
| Pattern | Meaning |
|---|---|
0 * * * * |
Every hour |
0 0 * * * |
Daily at midnight |
0 0 * * 1 |
Weekly on Monday |
0 0 1 * * |
Monthly on the 1st |
0 0 5 * * |
Monthly on the 5th |
Declarative Automation Quick Reference
Modern automation pattern: Set conditions on assets instead of scheduling jobs.
AutomationCondition Examples
from dagster import AutomationCondition
# Update when upstream data changes
@dg.asset(
automation_condition=AutomationCondition.on_missing()
)
def my_asset() -> None:
pass
# Update daily at a specific time
@dg.asset(
automation_condition=AutomationCondition.on_cron("0 9 * * *")
)
def daily_report() -> None:
pass
# Combine conditions
@dg.asset(
automation_condition=(
AutomationCondition.on_missing()
| AutomationCondition.on_cron("0 0 * * *")
)
)
def flexible_asset() -> None:
pass
Benefits over Schedules:
- More expressive condition logic
- Asset-native (no separate job definitions needed)
- Automatic dependency-aware execution
- Better for complex automation scenarios
When to Use:
- Asset-centric pipelines with complex update logic
- Condition-based triggers (data availability, freshness)
- Prefer over schedules for new projects
Sensors Quick Reference
Basic Sensor Pattern
@dg.sensor(job=my_job)
def my_sensor(context: dg.SensorEvaluationContext):
# 1. Read cursor (previous state)
previous_state = json.loads(context.cursor) if context.cursor else {}
current_state = {}
runs_to_request = []
# 2. Check for changes
for item in get_items_to_check():
current_state[item.id] = item.modified_at
if item.id not in previous_state or previous_state[item.id] != item.modified_at:
runs_to_request.append(dg.RunRequest(
run_key=f"run_{item.id}_{item.modified_at}",
run_config={...}
))
# 3. Return result with updated cursor
return dg.SensorResult(
run_requests=runs_to_request,
cursor=json.dumps(current_state)
)
Key: Use cursors to track state between sensor evaluations.
Partitions Quick Reference
Time-Based Partition
weekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01")
@dg.asset(partitions_def=weekly_partition)
def weekly_data(context: dg.AssetExecutionContext) -> None:
partition_key = context.partition_key # e.g., "2023-01-01"
# Process data for this partition
Static Partition
region_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"])
@dg.asset(partitions_def=region_partition)
def regional_data(context: dg.AssetExecutionContext) -> None:
region = context.partition_key
Partition Types
| Type | Use Case |
|---|---|
DailyPartitionsDefinition |
One partition per day |
WeeklyPartitionsDefinition |
One partition per week |
MonthlyPartitionsDefinition |
One partition per month |
HourlyPartitionsDefinition |
One partition per hour |
StaticPartitionsDefinition |
Fixed set of partitions |
DynamicPartitionsDefinition |
Partitions created at runtime |
MultiPartitionsDefinition |
Combine multiple partition dimensions |
Best Practice: Limit partitions to 100,000 or fewer per asset for optimal UI performance.
Testing Quick Reference
Direct Function Testing
def test_my_asset():
result = my_asset()
assert result == expected_value
Testing with Materialization
def test_asset_graph():
result = dg.materialize(
assets=[asset_a, asset_b],
resources={"database": mock_database},
)
assert result.success
assert result.output_for_node("asset_b") == expected
Mocking Resources
from unittest.mock import Mock
def test_with_mocked_resource():
mocked_resource = Mock()
mocked_resource.query.return_value = [{"id": 1}]
result = dg.materialize(
assets=[my_asset],
resources={"database": mocked_resource},
)
assert result.success
Asset Checks
@dg.asset_check(asset=my_asset)
def validate_non_empty(my_asset):
return dg.AssetCheckResult(
passed=len(my_asset) > 0,
metadata={"row_count": len(my_asset)},
)
dbt Integration
For dbt integration, prefer the component-based approach for standard dbt projects. Use Pythonic assets only when you need custom logic or fine-grained control.
Component-Based dbt (Recommended)
Use DbtProjectComponent with remote Git repository:
# defs/transform/defs.yaml
type: dagster_dbt.DbtProjectComponent
attributes:
project:
repo_url: https://github.com/dagster-io/jaffle-platform.git
repo_relative_path: jdbt
dbt:
target: dev
When to use:
- Standard dbt transformations
- Remote dbt project in Git repository
- Declarative configuration preferred
- Component reusability desired
For private repositories:
attributes:
project:
repo_url: https://github.com/your-org/dbt-project.git
repo_relative_path: dbt
token: "{{ env.GIT_TOKEN }}"
dbt:
target: dev
Pythonic dbt Assets
For custom logic or local development:
from dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path
dbt_project_dir = Path(__file__).parent / "dbt_project"
@dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json")
def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
dg.Definitions(
assets=[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)},
)
When to use:
- Custom transformation logic needed
- Local development with frequent dbt code changes
- Fine-grained control over dbt execution
Full patterns: See Dagster dbt docs
When to Load References
Load references/assets.md when:
- Defining complex asset dependencies
- Adding metadata, groups, or key prefixes
- Working with asset factories
- Understanding asset materialization patterns
Load references/resources.md when:
- Creating custom
ConfigurableResourceclasses - Integrating with databases, APIs, or cloud services
- Understanding resource scoping and lifecycle
Load references/automation.md when:
- Creating schedules with complex cron patterns
- Building sensors with cursors and state management
- Implementing partitions and backfills
- Using declarative automation conditions
- Automating dbt or other integration runs
Load references/testing.md when:
- Writing unit tests for assets
- Mocking resources and dependencies
- Using
dg.materialize()for integration tests - Creating asset checks for data validation
Load references/etl-patterns.md when:
- Using dlt for embedded ETL
- Using Sling for database replication
- Loading data from files or APIs
- Integrating external ETL tools
Load references/project-structure.md when:
- Setting up a new Dagster project
- Configuring
Definitionsand code locations - Using
dgCLI for scaffolding - Organizing large projects with Components
Project Structure
Recommended Layout
my_project/
âââ pyproject.toml
âââ src/
â âââ my_project/
â âââ definitions.py # Main Definitions
â âââ defs/
â âââ assets/
â â âââ __init__.py
â â âââ my_assets.py
â âââ jobs.py
â âââ schedules.py
â âââ sensors.py
â âââ resources.py
âââ tests/
âââ test_assets.py
Definitions Pattern (Modern)
Auto-Discovery (Simplest):
# src/my_project/definitions.py
from dagster import Definitions
from dagster_dg import load_defs
# Automatically discovers all definitions in defs/ folder
defs = Definitions.merge(
load_defs()
)
Combining Components with Pythonic Assets:
# src/my_project/definitions.py
from dagster import Definitions
from dagster_dg import load_defs
from my_project.assets import custom_assets
# Load component definitions from defs/ folder
component_defs = load_defs()
# Define pythonic assets separately
pythonic_defs = Definitions(
assets=custom_assets,
resources={...}
)
# Merge them together
defs = Definitions.merge(component_defs, pythonic_defs)
Traditional (Explicit):
# src/my_project/definitions.py
from dagster import Definitions
from my_project.defs import assets, jobs, schedules, resources
defs = Definitions(
assets=assets,
jobs=jobs,
schedules=schedules,
resources=resources,
)
Scaffolding with dg CLI
# Create new project
uvx create-dagster my_project
# Scaffold new asset file
dg scaffold defs dagster.asset assets/new_asset.py
# Scaffold schedule
dg scaffold defs dagster.schedule schedules.py
# Scaffold sensor
dg scaffold defs dagster.sensor sensors.py
# Validate definitions
dg check defs
Common Patterns
Job Definition
trip_update_job = dg.define_asset_job(
name="trip_update_job",
selection=["taxi_trips", "taxi_zones"],
)
Run Configuration
from dagster import Config
class MyAssetConfig(Config):
filename: str
limit: int = 100
@dg.asset
def configurable_asset(config: MyAssetConfig) -> None:
print(f"Processing {config.filename} with limit {config.limit}")
Asset Dependencies with External Sources
@dg.asset(deps=["external_table"])
def derived_asset() -> None:
"""Depends on external_table which isn't managed by Dagster."""
pass
Anti-Patterns to Avoid
| Anti-Pattern | Better Approach |
|---|---|
| Hardcoding credentials in assets | Use ConfigurableResource with env vars |
| Giant assets that do everything | Split into focused, composable assets |
| Ignoring asset return types | Use type annotations for clarity |
| Skipping tests for assets | Test assets like regular Python functions |
| Not using partitions for time-series | Use DailyPartitionsDefinition etc. |
| Putting all assets in one file | Organize by domain in separate modules |
CLI Quick Reference
dg CLI (Recommended for Modern Projects)
# Development
dg dev # Start Dagster UI (port 3000)
dg check defs # Validate definitions load correctly
dg list defs # Show all loaded definitions
dg list components # Show available components
# Scaffolding
dg scaffold defs dagster.asset assets/file.py
dg scaffold defs dagster.schedule schedules.py
dg scaffold defs dagster.sensor sensors.py
dg scaffold defs dagster.resources resources.py
# Execution
dg launch --assets my_asset # Materialize specific asset
dg launch --assets asset1 asset2 # Multiple assets
dg launch --assets "*" # Materialize all assets
dg launch --assets "tag:priority=high" # Assets by tag
dg launch --assets "group:sales_analytics" # Assets by group
dg launch --assets "kind:dbt" # Assets by kind
dg launch --job my_job # Execute a job
# Partitions
dg launch --assets my_asset --partition 2024-01-15 # Single partition
dg launch --assets my_asset --partition-range "2024-01-01...2024-01-31" # Backfill range
# Configuration
dg launch --assets my_asset --config-json '{"ops": {"my_asset": {"config": {"param": "value"}}}}'
# Environment Variables
uv run dg launch --assets my_asset # Auto-loads .env with uv
set -a; source .env; set +a; dg launch --assets my_asset # Manual .env loading
# See /dg:launch command for comprehensive launch documentation
dagster CLI (Legacy/General Purpose)
# Use for non-dg projects or advanced scenarios
dagster dev # Start Dagster UI
dagster job execute -j my_job # Execute a job
dagster asset materialize -a my_asset # Materialize an asset
Use dg CLI for projects created with create-dagster. It provides auto-discovery,
scaffolding, and modern workflow support.
References
- Assets:
references/assets.md– Detailed asset patterns and launching guidance - Resources:
references/resources.md– Resource configuration - Automation:
references/automation.md– Schedules, sensors, partitions - Testing:
references/testing.md– Testing patterns and asset checks - ETL Patterns:
references/etl-patterns.md– dlt, Sling, file/API ingestion - Project Structure:
references/project-structure.md– Definitions, Components - Launch Command:
/dg:launch– Comprehensive asset launching documentation - Official Docs: https://docs.dagster.io
- API Reference: https://docs.dagster.io/api/dagster