python-async-patterns

📁 clostaunau/holiday-card 📅 Jan 24, 2026
9
总安装量
4
周安装量
#32565
全站排名
安装命令
npx skills add https://github.com/clostaunau/holiday-card --skill python-async-patterns

Agent 安装分布

claude-code 3
windsurf 2
opencode 2
codex 2
antigravity 2
gemini-cli 2

Skill 文档

Python Async/Await Patterns and Best Practices

Purpose

This skill provides comprehensive guidance on Python asynchronous programming patterns for reviewing and writing async code. It covers async/await syntax, best practices, common patterns, anti-patterns, and framework-specific considerations.

Use this skill when:

  • Reviewing async Python code
  • Writing new async functions or services
  • Refactoring sync code to async
  • Troubleshooting async performance issues
  • Evaluating async library usage
  • Implementing async endpoints in web frameworks

Context

Asynchronous programming in Python enables efficient I/O-bound operations by allowing the event loop to switch between tasks while waiting for I/O operations to complete. However, async code introduces complexity and has specific patterns and pitfalls that must be understood for correct implementation.

Key Principles:

  • Async is beneficial for I/O-bound operations, not CPU-bound tasks
  • The event loop must never be blocked with synchronous operations
  • Coroutines must be properly awaited
  • Resources must be cleaned up correctly in async contexts
  • Mixing sync and async code requires careful consideration

Python Version Requirements:

  • async/await: Python 3.5+
  • asyncio.run(): Python 3.7+
  • asyncio.create_task(): Python 3.7+
  • Task Groups (asyncio.TaskGroup): Python 3.11+
  • Exception Groups: Python 3.11+

Async Fundamentals

1. Async/Await Syntax

Defining Async Functions:

# Async function (coroutine function)
async def fetch_data(url: str) -> dict:
    """Coroutine that fetches data asynchronously."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# Regular function (synchronous)
def process_data(data: dict) -> list:
    """Regular synchronous function."""
    return [item['value'] for item in data['items']]

Key Differences:

  • async def creates a coroutine function
  • Calling an async function returns a coroutine object (not the result)
  • Must use await to execute the coroutine and get the result
  • Can only use await inside async def functions

2. Coroutines vs Regular Functions

# Coroutine function
async def async_operation():
    await asyncio.sleep(1)
    return "done"

# This creates a coroutine object but doesn't execute it
coro = async_operation()  # RuntimeWarning: coroutine was never awaited

# Must await to execute
result = await async_operation()  # Correct

# Or run from synchronous code
result = asyncio.run(async_operation())  # Correct (Python 3.7+)

Coroutine Characteristics:

  • Lazy evaluation: not executed until awaited
  • Can be suspended and resumed
  • Return values through await
  • Must be awaited or scheduled for execution

3. Event Loop Fundamentals

The event loop is the core of async execution:

import asyncio

# High-level API (Python 3.7+) - PREFERRED
async def main():
    result = await some_async_operation()
    return result

# Run the event loop
if __name__ == "__main__":
    result = asyncio.run(main())  # Creates, runs, and closes loop

Event Loop Responsibilities:

  • Schedules and executes coroutines
  • Manages I/O operations
  • Handles callbacks and tasks
  • Coordinates concurrent execution

4. Using asyncio.run()

# GOOD: Modern approach (Python 3.7+)
async def main():
    result = await fetch_data()
    processed = await process_data(result)
    return processed

if __name__ == "__main__":
    result = asyncio.run(main())
# AVOID: Manual loop management (legacy, pre-3.7)
loop = asyncio.get_event_loop()
try:
    result = loop.run_until_complete(main())
finally:
    loop.close()

Why prefer asyncio.run():

  • Automatically creates and closes the loop
  • Handles cleanup properly
  • Simpler and less error-prone
  • Cancels remaining tasks on shutdown

Common Async Patterns

1. Async Context Managers

Async context managers use async with for resource management:

class AsyncDatabaseConnection:
    """Example async context manager."""

    async def __aenter__(self):
        """Called when entering async with block."""
        self.connection = await self._connect()
        return self.connection

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Called when exiting async with block."""
        await self.connection.close()
        return False  # Don't suppress exceptions

    async def _connect(self):
        """Establish connection."""
        await asyncio.sleep(0.1)  # Simulated async connection
        return {"connected": True}

