data-pipelines

📁 kylelundstedt/dotfiles 📅 10 days ago
23
总安装量
23
周安装量
#16324
全站排名
安装命令
npx skills add https://github.com/kylelundstedt/dotfiles --skill data-pipelines

Agent 安装分布

opencode 23
github-copilot 23
codex 23
gemini-cli 22
amp 22
kimi-cli 22

Skill 文档

You are building data pipelines. The general pattern is ingest (get data in) → transform (clean, model, join) → query (analyze) → explore (notebooks, apps, visualizations).

The specific tools for each step depend on the project. Preferred defaults:

Step Preferred Tool Alternatives
Ingest dlt Plain Python scripts, shell + curl, custom connectors
Transform sqlmesh Plain SQL scripts, dbt, Python scripts
Query engine DuckDB / MotherDuck —
DataFrames polars —
Notebooks marimo —
Project mgmt uv —

Language Preference

SQL first (DuckDB dialect), then Python, then bash. Use the simplest language that gets the job done.

Project Layout

Data projects should follow this structure:

project/
├── ingest/              # Extraction and loading scripts
│   ├── <source>.py      # One file per data source
│   └── .dlt/            # dlt config (if using dlt)
├── transform/           # Transformation logic
│   ├── models/          # SQL models (sqlmesh or plain SQL)
│   └── config.yaml      # sqlmesh config (if using sqlmesh)
├── notebooks/           # Exploration and analysis
│   └── *.py             # marimo notebooks (plain .py files)
├── data/                # Local data files (gitignored)
├── pyproject.toml       # Dependencies
├── uv.lock              # Locked dependencies (committed)
└── *.duckdb             # Local database (gitignored)

Not every project needs all directories — a simple analysis might only have notebooks/ and a DuckDB file. Scale up as needed.

uv — Project Management

Never use pip directly. All Python work goes through uv.

uv init my-project                    # New project
uv add polars duckdb                  # Add dependencies
uv sync                               # Install into .venv
uv run python script.py               # Run in project venv
uv run --with requests script.py      # Ad-hoc dependency

Inline script dependencies (PEP 723) for standalone scripts:

# /// script
# dependencies = ["dlt[duckdb]", "polars"]
# requires-python = ">=3.12"
# ///

Run with uv run script.py — deps are resolved automatically.

Always commit uv.lock. Use pyproject.toml for dependency declarations, never requirements.txt.

DuckDB — Query Engine

DuckDB is the shared SQL engine across the entire stack. Use DuckDB-specific syntax freely.

CLI

duckdb                              # In-memory
duckdb my_data.db                   # Persistent local
duckdb md:my_db                     # MotherDuck
duckdb -c "SELECT 42"              # One-shot

DuckDB SQL Syntax

Friendly SQL:

FROM my_table;                                          -- Implicit SELECT *
FROM my_table SELECT col1, col2 WHERE col3 > 5;        -- FROM-first
SELECT * EXCLUDE (internal_id) FROM events;             -- Drop columns
SELECT * REPLACE (amount / 100.0 AS amount) FROM txns;  -- Transform in-place
SELECT category, SUM(amount) FROM sales GROUP BY ALL;    -- Infer GROUP BY

Read files directly (no import step):

SELECT * FROM 'data.parquet';
SELECT * FROM read_csv('data.csv', header=true);
SELECT * FROM 's3://bucket/path/*.parquet';
COPY (SELECT * FROM events) TO 'output.parquet' (FORMAT PARQUET);

Nested types:

SELECT {'name': 'Alice', 'age': 30} AS person;
SELECT [1, 2, 3] AS nums;
SELECT list_filter([1, 2, 3, 4], x -> x > 2);

Useful commands:

DESCRIBE SELECT * FROM events;
SUMMARIZE events;

MotherDuck

ATTACH 'md:';              -- All databases
ATTACH 'md:my_db';         -- Specific database

Auth via motherduck_token env var. Cross-database queries work: SELECT * FROM local_db.main.t1 JOIN md:cloud_db.main.t2 USING (id).

polars — DataFrames

Use polars when Python logic is needed — complex string transforms, ML features, row-level conditionals. For joins, aggregations, and window functions, prefer SQL.

Key Patterns

import polars as pl

