gcp-pubsub

📁 dawiddutoit/custom-claude 📅 Jan 26, 2026
4
总安装量
4
周安装量
#49960
全站排名
安装命令
npx skills add https://github.com/dawiddutoit/custom-claude --skill gcp-pubsub

Agent 安装分布

mcpjam 4
neovate 4
gemini-cli 4
antigravity 4
windsurf 4
zencoder 4

Skill 文档

Google Cloud Pub/Sub

Table of Contents

Purpose

Build robust, production-ready event-driven systems using Google Cloud Pub/Sub with Python. Covers setup, publishing, subscribing, error handling, dead letter queues, and local development with the emulator.

When to Use

Use this skill when you need to:

  • Build event-driven architectures with message-based communication
  • Implement reliable message queuing between services
  • Handle at-least-once message delivery guarantees
  • Manage high-throughput message systems (1000+ msgs/sec)
  • Configure local development with Pub/Sub emulator
  • Implement dead letter queues for failed message handling

Quick Start

Install and authenticate:

pip install google-cloud-pubsub
gcloud auth application-default login
python -c "from google.cloud import pubsub_v1; print('Ready')"

Publish a message:

from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")

# Create topic
try:
    publisher.create_topic(request={"name": topic_path})
except Exception as e:
    if "ALREADY_EXISTS" not in str(e):
        raise

# Publish
future = publisher.publish(topic_path, b"Hello, World!")
print(f"Published: {future.result()}")

Subscribe to messages:

from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")

def callback(message):
    print(f"Received: {message.data.decode()}")
    message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)

try:
    future.result(timeout=30)
except Exception:
    future.cancel()

Instructions

Step 1: Set Up Development Environment

Install dependencies and configure authentication:

pip install google-cloud-pubsub
gcloud auth application-default login

For production, use service account:

export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"

Step 2: Create Topics and Subscriptions

from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

# Create topic
topic_path = publisher.topic_path("my-project", "my-topic")
publisher.create_topic(request={"name": topic_path})

# Create subscription
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
subscription_config = {
    "name": subscription_path,
    "topic": topic_path,
    "ack_deadline_seconds": 60,
}
subscriber.create_subscription(request=subscription_config)

See references/detailed-guide.md for advanced configuration options.

Step 3: Publish Messages

Simple publishing:

from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")

future = publisher.publish(topic_path, b"Message data")
message_id = future.result()

Publish with attributes:

import json

data = json.dumps({"event": "user.created", "user_id": "123"}).encode()
future = publisher.publish(
    topic_path,
    data,
    event_type="user.created",
    timestamp="2024-01-15T10:30:00Z"
)

See references/detailed-guide.md for production-ready publisher with batching and error handling.

Step 4: Subscribe to Messages

Basic subscriber:

from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")

def callback(message):
    try:
        print(f"Received: {message.data.decode()}")
        # Process message
        message.ack()
    except Exception as e:
        print(f"Error: {e}")
        message.nack()  # Will be redelivered

future = subscriber.subscribe(subscription_path, callback=callback)

try:
    future.result()  # Block indefinitely
except KeyboardInterrupt:
    future.cancel()

With flow control:

future = subscriber.subscribe(
    subscription_path,
    callback=callback,
    flow_control=pubsub_v1.types.FlowControl(
        max_messages=100,
        max_bytes=100 * 1024 * 1024,  # 100 MB
    ),
)

See references/detailed-guide.md for production subscriber with monitoring.

Step 5: Configure Dead Letter Queue

from google.cloud import pubsub_v1
from google.protobuf.duration_pb2 import Duration

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

# Create dead letter topic
dlq_topic_path = publisher.topic_path("my-project", "my-topic-dlq")
publisher.create_topic(request={"name": dlq_topic_path})

# Create subscription with DLQ
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
subscription = pubsub_v1.types.Subscription(
    name=subscription_path,
    topic=publisher.topic_path("my-project", "my-topic"),
    dead_letter_policy=pubsub_v1.types.DeadLetterPolicy(
        dead_letter_topic=dlq_topic_path,
        max_delivery_attempts=5,
    ),
    retry_policy=pubsub_v1.types.RetryPolicy(
        minimum_backoff=Duration(seconds=10),
        maximum_backoff=Duration(seconds=600),
    ),
)
subscriber.create_subscription(request=subscription)

See references/detailed-guide.md for complete DLQ setup with monitoring.

Step 6: Implement Idempotency

Track processed messages to avoid duplicate processing:

class IdempotentProcessor:
    def __init__(self):
        self.processed_ids = set()

    def process(self, message):
        msg_id = message.message_id

        if msg_id in self.processed_ids:
            print(f"Already processed: {msg_id}")
            message.ack()
            return

        try:
            # Process message
            print(f"Processing: {message.data.decode()}")
            self.processed_ids.add(msg_id)
            message.ack()
        except Exception as e:
            print(f"Failed: {e}")
            message.nack()

See references/detailed-guide.md for production-ready idempotency patterns.

Step 7: Local Development with Emulator

# Install and start emulator
gcloud components install pubsub-emulator
gcloud beta emulators pubsub start

# In another terminal
export PUBSUB_EMULATOR_HOST=localhost:8085
python your_script.py  # Uses emulator automatically

See references/detailed-guide.md for emulator configuration patterns.

Step 8: Monitor Operations

Enable logging and track metrics:

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Query subscription stats
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
subscription = subscriber.get_subscription(request={"subscription": subscription_path})

print(f"Topic: {subscription.topic}")
print(f"Ack deadline: {subscription.ack_deadline_seconds}s")

See references/detailed-guide.md for comprehensive monitoring patterns.

Requirements

  • Python: 3.7+
  • Dependencies:
    pip install google-cloud-pubsub>=2.18.0
    
  • GCP Project: Active project with Pub/Sub API enabled
  • Authentication: Application Default Credentials or service account key
  • IAM Permissions:
    • roles/pubsub.publisher – Publish messages
    • roles/pubsub.subscriber – Subscribe to messages
    • roles/pubsub.admin – Create/delete topics and subscriptions
  • For Local Development:
    gcloud components install pubsub-emulator
    

See Also