data-infrastructure-at-scale

📁 sunnypatneedi/claude-starter-kit 📅 Jan 27, 2026
3
总安装量
2
周安装量
#58803
全站排名
安装命令
npx skills add https://github.com/sunnypatneedi/claude-starter-kit --skill data-infrastructure-at-scale

Agent 安装分布

mcpjam 2
neovate 2
antigravity 2
qwen-code 2
windsurf 2
zencoder 2

Skill 文档

Data Infrastructure at Scale

Build data infrastructure that grows from MVP to millions of users without rewrites.

When to Use

Use this skill when:

  • Architecting a new data platform
  • Current infrastructure can’t handle load (slow queries, timeouts)
  • Planning to scale from thousands to millions of records/requests
  • Choosing between databases, caches, message queues
  • Designing ETL/ELT pipelines
  • Building real-time analytics or streaming systems

Scaling Stages

Stage 1: Prototype (< 10K users)

Architecture:

┌─────────┐
│   App   │
└────┬────┘
     │
┌────▼────┐
│   DB    │
└─────────┘

Characteristics:

  • Single database server
  • No caching
  • Synchronous processing
  • Simple backups

When to graduate: Response time > 200ms, database CPU > 60%


Stage 2: Caching (10K – 100K users)

Architecture:

┌─────────┐
│   App   │
└─┬────┬──┘
  │    │
  │  ┌─▼──────┐
  │  │ Cache  │
  │  └────────┘
  │
┌─▼────┐
│  DB  │
└──────┘

Add:

  • Redis/Memcached for hot data
  • CDN for static assets
  • Cache invalidation strategy

Pattern: Cache-Aside

def get_user(user_id):
    # Try cache first
    user = cache.get(f"user:{user_id}")
    if user:
        return user

    # Miss: fetch from DB
    user = db.query("SELECT * FROM users WHERE id = ?", user_id)

    # Populate cache
    cache.set(f"user:{user_id}", user, ttl=300)
    return user

When to graduate: Database read load > 70%, cache hit rate < 80%


Stage 3: Read Replicas (100K – 1M users)

Architecture:

┌─────────┐
│   App   │
└─┬────┬──┘
  │    │
  │  ┌─▼──────┐
  │  │ Cache  │
  │  └────────┘
  │
  │  ┌─────────┐
  ├──► Primary │ (writes)
  │  └────┬────┘
  │       │ replication
  │  ┌────▼────┐
  └──► Replica │ (reads)
     └─────────┘

Add:

  • Read replicas for read-heavy workloads
  • Write to primary, read from replicas
  • Replication lag monitoring

Pattern: Write-Primary, Read-Replica

def get_user(user_id):
    return db_replica.query("SELECT * FROM users WHERE id = ?", user_id)

def update_user(user_id, data):
    # Must use primary for writes
    db_primary.execute("UPDATE users SET ... WHERE id = ?", data, user_id)
    # Invalidate cache
    cache.delete(f"user:{user_id}")

Gotcha: Replication Lag

# Problem: Read-your-own-writes fails
def create_post(user_id, content):
    post_id = db_primary.insert("INSERT INTO posts ...", user_id, content)

    # Reads from replica, but replication hasn't caught up yet!
    post = db_replica.query("SELECT * FROM posts WHERE id = ?", post_id)
    # post might be None!

# Fix: Read from primary immediately after write
def create_post(user_id, content):
    post_id = db_primary.insert("INSERT INTO posts ...", user_id, content)
    post = db_primary.query("SELECT * FROM posts WHERE id = ?", post_id)
    return post

When to graduate: Single database can’t handle write load, need horizontal scaling


Stage 4: Sharding (1M – 10M users)

Architecture:

┌─────────┐
│   App   │
└────┬────┘
     │
┌────▼─────────┐
│ Shard Router │
└─┬──────────┬─┘
  │          │
┌─▼──────┐ ┌─▼──────┐
│Shard 1 │ │Shard 2 │  (each shard has replicas)
│user 0-5M │Shard 5M-10M│
└────────┘ └────────┘

Sharding Strategies:

1. Hash-based (even distribution):

def get_shard(user_id):
    return user_id % NUM_SHARDS

# User 123 → Shard 3
# User 456 → Shard 0
  • ✓ Even distribution
  • ✗ Range queries hard (need to query all shards)
  • ✗ Resharding difficult

2. Range-based (logical grouping):

def get_shard(user_id):
    if user_id < 5_000_000:
        return 'shard_1'
    elif user_id < 10_000_000:
        return 'shard_2'
    else:
        return 'shard_3'
  • ✓ Range queries fast
  • ✗ Uneven distribution (newer users more active)
  • ✓ Easy to add shards