# Usage
async def query_database():
    async with AsyncDatabaseConnection() as conn:
        # Connection automatically closed after block
        result = await execute_query(conn)
    return result

Common Async Context Managers:

  • aiohttp.ClientSession(): HTTP client sessions
  • aiofiles.open(): Async file operations
  • Database connection pools (asyncpg, motor)
  • Lock primitives (asyncio.Lock())

2. Async Iterators and Generators

class AsyncDataStream:
    """Async iterator for streaming data."""

    def __init__(self, count: int):
        self.count = count
        self.index = 0

    def __aiter__(self):
        """Return the async iterator."""
        return self

    async def __anext__(self):
        """Get next item asynchronously."""
        if self.index >= self.count:
            raise StopAsyncIteration

        await asyncio.sleep(0.1)  # Simulated async operation
        value = f"item_{self.index}"
        self.index += 1
        return value

# Usage
async def process_stream():
    async for item in AsyncDataStream(5):
        print(item)

# Async generator (simpler approach)
async def async_range(count: int):
    """Async generator function."""
    for i in range(count):
        await asyncio.sleep(0.1)
        yield i

# Usage
async def use_generator():
    async for value in async_range(5):
        print(value)

3. Async Comprehensions

# Async list comprehension
async def fetch_all_urls(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        # Create tasks for all URLs
        results = [
            await fetch_one(session, url)
            for url in urls
        ]
        return results

# Async generator expression
async def process_stream_data(stream):
    processed = (
        transform(item)
        async for item in stream
        if await validate(item)
    )
    return processed

# Async dict comprehension
async def fetch_user_data(user_ids: list[int]) -> dict[int, dict]:
    return {
        user_id: await fetch_user(user_id)
        for user_id in user_ids
    }

4. Concurrent Task Execution

Using asyncio.gather()

# Run multiple coroutines concurrently
async def fetch_multiple_resources():
    # All tasks run concurrently
    results = await asyncio.gather(
        fetch_user(1),
        fetch_posts(1),
        fetch_comments(1)
    )
    user, posts, comments = results
    return user, posts, comments

# With error handling
async def fetch_with_error_handling():
    results = await asyncio.gather(
        fetch_user(1),
        fetch_posts(1),
        fetch_comments(1),
        return_exceptions=True  # Return exceptions instead of raising
    )

    # Check for exceptions
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")

Using asyncio.create_task()

# Create tasks explicitly for more control
async def fetch_with_tasks():
    # Create tasks (they start executing immediately)
    user_task = asyncio.create_task(fetch_user(1))
    posts_task = asyncio.create_task(fetch_posts(1))
    comments_task = asyncio.create_task(fetch_comments(1))

    # Wait for all tasks
    user = await user_task
    posts = await posts_task
    comments = await comments_task

    return user, posts, comments

# Cancel tasks if needed
async def fetch_with_cancellation():
    task = asyncio.create_task(long_running_operation())

    try:
        result = await asyncio.wait_for(task, timeout=5.0)
        return result
    except asyncio.TimeoutError:
        task.cancel()  # Cancel the task
        raise

Using Task Groups (Python 3.11+)

# Task groups provide better error handling and cancellation
async def fetch_with_task_group():
    async with asyncio.TaskGroup() as tg:
        user_task = tg.create_task(fetch_user(1))
        posts_task = tg.create_task(fetch_posts(1))
        comments_task = tg.create_task(fetch_comments(1))

    # All tasks complete here (or exception raised)
    return user_task.result(), posts_task.result(), comments_task.result()

# If any task fails, all tasks are cancelled
async def task_group_error_handling():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(task_that_fails())
            tg.create_task(task_that_succeeds())
    except ExceptionGroup as eg:
        # All exceptions collected in ExceptionGroup
        for exc in eg.exceptions:
            print(f"Task failed: {exc}")

5. Task Coordination Patterns

Wait for first completion

# Wait for first task to complete
async def fetch_from_multiple_sources(urls: list[str]):
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.create_task(fetch_one(session, url))
            for url in urls
        ]

        # Wait for first completion
        done, pending = await asyncio.wait(
            tasks,
            return_when=asyncio.FIRST_COMPLETED
        )

        # Cancel remaining tasks
        for task in pending:
            task.cancel()

        # Return first result
        return done.pop().result()

Semaphore for rate limiting

