dotnet-channels
npx skills add https://github.com/novotnyllc/dotnet-artisan --skill dotnet-channels
Agent 安装分布
Skill 文档
dotnet-channels
Deep guide to System.Threading.Channels for high-performance, thread-safe producer/consumer communication in .NET. Covers channel creation, backpressure strategies, IAsyncEnumerable integration, and graceful shutdown patterns.
Scope
- Channel creation (bounded and unbounded)
- Backpressure strategies and capacity management
- IAsyncEnumerable integration with channel readers
- Graceful shutdown and drain patterns
Out of scope
- Hosted service lifecycle and BackgroundService registration — see [skill:dotnet-background-services]
- Async/await fundamentals and cancellation token propagation — see [skill:dotnet-csharp-async-patterns]
Cross-references: [skill:dotnet-background-services] for integrating channels with hosted services, [skill:dotnet-csharp-async-patterns] for async patterns used in channel consumers.
Channel Fundamentals
A Channel<T> is a thread-safe data structure with separate ChannelWriter<T> and ChannelReader<T> endpoints. Writers produce items, readers consume them — the channel handles all synchronization.
// Create a channel and separate the endpoints
Channel<WorkItem> channel = Channel.CreateUnbounded<WorkItem>();
ChannelWriter<WorkItem> writer = channel.Writer;
ChannelReader<WorkItem> reader = channel.Reader;
Bounded vs Unbounded
| Aspect | Bounded | Unbounded |
|---|---|---|
| Creation | Channel.CreateBounded<T>(capacity) |
Channel.CreateUnbounded<T>() |
| Back-pressure | Yes — FullMode controls behavior when full |
No — grows without limit |
| Memory safety | Capped at capacity items |
Can exhaust memory under load |
| Use when | Production workloads, untrusted producer rates | Guaranteed-low-volume, prototyping |
// Bounded -- preferred for production
var bounded = Channel.CreateBounded<WorkItem>(new BoundedChannelOptions(capacity: 1000)
{
FullMode = BoundedChannelFullMode.Wait
});
// Unbounded -- use only when you control the producer rate
var unbounded = Channel.CreateUnbounded<WorkItem>();
BoundedChannelFullMode
Controls what happens when a bounded channel is full and a producer attempts to write.
| Mode | Behavior | Use case |
|---|---|---|
Wait |
WriteAsync blocks until space is available |
Default. Reliable delivery with back-pressure |
DropOldest |
Drops the oldest item in the channel to make room | Telemetry, metrics — latest data matters most |
DropNewest |
Drops the item being written (newest) | Rate limiting — discard excess incoming work |
DropWrite |
Drops the item being written and returns false from TryWrite |
Non-blocking fire-and-forget with overflow detection |
// DropOldest -- telemetry pipeline where stale readings are expendable
var telemetryChannel = Channel.CreateBounded<SensorReading>(new BoundedChannelOptions(500)
{
FullMode = BoundedChannelFullMode.DropOldest
});
// DropWrite -- non-blocking enqueue with overflow awareness
var logChannel = Channel.CreateBounded<LogEntry>(new BoundedChannelOptions(10_000)
{
FullMode = BoundedChannelFullMode.DropWrite
});
if (!logChannel.Writer.TryWrite(entry))
{
// Channel full -- item was dropped; track overflow metric
overflowCounter.Add(1);
}
itemDropped Callback (.NET 7+)
Starting in .NET 7, bounded channels with drop modes accept an itemDropped callback that fires whenever an item is discarded. Use this for metrics, logging, or resource cleanup on dropped items.
var channel = Channel.CreateBounded(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.DropOldest
},
itemDropped: (item, writer) =>
{
logger.LogWarning("Dropped item due to channel overflow: {Id}", item.Id);
droppedItemsCounter.Add(1);
// Clean up disposable items if needed
(item as IDisposable)?.Dispose();
});
The callback receives the dropped item and the ChannelWriter<T> (useful if you need to re-route items to a fallback channel).
Producer Patterns
Single Producer
// Write with back-pressure (bounded channels)
await writer.WriteAsync(item, cancellationToken);
// Non-blocking write attempt (returns false if channel is full or completed)
if (!writer.TryWrite(item))
{
// Handle overflow -- log, retry, or discard
}
Multiple Producers
Multiple producers can call WriteAsync or TryWrite concurrently without external locking. The channel is internally thread-safe.
// Multiple API endpoints enqueueing work into a shared channel
app.MapPost("/api/orders/{id}/process", async (
string id,
ChannelWriter<OrderCommand> writer,
CancellationToken ct) =>
{
await writer.WriteAsync(new OrderCommand(id, "process"), ct);
return Results.Accepted();
});
app.MapPost("/api/orders/{id}/cancel", async (
string id,
ChannelWriter<OrderCommand> writer,
CancellationToken ct) =>
{
await writer.WriteAsync(new OrderCommand(id, "cancel"), ct);
return Results.Accepted();
});
Signaling Completion
Call Complete() or TryComplete() when no more items will be produced. This lets consumers detect the end of the stream.
// Signal completion -- no more items will be written
writer.Complete();
// TryComplete is idempotent -- safe to call multiple times
writer.TryComplete();
// Signal completion with an error
writer.TryComplete(new InvalidOperationException("Source failed"));
Consumer Patterns
Single Consumer — ReadAsync Loop
The classic pattern: wait for an item, process it, repeat.
while (await reader.WaitToReadAsync(cancellationToken))
{
while (reader.TryRead(out var item))
{
await ProcessAsync(item, cancellationToken);
}
}
This two-loop pattern is preferred over ReadAsync alone because it drains all available items before awaiting again, reducing async state machine overhead.
Single Consumer — ReadAsync (Simpler)
For simpler cases where per-item overhead is acceptable:
try
{
while (true)
{
var item = await reader.ReadAsync(cancellationToken);
await ProcessAsync(item, cancellationToken);
}
}
catch (ChannelClosedException)
{
// Writer called Complete() -- no more items
}
Multiple Consumers (Fan-Out)
Scale processing by running multiple consumer tasks. The channel ensures each item is read by exactly one consumer.
public sealed class ScaledChannelProcessor(
ChannelReader<WorkItem> reader,
IServiceScopeFactory scopeFactory,
ILogger<ScaledChannelProcessor> logger) : BackgroundService
{
private const int WorkerCount = 3;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var workers = Enumerable.Range(0, WorkerCount)
.Select(i => ConsumeAsync(i, stoppingToken));
await Task.WhenAll(workers);
}
private async Task ConsumeAsync(int workerId, CancellationToken ct)
{
logger.LogDebug("Consumer {WorkerId} started", workerId);
while (await reader.WaitToReadAsync(ct))
{
while (reader.TryRead(out var item))
{
try
{
using var scope = scopeFactory.CreateScope();
var handler = scope.ServiceProvider
.GetRequiredService<IWorkItemHandler>();
await handler.HandleAsync(item, ct);
}
catch (Exception ex)
{
logger.LogError(ex,
"Consumer {WorkerId}: error processing {ItemId}",
workerId, item.Id);
}
}
}
logger.LogDebug("Consumer {WorkerId} stopped", workerId);
}
}
IAsyncEnumerable Integration
ChannelReader<T>.ReadAllAsync() returns an IAsyncEnumerable<T>, enabling await foreach consumption and integration with LINQ async operators.
Basic await foreach
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
await ProcessAsync(item, cancellationToken);
}
// Loop exits when writer calls Complete() and all items are consumed
ReadAllAsync is the simplest consumption pattern. It handles WaitToReadAsync/TryRead internally and completes when the channel is closed.
Streaming from an API Endpoint
Channels combine naturally with ASP.NET Core streaming responses. Return the IAsyncEnumerable<T> directly — minimal APIs will stream items as JSON array elements:
app.MapGet("/api/events/stream", (
ChannelReader<ServerEvent> reader,
CancellationToken ct) => reader.ReadAllAsync(ct));
LINQ Async Operators
With the System.Linq.Async NuGet package, channel streams compose with familiar LINQ operators:
// NuGet: System.Linq.Async
await foreach (var batch in reader.ReadAllAsync(ct)
.Where(item => item.Priority >= Priority.High)
.Buffer(50) // Collect into batches of 50
.WithCancellation(ct))
{
await BulkProcessAsync(batch, ct);
}
Producing an IAsyncEnumerable from a Channel
async IAsyncEnumerable<PriceUpdate> StreamPricesAsync(
string symbol,
[EnumeratorCancellation] CancellationToken ct = default)
{
var channel = Channel.CreateUnbounded<PriceUpdate>();
// Start producer in background
_ = Task.Run(async () =>
{
try
{
await foreach (var tick in marketFeed.SubscribeAsync(symbol, ct))
{
await channel.Writer.WriteAsync(tick, ct);
}
channel.Writer.TryComplete();
}
catch (Exception ex)
{
// Propagate error to reader -- ReadAllAsync will throw
channel.Writer.TryComplete(ex);
}
}, ct);
await foreach (var update in channel.Reader.ReadAllAsync(ct))
{
yield return update;
}
}
Performance
SingleReader / SingleWriter Flags
Setting SingleReader = true or SingleWriter = true on channel options enables lock-free optimizations. The channel trusts these hints — violating them (multiple concurrent readers when SingleReader = true) causes data corruption.
// Optimal for single-producer, single-consumer pipeline
var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(1000)
{
SingleReader = true, // One consumer task
SingleWriter = true, // One producer task
FullMode = BoundedChannelFullMode.Wait
});
WaitToReadAsync + TryRead Pattern
The most efficient consumer pattern. WaitToReadAsync suspends until data is available, then TryRead drains all buffered items synchronously — avoiding per-item async state machine overhead.
while (await reader.WaitToReadAsync(ct))
{
// Drain all currently buffered items synchronously
while (reader.TryRead(out var item))
{
Process(item);
}
}
TryWrite Fast Path
TryWrite is synchronous and allocation-free when the channel has space. Prefer it over WriteAsync in hot paths where you can handle the false return.
// Hot path -- avoid async overhead when channel has space
if (!writer.TryWrite(item))
{
// Slow path -- wait for space (or handle overflow)
await writer.WriteAsync(item, ct);
}
Bounded Channel Memory Behavior
Bounded channels pre-allocate an internal array of capacity slots. Items are stored by reference (for reference types), so the channel holds references until consumed. For memory-sensitive workloads:
- Choose capacity based on expected item size multiplied by count
- Items are eligible for GC as soon as
TryRead/ReadAsyncreturns them - Drop modes (
DropOldest,DropNewest) keep memory stable but lose data
Cancellation and Graceful Shutdown
Basic Cancellation
Pass a CancellationToken to all async channel operations. When cancelled, operations throw OperationCanceledException.
try
{
await foreach (var item in reader.ReadAllAsync(stoppingToken))
{
await ProcessAsync(item, stoppingToken);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Expected during shutdown
}
Drain Pattern
Complete the writer to signal no more items will arrive, then drain remaining items before stopping. This prevents data loss during shutdown.
public sealed class DrainableProcessor(
Channel<WorkItem> channel,
IServiceScopeFactory scopeFactory,
ILogger<DrainableProcessor> logger) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var reader = channel.Reader;
try
{
while (await reader.WaitToReadAsync(stoppingToken))
{
while (reader.TryRead(out var item))
{
using var scope = scopeFactory.CreateScope();
var handler = scope.ServiceProvider
.GetRequiredService<IWorkItemHandler>();
await handler.HandleAsync(item, stoppingToken);
}
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Shutdown requested -- fall through to drain
}
// Signal producers to stop -- any concurrent WriteAsync will throw ChannelClosedException
channel.Writer.TryComplete();
// Drain remaining items with a deadline
logger.LogInformation("Draining remaining work items");
using var drainCts = new CancellationTokenSource(TimeSpan.FromSeconds(25));
while (reader.TryRead(out var remaining))
{
try
{
using var scope = scopeFactory.CreateScope();
var handler = scope.ServiceProvider
.GetRequiredService<IWorkItemHandler>();
await handler.HandleAsync(remaining, drainCts.Token);
}
catch (Exception ex)
{
logger.LogWarning(ex, "Error during drain");
}
}
logger.LogInformation("Drain complete");
}
}
Host Shutdown Timeout
The default host shutdown timeout is 30 seconds. If your drain needs more time, configure it:
builder.Services.Configure<HostOptions>(options =>
{
options.ShutdownTimeout = TimeSpan.FromSeconds(60);
});
Agent Gotchas
- Do not use unbounded channels in production without rate control — they can exhaust memory under sustained producer pressure. Always prefer bounded channels with explicit capacity.
- Do not violate SingleReader/SingleWriter promises — these flags enable lock-free optimizations. Multiple concurrent readers with
SingleReader = truecauses data corruption, not exceptions. - Do not forget to call
Complete()on the writer — without completion, consumers usingReadAllAsync()orWaitToReadAsyncwill wait indefinitely after the last item. - Do not catch
ChannelClosedExceptionglobally — it signals that the writer calledComplete(), possibly with an error. Catch it only aroundReadAsynccalls;WaitToReadAsync/TryReadloops handle completion viafalsereturn. - Do not use
ReadAsyncin hot paths — prefer theWaitToReadAsync+TryReadpattern to drain buffered items synchronously and reduce async state machine allocations. - Do not block in the
itemDroppedcallback — it runs synchronously on the writer’s thread. Keep it fast (increment counter, log) or offload heavy work.