data-engineering
29
总安装量
7
周安装量
#12501
全站排名
安装命令
npx skills add https://github.com/eyadsibai/ltk --skill data-engineering
Agent 安装分布
gemini-cli
6
antigravity
5
claude-code
5
github-copilot
5
codex
5
opencode
4
Skill 文档
Data Engineering Guide
Data pipelines, warehousing, and modern data stack.
When to Use
- Building data pipelines
- Designing data warehouses
- Implementing ETL/ELT processes
- Setting up data lakes
- Optimizing data infrastructure
Modern Data Stack
Components
Sources â Ingestion â Storage â Transform â Serve â Consume
| Layer | Tools |
|---|---|
| Ingestion | Fivetran, Airbyte, Stitch |
| Storage | S3, GCS, Snowflake, BigQuery |
| Transform | dbt, Spark, Airflow |
| Orchestration | Airflow, Dagster, Prefect |
| Serving | Looker, Tableau, Metabase |
Data Pipeline Patterns
Batch Processing
# Airflow DAG example
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
'daily_etl',
schedule_interval='0 6 * * *',
start_date=datetime(2024, 1, 1)
)
def extract():
# Extract from source
pass
def transform():
# Transform data
pass
def load():
# Load to warehouse
pass
extract_task = PythonOperator(
task_id='extract',
python_callable=extract,
dag=dag
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
dag=dag
)
load_task = PythonOperator(
task_id='load',
python_callable=load,
dag=dag
)
extract_task >> transform_task >> load_task
Streaming Processing
# Kafka consumer example
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
process_event(message.value)
dbt Patterns
Model Structure
models/
âââ staging/ # 1:1 with source
â âââ stg_orders.sql
â âââ stg_customers.sql
âââ intermediate/ # Business logic
â âââ int_order_items.sql
âââ marts/ # Final models
âââ dim_customers.sql
âââ fct_orders.sql
Example Model
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
select
o.order_id,
o.customer_id,
o.order_date,
sum(oi.quantity * oi.unit_price) as order_total
from {{ ref('stg_orders') }} o
join {{ ref('stg_order_items') }} oi
on o.order_id = oi.order_id
{% if is_incremental() %}
where o.order_date > (select max(order_date) from {{ this }})
{% endif %}
group by 1, 2, 3
Data Modeling
Dimensional Modeling
Fact Tables (events/transactions)
âââ fct_orders
âââ fct_page_views
âââ fct_transactions
Dimension Tables (context)
âââ dim_customers
âââ dim_products
âââ dim_dates
âââ dim_locations
Star Schema
dim_customers
â
dim_dates ââ fct_orders ââ dim_products
â
dim_locations
Data Quality
Validation Rules
-- dbt tests
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: order_total
tests:
- not_null
- positive_value
Quality Metrics
| Metric | Description |
|---|---|
| Completeness | % non-null values |
| Uniqueness | % distinct values |
| Timeliness | Data freshness |
| Accuracy | Matches source |
| Consistency | Across systems |
Performance Optimization
Partitioning
-- BigQuery partitioned table
CREATE TABLE orders
PARTITION BY DATE(order_date)
CLUSTER BY customer_id
AS SELECT * FROM staging.orders
Query Optimization
| Technique | Impact |
|---|---|
| Partitioning | Reduce scanned data |
| Clustering | Improve filter speed |
| Materialization | Pre-compute joins |
| Caching | Reduce repeat queries |
Monitoring
Pipeline Metrics
| Metric | Alert Threshold |
|---|---|
| Runtime | >2x normal |
| Row count | ±20% variance |
| Freshness | >SLA |
| Failures | Any failure |
Data Observability
# Monte Carlo / Elementary example
monitors:
- table: fct_orders
tests:
- freshness:
threshold: 6 hours
- volume:
threshold: 10%
- schema_change: true
Best Practices
Pipeline Design
- Idempotent operations
- Incremental processing
- Clear data lineage
- Automated testing
Data Governance
- Document all models
- Track data lineage
- Implement access controls
- Version control SQL
Cost Management
- Monitor query costs
- Use partitioning
- Schedule off-peak
- Archive old data