# Limit concurrent operations
async def fetch_with_rate_limit(urls: list[str], max_concurrent: int = 5):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_with_sem(url: str):
        async with semaphore:
            return await fetch_one(url)

    results = await asyncio.gather(*[
        fetch_with_sem(url) for url in urls
    ])
    return results

Best Practices

1. Avoid Blocking the Event Loop

# BAD: Blocking operations in async function
async def bad_async_function():
    # These block the event loop!
    time.sleep(1)  # WRONG
    requests.get(url)  # WRONG
    open('file.txt').read()  # WRONG
    some_cpu_intensive_task()  # WRONG

# GOOD: Use async alternatives
async def good_async_function():
    await asyncio.sleep(1)  # Non-blocking sleep
    async with aiohttp.ClientSession() as session:
        await session.get(url)  # Async HTTP
    async with aiofiles.open('file.txt') as f:
        await f.read()  # Async file I/O
    # For CPU-bound: use asyncio.to_thread()
    result = await asyncio.to_thread(cpu_intensive_task)

2. Using asyncio.to_thread for Blocking I/O

import asyncio
from functools import partial

# For blocking sync operations (Python 3.9+)
async def async_wrapper_for_blocking():
    # Run blocking operation in thread pool
    result = await asyncio.to_thread(
        blocking_operation,
        arg1,
        arg2
    )
    return result

# For CPU-bound tasks
async def process_data_async(data: list):
    # Offload CPU-intensive work to thread
    processed = await asyncio.to_thread(
        cpu_intensive_processing,
        data
    )
    return processed

# With partial for complex arguments
async def complex_blocking_call():
    func = partial(
        blocking_function,
        timeout=30,
        retry=3
    )
    result = await asyncio.to_thread(func)
    return result

3. Proper Exception Handling

# Handle exceptions in individual coroutines
async def fetch_with_exception_handling(url: str):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()
    except aiohttp.ClientError as e:
        # Handle specific async HTTP errors
        print(f"HTTP error: {e}")
        raise
    except asyncio.TimeoutError:
        # Handle timeout
        print(f"Timeout fetching {url}")
        raise
    except Exception as e:
        # Handle unexpected errors
        print(f"Unexpected error: {e}")
        raise

# Handle exceptions in gather()
async def fetch_multiple_with_errors(urls: list[str]):
    results = await asyncio.gather(
        *[fetch_with_exception_handling(url) for url in urls],
        return_exceptions=True
    )

    # Process results and exceptions
    successful = []
    failed = []

    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            failed.append((url, result))
        else:
            successful.append(result)

    return successful, failed

# Exception groups (Python 3.11+)
async def handle_exception_groups():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(task1())
            tg.create_task(task2())
    except* ValueError as eg:
        # Handle all ValueErrors
        print(f"Value errors: {eg.exceptions}")
    except* TypeError as eg:
        # Handle all TypeErrors
        print(f"Type errors: {eg.exceptions}")

4. Cancellation Handling

# Proper cancellation handling
async def cancellable_operation():
    try:
        await long_running_task()
    except asyncio.CancelledError:
        # Clean up resources
        await cleanup()
        # Re-raise to propagate cancellation
        raise

# Shielding from cancellation
async def critical_operation():
    # This won't be cancelled
    await asyncio.shield(important_task())

# Timeout with proper cleanup
async def operation_with_timeout():
    try:
        result = await asyncio.wait_for(
            some_operation(),
            timeout=5.0
        )
        return result
    except asyncio.TimeoutError:
        # Cleanup happens here
        await cleanup()
        raise

5. Timeout Patterns

# Single operation timeout
async def fetch_with_timeout(url: str, timeout: float = 10.0):
    try:
        async with aiohttp.ClientSession() as session:
            async with asyncio.timeout(timeout):  # Python 3.11+
                async with session.get(url) as response:
                    return await response.json()
    except asyncio.TimeoutError:
        print(f"Timeout after {timeout}s")
        raise

# Legacy timeout (Python < 3.11)
async def fetch_with_timeout_legacy(url: str, timeout: float = 10.0):
    try:
        result = await asyncio.wait_for(
            fetch_one(url),
            timeout=timeout
        )
        return result
    except asyncio.TimeoutError:
        print(f"Timeout after {timeout}s")
        raise

# Multiple timeouts
async def complex_operation_with_timeouts():
    # Per-operation timeouts
    async with asyncio.timeout(30):  # Overall timeout
        result1 = await asyncio.wait_for(op1(), timeout=10)
        result2 = await asyncio.wait_for(op2(), timeout=10)
        result3 = await asyncio.wait_for(op3(), timeout=10)
    return result1, result2, result3

