implement-cqrs-handler

📁 dawiddutoit/custom-claude 📅 Jan 26, 2026
4
总安装量
4
周安装量
#51358
全站排名
安装命令
npx skills add https://github.com/dawiddutoit/custom-claude --skill implement-cqrs-handler

Agent 安装分布

mcpjam 4
neovate 4
gemini-cli 4
antigravity 4
windsurf 4
zencoder 4

Skill 文档

Works with Python handlers in application/commands/ and application/queries/.

Implement CQRS Handler

Purpose

Create command or query handlers following the project’s CQRS pattern with proper separation of write (command) and read (query) operations, ServiceResult pattern, and dependency injection.

When to Use

Use this skill when:

  • Implementing new use cases – Creating new application layer operations
  • Adding features – Building new functionality that modifies or queries data
  • Creating API endpoints – Wiring business logic to interface layer
  • Building application layer logic – Orchestrating domain services and repositories

Trigger phrases:

  • “Create a command handler for X”
  • “Implement a query to retrieve Y”
  • “Add a new use case for Z”
  • “Build handler for feature X”

Quick Start

Command Handler (writes, returns ServiceResult[None]):

# src/{{PROJECT_NAME}}/application/commands/my_command.py
from {{PROJECT_NAME}}.application.commands.base import CommandHandler
from {{PROJECT_NAME}}.domain.common import ServiceResult

class MyCommandHandler(CommandHandler[MyCommand]):
    def __init__(self, repository: MyRepository):
        self.repository = repository

    async def handle(self, command: MyCommand) -> ServiceResult[None]:
        # Implementation
        return ServiceResult.ok(None)

Query Handler (reads, returns ServiceResult[TResult]):

# src/{{PROJECT_NAME}}/application/queries/my_query.py
from {{PROJECT_NAME}}.application.queries.base import QueryHandler
from {{PROJECT_NAME}}.application.dto.my_dto import MyDTO
from {{PROJECT_NAME}}.domain.common import ServiceResult

class MyQueryHandler(QueryHandler[MyQuery, list[MyDTO]]):
    def __init__(self, repository: MyRepository):
        self.repository = repository

    async def handle(self, query: MyQuery) -> ServiceResult[list[MyDTO]]:
        # Implementation
        return ServiceResult.ok(results)

Table of Contents

Core Sections

Supporting Resources

Utility Scripts

Related Documentation

  • ARCHITECTURE.md – System architecture overview
  • CLAUDE.md – Core rules and patterns

Instructions

Step 1: Decide Command vs Query

Use Command when:

  • Writing/modifying data
  • Changing system state
  • Side effects required
  • Examples: IndexFile, DeleteFile, InitializeProject

Use Query when:

  • Reading data only
  • No state changes
  • Retrieving information
  • Examples: SearchCode, GetStats, FindRelated

Step 2: Create Request Object

For Commands:

# src/{{PROJECT_NAME}}/application/commands/my_command_command.py
from dataclasses import dataclass
from {{PROJECT_NAME}}.application.commands.base import Command

@dataclass
class MyCommand(Command):
    """Command to perform write operation.

    Contains all data needed for the operation.
    """
    param1: str
    param2: int
    force: bool = False

For Queries:

# src/{{PROJECT_NAME}}/application/queries/my_query.py
from dataclasses import dataclass
from {{PROJECT_NAME}}.application.queries.base import Query

@dataclass
class MyQuery(Query):
    """Query to retrieve data.

    Contains parameters for data retrieval.
    """
    filter_by: str
    limit: int = 10

Step 3: Create Handler

Command Handler Template:

from {{PROJECT_NAME}}.application.commands.base import CommandHandler
from {{PROJECT_NAME}}.core.monitoring import get_logger, traced
from {{PROJECT_NAME}}.domain.common import ServiceResult

logger = get_logger(__name__)

class MyCommandHandler(CommandHandler[MyCommand]):
    """Handler for MyCommand.

    Orchestrates the operation using domain services.
    """

    def __init__(
        self,
        repository: MyRepository,
        service: MyService,
    ):
        """Initialize with required dependencies.

        Args:
            repository: Repository for persistence
            service: Service for business logic
        """
        if not repository:
            raise ValueError("Repository required")
        if not service:
            raise ValueError("Service required")
        self.repository = repository
        self.service = service

    @traced
    async def handle(self, command: MyCommand) -> ServiceResult[None]:
        """Execute the command.

        Args:
            command: The command to execute

        Returns:
            ServiceResult indicating success or failure
        """
        try:
            # 1. Validate inputs
            validation = self._validate(command)
            if not validation.success:
                return validation

            # 2. Execute business logic
            result = await self.service.do_work(command.param1)
            if not result.success:
                return ServiceResult.fail(result.error)

            # 3. Persist changes
            save_result = await self.repository.save(result.data)
            if not save_result.success:
                return ServiceResult.fail(f"Save failed: {save_result.error}")

            logger.info(f"Command completed: {command.param1}")
            return ServiceResult.ok(
                None,
                items_processed=1,
                operation="my_command"
            )

        except Exception as e:
            logger.exception(f"Command failed: {str(e)}")
            return ServiceResult.fail(f"Unexpected error: {str(e)}")

    def _validate(self, command: MyCommand) -> ServiceResult[None]:
        """Validate command parameters."""
        if not command.param1:
            return ServiceResult.fail("param1 is required")
        return ServiceResult.ok(None)

