rust-async
3
总安装量
3
周安装量
#56100
全站排名
安装命令
npx skills add https://github.com/hwatkins/my-skills --skill rust-async
Agent 安装分布
amp
3
gemini-cli
3
claude-code
3
github-copilot
3
codex
3
kimi-cli
3
Skill 文档
Async Rust Patterns
Expert guidance for building concurrent, async applications in Rust with Tokio.
Core Concepts
- Rust futures are lazy â they do nothing until
.awaited or spawned - Use
tokioas the async runtime (default for most Rust async work) - Prefer structured concurrency â spawn tasks with clear ownership
- Avoid blocking the async runtime â use
spawn_blockingfor CPU-heavy or blocking I/O
Runtime Setup
// â
Good: Multi-threaded runtime (default)
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Your async code here
Ok(())
}
// For libraries, don't pick a runtime â let the consumer choose
// Just return futures, don't call block_on
Spawning Tasks
- Use
tokio::spawnfor independent, concurrent work - Use
JoinHandleto await results from spawned tasks - Use
JoinSetto manage groups of tasks - Always handle errors from spawned tasks (they can panic)
use tokio::task::JoinSet;
// â
Good: JoinSet for managing multiple tasks
async fn fetch_all(urls: Vec<String>) -> Vec<Result<String, reqwest::Error>> {
let mut set = JoinSet::new();
for url in urls {
set.spawn(async move {
reqwest::get(&url).await?.text().await
});
}
let mut results = Vec::new();
while let Some(res) = set.join_next().await {
match res {
Ok(result) => results.push(result),
Err(e) => eprintln!("Task panicked: {e}"),
}
}
results
}
// â
Good: spawn_blocking for CPU-intensive work
async fn hash_password(password: String) -> Result<String, Error> {
tokio::task::spawn_blocking(move || {
bcrypt::hash(&password, bcrypt::DEFAULT_COST)
})
.await?
}
// â Bad: Blocking the async runtime
async fn bad_hash(password: &str) -> String {
bcrypt::hash(password, 12).unwrap() // Blocks the executor!
}
Channels
- Use
tokio::sync::mpscfor multi-producer, single-consumer - Use
tokio::sync::broadcastfor multi-producer, multi-consumer - Use
tokio::sync::oneshotfor single-value responses - Use
tokio::sync::watchfor latest-value broadcasting
use tokio::sync::mpsc;
// â
Good: mpsc for work queues
async fn worker_pool() {
let (tx, mut rx) = mpsc::channel::<Task>(100);
// Spawn workers
for _ in 0..4 {
let mut rx = rx.clone(); // Won't compile â mpsc rx isn't Clone
}
// Instead, use a shared receiver pattern:
let (tx, rx) = mpsc::channel::<Task>(100);
let rx = Arc::new(Mutex::new(rx));
for _ in 0..4 {
let rx = Arc::clone(&rx);
tokio::spawn(async move {
loop {
let task = rx.lock().await.recv().await;
match task {
Some(task) => process(task).await,
None => break,
}
}
});
}
}
// â
Good: oneshot for request-response
use tokio::sync::oneshot;
struct Request {
data: String,
respond_to: oneshot::Sender<Response>,
}
async fn handle_request(req: Request) {
let result = process(&req.data).await;
let _ = req.respond_to.send(result);
}
Select & Timeouts
- Use
tokio::select!to race multiple futures - Always handle all branches â don’t leave futures dangling
- Use
tokio::time::timeoutfor deadline enforcement
use tokio::time::{timeout, Duration};
// â
Good: Timeout on operations
async fn fetch_with_timeout(url: &str) -> Result<String, Error> {
timeout(Duration::from_secs(10), reqwest::get(url))
.await
.map_err(|_| Error::Timeout)?
.map_err(Error::Network)?
.text()
.await
.map_err(Error::Network)
}
// â
Good: Select for racing futures
use tokio::select;
async fn run(mut shutdown: tokio::sync::broadcast::Receiver<()>) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
select! {
_ = interval.tick() => {
do_periodic_work().await;
}
_ = shutdown.recv() => {
tracing::info!("Shutting down gracefully");
break;
}
}
}
}
Shared State
- Use
Arc<Mutex<T>>for shared mutable state (prefertokio::sync::Mutexfor async) - Use
Arc<RwLock<T>>when reads vastly outnumber writes - Use
DashMapfor concurrent hash maps without explicit locking - Minimize lock scope â hold locks for the shortest time possible
use std::sync::Arc;
use tokio::sync::RwLock;
// â
Good: Shared state with RwLock
#[derive(Clone)]
struct AppState {
db: Pool<Postgres>,
cache: Arc<RwLock<HashMap<String, CachedItem>>>,
}
async fn get_cached(state: &AppState, key: &str) -> Option<CachedItem> {
// Read lock â multiple readers allowed
state.cache.read().await.get(key).cloned()
}
async fn set_cached(state: &AppState, key: String, value: CachedItem) {
// Write lock â exclusive access
state.cache.write().await.insert(key, value);
}
// â Bad: Holding lock across await points
async fn bad_update(state: &AppState) {
let mut cache = state.cache.write().await;
let data = fetch_from_db().await; // Lock held during I/O!
cache.insert("key".into(), data);
}
// â
Good: Minimize lock scope
async fn good_update(state: &AppState) {
let data = fetch_from_db().await; // No lock held
state.cache.write().await.insert("key".into(), data);
}
Graceful Shutdown
- Use
tokio::signalto listen for SIGTERM/SIGINT - Use broadcast channels or
CancellationTokento propagate shutdown - Drain in-flight work before exiting
use tokio_util::sync::CancellationToken;
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let token = CancellationToken::new();
let worker_token = token.clone();
let worker = tokio::spawn(async move {
loop {
select! {
_ = worker_token.cancelled() => break,
_ = do_work() => {}
}
}
cleanup().await;
});
// Wait for shutdown signal
tokio::signal::ctrl_c().await?;
tracing::info!("Shutdown signal received");
token.cancel();
worker.await?;
Ok(())
}
Streams
- Use
tokio_streamorfutures::Streamfor async iterators - Use
StreamExtfor combinators (map,filter,buffer_unordered) - Use
buffer_unorderedfor concurrent processing with backpressure
use futures::stream::{self, StreamExt};
// â
Good: Process stream with concurrency limit
async fn process_urls(urls: Vec<String>) -> Vec<String> {
stream::iter(urls)
.map(|url| async move {
reqwest::get(&url).await?.text().await
})
.buffer_unordered(10) // Max 10 concurrent requests
.filter_map(|r| async { r.ok() })
.collect()
.await
}
Common Mistakes
// â Don't hold std::sync::Mutex across .await
let guard = std_mutex.lock().unwrap();
some_async_fn().await; // DEADLOCK RISK
// â
Use tokio::sync::Mutex for async code
let guard = tokio_mutex.lock().await;
// â Don't forget to handle JoinHandle errors
tokio::spawn(async { risky_work().await }); // Panic is silently swallowed
// â
Always handle spawn results
let handle = tokio::spawn(async { risky_work().await });
match handle.await {
Ok(result) => result?,
Err(e) => tracing::error!("Task panicked: {e}"),
}
// â Don't create runtime inside async context
async fn bad() {
let rt = tokio::runtime::Runtime::new().unwrap(); // Panics!
}
// â Don't use async when sync is fine
async fn add(a: i32, b: i32) -> i32 { a + b } // Unnecessary async
fn add(a: i32, b: i32) -> i32 { a + b } // Just use sync
Performance Tips
- Use
buffer_unorderedinstead of spawning unbounded tasks - Batch database operations instead of one-at-a-time queries
- Use connection pooling (
bb8,deadpool,sqlx::Pool) - Profile with
tokio-consolefor runtime introspection - Set appropriate channel buffer sizes â too small causes backpressure, too large wastes memory