etl-patterns

📁 majesticlabs-dev/majestic-marketplace 📅 8 days ago
12
总安装量
2
周安装量
#26800
全站排名
安装命令
npx skills add https://github.com/majesticlabs-dev/majestic-marketplace --skill etl-patterns

Agent 安装分布

opencode 2
claude-code 2
replit 2
openhands 1
zencoder 1

Skill 文档

ETL Patterns

Orchestrator for production-grade Extract-Transform-Load patterns.

Skill Routing

Need Skill Content
Reliability patterns etl-core-patterns Idempotency, checkpointing, error handling, chunking, retry, logging
Load strategies etl-incremental-patterns Backfill, timestamp-based, CDC, pipeline orchestration

Pattern Selection Guide

By Reliability Need

Need Pattern Skill
Repeatable runs Idempotency etl-core-patterns
Resume after failure Checkpointing etl-core-patterns
Handle bad records Error handling + DLQ etl-core-patterns
Memory management Chunked processing etl-core-patterns
Network resilience Retry with backoff etl-core-patterns
Observability Structured logging etl-core-patterns

By Load Strategy

Scenario Pattern Skill
Small tables (<100K) Full refresh etl-incremental-patterns
Large tables Timestamp incremental etl-incremental-patterns
Real-time sync CDC events etl-incremental-patterns
Historical migration Parallel backfill etl-incremental-patterns
Zero-downtime refresh Swap pattern etl-incremental-patterns
Multi-step pipelines Pipeline orchestration etl-incremental-patterns

Quick Reference

Idempotency Options

# Small datasets: Delete-then-insert
# Large datasets: UPSERT on conflict
# Change detection: Row hash comparison

Load Strategy Decision

Is table < 100K rows?
  → Full refresh

Has reliable timestamp column?
  → Timestamp incremental

Source supports CDC?
  → CDC event processing

Need zero downtime?
  → Swap pattern (temp table → rename)

One-time historical load?
  → Parallel backfill with date ranges

Common Pipeline Structure

# 1. Setup
checkpoint = Checkpoint('.etl_checkpoint.json')
processor = ETLProcessor()

# 2. Extract (with incremental)
df = incremental_by_timestamp(source_table, 'updated_at')

# 3. Transform (with error handling)
transformed = processor.process_batch(df.to_dict('records'))

# 4. Load (with idempotency)
upsert_records(pd.DataFrame(transformed))

# 5. Checkpoint
checkpoint.set_last_processed('sync', df['updated_at'].max())

# 6. Handle failures
processor.save_failures('failures/')

Related Skills

  • data-validation – Validate data quality during ETL
  • data-quality – Monitor data quality metrics
  • pandas-coder – DataFrame transformations