tasks-module

📁 psincraian/myfy 📅 1 day ago
0
总安装量
1
周安装量
安装命令
npx skills add https://github.com/psincraian/myfy --skill tasks-module

Agent 安装分布

amp 1
cline 1
opencode 1
cursor 1
continue 1
kimi-cli 1

Skill 文档

TasksModule – Background Jobs

TasksModule provides SQL-based async task processing with DI injection and automatic retries.

Quick Start

from myfy.core import Application
from myfy.data import DataModule
from myfy.tasks import TasksModule, task

app = Application()
app.add_module(DataModule())
app.add_module(TasksModule(auto_create_tables=True))

# Define a task
@task
async def send_email(to: str, subject: str, body: str) -> None:
    await email_service.send(to, subject, body)

# Dispatch from a route
@route.post("/notifications")
async def notify_user(body: NotifyRequest) -> dict:
    task_id = await send_email.send(
        to=body.email,
        subject="Welcome!",
        body="Thanks for signing up.",
    )
    return {"task_id": task_id}

Configuration

Environment variables use the MYFY_TASKS_ prefix:

Variable Default Description
MYFY_TASKS_DEFAULT_MAX_RETRIES 3 Default retry attempts
MYFY_TASKS_RETRY_DELAY_SECONDS 60.0 Seconds between retries
MYFY_TASKS_WORKER_CONCURRENCY 4 Concurrent tasks per worker
MYFY_TASKS_POLL_INTERVAL 1.0 Seconds between queue polls
MYFY_TASKS_TASK_TIMEOUT 300.0 Max seconds per task

Defining Tasks

Basic Task

from myfy.tasks import task

@task
async def process_order(order_id: int) -> str:
    # Process the order
    return f"Processed order {order_id}"

Task with DI Injection

Services are automatically injected at runtime:

from myfy.tasks import task
from myfy.data import AsyncSession

@task
async def sync_user_data(user_id: int, session: AsyncSession) -> None:
    # session is TASK-scoped (injected per task execution)
    user = await session.get(User, user_id)
    await sync_to_external_service(user)

Task with Custom Options

@task(max_retries=5, retry_on=[ConnectionError, TimeoutError])
async def upload_file(file_path: str) -> str:
    # Retries up to 5 times on connection/timeout errors
    return await s3.upload(file_path)

Dispatching Tasks

Basic Dispatch

# Returns immediately with task_id
task_id = await send_email.send(to="user@example.com", subject="Hi")

Dispatch Options

task_id = await send_email.send(
    to="user@example.com",
    subject="Hi",
    _priority=10,      # Higher priority = executes first
    _delay=60,         # Wait 60 seconds before executing
    _max_retries=5,    # Override default retries
)

Getting Results

result = await send_email.get_result(task_id, timeout=60)

if result.is_completed:
    print(f"Success: {result.value}")
elif result.is_failed:
    print(f"Error: {result.error}")
elif result.is_pending:
    print("Still processing...")

TaskContext for Progress

Report progress from long-running tasks:

from myfy.tasks import task, TaskContext

@task
async def import_users(file_path: str, ctx: TaskContext) -> int:
    users = load_users_from_file(file_path)
    total = len(users)

    for i, user in enumerate(users):
        await create_user(user)
        await ctx.update_progress(
            current=i + 1,
            total=total,
            message=f"Importing user {i + 1}/{total}",
        )

    return total

Check progress from caller:

result = await import_users.get_result(task_id)
if result.progress:
    current, total = result.progress
    print(f"Progress: {current}/{total} - {result.progress_message}")

Running Workers

Start a worker process:

myfy tasks worker

With options:

myfy tasks worker --concurrency 8 --poll-interval 0.5

Workers:

  • Poll the database for pending tasks
  • Execute tasks with full DI injection
  • Handle retries automatically
  • Report progress and results
  • Gracefully shutdown on SIGTERM

Task States

Status Description
pending Queued, waiting for worker
running Being executed by worker
completed Finished successfully
failed Failed after all retries
cancelled Manually cancelled

Error Handling

Tasks automatically retry on failure:

@task(max_retries=3, retry_on=[APIError])
async def call_api(url: str) -> dict:
    response = await http.get(url)
    if response.status >= 500:
        raise APIError("Server error")  # Will retry
    return response.json()

After all retries fail:

  • Task status becomes failed
  • Error message and traceback are stored
  • Can be retrieved via get_result()

Parameter Classification

Type Behavior
Primitives (str, int, float, bool) Serialized as task args
Lists, dicts Serialized as task args
TaskContext Injected by worker
Services (other types) DI injected at runtime
@task
async def complex_task(
    order_id: int,           # Serialized (primitive)
    items: list[str],        # Serialized (list)
    ctx: TaskContext,        # Injected (context)
    session: AsyncSession,   # DI injected (service)
    settings: AppSettings,   # DI injected (service)
) -> None:
    ...

Best Practices

  1. Keep tasks idempotent – Safe to retry on failure
  2. Serialize only primitives – Complex objects should be loaded in task
  3. Use TaskContext – Report progress for long tasks
  4. Set appropriate timeouts – Prevent zombie tasks
  5. Monitor worker logs – Watch for repeated failures
  6. Use priorities – Critical tasks get processed first
  7. Handle cleanup – TaskContext supports cleanup callbacks