dagster-development
npx skills add https://github.com/c00ldudenoonan/economic-data-project --skill dagster-development
Agent 安装分布
Skill 文档
Dagster Development Expert
Quick Reference
| If you’re writing… | Check this section/reference |
|---|---|
@dg.asset |
Assets or references/assets.md |
ConfigurableResource |
Resources or references/resources.md |
@dg.schedule or ScheduleDefinition |
Automation or references/automation.md |
@dg.sensor |
Sensors or references/automation.md |
PartitionsDefinition |
Partitions or references/automation.md |
Tests with dg.materialize() |
Testing or references/testing.md |
@asset_check |
references/testing.md#asset-checks |
@dlt_assets or @sling_assets |
references/etl-patterns.md |
@dbt_assets |
dbt Integration or dbt-development skill |
Definitions or code locations |
references/project-structure.md |
Core Concepts
Asset: A persistent object (table, file, model) that your pipeline produces. Define with @dg.asset.
Resource: External services/tools (databases, APIs) shared across assets. Define with ConfigurableResource.
Job: A selection of assets to execute together. Create with dg.define_asset_job().
Schedule: Time-based automation for jobs. Create with dg.ScheduleDefinition.
Sensor: Event-driven automation that watches for changes. Define with @dg.sensor.
Partition: Logical divisions of data (by date, category). Define with PartitionsDefinition.
Definitions: The container for all Dagster objects in a code location.
Assets Quick Reference
Basic Asset
import dagster as dg
@dg.asset
def my_asset() -> None:
"""Asset description appears in the UI."""
# Your computation logic here
pass
Asset with Dependencies
@dg.asset
def downstream_asset(upstream_asset) -> dict:
"""Depends on upstream_asset by naming it as a parameter."""
return {"processed": upstream_asset}
Asset with Metadata
@dg.asset(
group_name="analytics",
key_prefix=["warehouse", "staging"],
description="Cleaned customer data",
)
def customers() -> None:
pass
Naming: Use nouns describing what is produced (customers, daily_revenue), not verbs (load_customers).
Resources Quick Reference
Define a Resource
from dagster import ConfigurableResource
class DatabaseResource(ConfigurableResource):
connection_string: str
def query(self, sql: str) -> list:
# Implementation here
pass
Use in Assets
@dg.asset
def my_asset(database: DatabaseResource) -> None:
results = database.query("SELECT * FROM table")
Register in Definitions
dg.Definitions(
assets=[my_asset],
resources={"database": DatabaseResource(connection_string="...")},
)
Automation Quick Reference
Schedule
import dagster as dg
from my_project.defs.jobs import my_job
my_schedule = dg.ScheduleDefinition(
job=my_job,
cron_schedule="0 0 * * *", # Daily at midnight
)
Common Cron Patterns
| Pattern | Meaning |
|---|---|
0 * * * * |
Every hour |
0 0 * * * |
Daily at midnight |
0 0 * * 1 |
Weekly on Monday |
0 0 1 * * |
Monthly on the 1st |
0 0 5 * * |
Monthly on the 5th |
Sensors Quick Reference
Basic Sensor Pattern
@dg.sensor(job=my_job)
def my_sensor(context: dg.SensorEvaluationContext):
# 1. Read cursor (previous state)
previous_state = json.loads(context.cursor) if context.cursor else {}
current_state = {}
runs_to_request = []
# 2. Check for changes
for item in get_items_to_check():
current_state[item.id] = item.modified_at
if item.id not in previous_state or previous_state[item.id] != item.modified_at:
runs_to_request.append(dg.RunRequest(
run_key=f"run_{item.id}_{item.modified_at}",
run_config={...}
))
# 3. Return result with updated cursor
return dg.SensorResult(
run_requests=runs_to_request,
cursor=json.dumps(current_state)
)
Key: Use cursors to track state between sensor evaluations.
Partitions Quick Reference
Time-Based Partition
weekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01")
@dg.asset(partitions_def=weekly_partition)
def weekly_data(context: dg.AssetExecutionContext) -> None:
partition_key = context.partition_key # e.g., "2023-01-01"
# Process data for this partition
Static Partition
region_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"])
@dg.asset(partitions_def=region_partition)
def regional_data(context: dg.AssetExecutionContext) -> None:
region = context.partition_key
Partition Types
| Type | Use Case |
|---|---|
DailyPartitionsDefinition |
One partition per day |
WeeklyPartitionsDefinition |
One partition per week |
MonthlyPartitionsDefinition |
One partition per month |
StaticPartitionsDefinition |
Fixed set of partitions |
MultiPartitionsDefinition |
Combine multiple partition dimensions |
Testing Quick Reference
Direct Function Testing
def test_my_asset():
result = my_asset()
assert result == expected_value
Testing with Materialization
def test_asset_graph():
result = dg.materialize(
assets=[asset_a, asset_b],
resources={"database": mock_database},
)
assert result.success
assert result.output_for_node("asset_b") == expected
Mocking Resources
from unittest.mock import Mock
def test_with_mocked_resource():
mocked_resource = Mock()
mocked_resource.query.return_value = [{"id": 1}]
result = dg.materialize(
assets=[my_asset],
resources={"database": mocked_resource},
)
assert result.success
Asset Checks
@dg.asset_check(asset=my_asset)
def validate_non_empty(my_asset):
return dg.AssetCheckResult(
passed=len(my_asset) > 0,
metadata={"row_count": len(my_asset)},
)
dbt Integration
For dbt integration, use the minimal pattern below. For comprehensive dbt patterns, see the dbt-development skill.
Basic dbt Assets
from dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path
dbt_project_dir = Path(__file__).parent / "dbt_project"
@dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json")
def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
dbt Resource
dg.Definitions(
assets=[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)},
)
Full patterns: See Dagster dbt docs
When to Load References
Load references/assets.md when:
- Defining complex asset dependencies
- Adding metadata, groups, or key prefixes
- Working with asset factories
- Understanding asset materialization patterns
Load references/resources.md when:
- Creating custom
ConfigurableResourceclasses - Integrating with databases, APIs, or cloud services
- Understanding resource scoping and lifecycle
Load references/automation.md when:
- Creating schedules with complex cron patterns
- Building sensors with cursors and state management
- Implementing partitions and backfills
- Automating dbt or other integration runs
Load references/testing.md when:
- Writing unit tests for assets
- Mocking resources and dependencies
- Using
dg.materialize()for integration tests - Creating asset checks for data validation
Load references/etl-patterns.md when:
- Using dlt for embedded ETL
- Using Sling for database replication
- Loading data from files or APIs
- Integrating external ETL tools
Load references/project-structure.md when:
- Setting up a new Dagster project
- Configuring
Definitionsand code locations - Using
dgCLI for scaffolding - Organizing large projects with Components
Project Structure
Recommended Layout
my_project/
âââ pyproject.toml
âââ src/
â âââ my_project/
â âââ definitions.py # Main Definitions
â âââ defs/
â âââ assets/
â â âââ __init__.py
â â âââ my_assets.py
â âââ jobs.py
â âââ schedules.py
â âââ sensors.py
â âââ resources.py
âââ tests/
âââ test_assets.py
Definitions Pattern (Modern)
# src/my_project/definitions.py
from pathlib import Path
from dagster import definitions, load_from_defs_folder
@definitions
def defs():
return load_from_defs_folder(project_root=Path(__file__).parent.parent.parent)
Scaffolding with dg CLI
# Create new project
uvx create-dagster my_project
# Scaffold new asset file
dg scaffold defs dagster.asset assets/new_asset.py
# Scaffold schedule
dg scaffold defs dagster.schedule schedules.py
# Scaffold sensor
dg scaffold defs dagster.sensor sensors.py
# Validate definitions
dg check defs
Common Patterns
Job Definition
trip_update_job = dg.define_asset_job(
name="trip_update_job",
selection=["taxi_trips", "taxi_zones"],
)
Run Configuration
from dagster import Config
class MyAssetConfig(Config):
filename: str
limit: int = 100
@dg.asset
def configurable_asset(config: MyAssetConfig) -> None:
print(f"Processing {config.filename} with limit {config.limit}")
Asset Dependencies with External Sources
@dg.asset(deps=["external_table"])
def derived_asset() -> None:
"""Depends on external_table which isn't managed by Dagster."""
pass
Anti-Patterns to Avoid
| Anti-Pattern | Better Approach |
|---|---|
| Hardcoding credentials in assets | Use ConfigurableResource with env vars |
| Giant assets that do everything | Split into focused, composable assets |
| Ignoring asset return types | Use type annotations for clarity |
| Skipping tests for assets | Test assets like regular Python functions |
| Not using partitions for time-series | Use DailyPartitionsDefinition etc. |
| Putting all assets in one file | Organize by domain in separate modules |
CLI Quick Reference
# Development
dg dev # Start Dagster UI
dg check defs # Validate definitions
# Scaffolding
dg scaffold defs dagster.asset assets/file.py
dg scaffold defs dagster.schedule schedules.py
dg scaffold defs dagster.sensor sensors.py
# Production
dagster job execute -j my_job # Execute a job
dagster asset materialize -a my_asset # Materialize an asset
References
- Assets:
references/assets.md– Detailed asset patterns - Resources:
references/resources.md– Resource configuration - Automation:
references/automation.md– Schedules, sensors, partitions - Testing:
references/testing.md– Testing patterns and asset checks - ETL Patterns:
references/etl-patterns.md– dlt, Sling, file/API ingestion - Project Structure:
references/project-structure.md– Definitions, Components - Official Docs: https://docs.dagster.io
- API Reference: https://docs.dagster.io/api/dagster