queues
8
总安装量
2
周安装量
#33720
全站排名
安装命令
npx skills add https://github.com/null-shot/cloudflare-skills --skill queues
Agent 安装分布
mcpjam
2
claude-code
2
replit
2
windsurf
2
zencoder
2
crush
2
Skill 文档
Cloudflare Queues
Build reliable asynchronous message processing on Cloudflare Workers using Queues for background tasks, batch operations, and retry handling.
When to Use
- Background Tasks – Offload non-critical work from request handlers
- Batch Processing – Accumulate messages and process in batches to reduce upstream API calls
- Retry Handling – Automatic retries with configurable delays for transient failures
- Decoupling – Separate producers from consumers for scalability
- Rate Limiting Upstream – Control the rate of requests to external APIs
- Dead Letter Queues – Capture and inspect failed messages for debugging
Quick Reference
| Task | API |
|---|---|
| Send single message | env.QUEUE_BINDING.send(payload) |
| Send batch | env.QUEUE_BINDING.sendBatch([msg1, msg2]) |
| Define consumer | async queue(batch: MessageBatch, env: Env) { ... } |
| Access message body | batch.messages.map(msg => msg.body) |
| Acknowledge message | Messages auto-ack unless handler throws |
| Retry message | throw new Error() in queue handler |
| Get batch size | batch.messages.length |
FIRST: wrangler.jsonc Configuration
Queues require both producer and consumer configuration:
{
"name": "request-logger-consumer",
"main": "src/index.ts",
"compatibility_date": "2025-02-11",
"queues": {
"producers": [{
"name": "request-queue",
"binding": "REQUEST_QUEUE"
}],
"consumers": [{
"name": "request-queue",
"dead_letter_queue": "request-queue-dlq",
"retry_delay": 300,
"max_batch_size": 100,
"max_batch_timeout": 30,
"max_retries": 3
}]
},
"vars": {
"UPSTREAM_API_URL": "https://api.example.com/batch-logs",
"UPSTREAM_API_KEY": ""
}
}
Consumer Options:
dead_letter_queue– Queue name for failed messages (optional)retry_delay– Seconds to wait before retry (default: 0)max_batch_size– Max messages per batch (default: 10, max: 100)max_batch_timeout– Max seconds to wait for batch (default: 5, max: 30)max_retries– Max retry attempts (default: 3)
Producer and Consumer Pattern
Complete example showing how to produce and consume messages:
// src/index.ts
interface Env {
REQUEST_QUEUE: Queue;
UPSTREAM_API_URL: string;
UPSTREAM_API_KEY: string;
}
export default {
// Producer: Send messages to queue
async fetch(request: Request, env: Env) {
const info = {
timestamp: new Date().toISOString(),
method: request.method,
url: request.url,
headers: Object.fromEntries(request.headers),
};
await env.REQUEST_QUEUE.send(info);
return Response.json({
message: 'Request logged',
requestId: crypto.randomUUID()
});
},
// Consumer: Process messages in batches
async queue(batch: MessageBatch<any>, env: Env) {
const requests = batch.messages.map(msg => msg.body);
const response = await fetch(env.UPSTREAM_API_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${env.UPSTREAM_API_KEY}`
},
body: JSON.stringify({
timestamp: new Date().toISOString(),
batchSize: requests.length,
requests
})
});
if (!response.ok) {
// Throwing will retry the entire batch
throw new Error(`Upstream API error: ${response.status}`);
}
}
};
Batch Message Types
Send messages with different formats:
// Send simple JSON payload
await env.QUEUE.send({ userId: 123, action: "login" });
// Send batch of messages
await env.QUEUE.sendBatch([
{ userId: 123, action: "login" },
{ userId: 456, action: "logout" },
{ userId: 789, action: "purchase" }
]);
// Send with typed body
interface UserEvent {
userId: number;
action: string;
timestamp: string;
}
await env.QUEUE.send<UserEvent>({
userId: 123,
action: "login",
timestamp: new Date().toISOString()
});
Retry and Dead Letter Queues
Configure automatic retries and capture failed messages:
{
"queues": {
"consumers": [{
"name": "main-queue",
"dead_letter_queue": "main-queue-dlq",
"retry_delay": 300, // 5 minutes
"max_retries": 3
}]
}
}
Retry Behavior:
- Handler throws error â message is retried after
retry_delayseconds - After
max_retriesattempts â message moves to dead letter queue - No DLQ configured â message is discarded after max retries
- Handler succeeds â message is acknowledged and removed
Processing Dead Letter Queue:
export default {
// Main consumer
async queue(batch: MessageBatch<any>, env: Env) {
for (const message of batch.messages) {
try {
await processMessage(message.body);
} catch (error) {
console.error('Processing failed:', error);
throw error; // Trigger retry
}
}
}
};
// Separate worker for DLQ
export default {
async queue(batch: MessageBatch<any>, env: Env) {
// Log failed messages for debugging
for (const message of batch.messages) {
console.error('Dead letter message:', {
body: message.body,
attempts: message.attempts,
timestamp: message.timestamp
});
// Optionally store in KV/D1 for inspection
await env.FAILED_MESSAGES.put(
message.id,
JSON.stringify(message),
{ expirationTtl: 86400 * 7 } // 7 days
);
}
}
};
Batch Processing Patterns
Pattern 1: All-or-Nothing Batch
Process entire batch as transactionâif any message fails, retry all:
async queue(batch: MessageBatch<any>, env: Env) {
// Throwing retries entire batch
const response = await fetch(env.UPSTREAM_API_URL, {
method: 'POST',
body: JSON.stringify(batch.messages.map(m => m.body))
});
if (!response.ok) {
throw new Error(`Batch failed: ${response.status}`);
}
}
Pattern 2: Individual Message Handling
Process messages individually with partial success:
async queue(batch: MessageBatch<any>, env: Env) {
const results = await Promise.allSettled(
batch.messages.map(msg => processMessage(msg.body))
);
const failures = results.filter(r => r.status === 'rejected');
if (failures.length > 0) {
console.error(`${failures.length}/${batch.messages.length} messages failed`);
// Throwing here retries the entire batch
// Consider sending failed messages to a separate queue instead
}
}
Pattern 3: Partial Retry with Requeue
Requeue only failed messages:
async queue(batch: MessageBatch<any>, env: Env) {
const failedMessages = [];
for (const message of batch.messages) {
try {
await processMessage(message.body);
} catch (error) {
failedMessages.push(message.body);
}
}
// Requeue only failures
if (failedMessages.length > 0) {
await env.RETRY_QUEUE.sendBatch(failedMessages);
}
// Don't throw - successfully processed messages won't be retried
}
Message Size Limits
- Max message size: 128 KB per message
- Max batch size: 100 messages per batch (configurable)
- Max total batch size: 256 MB
// Handle large payloads
async function sendLargePayload(data: any, env: Env) {
const serialized = JSON.stringify(data);
if (serialized.length > 100_000) { // ~100KB
// Option 1: Store in R2/KV, send reference
const key = crypto.randomUUID();
await env.LARGE_PAYLOADS.put(key, serialized);
await env.QUEUE.send({ type: 'large', key });
} else {
await env.QUEUE.send(data);
}
}
Environment Interface
Type your queue bindings:
interface Env {
// Producer bindings
REQUEST_QUEUE: Queue<RequestInfo>;
EMAIL_QUEUE: Queue<EmailPayload>;
// Environment variables
UPSTREAM_API_URL: string;
UPSTREAM_API_KEY: string;
// Other bindings
KV: KVNamespace;
DB: D1Database;
}
interface RequestInfo {
timestamp: string;
method: string;
url: string;
headers: Record<string, string>;
}
interface EmailPayload {
to: string;
subject: string;
body: string;
}
Detailed References
- references/patterns.md – Advanced patterns: fan-out, priority queues, rate limiting
- references/error-handling.md – Retry strategies, DLQ management, monitoring
- references/limits.md – Message size, batch limits, retention, CPU constraints
- references/testing.md – Vitest integration, createMessageBatch, getQueueResult, testing handlers
Best Practices
- Use batch processing: Reduce upstream API calls by processing messages in batches
- Configure retry_delay: Set appropriate delays to avoid overwhelming failing services
- Always configure DLQ: Capture failed messages for debugging and replay
- Type your messages: Use generics for type-safe message bodies
- Monitor batch timeouts: Adjust
max_batch_timeoutbased on processing time - Handle partial failures: Don’t throw on single message failure if others succeeded
- Size payloads appropriately: Keep messages under 100KB; use R2/KV for large data
- Use separate queues for priorities: Different queues for high/low priority messages
- Log DLQ messages: Always log or store DLQ messages for later analysis
- Don’t await send() in hot paths: Queue operations are async but fastâfire and forget when appropriate