python-async-workers

📁 lct1407/sidcorp-skills 📅 Jan 19, 2026
1
总安装量
0
周安装量
#51309
全站排名
安装命令
npx skills add https://github.com/lct1407/sidcorp-skills --skill python-async-workers

Skill 文档

Python Async Workers

Background processing patterns for FastAPI applications.

Architecture Overview

┌─────────────────────────────────────────────────────────────┐
│                      FastAPI Application                     │
├─────────────────────────────────────────────────────────────┤
│  API Request  │  Cron Scheduler  │  Queue Consumer          │
│      ↓        │        ↓         │        ↓                 │
│  Enqueue Job  │  Trigger Task    │  Process Message         │
│      ↓        │        ↓         │        ↓                 │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   Service Layer                      │   │
│  │              (Shared Business Logic)                 │   │
│  └─────────────────────────────────────────────────────┘   │
│      ↓                   ↓                  ↓               │
│  Message Queue      Database            External APIs       │
│  (RabbitMQ/Redis)   (PostgreSQL)                            │
└─────────────────────────────────────────────────────────────┘

Components

Component Purpose Location
Cron Jobs Scheduled recurring tasks app/jobs/
Celery Tasks Distributed task queue app/tasks/
Queue Consumers Message processing app/consumers/
Workers Background processors app/workers/

When to Use What

Use Case Solution
Run every X minutes/hours Cron (APScheduler)
Async processing after API call Celery Task
Process messages from external system Queue Consumer
Long-running background process Worker with asyncio
Distributed across multiple servers Celery + RabbitMQ
Simple in-process background FastAPI BackgroundTasks

Key Principles

1. Reuse Service Layer

Workers should call existing services, not duplicate logic:

# GOOD: Reuse service
async def process_order(order_id: int):
    async with get_session() as session:
        service = OrderService(session)
        await service.process(order_id)

# BAD: Duplicate logic in worker
async def process_order(order_id: int):
    # Don't copy-paste service code here!
    pass

2. Idempotency

All background tasks must be idempotent (safe to retry):

# GOOD: Check before processing
async def send_email(user_id: int, email_type: str):
    if await was_email_sent(user_id, email_type):
        return  # Already sent, skip
    await do_send_email(user_id, email_type)
    await mark_email_sent(user_id, email_type)

3. Error Handling

Always handle errors gracefully with retries:

@celery.task(bind=True, max_retries=3)
def process_payment(self, payment_id: int):
    try:
        # Process
        pass
    except TransientError as e:
        raise self.retry(exc=e, countdown=60)
    except PermanentError as e:
        # Log and don't retry
        logger.error(f"Payment {payment_id} failed permanently: {e}")

Reference Navigation

Scheduling:

Task Queues:

Message Queues:

Integration with python-backend-development

This skill extends python-backend-development:

  • Workers call Services from app/services/
  • Use same Repository pattern for database access
  • Follow same async patterns (async def, await)
  • Use same error handling (custom exceptions)
  • Share configuration from app/core/config.py