3. Geographic (data locality):

def get_shard(user_region):
    if user_region in ['US', 'CA', 'MX']:
        return 'shard_americas'
    elif user_region in ['UK', 'DE', 'FR']:
        return 'shard_europe'
    else:
        return 'shard_asia'
  • ✓ Low latency (data near users)
  • ✓ Compliance (GDPR data residency)
  • ✗ Uneven load

Challenges:

  • Cross-shard JOINs: Avoid or do in application layer
  • Distributed transactions: Use sagas or eventual consistency
  • Shard key choice: Hard to change later!

When to graduate: Need multi-datacenter, global scale


Stage 5: Distributed (10M+ users)

Architecture:

┌──────────────────────────────────┐
│       Load Balancer (Global)     │
└────────┬────────────────┬────────┘
         │                │
    ┌────▼────┐      ┌────▼────┐
    │  US-DC  │      │  EU-DC  │
    └────┬────┘      └────┬────┘
         │                │
    ┌────▼────────────────▼────┐
    │   Distributed Database   │
    │  (CockroachDB, Spanner)  │
    └──────────────────────────┘

Add:

  • Multi-region deployment
  • Distributed consensus (Raft, Paxos)
  • Global load balancing
  • Edge caching (Cloudflare, Fastly)

Databases for this stage:

  • CockroachDB: PostgreSQL-compatible, multi-region
  • YugabyteDB: PostgreSQL-compatible, Cassandra API
  • Google Spanner: Globally distributed, strong consistency
  • DynamoDB Global Tables: Multi-region key-value

Data Storage Selection

Decision Tree

Start here:
│
├─ Need ACID transactions?
│  ├─ Yes → SQL (PostgreSQL, MySQL)
│  └─ No → Continue
│
├─ Need complex queries (JOINs, aggregations)?
│  ├─ Yes → SQL
│  └─ No → Continue
│
├─ Need flexible schema?
│  ├─ Yes → Document DB (MongoDB, Firestore)
│  └─ No → Continue
│
├─ Write-heavy, time-series?
│  ├─ Yes → Wide-column (Cassandra, ClickHouse)
│  └─ No → Continue
│
├─ Need ultra-low latency?
│  ├─ Yes → In-memory (Redis, Memcached)
│  └─ No → Key-Value (DynamoDB, RocksDB)

Storage Comparison

Database Best For Throughput Consistency Cost
PostgreSQL Transactional, complex queries Medium Strong Low
MySQL Read-heavy, replication High reads Strong Low
MongoDB Flexible schema, rapid iteration Medium Eventual Medium
Cassandra Write-heavy, multi-DC Very high writes Tunable Medium
Redis Caching, sessions, pub/sub Very high Eventually Low (memory)
DynamoDB Serverless, predictable latency High Strong/Eventual Pay-per-request
ClickHouse Analytics, OLAP Very high (columnar) Eventually Medium
Elasticsearch Full-text search, logs High Eventually Medium-High

Caching Strategies

1. Cache-Aside (Lazy Loading)

Pattern: Application manages cache

def get_product(product_id):
    # Check cache
    product = cache.get(f"product:{product_id}")
    if product:
        return product

    # Cache miss: Load from DB
    product = db.query("SELECT * FROM products WHERE id = ?", product_id)

    # Store in cache
    cache.set(f"product:{product_id}", product, ttl=3600)
    return product

Pros: Simple, cache failures don’t break app Cons: Cache miss penalty, stale data possible

2. Write-Through

Pattern: Update cache and database together

def update_product(product_id, data):
    # Update DB
    db.execute("UPDATE products SET ... WHERE id = ?", data, product_id)

    # Update cache
    cache.set(f"product:{product_id}", data, ttl=3600)

Pros: Cache always fresh Cons: Write latency (two operations)

3. Write-Behind (Write-Back)

Pattern: Update cache first, DB asynchronously

def update_product(product_id, data):
    # Update cache immediately
    cache.set(f"product:{product_id}", data)

    # Queue DB update for later
    queue.enqueue('update_product_db', product_id, data)

Pros: Low write latency Cons: Data loss risk if cache fails before DB write

4. Refresh-Ahead

Pattern: Refresh cache before expiration

def get_product(product_id):
    product = cache.get(f"product:{product_id}")
    ttl = cache.ttl(f"product:{product_id}")

    # Refresh if TTL < 10% of original
    if ttl < 360:  # 10% of 3600s
        queue.enqueue('refresh_product_cache', product_id)

    return product

Pros: Reduces cache misses Cons: More complex, refresh overhead

Cache Invalidation

Problem: “There are only two hard things in Computer Science: cache invalidation and naming things.”

Strategies:

1. TTL-based (simplest):

