benchflow-ai-python-parallelization
1
总安装量
1
周安装量
#52781
全站排名
安装命令
npx skills add https://smithery.ai
Agent 安装分布
codex
1
claude-code
1
Skill 文档
Python Parallelization Skill
Transform sequential Python code to leverage parallel and concurrent execution patterns.
Workflow
- Analyze the code to identify parallelization candidates
- Classify the workload type (CPU-bound, I/O-bound, or data-parallel)
- Select the appropriate parallelization strategy
- Transform the code with proper synchronization and error handling
- Verify correctness and measure expected speedup
Parallelization Decision Tree
Is the bottleneck CPU-bound or I/O-bound?
CPU-bound (computation-heavy):
âââ Independent iterations? â multiprocessing.Pool / ProcessPoolExecutor
âââ Shared state needed? â multiprocessing with Manager or shared memory
âââ NumPy/Pandas operations? â Vectorization first, then consider numba/dask
âââ Large data chunks? â chunked processing with Pool.map
I/O-bound (network, disk, database):
âââ Many independent requests? â asyncio with aiohttp/aiofiles
âââ Legacy sync code? â ThreadPoolExecutor
âââ Mixed sync/async? â asyncio.to_thread()
âââ Database queries? â Connection pooling + async drivers
Data-parallel (array/matrix ops):
âââ NumPy arrays? â Vectorize, avoid Python loops
âââ Pandas DataFrames? â Use built-in vectorized methods
âââ Large datasets? â Dask for out-of-core parallelism
âââ GPU available? â Consider CuPy or JAX
Transformation Patterns
Pattern 1: Loop to ProcessPoolExecutor (CPU-bound)
Before:
results = []
for item in items:
results.append(expensive_computation(item))
After:
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
results = list(executor.map(expensive_computation, items))
Pattern 2: Sequential I/O to Async (I/O-bound)
Before:
import requests
def fetch_all(urls):
return [requests.get(url).json() for url in urls]
After:
import asyncio
import aiohttp
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def fetch_one(session, url):
async with session.get(url) as response:
return await response.json()
Pattern 3: Nested Loops to Vectorization
Before:
result = []
for i in range(len(a)):
row = []
for j in range(len(b)):
row.append(a[i] * b[j])
result.append(row)
After:
import numpy as np
result = np.outer(a, b)
Pattern 4: Mixed CPU/IO with asyncio
import asyncio
from concurrent.futures import ProcessPoolExecutor
async def hybrid_pipeline(data, urls):
loop = asyncio.get_event_loop()
# CPU-bound in process pool
with ProcessPoolExecutor() as pool:
processed = await loop.run_in_executor(pool, cpu_heavy_fn, data)
# I/O-bound with async
results = await asyncio.gather(*[fetch(url) for url in urls])
return processed, results
Parallelization Candidates
Look for these patterns in code:
| Pattern | Indicator | Strategy |
|---|---|---|
for item in collection with independent iterations |
No shared mutation | Pool.map / executor.map |
Multiple requests.get() or file reads |
Sequential I/O | asyncio.gather() |
| Nested loops over arrays | Numerical computation | NumPy vectorization |
time.sleep() or blocking waits |
Waiting on external | Threading or async |
| Large list comprehensions | Independent transforms | Pool.map with chunking |
Safety Requirements
Always preserve correctness when parallelizing:
- Identify shared state – variables modified across iterations break parallelism
- Check dependencies – iteration N depending on N-1 requires sequential execution
- Handle exceptions – wrap parallel code in try/except, use
executor.submit()for granular error handling - Manage resources – use context managers, limit worker count to avoid exhaustion
- Preserve ordering – use
map()oversubmit()when order matters
Common Pitfalls
- GIL trap: Threading doesn’t help CPU-bound Python codeâuse multiprocessing
- Pickle failures: Lambda functions and nested classes can’t be pickled for multiprocessing
- Memory explosion: ProcessPoolExecutor copies data to each processâuse shared memory for large data
- Async in sync: Can’t just add
asyncto existing codeârequires restructuring call chain - Over-parallelization: Parallel overhead exceeds gains for small workloads (<1000 items typically)
Verification Checklist
Before finalizing transformed code:
- Output matches sequential version for test inputs
- No race conditions (shared mutable state properly synchronized)
- Exceptions are caught and handled appropriately
- Resources are properly cleaned up (pools closed, connections released)
- Worker count is bounded (default or explicit limit)
- Added appropriate imports