6. Resource Cleanup

# Always use async context managers
async def proper_resource_cleanup():
    # GOOD: Automatic cleanup
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()
    # Session and response closed automatically

# For resources without context managers
async def manual_cleanup():
    resource = await acquire_resource()
    try:
        result = await use_resource(resource)
        return result
    finally:
        # Always cleanup
        await resource.close()

# Async cleanup in exception handlers
async def cleanup_on_error():
    resources = []
    try:
        for i in range(10):
            resource = await acquire_resource(i)
            resources.append(resource)
            await use_resource(resource)
    except Exception as e:
        # Cleanup all acquired resources
        await asyncio.gather(
            *[res.close() for res in resources],
            return_exceptions=True
        )
        raise
    finally:
        # Final cleanup
        await asyncio.gather(
            *[res.close() for res in resources],
            return_exceptions=True
        )

Common Libraries

1. aiohttp – Async HTTP Client/Server

import aiohttp
import asyncio

# Client usage
async def fetch_data(url: str) -> dict:
    """Fetch data using aiohttp."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            response.raise_for_status()
            return await response.json()

# With custom headers and timeout
async def fetch_with_options(url: str):
    headers = {"Authorization": "Bearer token"}
    timeout = aiohttp.ClientTimeout(total=30)

    async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
        async with session.get(url) as response:
            return await response.text()

# POST request
async def post_data(url: str, data: dict):
    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=data) as response:
            return await response.json()

# Concurrent requests with rate limiting
async def fetch_multiple(urls: list[str], max_concurrent: int = 5):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_one(session, url):
        async with semaphore:
            async with session.get(url) as response:
                return await response.json()

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks)

2. aiofiles – Async File I/O

import aiofiles

# Read file asynchronously
async def read_file(filepath: str) -> str:
    """Read file contents asynchronously."""
    async with aiofiles.open(filepath, mode='r') as f:
        contents = await f.read()
    return contents

# Write file asynchronously
async def write_file(filepath: str, content: str):
    """Write content to file asynchronously."""
    async with aiofiles.open(filepath, mode='w') as f:
        await f.write(content)

# Read lines
async def read_lines(filepath: str) -> list[str]:
    """Read file line by line."""
    lines = []
    async with aiofiles.open(filepath, mode='r') as f:
        async for line in f:
            lines.append(line.strip())
    return lines

# Process large file
async def process_large_file(filepath: str):
    """Process file line by line without loading all into memory."""
    async with aiofiles.open(filepath, mode='r') as f:
        async for line in f:
            processed = await process_line(line)
            await save_result(processed)

3. asyncpg – Async PostgreSQL Driver

import asyncpg

# Create connection pool
async def create_pool():
    """Create database connection pool."""
    pool = await asyncpg.create_pool(
        host='localhost',
        port=5432,
        user='user',
        password='password',
        database='mydb',
        min_size=10,
        max_size=20
    )
    return pool

# Query with pool
async def fetch_users(pool: asyncpg.Pool):
    """Fetch users from database."""
    async with pool.acquire() as connection:
        rows = await connection.fetch(
            'SELECT id, name, email FROM users WHERE active = $1',
            True
        )
        return [dict(row) for row in rows]

# Transaction
async def create_user_with_profile(pool: asyncpg.Pool, user_data: dict):
    """Create user and profile in transaction."""
    async with pool.acquire() as connection:
        async with connection.transaction():
            user_id = await connection.fetchval(
                'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id',
                user_data['name'],
                user_data['email']
            )
            await connection.execute(
                'INSERT INTO profiles (user_id, bio) VALUES ($1, $2)',
                user_id,
                user_data['bio']
            )
            return user_id

# Prepared statements for better performance
async def fetch_user_by_id(pool: asyncpg.Pool, user_id: int):
    """Fetch user using prepared statement."""
    async with pool.acquire() as connection:
        stmt = await connection.prepare('SELECT * FROM users WHERE id = $1')
        row = await stmt.fetchrow(user_id)
        return dict(row) if row else None

4. motor – Async MongoDB Driver

from motor.motor_asyncio import AsyncIOMotorClient

# Create client
async def create_mongo_client():
    """Create MongoDB client."""
    client = AsyncIOMotorClient('mongodb://localhost:27017')
    return client

# Query documents
async def fetch_documents(collection_name: str):
    """Fetch documents from MongoDB."""
    client = AsyncIOMotorClient('mongodb://localhost:27017')
    db = client.mydb
    collection = db[collection_name]

    cursor = collection.find({'active': True})
    documents = await cursor.to_list(length=100)
    return documents

# Insert document
async def insert_document(collection_name: str, document: dict):
    """Insert document into MongoDB."""
    client = AsyncIOMotorClient('mongodb://localhost:27017')
    db = client.mydb
    collection = db[collection_name]

    result = await collection.insert_one(document)
    return result.inserted_id

# Update document
async def update_document(collection_name: str, filter_dict: dict, update_dict: dict):
    """Update document in MongoDB."""
    client = AsyncIOMotorClient('mongodb://localhost:27017')
    db = client.mydb
    collection = db[collection_name]

    result = await collection.update_one(
        filter_dict,
        {'$set': update_dict}
    )
    return result.modified_count

# Async iteration over cursor
async def process_large_collection(collection_name: str):
    """Process large collection with cursor."""
    client = AsyncIOMotorClient('mongodb://localhost:27017')
    db = client.mydb
    collection = db[collection_name]

    cursor = collection.find({})
    async for document in cursor:
        await process_document(document)

Anti-Patterns to Avoid

1. Not Awaiting Coroutines

# BAD: Coroutine never executed
async def bad_example():
    result = fetch_data()  # RuntimeWarning: coroutine 'fetch_data' was never awaited
    return result

# GOOD: Properly awaited
async def good_example():
    result = await fetch_data()
    return result

2. Mixing Sync and Async Incorrectly

# BAD: Calling async function from sync code without event loop
def sync_function():
    result = await async_function()  # SyntaxError: 'await' outside async function
    return result

# GOOD: Use asyncio.run() to bridge sync/async
def sync_function():
    result = asyncio.run(async_function())
    return result

# BAD: Creating new event loop in async context
async def bad_nested():
    result = asyncio.run(another_async())  # RuntimeError: asyncio.run() cannot be called from a running event loop
    return result

# GOOD: Just await in async context
async def good_nested():
    result = await another_async()
    return result

3. Creating Event Loops Manually

# BAD: Manual loop management (unless necessary)
def bad_approach():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        result = loop.run_until_complete(main())
    finally:
        loop.close()
    return result

# GOOD: Use asyncio.run()
def good_approach():
    result = asyncio.run(main())
    return result

# EXCEPTION: Multiple event loops needed (advanced use case)
# Only create manual loops when you specifically need multiple loops

4. Blocking Operations in Async Functions

# BAD: Blocking the event loop
async def bad_blocking():
    time.sleep(5)  # Blocks entire event loop!
    data = requests.get(url).json()  # Blocks event loop!
    with open('file.txt') as f:
        content = f.read()  # Blocks event loop!
    result = compute_fibonacci(1000000)  # Blocks event loop!
    return result

# GOOD: Use async alternatives
async def good_async():
    await asyncio.sleep(5)  # Non-blocking
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()  # Non-blocking
    async with aiofiles.open('file.txt') as f:
        content = await f.read()  # Non-blocking
    result = await asyncio.to_thread(compute_fibonacci, 1000000)  # Non-blocking
    return result

5. Improper Exception Handling

# BAD: Bare except swallows CancelledError
async def bad_exception_handling():
    try:
        await some_operation()
    except:  # Catches CancelledError, preventing proper cancellation!
        print("Error occurred")

# GOOD: Specific exception handling
async def good_exception_handling():
    try:
        await some_operation()
    except asyncio.CancelledError:
        # Handle cancellation specifically
        await cleanup()
        raise  # Re-raise to propagate
    except Exception as e:
        # Handle other exceptions
        print(f"Error: {e}")
        raise

# BAD: Silencing errors in gather()
async def bad_gather():
    results = await asyncio.gather(
        task1(),
        task2(),
        task3(),
        return_exceptions=True
    )
    return results  # Might contain exceptions that are ignored

# GOOD: Check for exceptions
async def good_gather():
    results = await asyncio.gather(
        task1(),
        task2(),
        task3(),
        return_exceptions=True
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
            # Handle or raise as appropriate

    return [r for r in results if not isinstance(r, Exception)]

6. Resource Leaks

# BAD: Not closing resources
async def bad_resource_management():
    session = aiohttp.ClientSession()
    response = await session.get(url)
    data = await response.json()
    # Session and response never closed!
    return data

# GOOD: Use context managers
async def good_resource_management():
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()
    # Automatically closed
    return data

# BAD: Creating session per request
async def bad_session_usage():
    for url in urls:
        async with aiohttp.ClientSession() as session:
            # Creating new session each iteration is wasteful!
            await session.get(url)

# GOOD: Reuse session
async def good_session_usage():
    async with aiohttp.ClientSession() as session:
        for url in urls:
            # Reuse same session
            await session.get(url)

7. Not Using Task Groups for Error Propagation (Python 3.11+)

# BAD: Errors might be swallowed
async def bad_error_propagation():
    tasks = [
        asyncio.create_task(task1()),
        asyncio.create_task(task2()),
        asyncio.create_task(task3())
    ]
    # If task2 fails, task1 and task3 keep running
    await asyncio.gather(*tasks)

# GOOD: Use TaskGroup for automatic cancellation
async def good_error_propagation():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(task1())
        tg.create_task(task2())
        tg.create_task(task3())
    # If any task fails, all tasks are cancelled

Framework-Specific Patterns

1. FastAPI Async Endpoints

from fastapi import FastAPI, HTTPException, Depends
from typing import List
import asyncpg

app = FastAPI()

# Async endpoint
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: asyncpg.Pool = Depends(get_db_pool)):
    """Async endpoint that queries database."""
    async with db.acquire() as connection:
        user = await connection.fetchrow(
            'SELECT * FROM users WHERE id = $1',
            user_id
        )
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        return dict(user)

# Async endpoint with external API call
@app.get("/external-data")
async def fetch_external_data():
    """Fetch data from external API."""
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as response:
            return await response.json()

# Background tasks (async)
from fastapi import BackgroundTasks

@app.post("/send-notification")
async def send_notification(
    email: str,
    background_tasks: BackgroundTasks
):
    """Send notification in background."""
    background_tasks.add_task(send_email_async, email)
    return {"message": "Notification queued"}

async def send_email_async(email: str):
    """Send email asynchronously."""
    async with aiohttp.ClientSession() as session:
        await session.post(
            'https://email-service.com/send',
            json={'to': email}
        )

# Dependency injection with async
async def get_db_pool() -> asyncpg.Pool:
    """Dependency that provides database pool."""
    pool = await asyncpg.create_pool(...)
    try:
        yield pool
    finally:
        await pool.close()

# Lifespan events
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Startup and shutdown events."""
    # Startup
    app.state.db_pool = await asyncpg.create_pool(...)
    app.state.http_session = aiohttp.ClientSession()
    yield
    # Shutdown
    await app.state.db_pool.close()
    await app.state.http_session.close()

