implement-cqrs-handler
npx skills add https://github.com/dawiddutoit/custom-claude --skill implement-cqrs-handler
Agent 安装分布
Skill 文档
Works with Python handlers in application/commands/ and application/queries/.
Implement CQRS Handler
Purpose
Create command or query handlers following the project’s CQRS pattern with proper separation of write (command) and read (query) operations, ServiceResult pattern, and dependency injection.
When to Use
Use this skill when:
- Implementing new use cases – Creating new application layer operations
- Adding features – Building new functionality that modifies or queries data
- Creating API endpoints – Wiring business logic to interface layer
- Building application layer logic – Orchestrating domain services and repositories
Trigger phrases:
- “Create a command handler for X”
- “Implement a query to retrieve Y”
- “Add a new use case for Z”
- “Build handler for feature X”
Quick Start
Command Handler (writes, returns ServiceResult[None]):
# src/{{PROJECT_NAME}}/application/commands/my_command.py
from {{PROJECT_NAME}}.application.commands.base import CommandHandler
from {{PROJECT_NAME}}.domain.common import ServiceResult
class MyCommandHandler(CommandHandler[MyCommand]):
def __init__(self, repository: MyRepository):
self.repository = repository
async def handle(self, command: MyCommand) -> ServiceResult[None]:
# Implementation
return ServiceResult.ok(None)
Query Handler (reads, returns ServiceResult[TResult]):
# src/{{PROJECT_NAME}}/application/queries/my_query.py
from {{PROJECT_NAME}}.application.queries.base import QueryHandler
from {{PROJECT_NAME}}.application.dto.my_dto import MyDTO
from {{PROJECT_NAME}}.domain.common import ServiceResult
class MyQueryHandler(QueryHandler[MyQuery, list[MyDTO]]):
def __init__(self, repository: MyRepository):
self.repository = repository
async def handle(self, query: MyQuery) -> ServiceResult[list[MyDTO]]:
# Implementation
return ServiceResult.ok(results)
Table of Contents
Core Sections
- Purpose – What this skill helps you build
- Quick Start – Immediate working examples for commands and queries
- Instructions – Complete implementation guide
- Step 1: Decide Command vs Query – When to use each pattern
- Step 2: Create Request Object – Define command/query data structures
- Step 3: Create Handler – Implement handler logic with templates
- Step 4: Create DTO – Data transfer objects for queries
- Step 5: Register in Container – Dependency injection setup
- Step 6: Wire to MCP Tool – Expose handler via MCP interface
Supporting Resources
- Templates – Handler skeletons and boilerplate code
- templates/command_handler.py – Command handler skeleton
- templates/query_handler.py – Query handler skeleton
- References – CQRS pattern deep dive and architecture
- Requirements – Pattern compliance checklist
Utility Scripts
- Generate Handler – Auto-generate CQRS handler boilerplate code
- List Handlers – List and inventory all CQRS handlers in the project
- Validate CQRS Separation – Validate CQRS pattern separation in handlers
Related Documentation
- ARCHITECTURE.md – System architecture overview
- CLAUDE.md – Core rules and patterns
Instructions
Step 1: Decide Command vs Query
Use Command when:
- Writing/modifying data
- Changing system state
- Side effects required
- Examples: IndexFile, DeleteFile, InitializeProject
Use Query when:
- Reading data only
- No state changes
- Retrieving information
- Examples: SearchCode, GetStats, FindRelated
Step 2: Create Request Object
For Commands:
# src/{{PROJECT_NAME}}/application/commands/my_command_command.py
from dataclasses import dataclass
from {{PROJECT_NAME}}.application.commands.base import Command
@dataclass
class MyCommand(Command):
"""Command to perform write operation.
Contains all data needed for the operation.
"""
param1: str
param2: int
force: bool = False
For Queries:
# src/{{PROJECT_NAME}}/application/queries/my_query.py
from dataclasses import dataclass
from {{PROJECT_NAME}}.application.queries.base import Query
@dataclass
class MyQuery(Query):
"""Query to retrieve data.
Contains parameters for data retrieval.
"""
filter_by: str
limit: int = 10
Step 3: Create Handler
Command Handler Template:
from {{PROJECT_NAME}}.application.commands.base import CommandHandler
from {{PROJECT_NAME}}.core.monitoring import get_logger, traced
from {{PROJECT_NAME}}.domain.common import ServiceResult
logger = get_logger(__name__)
class MyCommandHandler(CommandHandler[MyCommand]):
"""Handler for MyCommand.
Orchestrates the operation using domain services.
"""
def __init__(
self,
repository: MyRepository,
service: MyService,
):
"""Initialize with required dependencies.
Args:
repository: Repository for persistence
service: Service for business logic
"""
if not repository:
raise ValueError("Repository required")
if not service:
raise ValueError("Service required")
self.repository = repository
self.service = service
@traced
async def handle(self, command: MyCommand) -> ServiceResult[None]:
"""Execute the command.
Args:
command: The command to execute
Returns:
ServiceResult indicating success or failure
"""
try:
# 1. Validate inputs
validation = self._validate(command)
if not validation.success:
return validation
# 2. Execute business logic
result = await self.service.do_work(command.param1)
if not result.success:
return ServiceResult.fail(result.error)
# 3. Persist changes
save_result = await self.repository.save(result.data)
if not save_result.success:
return ServiceResult.fail(f"Save failed: {save_result.error}")
logger.info(f"Command completed: {command.param1}")
return ServiceResult.ok(
None,
items_processed=1,
operation="my_command"
)
except Exception as e:
logger.exception(f"Command failed: {str(e)}")
return ServiceResult.fail(f"Unexpected error: {str(e)}")
def _validate(self, command: MyCommand) -> ServiceResult[None]:
"""Validate command parameters."""
if not command.param1:
return ServiceResult.fail("param1 is required")
return ServiceResult.ok(None)
Query Handler Template:
from {{PROJECT_NAME}}.application.queries.base import QueryHandler
from {{PROJECT_NAME}}.application.dto.my_dto import MyDTO
from {{PROJECT_NAME}}.core.monitoring import get_logger, traced
from {{PROJECT_NAME}}.domain.common import ServiceResult
logger = get_logger(__name__)
class MyQueryHandler(QueryHandler[MyQuery, list[MyDTO]]):
"""Handler for MyQuery.
Retrieves data without modifying system state.
"""
def __init__(
self,
repository: MyRepository,
):
"""Initialize with required dependencies.
Args:
repository: Repository for data retrieval
"""
if not repository:
raise ValueError("Repository required")
self.repository = repository
@traced
async def handle(self, query: MyQuery) -> ServiceResult[list[MyDTO]]:
"""Execute the query.
Args:
query: The query to execute
Returns:
ServiceResult containing the requested data or error
"""
try:
# 1. Validate query
if query.limit < 1:
return ServiceResult.fail("Limit must be at least 1")
# 2. Fetch data
results = await self.repository.find_by_filter(
filter_by=query.filter_by,
limit=query.limit
)
if not results.success:
return ServiceResult.fail(results.error)
# 3. Convert to DTOs
dtos = [MyDTO.from_entity(entity) for entity in results.data]
logger.info(f"Query returned {len(dtos)} results")
return ServiceResult.ok(
dtos,
metadata={"total_results": len(dtos), "filter": query.filter_by}
)
except Exception as e:
logger.exception(f"Query failed: {str(e)}")
return ServiceResult.fail(f"Unexpected error: {str(e)}")
Step 4: Create DTO (Queries Only)
If query returns complex data, create a DTO:
# src/{{PROJECT_NAME}}/application/dto/my_dto.py
from dataclasses import dataclass
from typing import Any
@dataclass
class MyDTO:
"""Data transfer object for query results.
Carries data from application layer to interface layer.
Contains only data, no business logic.
"""
id: str
name: str
value: int
metadata: dict[str, Any]
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"id": self.id,
"name": self.name,
"value": self.value,
"metadata": self.metadata,
}
@classmethod
def from_entity(cls, entity) -> "MyDTO":
"""Create from domain entity."""
return cls(
id=entity.id,
name=entity.name,
value=entity.value,
metadata=entity.metadata,
)
Step 5: Register in Container
Add handler to src/{{PROJECT_NAME}}/interfaces/mcp/container.py:
# 1. Import handler at top
from {{PROJECT_NAME}}.application.commands.my_command import MyCommandHandler
# 2. Add provider in Container class
class Container(containers.DeclarativeContainer):
# ... existing providers ...
# Command Handlers
my_command_handler = providers.Singleton(
MyCommandHandler,
repository=code_repository, # Inject dependencies
service=my_service,
)
Step 6: Wire to MCP Tool
Add MCP tool in src/{{PROJECT_NAME}}/interfaces/mcp/server.py:
@mcp.tool()
async def my_operation(param1: str, param2: int) -> dict[str, Any]:
"""Perform my operation.
Args:
param1: Description
param2: Description
Returns:
Result dictionary
"""
command = MyCommand(param1=param1, param2=param2)
result = await container.my_command_handler().handle(command)
if not result.success:
return {"success": False, "error": result.error}
return {"success": True, "message": "Operation completed"}
Pattern Examples
The Quick Start and Instructions sections above provide complete working examples:
- Command handler with validation (Step 3)
- Query handler with DTO conversion (Step 3)
- DTO creation pattern (Step 4)
- Container registration (Step 5)
- MCP tool wiring (Step 6)
Requirements
- Handler extends
CommandHandler[TCommand]orQueryHandler[TQuery, TResult] - Request object extends
CommandorQuery - Return type is always
ServiceResult[T] - Commands return
ServiceResult[None] - Queries return
ServiceResult[TResult](often DTOs) - All dependencies injected via constructor
- Constructor validates dependencies (fail-fast)
- Use
@traceddecorator for OpenTelemetry tracing - Use
get_logger(__name__)for logging - No optional dependencies – all required
See Also
- templates/command_handler.py – Command handler skeleton
- templates/query_handler.py – Query handler skeleton
- references/reference.md – CQRS pattern deep dive
- ARCHITECTURE.md – System architecture overview
- CLAUDE.md – Core rules and patterns