cache.set("user:123", user_data, ttl=300)  # 5 minutes
# Pros: Simple, automatic cleanup
# Cons: May serve stale data for up to 5 minutes

2. Event-based:

def update_user(user_id, data):
    db.execute("UPDATE users SET ... WHERE id = ?", data, user_id)
    cache.delete(f"user:{user_id}")  # Invalidate immediately

    # Also invalidate derived caches
    cache.delete(f"user:{user_id}:posts")
    cache.delete(f"user:{user_id}:followers")

3. Version-based:

def get_user(user_id):
    version = db.query("SELECT version FROM users WHERE id = ?", user_id)
    cache_key = f"user:{user_id}:v{version}"

    user = cache.get(cache_key)
    if not user:
        user = db.query("SELECT * FROM users WHERE id = ?", user_id)
        cache.set(cache_key, user, ttl=3600)
    return user

def update_user(user_id, data):
    db.execute("UPDATE users SET version = version + 1, ... WHERE id = ?", data, user_id)
    # Old cache keys naturally expire, no need to delete

Message Queues & Async Processing

When to Use Queues

Use queues for:

  • Decoupling: Producer and consumer run independently
  • Load leveling: Handle traffic spikes without overload
  • Reliability: Retry failed jobs automatically
  • Async processing: Don’t block user requests

Queue Comparison

Queue Best For Guarantees Throughput
Redis Simple queues, fast At-most-once Very high
RabbitMQ Complex routing, reliability At-least-once High
Apache Kafka Event streaming, replay Exactly-once Very high
AWS SQS Serverless, simple At-least-once High
Google Pub/Sub GCP, fan-out At-least-once High

Pattern: Task Queue

# Producer (web app)
@app.post('/orders')
def create_order(order_data):
    # Save to DB
    order_id = db.insert("INSERT INTO orders ...", order_data)

    # Enqueue async tasks
    queue.enqueue('send_confirmation_email', order_id)
    queue.enqueue('update_inventory', order_id)
    queue.enqueue('notify_warehouse', order_id)

    # Return immediately
    return {'order_id': order_id, 'status': 'processing'}

# Consumer (worker)
def send_confirmation_email(order_id):
    order = db.query("SELECT * FROM orders WHERE id = ?", order_id)
    email_service.send(order.email, "Order confirmed", template)

Benefits:

  • User gets instant response
  • Email failures don’t fail order creation
  • Can scale workers independently

Pattern: Event Bus (Pub/Sub)

# Multiple consumers for same event
event_bus.publish('order.created', {
    'order_id': 123,
    'user_id': 456,
    'total': 99.99
})

# Subscriber 1: Email service
@event_bus.subscribe('order.created')
def send_email(event):
    email_service.send(...)

# Subscriber 2: Analytics
@event_bus.subscribe('order.created')
def track_analytics(event):
    analytics.track('Order Created', event)

# Subscriber 3: Inventory
@event_bus.subscribe('order.created')
def update_inventory(event):
    inventory.decrement(...)

Benefits:

  • Loose coupling (services don’t know about each other)
  • Easy to add new subscribers
  • Each service can fail independently

Retry Strategy

# Exponential backoff with max retries
@queue.job(retry=5, backoff='exponential')
def send_email(order_id):
    # Retry delays: 1s, 2s, 4s, 8s, 16s
    email_service.send(...)

# Dead letter queue for failed jobs
@queue.job(retry=3, on_failure='move_to_dlq')
def process_payment(order_id):
    # After 3 failures, move to DLQ for manual review
    payment_service.charge(...)

Data Pipeline Patterns

ETL (Extract, Transform, Load)

Use when: Batch processing, data warehouse

┌─────────┐     ┌───────────┐     ┌────────────┐
│ Sources │────►│ Transform │────►│   Target   │
│(DB, API)│     │  (Python) │     │(Warehouse) │
└─────────┘     └───────────┘     └────────────┘

Example: Daily sales report

# Extract
orders = db.query("SELECT * FROM orders WHERE created_at >= ?", yesterday)
users = db.query("SELECT * FROM users WHERE id IN (?)", order_user_ids)

# Transform
sales_data = []
for order in orders:
    user = users[order.user_id]
    sales_data.append({
        'date': order.created_at.date(),
        'user_region': user.region,
        'total': order.total,
        'category': categorize(order.items)
    })

# Load
warehouse.bulk_insert('daily_sales', sales_data)

Tools: Apache Airflow, dbt, Luigi

ELT (Extract, Load, Transform)

Use when: Cloud data warehouses with compute power

┌─────────┐     ┌────────────┐     ┌───────────┐
│ Sources │────►│   Target   │────►│ Transform │
│(DB, API)│     │(Warehouse) │     │   (SQL)   │
└─────────┘     └────────────┘     └───────────┘