app = FastAPI(lifespan=lifespan)

2. Django Async Views (Django 3.1+)

from django.http import JsonResponse
from django.views import View
from asgiref.sync import sync_to_async
import aiohttp

# Async function-based view
async def async_user_list(request):
    """Async view for listing users."""
    # Use sync_to_async for Django ORM
    users = await sync_to_async(list)(
        User.objects.filter(active=True)
    )
    return JsonResponse({
        'users': [
            {'id': u.id, 'name': u.name}
            for u in users
        ]
    })

# Async class-based view
class AsyncUserDetailView(View):
    """Async class-based view."""

    async def get(self, request, user_id):
        """Get user details asynchronously."""
        # Wrap ORM calls with sync_to_async
        user = await sync_to_async(User.objects.get)(id=user_id)
        return JsonResponse({
            'id': user.id,
            'name': user.name,
            'email': user.email
        })

# Async view with external API
async def fetch_external_data_view(request):
    """Fetch data from external API."""
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as response:
            data = await response.json()
    return JsonResponse(data)

# Django ORM with async (Django 4.1+)
async def async_orm_view(request):
    """Use async ORM operations."""
    # Django 4.1+ supports async ORM
    users = [user async for user in User.objects.filter(active=True)]
    count = await User.objects.filter(active=True).acount()

    return JsonResponse({
        'count': count,
        'users': [{'id': u.id, 'name': u.name} for u in users]
    })

