patterns-concurrency-dev
1
总安装量
1
周安装量
#50931
全站排名
安装命令
npx skills add https://github.com/arustydev/ai --skill patterns-concurrency-dev
Agent 安装分布
moltbot
1
opencode
1
cursor
1
continue
1
claude-code
1
Skill 文档
Concurrency Patterns
Cross-language reference for concurrency mechanisms including async/await, goroutines, channels, threads, and synchronization primitives. This skill helps translate concurrent code between languages during code conversion.
Overview
This skill covers:
- Async/await pattern comparison
- Goroutines, tasks, and green threads
- Channel and message passing patterns
- Synchronization primitives
- Cancellation and timeout patterns
This skill does NOT cover:
- Building applications with async frameworks (see
lang-*-devskills) - Distributed systems patterns (see dedicated skills)
- Database connection pooling (see database skills)
Concurrency Model Comparison
| Language | Primary Model | Runtime | Threading | Channels |
|---|---|---|---|---|
| TypeScript | async/await | V8 event loop | Workers (limited) | N/A |
| Python | async/await | asyncio | threading/multiprocessing | Queue |
| Rust | async/await | tokio/async-std | std::thread | mpsc, crossbeam |
| Go | Goroutines | Go scheduler | Built-in | chan (first-class) |
| Java | Virtual Threads | JVM | Thread, ExecutorService | BlockingQueue |
| Elixir | Processes | BEAM | N/A (processes) | Built-in messaging |
Model Characteristics
Event Loop (JS/TS, Python asyncio)
âââ Single-threaded by default
âââ Non-blocking I/O
âââ Cooperative scheduling
âââ Cannot utilize multiple cores directly
Goroutines (Go)
âââ Multiplexed onto OS threads
âââ Preemptive scheduling
âââ Built-in channel communication
âââ Automatic multi-core utilization
Tokio/async-std (Rust)
âââ Multi-threaded runtime
âââ Work-stealing scheduler
âââ Zero-cost futures
âââ Explicit spawning for parallelism
BEAM Processes (Elixir/Erlang)
âââ Lightweight isolated processes
âââ Message passing only
âââ Preemptive scheduling
âââ Fault tolerance built-in
Async/Await Translation
Basic Async Function
TypeScript:
async function fetchUser(id: string): Promise<User> {
const response = await fetch(`/users/${id}`);
return response.json();
}
Python:
async def fetch_user(id: str) -> User:
async with httpx.AsyncClient() as client:
response = await client.get(f"/users/{id}")
return User(**response.json())
Rust:
async fn fetch_user(id: &str) -> Result<User, Error> {
let response = reqwest::get(format!("/users/{}", id)).await?;
let user: User = response.json().await?;
Ok(user)
}
Go:
// Go doesn't have async/await - use goroutines + channels
func fetchUser(id string) (*User, error) {
resp, err := http.Get(fmt.Sprintf("/users/%s", id))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var user User
err = json.NewDecoder(resp.Body).Decode(&user)
return &user, err
}
Parallel Execution
Promise.all / join!
TypeScript:
const [users, orders] = await Promise.all([
fetchUsers(),
fetchOrders()
]);
Python:
import asyncio
users, orders = await asyncio.gather(
fetch_users(),
fetch_orders()
)
Rust:
let (users, orders) = tokio::join!(
fetch_users(),
fetch_orders()
);
// Or with try_join for Result types
let (users, orders) = tokio::try_join!(
fetch_users(),
fetch_orders()
)?;
Go:
var wg sync.WaitGroup
var users []User
var orders []Order
var usersErr, ordersErr error
wg.Add(2)
go func() {
defer wg.Done()
users, usersErr = fetchUsers()
}()
go func() {
defer wg.Done()
orders, ordersErr = fetchOrders()
}()
wg.Wait()
Race / select
TypeScript:
const result = await Promise.race([
fetchFromPrimary(),
fetchFromBackup()
]);
Python:
done, pending = await asyncio.wait(
[fetch_from_primary(), fetch_from_backup()],
return_when=asyncio.FIRST_COMPLETED
)
result = done.pop().result()
for task in pending:
task.cancel()
Rust:
tokio::select! {
result = fetch_from_primary() => result,
result = fetch_from_backup() => result,
}
Go:
select {
case result := <-primaryCh:
return result
case result := <-backupCh:
return result
}
Channel Patterns
Basic Channel Usage
Go (native channels):
// Unbuffered channel
ch := make(chan int)
// Send
go func() {
ch <- 42
}()
// Receive
value := <-ch
// Buffered channel
buffered := make(chan int, 10)
Rust (mpsc):
use tokio::sync::mpsc;
// Create channel
let (tx, mut rx) = mpsc::channel(32);
// Send
tokio::spawn(async move {
tx.send(42).await.unwrap();
});
// Receive
while let Some(value) = rx.recv().await {
println!("Received: {}", value);
}
Python (asyncio.Queue):
import asyncio
queue = asyncio.Queue()
# Send
await queue.put(42)
# Receive
value = await queue.get()
TypeScript (no native channels):
// Use a library or implement with EventEmitter/streams
import { Channel } from './channel';
const ch = new Channel<number>();
await ch.send(42);
const value = await ch.receive();
Fan-out / Fan-in
Go:
func fanOut(input <-chan int, workers int) []<-chan int {
outputs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
outputs[i] = worker(input)
}
return outputs
}
func fanIn(inputs ...<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
for _, input := range inputs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for v := range ch {
output <- v
}
}(input)
}
go func() {
wg.Wait()
close(output)
}()
return output
}
Rust:
use tokio::sync::mpsc;
use futures::stream::{self, StreamExt};
async fn fan_out<T: Send + 'static>(
mut input: mpsc::Receiver<T>,
workers: usize,
) -> Vec<mpsc::Receiver<T>> {
// Implementation using multiple channels
}
Cancellation Patterns
Timeout
TypeScript:
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 5000);
try {
const result = await fetch(url, { signal: controller.signal });
clearTimeout(timeout);
return result;
} catch (err) {
if (err.name === 'AbortError') {
throw new Error('Request timed out');
}
throw err;
}
Python:
import asyncio
try:
result = await asyncio.wait_for(fetch_data(), timeout=5.0)
except asyncio.TimeoutError:
raise Exception("Request timed out")
Rust:
use tokio::time::{timeout, Duration};
match timeout(Duration::from_secs(5), fetch_data()).await {
Ok(result) => result?,
Err(_) => return Err(Error::Timeout),
}
Go:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
result, err := fetchData(ctx)
if err == context.DeadlineExceeded {
return nil, errors.New("request timed out")
}
Cancellation Token / Context
Go (Context):
func worker(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
// Do work
}
}
}
// Usage
ctx, cancel := context.WithCancel(context.Background())
go worker(ctx)
// Later...
cancel()
Rust (CancellationToken):
use tokio_util::sync::CancellationToken;
async fn worker(token: CancellationToken) {
loop {
tokio::select! {
_ = token.cancelled() => {
return;
}
_ = do_work() => {}
}
}
}
// Usage
let token = CancellationToken::new();
tokio::spawn(worker(token.clone()));
// Later...
token.cancel();
TypeScript (AbortController):
async function worker(signal: AbortSignal): Promise<void> {
while (!signal.aborted) {
await doWork();
}
}
// Usage
const controller = new AbortController();
worker(controller.signal);
// Later...
controller.abort();
Synchronization Primitives
Mutex
| Language | Type | Usage |
|---|---|---|
| TypeScript | N/A (single-threaded) | Use for async coordination |
| Python | asyncio.Lock |
async with lock: |
| Rust | tokio::sync::Mutex |
let guard = mutex.lock().await |
| Go | sync.Mutex |
mu.Lock(); defer mu.Unlock() |
Rust (async mutex):
use tokio::sync::Mutex;
use std::sync::Arc;
let data = Arc::new(Mutex::new(0));
let data_clone = data.clone();
tokio::spawn(async move {
let mut guard = data_clone.lock().await;
*guard += 1;
});
Go:
var mu sync.Mutex
var count int
go func() {
mu.Lock()
defer mu.Unlock()
count++
}()
Semaphore
Rust:
use tokio::sync::Semaphore;
use std::sync::Arc;
let semaphore = Arc::new(Semaphore::new(10)); // Max 10 concurrent
async fn limited_task(sem: Arc<Semaphore>) {
let _permit = sem.acquire().await.unwrap();
// Do work - permit released on drop
}
Go:
// Using buffered channel as semaphore
sem := make(chan struct{}, 10)
func limitedTask() {
sem <- struct{}{} // Acquire
defer func() { <-sem }() // Release
// Do work
}
Python:
import asyncio
semaphore = asyncio.Semaphore(10)
async def limited_task():
async with semaphore:
# Do work
pass
Translation Patterns
Goroutine â Tokio Task
// Go
go func() {
result := doWork()
resultCh <- result
}()
// Rust
tokio::spawn(async move {
let result = do_work().await;
tx.send(result).await.unwrap();
});
Promise â Future
// TypeScript
function fetchData(): Promise<Data> {
return new Promise((resolve, reject) => {
// ...
});
}
// Rust
async fn fetch_data() -> Result<Data, Error> {
// async fn returns impl Future automatically
}
// Or explicitly
fn fetch_data() -> impl Future<Output = Result<Data, Error>> {
async {
// ...
}
}
Callback â Async/Await
// JavaScript callback
function fetchData(callback) {
http.get(url, (res) => {
callback(null, res);
}).on('error', callback);
}
// TypeScript async
async function fetchData(): Promise<Response> {
return new Promise((resolve, reject) => {
http.get(url, resolve).on('error', reject);
});
}
Common Pitfalls
1. Blocking in Async Context
// â Blocks the async runtime
async fn bad() {
std::thread::sleep(Duration::from_secs(1)); // Blocks!
}
// â Use async sleep
async fn good() {
tokio::time::sleep(Duration::from_secs(1)).await;
}
// â Or spawn_blocking for CPU-bound work
async fn cpu_bound() {
tokio::task::spawn_blocking(|| {
heavy_computation()
}).await.unwrap();
}
2. Deadlock with Channels
// â Deadlock - unbuffered channel, same goroutine
ch := make(chan int)
ch <- 42 // Blocks forever - no receiver
val := <-ch
// â Use goroutine
ch := make(chan int)
go func() { ch <- 42 }()
val := <-ch
3. Forgetting to Close Channels
// â Receiver blocks forever
ch := make(chan int)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
// Forgot to close!
}()
for v := range ch { // Blocks after 10 values
fmt.Println(v)
}
// â Close when done
go func() {
defer close(ch)
for i := 0; i < 10; i++ {
ch <- i
}
}()
4. Shared State Without Synchronization
// â Data race
let mut data = vec![];
for i in 0..10 {
tokio::spawn(async move {
data.push(i); // Cannot borrow mutably!
});
}
// â Use Arc<Mutex<T>>
let data = Arc::new(Mutex::new(vec![]));
for i in 0..10 {
let data = data.clone();
tokio::spawn(async move {
data.lock().await.push(i);
});
}
Best Practices
- Prefer message passing over shared state when possible
- Use structured concurrency – parent tasks own child tasks
- Always handle cancellation – provide clean shutdown paths
- Avoid blocking in async contexts
- Limit concurrency with semaphores for resource-intensive operations
- Close channels when done sending
- Use timeouts for all external operations
- Test concurrent code with race detectors (
go test -race, ThreadSanitizer)
Related Skills
meta-convert-dev– Code conversion patternspatterns-metaprogramming-dev– Async decorators/macroslang-*-devskills – Language-specific concurrency details