Query Handler Template:

from {{PROJECT_NAME}}.application.queries.base import QueryHandler
from {{PROJECT_NAME}}.application.dto.my_dto import MyDTO
from {{PROJECT_NAME}}.core.monitoring import get_logger, traced
from {{PROJECT_NAME}}.domain.common import ServiceResult

logger = get_logger(__name__)

class MyQueryHandler(QueryHandler[MyQuery, list[MyDTO]]):
    """Handler for MyQuery.

    Retrieves data without modifying system state.
    """

    def __init__(
        self,
        repository: MyRepository,
    ):
        """Initialize with required dependencies.

        Args:
            repository: Repository for data retrieval
        """
        if not repository:
            raise ValueError("Repository required")
        self.repository = repository

    @traced
    async def handle(self, query: MyQuery) -> ServiceResult[list[MyDTO]]:
        """Execute the query.

        Args:
            query: The query to execute

        Returns:
            ServiceResult containing the requested data or error
        """
        try:
            # 1. Validate query
            if query.limit < 1:
                return ServiceResult.fail("Limit must be at least 1")

            # 2. Fetch data
            results = await self.repository.find_by_filter(
                filter_by=query.filter_by,
                limit=query.limit
            )
            if not results.success:
                return ServiceResult.fail(results.error)

            # 3. Convert to DTOs
            dtos = [MyDTO.from_entity(entity) for entity in results.data]

            logger.info(f"Query returned {len(dtos)} results")
            return ServiceResult.ok(
                dtos,
                metadata={"total_results": len(dtos), "filter": query.filter_by}
            )

        except Exception as e:
            logger.exception(f"Query failed: {str(e)}")
            return ServiceResult.fail(f"Unexpected error: {str(e)}")

Step 4: Create DTO (Queries Only)

If query returns complex data, create a DTO:

# src/{{PROJECT_NAME}}/application/dto/my_dto.py
from dataclasses import dataclass
from typing import Any

@dataclass
class MyDTO:
    """Data transfer object for query results.

    Carries data from application layer to interface layer.
    Contains only data, no business logic.
    """
    id: str
    name: str
    value: int
    metadata: dict[str, Any]

    def to_dict(self) -> dict[str, Any]:
        """Convert to dictionary for serialization."""
        return {
            "id": self.id,
            "name": self.name,
            "value": self.value,
            "metadata": self.metadata,
        }

    @classmethod
    def from_entity(cls, entity) -> "MyDTO":
        """Create from domain entity."""
        return cls(
            id=entity.id,
            name=entity.name,
            value=entity.value,
            metadata=entity.metadata,
        )

Step 5: Register in Container

Add handler to src/{{PROJECT_NAME}}/interfaces/mcp/container.py:

# 1. Import handler at top
from {{PROJECT_NAME}}.application.commands.my_command import MyCommandHandler

# 2. Add provider in Container class
class Container(containers.DeclarativeContainer):
    # ... existing providers ...

    # Command Handlers
    my_command_handler = providers.Singleton(
        MyCommandHandler,
        repository=code_repository,  # Inject dependencies
        service=my_service,
    )

Step 6: Wire to MCP Tool

Add MCP tool in src/{{PROJECT_NAME}}/interfaces/mcp/server.py:

@mcp.tool()
async def my_operation(param1: str, param2: int) -> dict[str, Any]:
    """Perform my operation.

    Args:
        param1: Description
        param2: Description

    Returns:
        Result dictionary
    """
    command = MyCommand(param1=param1, param2=param2)
    result = await container.my_command_handler().handle(command)

    if not result.success:
        return {"success": False, "error": result.error}

    return {"success": True, "message": "Operation completed"}

Pattern Examples

The Quick Start and Instructions sections above provide complete working examples:

  • Command handler with validation (Step 3)
  • Query handler with DTO conversion (Step 3)
  • DTO creation pattern (Step 4)
  • Container registration (Step 5)
  • MCP tool wiring (Step 6)

Requirements

  • Handler extends CommandHandler[TCommand] or QueryHandler[TQuery, TResult]
  • Request object extends Command or Query
  • Return type is always ServiceResult[T]
  • Commands return ServiceResult[None]
  • Queries return ServiceResult[TResult] (often DTOs)
  • All dependencies injected via constructor
  • Constructor validates dependencies (fail-fast)
  • Use @traced decorator for OpenTelemetry tracing
  • Use get_logger(__name__) for logging
  • No optional dependencies – all required

See Also