azure-servicebus-dotnet
0
总安装量
3
周安装量
#55728
全站排名
安装命令
npx skills add https://github.com/microsoft/agent-skills --skill azure-servicebus-dotnet
Agent 安装分布
opencode
2
claude-code
2
github-copilot
2
mcpjam
1
qwen-code
1
Skill 文档
Azure.Messaging.ServiceBus (.NET)
Enterprise messaging SDK for reliable message delivery with queues, topics, subscriptions, and sessions.
Installation
dotnet add package Azure.Messaging.ServiceBus
dotnet add package Azure.Identity
Current Version: v7.20.1 (stable)
Environment Variables
AZURE_SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
# Or connection string (less secure)
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://...
Authentication
Microsoft Entra ID (Recommended)
using Azure.Identity;
using Azure.Messaging.ServiceBus;
string fullyQualifiedNamespace = "<namespace>.servicebus.windows.net";
await using ServiceBusClient client = new(fullyQualifiedNamespace, new DefaultAzureCredential());
Connection String
string connectionString = "<connection_string>";
await using ServiceBusClient client = new(connectionString);
ASP.NET Core Dependency Injection
services.AddAzureClients(builder =>
{
builder.AddServiceBusClientWithNamespace("<namespace>.servicebus.windows.net");
builder.UseCredential(new DefaultAzureCredential());
});
Client Hierarchy
ServiceBusClient
âââ CreateSender(queueOrTopicName) â ServiceBusSender
âââ CreateReceiver(queueName) â ServiceBusReceiver
âââ CreateReceiver(topicName, subName) â ServiceBusReceiver
âââ AcceptNextSessionAsync(queueName) â ServiceBusSessionReceiver
âââ CreateProcessor(queueName) â ServiceBusProcessor
âââ CreateSessionProcessor(queueName) â ServiceBusSessionProcessor
ServiceBusAdministrationClient (separate client for CRUD)
Core Workflows
1. Send Messages
await using ServiceBusClient client = new(fullyQualifiedNamespace, new DefaultAzureCredential());
ServiceBusSender sender = client.CreateSender("my-queue");
// Single message
ServiceBusMessage message = new("Hello world!");
await sender.SendMessageAsync(message);
// Safe batching (recommended)
using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync();
if (batch.TryAddMessage(new ServiceBusMessage("Message 1")))
{
// Message added successfully
}
if (batch.TryAddMessage(new ServiceBusMessage("Message 2")))
{
// Message added successfully
}
await sender.SendMessagesAsync(batch);
2. Receive Messages
ServiceBusReceiver receiver = client.CreateReceiver("my-queue");
// Single message
ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();
string body = message.Body.ToString();
Console.WriteLine(body);
// Complete the message (removes from queue)
await receiver.CompleteMessageAsync(message);
// Batch receive
IReadOnlyList<ServiceBusReceivedMessage> messages = await receiver.ReceiveMessagesAsync(maxMessages: 10);
foreach (var msg in messages)
{
Console.WriteLine(msg.Body.ToString());
await receiver.CompleteMessageAsync(msg);
}
3. Message Settlement
// Complete - removes message from queue
await receiver.CompleteMessageAsync(message);
// Abandon - releases lock, message can be received again
await receiver.AbandonMessageAsync(message);
// Defer - prevents normal receive, use ReceiveDeferredMessageAsync
await receiver.DeferMessageAsync(message);
// Dead Letter - moves to dead letter subqueue
await receiver.DeadLetterMessageAsync(message, "InvalidFormat", "Message body was not valid JSON");
4. Background Processing with Processor
ServiceBusProcessor processor = client.CreateProcessor("my-queue", new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 2
});
processor.ProcessMessageAsync += async (args) =>
{
try
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Received: {body}");
await args.CompleteMessageAsync(args.Message);
}
catch (Exception ex)
{
Console.WriteLine($"Error processing: {ex.Message}");
await args.AbandonMessageAsync(args.Message);
}
};
processor.ProcessErrorAsync += (args) =>
{
Console.WriteLine($"Error source: {args.ErrorSource}");
Console.WriteLine($"Entity: {args.EntityPath}");
Console.WriteLine($"Exception: {args.Exception}");
return Task.CompletedTask;
};
await processor.StartProcessingAsync();
// ... application runs
await processor.StopProcessingAsync();
5. Sessions (Ordered Processing)
// Send session message
ServiceBusMessage message = new("Hello")
{
SessionId = "order-123"
};
await sender.SendMessageAsync(message);
// Receive from next available session
ServiceBusSessionReceiver receiver = await client.AcceptNextSessionAsync("my-queue");
// Or receive from specific session
ServiceBusSessionReceiver receiver = await client.AcceptSessionAsync("my-queue", "order-123");
// Session state management
await receiver.SetSessionStateAsync(new BinaryData("processing"));
BinaryData state = await receiver.GetSessionStateAsync();
// Renew session lock
await receiver.RenewSessionLockAsync();
6. Dead Letter Queue
// Receive from dead letter queue
ServiceBusReceiver dlqReceiver = client.CreateReceiver("my-queue", new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter
});
ServiceBusReceivedMessage dlqMessage = await dlqReceiver.ReceiveMessageAsync();
// Access dead letter metadata
string reason = dlqMessage.DeadLetterReason;
string description = dlqMessage.DeadLetterErrorDescription;
Console.WriteLine($"Dead letter reason: {reason} - {description}");
7. Topics and Subscriptions
// Send to topic
ServiceBusSender topicSender = client.CreateSender("my-topic");
await topicSender.SendMessageAsync(new ServiceBusMessage("Broadcast message"));
// Receive from subscription
ServiceBusReceiver subReceiver = client.CreateReceiver("my-topic", "my-subscription");
var message = await subReceiver.ReceiveMessageAsync();
8. Administration (CRUD)
var adminClient = new ServiceBusAdministrationClient(
fullyQualifiedNamespace,
new DefaultAzureCredential());
// Create queue
var options = new CreateQueueOptions("my-queue")
{
MaxDeliveryCount = 10,
LockDuration = TimeSpan.FromSeconds(30),
RequiresSession = true,
DeadLetteringOnMessageExpiration = true
};
QueueProperties queue = await adminClient.CreateQueueAsync(options);
// Update queue
queue.LockDuration = TimeSpan.FromSeconds(60);
await adminClient.UpdateQueueAsync(queue);
// Create topic and subscription
await adminClient.CreateTopicAsync(new CreateTopicOptions("my-topic"));
await adminClient.CreateSubscriptionAsync(new CreateSubscriptionOptions("my-topic", "my-subscription"));
// Delete
await adminClient.DeleteQueueAsync("my-queue");
9. Cross-Entity Transactions
var options = new ServiceBusClientOptions { EnableCrossEntityTransactions = true };
await using var client = new ServiceBusClient(connectionString, options);
ServiceBusReceiver receiverA = client.CreateReceiver("queueA");
ServiceBusSender senderB = client.CreateSender("queueB");
ServiceBusReceivedMessage receivedMessage = await receiverA.ReceiveMessageAsync();
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await receiverA.CompleteMessageAsync(receivedMessage);
await senderB.SendMessageAsync(new ServiceBusMessage("Forwarded"));
ts.Complete();
}
Key Types Reference
| Type | Purpose |
|---|---|
ServiceBusClient |
Main entry point, manages connection |
ServiceBusSender |
Sends messages to queues/topics |
ServiceBusReceiver |
Receives messages from queues/subscriptions |
ServiceBusSessionReceiver |
Receives session messages |
ServiceBusProcessor |
Background message processing |
ServiceBusSessionProcessor |
Background session processing |
ServiceBusAdministrationClient |
CRUD for queues/topics/subscriptions |
ServiceBusMessage |
Message to send |
ServiceBusReceivedMessage |
Received message with metadata |
ServiceBusMessageBatch |
Batch of messages |
Best Practices
- Use singletons â Clients, senders, receivers, and processors are thread-safe
- Always dispose â Use
await usingor callDisposeAsync() - Dispose order â Close senders/receivers/processors first, then client
- Use DefaultAzureCredential â Prefer over connection strings for production
- Use processors for background work â Handles lock renewal automatically
- Use safe batching â
CreateMessageBatchAsync()andTryAddMessage() - Handle transient errors â Use
ServiceBusException.Reason - Configure transport â Use
AmqpWebSocketsif ports 5671/5672 are blocked - Set appropriate lock duration â Default is 30 seconds
- Use sessions for ordering â FIFO within a session
Error Handling
try
{
await sender.SendMessageAsync(message);
}
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.ServiceBusy)
{
// Retry with backoff
}
catch (ServiceBusException ex)
{
Console.WriteLine($"Service Bus Error: {ex.Reason} - {ex.Message}");
}
Related SDKs
| SDK | Purpose | Install |
|---|---|---|
Azure.Messaging.ServiceBus |
Service Bus (this SDK) | dotnet add package Azure.Messaging.ServiceBus |
Azure.Messaging.EventHubs |
Event streaming | dotnet add package Azure.Messaging.EventHubs |
Azure.Messaging.EventGrid |
Event routing | dotnet add package Azure.Messaging.EventGrid |