# Middleware (async)
class AsyncMiddleware:
    """Async middleware example."""

    async def __call__(self, request):
        # Pre-processing
        await log_request_async(request)

        # Call next middleware/view
        response = await self.get_response(request)

        # Post-processing
        await log_response_async(response)

        return response

3. Async Database Operations

# asyncpg with connection pool
class AsyncPostgresService:
    """Service for async PostgreSQL operations."""

    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool

    async def get_user(self, user_id: int) -> dict | None:
        """Get user by ID."""
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                'SELECT * FROM users WHERE id = $1',
                user_id
            )
            return dict(row) if row else None

    async def create_user(self, name: str, email: str) -> int:
        """Create new user."""
        async with self.pool.acquire() as conn:
            user_id = await conn.fetchval(
                'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id',
                name,
                email
            )
            return user_id

    async def update_user(self, user_id: int, **kwargs) -> bool:
        """Update user fields."""
        if not kwargs:
            return False

        set_clause = ', '.join(f"{k} = ${i+2}" for i, k in enumerate(kwargs.keys()))
        query = f'UPDATE users SET {set_clause} WHERE id = $1'

        async with self.pool.acquire() as conn:
            result = await conn.execute(query, user_id, *kwargs.values())
            return result != 'UPDATE 0'

    async def batch_insert_users(self, users: list[tuple[str, str]]) -> int:
        """Batch insert users."""
        async with self.pool.acquire() as conn:
            result = await conn.executemany(
                'INSERT INTO users (name, email) VALUES ($1, $2)',
                users
            )
            return len(users)