Example:

-- 1. Load raw data (just copy)
COPY orders FROM 's3://bucket/orders.csv';

-- 2. Transform in warehouse (fast!)
CREATE TABLE sales_summary AS
SELECT
  DATE(created_at) as date,
  u.region,
  SUM(o.total) as total_sales,
  COUNT(*) as order_count
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE created_at >= CURRENT_DATE - 1
GROUP BY 1, 2;

Benefit: Leverage warehouse compute, transform at scale

Tools: Snowflake, BigQuery, Redshift, dbt

Streaming (Real-time)

Use when: Need low-latency processing

┌─────────┐     ┌────────┐     ┌───────────┐
│ Sources │────►│ Kafka  │────►│ Processor │
│(Events) │     │(Buffer)│     │ (Flink)   │
└─────────┘     └────────┘     └───────────┘

Example: Real-time fraud detection

# Stream processor
stream = kafka.subscribe('transactions')

for transaction in stream:
    # Process each transaction in real-time
    risk_score = fraud_model.predict(transaction)

    if risk_score > 0.8:
        # Flag for review
        alert_service.notify('High risk transaction', transaction)
        db.execute("UPDATE transactions SET status = 'review' WHERE id = ?",
                   transaction.id)

Tools: Apache Kafka, Flink, Spark Streaming, AWS Kinesis


Monitoring & Observability

Key Metrics

Database:

  • Query latency (p50, p95, p99)
  • Connection pool utilization
  • Replication lag (for replicas)
  • Cache hit rate
  • Slow query count

Queues:

  • Queue depth (backlog size)
  • Processing rate (messages/sec)
  • Error rate
  • Dead letter queue size

Application:

  • Request throughput (req/sec)
  • Error rate (%)
  • Response time (p95, p99)

Alerting Thresholds

# Example alert rules
alerts:
  - name: HighDatabaseLatency
    condition: db.query.p95 > 200ms for 5 minutes
    severity: warning

  - name: HighReplicationLag
    condition: db.replication_lag > 10s for 2 minutes
    severity: critical

  - name: LowCacheHitRate
    condition: cache.hit_rate < 70% for 10 minutes
    severity: warning

  - name: QueueBacklog
    condition: queue.depth > 10000 for 5 minutes
    severity: warning

Migration Strategies

Database Migration (Zero Downtime)

1. Dual Writes:

# Phase 1: Write to both old and new DB
def create_user(data):
    user_id = old_db.insert("INSERT INTO users ...", data)
    new_db.insert("INSERT INTO users ...", data)  # Also write to new
    return user_id

# Phase 2: Backfill historical data
# (Run in background)
for user in old_db.query("SELECT * FROM users"):
    new_db.insert("INSERT INTO users ...", user)

# Phase 3: Read from new DB, verify against old
def get_user(user_id):
    new_user = new_db.query("SELECT * FROM users WHERE id = ?", user_id)
    old_user = old_db.query("SELECT * FROM users WHERE id = ?", user_id)

    if new_user != old_user:
        logger.error("Data mismatch!", user_id=user_id)

    return new_user

# Phase 4: Stop writing to old DB
# Phase 5: Decommission old DB

Resharding

Problem: Shard capacity full, need to split

Strategy: Virtual shards

# Instead of:
shard = user_id % 4  # Hard to change!

# Use:
virtual_shard = user_id % 1024  # Many virtual shards
physical_shard = shard_map[virtual_shard]  # Map to physical shards

# shard_map initially:
# {0-255: 'shard_1', 256-511: 'shard_2', ...}

# Later, easily rebalance:
# {0-127: 'shard_1', 128-255: 'shard_5', ...}

Output Format

When helping with infrastructure scaling:

## Current Architecture Analysis

### Bottlenecks Identified
- [Specific bottleneck 1]
- [Specific bottleneck 2]

### Recommended Architecture

[Architecture diagram in text]

### Components to Add
1. [Component]: [Why + how]
2. [Component]: [Why + how]

### Migration Plan

Phase 1: [Description]
- Week 1: [Tasks]
- Week 2: [Tasks]

Phase 2: [Description]
...

### Cost Impact
- Current: $[X]/month
- Projected: $[Y]/month
- ROI: [Justification]

### Risk Mitigation
- Risk 1: [Mitigation]
- Risk 2: [Mitigation]

Integration

Works with:

  • scalable-data-schema – Schema design for scaled infrastructure
  • multi-source-data-conflation – Merging data from multiple sources
  • data-provenance – Tracking data lineage in pipelines
  • systems-decompose – Feature-driven infrastructure planning