tasks-module
0
总安装量
1
周安装量
安装命令
npx skills add https://github.com/psincraian/myfy --skill tasks-module
Agent 安装分布
amp
1
cline
1
opencode
1
cursor
1
continue
1
kimi-cli
1
Skill 文档
TasksModule – Background Jobs
TasksModule provides SQL-based async task processing with DI injection and automatic retries.
Quick Start
from myfy.core import Application
from myfy.data import DataModule
from myfy.tasks import TasksModule, task
app = Application()
app.add_module(DataModule())
app.add_module(TasksModule(auto_create_tables=True))
# Define a task
@task
async def send_email(to: str, subject: str, body: str) -> None:
await email_service.send(to, subject, body)
# Dispatch from a route
@route.post("/notifications")
async def notify_user(body: NotifyRequest) -> dict:
task_id = await send_email.send(
to=body.email,
subject="Welcome!",
body="Thanks for signing up.",
)
return {"task_id": task_id}
Configuration
Environment variables use the MYFY_TASKS_ prefix:
| Variable | Default | Description |
|---|---|---|
MYFY_TASKS_DEFAULT_MAX_RETRIES |
3 |
Default retry attempts |
MYFY_TASKS_RETRY_DELAY_SECONDS |
60.0 |
Seconds between retries |
MYFY_TASKS_WORKER_CONCURRENCY |
4 |
Concurrent tasks per worker |
MYFY_TASKS_POLL_INTERVAL |
1.0 |
Seconds between queue polls |
MYFY_TASKS_TASK_TIMEOUT |
300.0 |
Max seconds per task |
Defining Tasks
Basic Task
from myfy.tasks import task
@task
async def process_order(order_id: int) -> str:
# Process the order
return f"Processed order {order_id}"
Task with DI Injection
Services are automatically injected at runtime:
from myfy.tasks import task
from myfy.data import AsyncSession
@task
async def sync_user_data(user_id: int, session: AsyncSession) -> None:
# session is TASK-scoped (injected per task execution)
user = await session.get(User, user_id)
await sync_to_external_service(user)
Task with Custom Options
@task(max_retries=5, retry_on=[ConnectionError, TimeoutError])
async def upload_file(file_path: str) -> str:
# Retries up to 5 times on connection/timeout errors
return await s3.upload(file_path)
Dispatching Tasks
Basic Dispatch
# Returns immediately with task_id
task_id = await send_email.send(to="user@example.com", subject="Hi")
Dispatch Options
task_id = await send_email.send(
to="user@example.com",
subject="Hi",
_priority=10, # Higher priority = executes first
_delay=60, # Wait 60 seconds before executing
_max_retries=5, # Override default retries
)
Getting Results
result = await send_email.get_result(task_id, timeout=60)
if result.is_completed:
print(f"Success: {result.value}")
elif result.is_failed:
print(f"Error: {result.error}")
elif result.is_pending:
print("Still processing...")
TaskContext for Progress
Report progress from long-running tasks:
from myfy.tasks import task, TaskContext
@task
async def import_users(file_path: str, ctx: TaskContext) -> int:
users = load_users_from_file(file_path)
total = len(users)
for i, user in enumerate(users):
await create_user(user)
await ctx.update_progress(
current=i + 1,
total=total,
message=f"Importing user {i + 1}/{total}",
)
return total
Check progress from caller:
result = await import_users.get_result(task_id)
if result.progress:
current, total = result.progress
print(f"Progress: {current}/{total} - {result.progress_message}")
Running Workers
Start a worker process:
myfy tasks worker
With options:
myfy tasks worker --concurrency 8 --poll-interval 0.5
Workers:
- Poll the database for pending tasks
- Execute tasks with full DI injection
- Handle retries automatically
- Report progress and results
- Gracefully shutdown on SIGTERM
Task States
| Status | Description |
|---|---|
pending |
Queued, waiting for worker |
running |
Being executed by worker |
completed |
Finished successfully |
failed |
Failed after all retries |
cancelled |
Manually cancelled |
Error Handling
Tasks automatically retry on failure:
@task(max_retries=3, retry_on=[APIError])
async def call_api(url: str) -> dict:
response = await http.get(url)
if response.status >= 500:
raise APIError("Server error") # Will retry
return response.json()
After all retries fail:
- Task status becomes
failed - Error message and traceback are stored
- Can be retrieved via
get_result()
Parameter Classification
| Type | Behavior |
|---|---|
Primitives (str, int, float, bool) |
Serialized as task args |
| Lists, dicts | Serialized as task args |
| TaskContext | Injected by worker |
| Services (other types) | DI injected at runtime |
@task
async def complex_task(
order_id: int, # Serialized (primitive)
items: list[str], # Serialized (list)
ctx: TaskContext, # Injected (context)
session: AsyncSession, # DI injected (service)
settings: AppSettings, # DI injected (service)
) -> None:
...
Best Practices
- Keep tasks idempotent – Safe to retry on failure
- Serialize only primitives – Complex objects should be loaded in task
- Use TaskContext – Report progress for long tasks
- Set appropriate timeouts – Prevent zombie tasks
- Monitor worker logs – Watch for repeated failures
- Use priorities – Critical tasks get processed first
- Handle cleanup – TaskContext supports cleanup callbacks