message_queues
npx skills add https://github.com/vuralserhat86/antigravity-agentic-skills --skill message_queues
Agent 安装分布
Skill 文档
Message Queues
Implement asynchronous communication patterns for event-driven architectures, background job processing, and service decoupling.
When to Use This Skill
Use message queues when:
- Long-running operations block HTTP requests (report generation, video processing)
- Service decoupling required (microservices, event-driven architecture)
- Guaranteed delivery needed (payment processing, order fulfillment)
- Event streaming for analytics (log aggregation, metrics pipelines)
- Workflow orchestration for complex processes (multi-step sagas, human-in-the-loop)
- Background job processing (email sending, image resizing)
Broker Selection Decision Tree
Choose message broker based on primary need:
Event Streaming / Log Aggregation
â Apache Kafka
- Throughput: 500K-1M msg/s
- Replay events (event sourcing)
- Exactly-once semantics
- Long-term retention
- Use: Analytics pipelines, CQRS, event sourcing
Simple Background Jobs
â Task Queues
- Python â Celery + Redis
- TypeScript â BullMQ + Redis
- Go â Asynq + Redis
- Use: Email sending, report generation, webhooks
Complex Workflows / Sagas
â Temporal
- Durable execution (survives restarts)
- Saga pattern support
- Human-in-the-loop workflows
- Use: Order processing, AI agent orchestration
Request-Reply / RPC Patterns
â NATS
- Built-in request-reply
- Sub-millisecond latency
- Cloud-native, simple operations
- Use: Microservices RPC, IoT command/control
Complex Message Routing
â RabbitMQ
- Exchanges (direct, topic, fanout, headers)
- Dead letter exchanges
- Message TTL, priorities
- Use: Multi-consumer patterns, pub/sub
Already Using Redis
â Redis Streams
- No new infrastructure
- Simple consumer groups
- Moderate throughput (100K+ msg/s)
- Use: Notification queues, simple job queues
Performance Comparison
| Broker | Throughput | Latency (p99) | Best For |
|---|---|---|---|
| Kafka | 500K-1M msg/s | 10-50ms | Event streaming |
| NATS JetStream | 200K-400K msg/s | Sub-ms to 5ms | Cloud-native microservices |
| RabbitMQ | 50K-100K msg/s | 5-20ms | Task queues, complex routing |
| Redis Streams | 100K+ msg/s | Sub-ms | Simple queues, caching |
Quick Start Examples
Kafka Producer/Consumer (Python)
See examples/kafka-python/ for working code.
from confluent_kafka import Producer, Consumer
# Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', key='order_123', value='{"status": "created"}')
producer.flush()
# Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])
while True:
msg = consumer.poll(1.0)
if msg is not None:
process_order(msg.value())
Celery Background Jobs (Python)
See examples/celery-image-processing/ for full implementation.
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
try:
result = expensive_image_processing(image_url)
return result
except RecoverableError as e:
raise self.retry(exc=e, countdown=60)
BullMQ Job Processing (TypeScript)
See examples/bullmq-webhook-processor/ for full implementation.
import { Queue, Worker } from 'bullmq'
const queue = new Queue('webhooks', {
connection: { host: 'localhost', port: 6379 }
})
// Enqueue job
await queue.add('send-webhook', {
url: 'https://example.com/webhook',
payload: { event: 'order.created' }
})
// Process jobs
const worker = new Worker('webhooks', async job => {
await fetch(job.data.url, {
method: 'POST',
body: JSON.stringify(job.data.payload)
})
}, { connection: { host: 'localhost', port: 6379 } })
Temporal Workflow Orchestration
See examples/temporal-order-saga/ for saga pattern implementation.
from temporalio import workflow, activity
from datetime import timedelta
@workflow.defn
class OrderSagaWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# Step 1: Reserve inventory
inventory_id = await workflow.execute_activity(
reserve_inventory,
order_id,
start_to_close_timeout=timedelta(seconds=10),
)
# Step 2: Charge payment
payment_id = await workflow.execute_activity(
charge_payment,
order_id,
start_to_close_timeout=timedelta(seconds=30),
)
return f"Order {order_id} completed"
Core Patterns
Event Naming Convention
Use: Domain.Entity.Action.Version
Examples:
order.created.v1user.profile.updated.v2payment.failed.v1
Event Schema Structure
{
"event_type": "order.created.v2",
"event_id": "uuid-here",
"timestamp": "2025-12-02T10:00:00Z",
"version": "2.0",
"data": {
"order_id": "ord_123",
"customer_id": "cus_456"
},
"metadata": {
"producer": "order-service",
"trace_id": "abc123",
"correlation_id": "xyz789"
}
}
Dead Letter Queue Pattern
Route failed messages to dead letter queue (DLQ) after max retries:
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
try:
result = perform_processing(order_id)
return result
except UnrecoverableError as e:
send_to_dlq(order_id, str(e))
raise Reject(e, requeue=False)
Idempotency for Exactly-Once Processing
@app.post("/process")
async def process_payment(
payment_data: dict,
idempotency_key: str = Header(None)
):
# Check if already processed
cached_result = redis_client.get(f"idempotency:{idempotency_key}")
if cached_result:
return {"status": "already_processed"}
result = process_payment_logic(payment_data)
redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
return {"status": "processed", "result": result}
Frontend Integration
Job Status Updates via SSE
# FastAPI endpoint for real-time job status
@app.get("/status/{task_id}")
async def task_status_stream(task_id: str):
async def event_generator():
while True:
task = celery_app.AsyncResult(task_id)
if task.state == 'PROGRESS':
yield {"event": "progress", "data": task.info.get('progress', 0)}
elif task.state == 'SUCCESS':
yield {"event": "complete", "data": task.result}
break
await asyncio.sleep(0.5)
return EventSourceResponse(event_generator())
React Component
export function JobStatus({ jobId }: { jobId: string }) {
const [progress, setProgress] = useState(0)
useEffect(() => {
const eventSource = new EventSource(`/api/status/${jobId}`)
eventSource.addEventListener('progress', (e) => {
setProgress(JSON.parse(e.data))
})
eventSource.addEventListener('complete', (e) => {
toast({ title: 'Job complete', description: JSON.parse(e.data) })
eventSource.close()
})
return () => eventSource.close()
}, [jobId])
return <ProgressBar value={progress} />
}
Detailed Guides
For comprehensive documentation, see reference files:
Broker-Specific Guides
- Kafka: See
references/kafka.mdfor partitioning, consumer groups, exactly-once semantics - RabbitMQ: See
references/rabbitmq.mdfor exchanges, bindings, routing patterns - NATS: See
references/nats.mdfor JetStream, request-reply patterns - Redis Streams: See
references/redis-streams.mdfor consumer groups, acknowledgments
Task Queue Guides
- Celery: See
references/celery.mdfor periodic tasks, canvas (workflows), monitoring - BullMQ: See
references/bullmq.mdfor job prioritization, flows, Bull Board monitoring - Temporal: See
references/temporal-workflows.mdfor saga patterns, signals, queries
Pattern Guides
- Event Patterns: See
references/event-patterns.mdfor event sourcing, CQRS, outbox pattern
Common Anti-Patterns to Avoid
1. Synchronous API for Long Operations
# â BAD: Blocks request thread
@app.post("/generate-report")
def generate_report(user_id: str):
report = expensive_computation(user_id) # 5 minutes!
return report
# â
GOOD: Enqueue background job
@app.post("/generate-report")
async def generate_report(user_id: str):
task = generate_report_task.delay(user_id)
return {"task_id": task.id}
2. Non-Idempotent Consumers
# â BAD: Processes duplicates
@app.task
def send_email(email: str):
send_email_service(email) # Sends twice if retried!
# â
GOOD: Idempotent with deduplication
@app.task
def send_email(email: str, idempotency_key: str):
if redis.exists(f"sent:{idempotency_key}"):
return "already_sent"
send_email_service(email)
redis.setex(f"sent:{idempotency_key}", 86400, "1")
3. Ignoring Dead Letter Queues
# â BAD: Failed messages lost forever
@app.task(max_retries=3)
def risky_task(data):
process(data) # If all retries fail, data disappears
# â
GOOD: DLQ for manual inspection
@app.task(max_retries=3)
def risky_task(data):
try:
process(data)
except Exception as e:
if self.request.retries >= 3:
send_to_dlq(data, str(e))
raise
4. Using Kafka for Request-Reply
# â BAD: Kafka is not designed for RPC
def get_user_profile(user_id: str):
kafka_producer.send("user_requests", {"user_id": user_id})
# How to correlate response? Kafka is asynchronous!
# â
GOOD: Use NATS request-reply or HTTP/gRPC
response = await nats.request("user.profile", user_id.encode())
Library Recommendations
Context7 Research
Confluent Kafka (Python)
- Context7 ID:
/confluentinc/confluent-kafka-python - Trust Score: 68.8/100
- Code Snippets: 192+
- Production-ready Python Kafka client
Temporal
- Context7 ID:
/websites/temporal_io - Trust Score: 80.9/100
- Code Snippets: 3,769+
- Workflow orchestration for durable execution
Installation
Python:
pip install confluent-kafka celery[redis] temporalio aio-pika redis
TypeScript/Node.js:
npm install kafkajs bullmq @temporalio/client amqplib ioredis
Rust:
cargo add rdkafka lapin async-nats redis
Go:
go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdk
Utilities
Use scripts for setup automation:
- Kafka setup: Run
python scripts/kafka_producer_consumer.pyfor test utilities - Schema validation: Run
python scripts/validate_message_schema.pyto validate event schemas
Related Skills
- api-patterns: API design for async job submission
- realtime-sync: WebSocket/SSE for job status updates
- feedback: Toast notifications for job completion
- databases-*: Persistent storage for event logs Message Queues v1.1 – Enhanced
ð Workflow
Kaynak: Enterprise Integration Patterns & Confluent Kafka Guide
AÅama 1: Design Phase
- Pattern Selection: Point-to-Point (Queue) mi Pub-Sub (Topic) mi karar ver.
- Schema Registry: Mesaj formatını (Avro/Protobuf) ve versiyonlamayı baÅtan yap.
- Partitioning: Veri daÄılımını (Ordering garantisi için Key seçimi) planla.
AÅama 2: Implementation Checklist
- Idempotency: Consumer tarafında “Exactly-Once” veya “At-Least-Once” stratejisini kur.
- DLQ: İÅlenemeyen mesajlar için Dead Letter Queue ve Alarm kur.
- Backpressure: Consumer yavaÅlarsa Producer’ı yavaÅlatacak mekanizmayı düÅün.
AÅama 3: Operations
- Lag Monitoring: Consumer Lag (üretim hızı vs tüketim hızı) metriÄini izle.
- Retention: Disk doluluÄunu önlemek için retention policy (süre veya boyut) ayarla.
Kontrol Noktaları
| AÅama | DoÄrulama |
|---|---|
| 1 | Mesaj sırasında (ordering) bozulma iÅ mantıÄını bozuyor mu? |
| 2 | Sistem 24 saatlik log kaybına dayanıklı mı (Durability)? |
| 3 | Poison message (formatı bozuk mesaj) sistemi kilitliyor mu? |