# Motor (MongoDB) service
class AsyncMongoService:
    """Service for async MongoDB operations."""

    def __init__(self, client: AsyncIOMotorClient):
        self.db = client.mydb

    async def get_document(self, collection: str, doc_id: str) -> dict | None:
        """Get document by ID."""
        coll = self.db[collection]
        return await coll.find_one({'_id': doc_id})

    async def insert_document(self, collection: str, document: dict) -> str:
        """Insert document."""
        coll = self.db[collection]
        result = await coll.insert_one(document)
        return str(result.inserted_id)

    async def find_documents(
        self,
        collection: str,
        filter_dict: dict,
        limit: int = 100
    ) -> list[dict]:
        """Find documents matching filter."""
        coll = self.db[collection]
        cursor = coll.find(filter_dict).limit(limit)
        return await cursor.to_list(length=limit)

    async def aggregate(self, collection: str, pipeline: list[dict]) -> list[dict]:
        """Run aggregation pipeline."""
        coll = self.db[collection]
        cursor = coll.aggregate(pipeline)
        return await cursor.to_list(length=None)

Performance Considerations

1. When Async Provides Benefits (I/O-Bound)

Async is beneficial for I/O-bound operations:

# GOOD: I/O-bound operations benefit from async
async def io_bound_operations():
    """These operations benefit from async."""

    # Network requests
    async with aiohttp.ClientSession() as session:
        responses = await asyncio.gather(
            session.get('https://api1.example.com'),
            session.get('https://api2.example.com'),
            session.get('https://api3.example.com')
        )

    # Database queries
    async with pool.acquire() as conn:
        users, posts, comments = await asyncio.gather(
            conn.fetch('SELECT * FROM users'),
            conn.fetch('SELECT * FROM posts'),
            conn.fetch('SELECT * FROM comments')
        )

    # File I/O
    async with aiofiles.open('large_file.txt') as f:
        content = await f.read()

    return responses, users, posts, comments

# Example: 100 HTTP requests
async def fetch_many_urls(urls: list[str]):
    """Async is much faster for concurrent I/O."""
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
    # With async: ~2 seconds
    # With sync requests: ~200 seconds (2s per request × 100)

2. When Async Doesn’t Help (CPU-Bound)

Async provides no benefit for CPU-bound operations:

# BAD: CPU-bound operations don't benefit from async
async def cpu_bound_async():
    """This gains nothing from being async."""
    result = compute_fibonacci(1000000)  # Blocks event loop!
    return result

# GOOD: Use asyncio.to_thread for CPU-bound work
async def cpu_bound_with_threads():
    """Offload CPU-bound work to threads."""
    result = await asyncio.to_thread(compute_fibonacci, 1000000)
    return result

# BETTER: Use ProcessPoolExecutor for true parallelism
from concurrent.futures import ProcessPoolExecutor

async def cpu_bound_with_processes():
    """Use processes for CPU-bound parallelism."""
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool,
            compute_fibonacci,
            1000000
        )
    return result