# Lazy evaluation (always prefer for production)
lf = pl.scan_parquet("events/*.parquet")
result = (
    lf.filter(pl.col("event_date") >= "2024-01-01")
    .group_by("user_id")
    .agg(pl.col("amount").sum().alias("total_spend"))
    .sort("total_spend", descending=True)
    .collect()
)

# Three contexts
df.select(...)         # Pick/transform columns (output has ONLY these)
df.with_columns(...)   # Add/overwrite columns (keeps all originals)
df.filter(...)         # Keep rows matching condition

DuckDB interop (zero-copy via Arrow):

import duckdb
result = duckdb.sql("SELECT * FROM df WHERE amount > 100").pl()

marimo — Notebooks

Reactive Python notebooks stored as plain .py files. Cells auto-re-execute when dependencies change.

marimo edit notebook.py              # Create/edit
marimo run notebook.py               # Serve as app
marimo convert notebook.ipynb -o out.py  # From Jupyter

SQL cells use DuckDB by default and return polars DataFrames:

result = mo.sql(f"""
    SELECT * FROM events
    WHERE event_date >= '{start_date}'
""")

Python variables and polars DataFrames are queryable from SQL cells and vice versa.

dlt — Ingestion

When a project uses dlt for ingestion. Handles API calls, pagination, schema inference, incremental loading, and state management.

Scaffold and Run

dlt init rest_api duckdb             # Scaffold pipeline
uv run python pipeline.py           # Run extraction
dlt pipeline <name> info             # Inspect state
dlt pipeline <name> schema           # View inferred schema

Pipeline Patterns

Minimal pipeline:

import dlt

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="duckdb",
    dataset_name="raw",
)
info = pipeline.run(data, table_name="events")

Incremental loading:

@dlt.resource(write_disposition="merge", primary_key="id")
def users(updated_at=dlt.sources.incremental("updated_at")):
    yield from fetch_users(since=updated_at.last_value)

REST API source (declarative):

from dlt.sources.rest_api import rest_api_source

source = rest_api_source({
    "client": {"base_url": "https://api.example.com/v1"},
    "resource_defaults": {"primary_key": "id", "write_disposition": "merge"},
    "resources": [
        "users",
        {
            "name": "events",
            "write_disposition": "append",
            "endpoint": {
                "path": "events",
                "incremental": {"cursor_path": "created_at", "initial_value": "2024-01-01"},
            },
        },
    ],
})

Write dispositions:

Disposition Behavior Use For
append Insert rows (default) Immutable events, logs
replace Drop and recreate Small lookup tables
merge Upsert by primary_key Mutable records

Destinations: duckdb (local file), motherduck (cloud). Set motherduck_token env var or configure in .dlt/secrets.toml.

sqlmesh — Transformation

When a project uses sqlmesh for transformations. SQL-first, plan/apply workflow — no accidental production changes.

Scaffold and Run

sqlmesh init duckdb                              # New project
sqlmesh init -t dlt --dlt-pipeline <name>        # From dlt schema
sqlmesh plan                                     # Preview + apply (dev)
sqlmesh plan prod                                # Promote to production
sqlmesh fetchdf "SELECT * FROM analytics.users"  # Ad-hoc query
sqlmesh test                                     # Run unit tests
sqlmesh ui                                       # Web interface

Model Kinds

Kind Behavior Use For
FULL Rewrite entire table Small dimension tables
INCREMENTAL_BY_TIME_RANGE Process new time intervals Facts, events, logs
INCREMENTAL_BY_UNIQUE_KEY Upsert by key Mutable dimensions
SEED Static CSV data Reference/lookup data
VIEW SQL view Simple pass-throughs
SCD_TYPE_2 Slowly changing dimensions Historical tracking

Model Example

MODEL (
    name analytics.stg_events,
    kind INCREMENTAL_BY_TIME_RANGE (time_column event_date),
    cron '@daily',
    grain (event_id),
    audits (NOT_NULL(columns=[event_id]))
);

SELECT
    event_id,
    user_id,
    event_type,
    event_date
FROM raw.events
WHERE event_date BETWEEN @start_date AND @end_date

Config (config.yaml)

gateways:
  local:
    connection:
      type: duckdb
      database: db.duckdb
default_gateway: local
model_defaults:
  dialect: duckdb

dlt Integration

sqlmesh init -t dlt auto-generates external models and incremental staging models from dlt’s inferred schema. Schema changes from dlt are detected by sqlmesh plan.