data-engineering-core
0
总安装量
3
周安装量
安装命令
npx skills add https://github.com/legout/data-platform-agent-skills --skill data-engineering-core
Agent 安装分布
pi
3
mcpjam
1
openhands
1
zencoder
1
crush
1
Skill 文档
Core Data Engineering
Use this skill for day-to-day Python data engineering decisions: dataframe processing, embedded OLAP SQL, Arrow interchange, ETL structure, and resilience patterns.
When to use this skill
Use this skill when the task involves one or more of:
- Polars transformations and lazy query optimization
- DuckDB SQL analytics, MERGE/upsert, or Parquet querying
- PyArrow table/dataset interchange and Parquet scans
- Python ETL pipeline structure (extract/transform/load, logging, retries, idempotency)
- PostgreSQL-to-lake/warehouse ingestion patterns
If the task is primarily about cloud storage auth/connectors, use:
@data-engineering-storage-remote-access@data-engineering-storage-authentication
If the task is primarily about ACID lakehouse table behavior, use:
@data-engineering-storage-lakehouse
Quick tool selection
| Need | Default choice | Why |
|---|---|---|
| DataFrame transformation in Python | Polars | Fast lazy engine, strong expression API |
| SQL over files/DataFrames | DuckDB | Embedded OLAP + Parquet/Arrow native |
| Interchange format between systems | PyArrow | Zero-copy table/batch ecosystem |
| OLTP source extraction | psycopg2 / postgres driver | Stable DB connectivity |
Rule of thumb:
- Start transformations in Polars lazy.
- Use DuckDB for heavy SQL/windowing/joins over files.
- Keep boundaries in Arrow/Parquet.
Core implementation rules
1) Prefer lazy execution
- Use
pl.scan_*overpl.read_*for large inputs. - Chain filters/projections before
collect(). - Avoid row-wise loops.
2) Push work down
- Push filtering into file scans (predicate pushdown).
- Select only needed columns (column pruning).
- Partition data by query dimensions (typically date/tenant/region).
3) Keep writes idempotent
- Prefer MERGE/upsert semantics where possible.
- If append-only, track watermark/checkpoints.
- Ensure retries do not duplicate side effects.
4) Protect boundaries
- Use parameterized SQL for values.
- Treat dynamic identifiers (table/column names) separately and validate.
- Never hardcode secrets.
5) Instrument and validate
- Log row counts and stage durations.
- Validate schema/required columns at stage boundaries.
- Record checkpoints/watermarks for incremental flows.
Minimal safe ETL shape
import polars as pl
import duckdb
def run_etl(source_path: str, target_table: str) -> None:
lazy = (
pl.scan_parquet(source_path)
.filter(pl.col("value").is_not_null())
.select(["id", "event_ts", "value", "category"])
)
df = lazy.collect()
with duckdb.connect("analytics.db") as con:
con.sql("CREATE OR REPLACE TABLE staging AS SELECT * FROM df")
con.sql(f"""
CREATE TABLE IF NOT EXISTS {target_table} AS
SELECT * FROM staging WHERE 1=0
""")
con.sql(f"INSERT INTO {target_table} SELECT * FROM staging")
For production-grade structure, use:
templates/complete_etl_pipeline.py
Progressive disclosure (read next as needed)
patterns/etl.mdâ canonical ETL pipeline structurepatterns/incremental.mdâ watermark/CDC/incremental loading patternstemplates/complete_etl_pipeline.pyâ full template with logging and checkpointscore-detailed.mdâ comprehensive reference (extended examples)
Related skills
@data-engineering-storage-lakehouseâ Delta/Iceberg/Hudi behavior@data-engineering-storage-remote-accessâ fsspec/pyarrow.fs/obstore cloud access@data-engineering-orchestrationâ Prefect/Dagster/dbt orchestration@data-engineering-qualityâ Pandera / Great Expectations validation@data-engineering-observabilityâ OTel + Prometheus monitoring@data-engineering-ai-mlâ embedding/vector/RAG pipelines