# Example: Multiple CPU-bound tasks
async def parallel_cpu_tasks(numbers: list[int]):
    """Run CPU-bound tasks in parallel using processes."""
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        results = await asyncio.gather(*[
            loop.run_in_executor(pool, compute_fibonacci, n)
            for n in numbers
        ])
    return results

3. Measuring Async Performance

import time
import asyncio
from functools import wraps

# Decorator to measure async function performance
def async_timer(func):
    """Measure execution time of async function."""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = await func(*args, **kwargs)
        end = time.perf_counter()
        print(f"{func.__name__} took {end - start:.2f} seconds")
        return result
    return wrapper

# Usage
@async_timer
async def fetch_data(url: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# Context manager for timing blocks
from contextlib import asynccontextmanager

@asynccontextmanager
async def timer(name: str):
    """Time a block of async code."""
    start = time.perf_counter()
    yield
    end = time.perf_counter()
    print(f"{name} took {end - start:.2f} seconds")

# Usage
async def timed_operations():
    async with timer("Database queries"):
        users = await fetch_users()
        posts = await fetch_posts()

    async with timer("External API calls"):
        data = await fetch_external_data()

# Compare sync vs async performance
async def compare_performance():
    """Compare sync and async performance."""
    urls = ['https://example.com'] * 100

    # Sync version (for comparison)
    start = time.perf_counter()
    sync_results = [requests.get(url).json() for url in urls]
    sync_time = time.perf_counter() - start
    print(f"Sync: {sync_time:.2f}s")

    # Async version
    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        async_results = await asyncio.gather(*tasks)
    async_time = time.perf_counter() - start
    print(f"Async: {async_time:.2f}s")
    print(f"Speedup: {sync_time / async_time:.1f}x")

Code Review Checklist

Use this checklist when reviewing async Python code:

Syntax and Structure

  • All async functions defined with async def
  • All coroutines properly awaited
  • asyncio.run() used for top-level entry point (Python 3.7+)
  • No manual event loop creation (unless necessary)
  • Async context managers used with async with
  • Async iterators used with async for

Concurrency Patterns

  • asyncio.gather() used for concurrent operations
  • asyncio.create_task() used for fire-and-forget tasks
  • Task groups used for error propagation (Python 3.11+)
  • Semaphores used for rate limiting
  • Proper task cancellation handling

Event Loop

  • No blocking operations in async functions
  • asyncio.to_thread() used for blocking I/O (Python 3.9+)
  • No time.sleep() (use asyncio.sleep())
  • No sync HTTP libraries (use aiohttp)
  • No sync file operations (use aiofiles)

Exception Handling

  • Specific exception types caught
  • asyncio.CancelledError handled and re-raised
  • return_exceptions=True used appropriately in gather()
  • Exceptions in tasks not silently ignored
  • Exception groups used for multiple task errors (Python 3.11+)

Resource Management

  • Async context managers used for resources
  • Resources properly closed in finally blocks
  • No session/connection leaks
  • Sessions reused across multiple requests
  • Connection pools used for databases

Timeouts and Cancellation

  • Timeouts set for external operations
  • asyncio.wait_for() or asyncio.timeout() used
  • Tasks can be cancelled gracefully
  • Critical sections shielded from cancellation if needed
  • Cleanup performed on timeout/cancellation

Library Usage

  • aiohttp used instead of requests
  • aiofiles used for async file I/O
  • Async database drivers used (asyncpg, motor)
  • Library-specific best practices followed

Framework Integration

  • FastAPI async endpoints defined correctly
  • Django ORM calls wrapped with sync_to_async (Django < 4.1)
  • Async ORM operations used (Django 4.1+)
  • Dependency injection works with async

Performance

  • Async used for I/O-bound operations
  • CPU-bound work offloaded to threads/processes
  • Unnecessary async overhead avoided
  • Appropriate concurrency limits set

Type Hints

  • Async functions return type hints
  • Coroutine types annotated
  • Awaitable, Coroutine types used where appropriate

Related Skills

  • python-testing: For testing async code with pytest-asyncio
  • python-type-hints: For properly typing async functions and coroutines
  • python-error-handling: For exception handling patterns
  • fastapi-best-practices: For FastAPI-specific async patterns
  • django-best-practices: For Django async views and ORM

References


Version: 1.0 Last Updated: 2025-12-24 Maintainer: Claude Code Skills