saga-orchestration
26
总安装量
6
周安装量
#14280
全站排名
安装命令
npx skills add https://github.com/eyadsibai/ltk --skill saga-orchestration
Agent 安装分布
gemini-cli
5
antigravity
5
claude-code
5
cursor
4
qwen-code
4
windsurf
4
Skill 文档
Saga Orchestration
Patterns for managing distributed transactions and long-running business processes.
Saga Types
Choreography
âââââââ âââââââ âââââââ
âSvc AâââºâSvc BâââºâSvc Câ
âââââââ âââââââ âââââââ
â â â
â¼ â¼ â¼
Event Event Event
- Services react to events
- Decentralized control
- Good for simple flows
Orchestration
âââââââââââââââ
â Orchestratorâ
ââââââââ¬âââââââ
â
âââââââ¼ââââââ
â¼ â¼ â¼
ââââââââââââââââââ
âSvc1ââSvc2ââSvc3â
ââââââââââââââââââ
- Central coordinator
- Explicit control flow
- Better for complex flows
Saga States
| State | Description |
|---|---|
| Started | Saga initiated |
| Pending | Waiting for step |
| Compensating | Rolling back |
| Completed | All steps succeeded |
| Failed | Failed after compensation |
Orchestrator Implementation
@dataclass
class SagaStep:
name: str
action: str
compensation: str
status: str = "pending"
class SagaOrchestrator:
async def execute(self, data: dict) -> SagaResult:
completed_steps = []
context = {"data": data}
for step in self.steps:
result = await step.action(context)
if not result.success:
await self.compensate(completed_steps, context)
return SagaResult(status="failed", error=result.error)
completed_steps.append(step)
context.update(result.data)
return SagaResult(status="completed", data=context)
async def compensate(self, completed_steps, context):
for step in reversed(completed_steps):
await step.compensation(context)
Order Fulfillment Saga Example
class OrderFulfillmentSaga(SagaOrchestrator):
def define_steps(self, data):
return [
SagaStep("reserve_inventory",
action=self.reserve_inventory,
compensation=self.release_inventory),
SagaStep("process_payment",
action=self.process_payment,
compensation=self.refund_payment),
SagaStep("create_shipment",
action=self.create_shipment,
compensation=self.cancel_shipment),
]
Choreography Example
class OrderChoreographySaga:
def __init__(self, event_bus):
self.event_bus = event_bus
event_bus.subscribe("OrderCreated", self._on_order_created)
event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
event_bus.subscribe("PaymentFailed", self._on_payment_failed)
async def _on_order_created(self, event):
await self.event_bus.publish("ReserveInventory", {
"order_id": event["order_id"],
"items": event["items"]
})
async def _on_payment_failed(self, event):
# Compensation
await self.event_bus.publish("ReleaseInventory", {
"reservation_id": event["reservation_id"]
})
Best Practices
- Make steps idempotent – Safe to retry
- Design compensations carefully – Must always work
- Use correlation IDs – For tracing across services
- Implement timeouts – Don’t wait forever
- Log everything – For debugging failures
When to Use
Orchestration:
- Complex multi-step workflows
- Need visibility into saga state
- Central error handling
Choreography:
- Simple event flows
- Loose coupling required
- Independent service teams