cqrs-patterns
11
总安装量
11
周安装量
#27140
全站排名
安装命令
npx skills add https://github.com/yonatangross/orchestkit --skill cqrs-patterns
Agent 安装分布
claude-code
8
antigravity
6
gemini-cli
6
opencode
6
windsurf
5
github-copilot
5
Skill 文档
CQRS Patterns
Separate read and write concerns for optimized data access.
Overview
- Read-heavy workloads with complex queries
- Different scaling requirements for reads vs writes
- Event sourcing implementations
- Multiple read model representations of same data
- Complex domain models with simple read requirements
When NOT to Use
- Simple CRUD applications
- Strong consistency requirements everywhere
- Small datasets with simple queries
Architecture Overview
âââââââââââââââââââ âââââââââââââââââââ
â Write Side â â Read Side â
ââââââââââââââââââ⤠âââââââââââââââââââ¤
â âââââââââââââ â â âââââââââââââ â
â â Commands â â â â Queries â â
â âââââââ¬ââââââ â â âââââââ¬ââââââ â
â âââââââ¼ââââââ â â âââââââ¼ââââââ â
â â Aggregate â â â âRead Model â â
â âââââââ¬ââââââ â â âââââââââââââ â
â âââââââ¼ââââââ â â â² â
â â Events ââââ¼ââââââââââ¼âââââââââ â
â âââââââââââââ â Publish â Project â
âââââââââââââââââââ âââââââââââââââââââ
Command Side (Write Model)
Command and Handler
from pydantic import BaseModel, Field
from uuid import UUID, uuid4
from datetime import datetime, timezone
from abc import ABC, abstractmethod
class Command(BaseModel):
"""Base command with metadata."""
command_id: UUID = Field(default_factory=uuid4)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
user_id: UUID | None = None
class CreateOrder(Command):
customer_id: UUID
items: list[OrderItem]
shipping_address: Address
class CommandHandler(ABC):
@abstractmethod
async def handle(self, command: Command) -> list["DomainEvent"]:
pass
class CreateOrderHandler(CommandHandler):
def __init__(self, order_repo, inventory_service):
self.order_repo = order_repo
self.inventory = inventory_service
async def handle(self, command: CreateOrder) -> list[DomainEvent]:
for item in command.items:
if not await self.inventory.check_availability(item.product_id, item.quantity):
raise InsufficientInventoryError(item.product_id)
order = Order.create(
customer_id=command.customer_id,
items=command.items,
shipping_address=command.shipping_address,
)
await self.order_repo.save(order)
return order.pending_events
class CommandBus:
def __init__(self):
self._handlers: dict[type, CommandHandler] = {}
def register(self, command_type: type, handler: CommandHandler):
self._handlers[command_type] = handler
async def dispatch(self, command: Command) -> list[DomainEvent]:
handler = self._handlers.get(type(command))
if not handler:
raise NoHandlerFoundError(type(command))
events = await handler.handle(command)
for event in events:
await self.event_publisher.publish(event)
return events
Write Model (Aggregate)
from dataclasses import dataclass, field
@dataclass
class Order:
id: UUID
customer_id: UUID
items: list[OrderItem]
status: OrderStatus
_pending_events: list[DomainEvent] = field(default_factory=list)
@classmethod
def create(cls, customer_id: UUID, items: list, shipping_address: Address) -> "Order":
order = cls(id=uuid4(), customer_id=customer_id, items=[], status=OrderStatus.PENDING)
for item in items:
order.items.append(item)
order._raise_event(OrderItemAdded(order_id=order.id, product_id=item.product_id))
order._raise_event(OrderCreated(order_id=order.id, customer_id=customer_id))
return order
def cancel(self, reason: str):
if self.status == OrderStatus.SHIPPED:
raise InvalidOperationError("Cannot cancel shipped order")
self.status = OrderStatus.CANCELLED
self._raise_event(OrderCancelled(order_id=self.id, reason=reason))
def _raise_event(self, event: DomainEvent):
self._pending_events.append(event)
@property
def pending_events(self) -> list[DomainEvent]:
events = self._pending_events.copy()
self._pending_events.clear()
return events
Query Side (Read Model)
Query Handler
class Query(BaseModel):
pass
class GetOrderById(Query):
order_id: UUID
class GetOrdersByCustomer(Query):
customer_id: UUID
status: OrderStatus | None = None
page: int = 1
page_size: int = 20
class GetOrderByIdHandler:
def __init__(self, read_db):
self.db = read_db
async def handle(self, query: GetOrderById) -> OrderView | None:
row = await self.db.fetchrow(
"SELECT * FROM order_summary WHERE id = $1", query.order_id
)
return OrderView(**row) if row else None
class OrderView(BaseModel):
"""Denormalized read model for orders."""
id: UUID
customer_id: UUID
customer_name: str # Denormalized
status: str
total_amount: float
item_count: int
created_at: datetime
Projections
class OrderProjection:
"""Projects events to read models."""
def __init__(self, read_db, customer_service):
self.db = read_db
self.customers = customer_service
async def handle(self, event: DomainEvent):
match event:
case OrderCreated():
await self._on_order_created(event)
case OrderItemAdded():
await self._on_item_added(event)
case OrderCancelled():
await self._on_order_cancelled(event)
async def _on_order_created(self, event: OrderCreated):
customer = await self.customers.get(event.customer_id)
await self.db.execute(
"""INSERT INTO order_summary (id, customer_id, customer_name, status, total_amount, item_count, created_at)
VALUES ($1, $2, $3, 'pending', 0.0, 0, $4)
ON CONFLICT (id) DO UPDATE SET customer_name = $3""",
event.order_id, event.customer_id, customer.name, event.timestamp,
)
async def _on_item_added(self, event: OrderItemAdded):
subtotal = event.quantity * event.unit_price
await self.db.execute(
"UPDATE order_summary SET total_amount = total_amount + $1, item_count = item_count + 1 WHERE id = $2",
subtotal, event.order_id,
)
async def _on_order_cancelled(self, event: OrderCancelled):
await self.db.execute(
"UPDATE order_summary SET status = 'cancelled' WHERE id = $1", event.order_id
)
FastAPI Integration
from fastapi import FastAPI, Depends, HTTPException
app = FastAPI()
@app.post("/api/v1/orders", status_code=201)
async def create_order(request: CreateOrderRequest, bus: CommandBus = Depends(get_command_bus)):
command = CreateOrder(
customer_id=request.customer_id,
items=request.items,
shipping_address=request.shipping_address,
)
try:
events = await bus.dispatch(command)
return {"order_id": events[0].order_id}
except InsufficientInventoryError as e:
raise HTTPException(400, f"Insufficient inventory: {e}")
@app.get("/api/v1/orders/{order_id}")
async def get_order(order_id: UUID, bus: QueryBus = Depends(get_query_bus)):
order = await bus.dispatch(GetOrderById(order_id=order_id))
if not order:
raise HTTPException(404, "Order not found")
return order
Key Decisions
| Decision | Recommendation |
|---|---|
| Consistency | Eventual consistency between write and read models |
| Event storage | Event store for write side, denormalized tables for read |
| Projection lag | Monitor and alert on projection delay |
| Read model count | Start with one, add more for specific query needs |
| Rebuild strategy | Ability to rebuild projections from events |
Anti-Patterns (FORBIDDEN)
# NEVER query write model for reads
order = await aggregate_repo.get(order_id) # WRONG
# CORRECT: Use read model
order = await query_bus.dispatch(GetOrderById(order_id=order_id))
# NEVER modify read model directly
await read_db.execute("UPDATE orders SET status = $1", status) # WRONG
# CORRECT: Dispatch command, let projection update
await bus.dispatch(UpdateOrderStatus(order_id=order_id, status=status))
# NEVER skip projection idempotency - use UPSERT
await self.db.execute("INSERT INTO ... ON CONFLICT (id) DO UPDATE SET ...")
Related Skills
event-sourcing– Event-sourced write modelssaga-patterns– Cross-aggregate transactionsdatabase-schema-designer– Read model schema design