Event-Driven Architecture on GCP: Mastering Cloud Pub/Sub for Real-Time Systems

Executive Summary: Google Cloud Pub/Sub provides the foundation for event-driven architectures at any scale, offering globally distributed messaging with exactly-once delivery semantics and sub-second latency. This comprehensive guide explores Pub/Sub’s enterprise capabilities, from dead letter queues and message ordering to BigQuery subscriptions and schema enforcement. After building event-driven systems across multiple cloud platforms, I’ve found Pub/Sub delivers the most operationally simple yet powerful messaging service available. Organizations should leverage Pub/Sub for decoupling microservices, streaming data pipelines, and real-time event processing while implementing proper message schema governance and monitoring from the start.

Pub/Sub Architecture: Topics, Subscriptions, and Delivery Guarantees

Pub/Sub implements a publish-subscribe pattern where publishers send messages to topics and subscribers receive messages through subscriptions. This decoupling enables independent scaling of producers and consumers while providing durable message storage. Messages persist for up to 31 days (configurable), ensuring subscribers can recover from extended outages without data loss.

The service offers two subscription types optimized for different consumption patterns. Pull subscriptions give subscribers control over message retrieval rate, ideal for batch processing and workloads with variable processing capacity. Push subscriptions deliver messages to HTTP endpoints automatically, simplifying integration with Cloud Run, Cloud Functions, and external services. Push subscriptions handle retry logic and backoff automatically, reducing operational complexity.

Delivery guarantees vary by configuration. By default, Pub/Sub provides at-least-once delivery—messages may be delivered multiple times during retries or redeliveries. Exactly-once delivery, available with specific configurations, eliminates duplicates but requires careful subscriber implementation. Message ordering guarantees in-order delivery for messages with the same ordering key, essential for event sourcing and state machine patterns where sequence matters.

Advanced Features for Enterprise Workloads

Dead letter topics capture messages that fail processing after configurable retry attempts. This prevents poison messages from blocking subscription progress while preserving failed messages for analysis and reprocessing. Configure dead letter policies with appropriate max delivery attempts—I recommend 5-10 attempts with exponential backoff before routing to dead letter topics.

Schema enforcement validates message structure at publish time, preventing malformed messages from entering the system. Pub/Sub supports Avro, Protocol Buffers, and JSON schemas with schema registry integration. Schema evolution with compatibility checking ensures backward and forward compatibility as your message formats evolve. This is critical for large organizations where multiple teams publish to shared topics.

BigQuery subscriptions write messages directly to BigQuery tables without intermediate processing, ideal for analytics pipelines and audit logging. Messages are batched and written efficiently, with automatic schema mapping for JSON payloads. This eliminates the need for custom Dataflow pipelines for simple message archival use cases, reducing both complexity and cost.

Production Terraform Configuration

Here’s a comprehensive Terraform configuration for Pub/Sub topics and subscriptions with enterprise features enabled:

# Pub/Sub Enterprise Configuration
terraform {
  required_version = ">= 1.5.0"
  required_providers {
    google = { source = "hashicorp/google", version = "~> 5.0" }
  }
}

variable "project_id" { type = string }

# Schema for message validation
resource "google_pubsub_schema" "events" {
  name       = "events-schema"
  type       = "AVRO"
  definition = jsonencode({
    type = "record"
    name = "Event"
    fields = [
      { name = "event_id", type = "string" },
      { name = "event_type", type = "string" },
      { name = "timestamp", type = "long" },
      { name = "payload", type = "string" }
    ]
  })
}

# Main events topic with schema enforcement
resource "google_pubsub_topic" "events" {
  name = "events"
  
  schema_settings {
    schema   = google_pubsub_schema.events.id
    encoding = "JSON"
  }

  message_retention_duration = "604800s"  # 7 days

  labels = {
    environment = "production"
    team        = "platform"
  }
}

# Dead letter topic for failed messages
resource "google_pubsub_topic" "events_dlq" {
  name = "events-dlq"
  message_retention_duration = "2592000s"  # 30 days
}

# Pull subscription with dead letter policy
resource "google_pubsub_subscription" "events_processor" {
  name  = "events-processor"
  topic = google_pubsub_topic.events.name

  ack_deadline_seconds       = 60
  message_retention_duration = "604800s"
  retain_acked_messages      = false
  
  enable_exactly_once_delivery = true

  expiration_policy {
    ttl = ""  # Never expire
  }

  retry_policy {
    minimum_backoff = "10s"
    maximum_backoff = "600s"
  }

  dead_letter_policy {
    dead_letter_topic     = google_pubsub_topic.events_dlq.id
    max_delivery_attempts = 10
  }

  labels = {
    consumer = "event-processor"
  }
}

# Push subscription to Cloud Run
resource "google_pubsub_subscription" "events_webhook" {
  name  = "events-webhook"
  topic = google_pubsub_topic.events.name

  push_config {
    push_endpoint = "https://event-handler-xxx.run.app/events"
    
    oidc_token {
      service_account_email = google_service_account.pubsub_invoker.email
    }

    attributes = {
      x-goog-version = "v1"
    }
  }

  ack_deadline_seconds = 30
  
  retry_policy {
    minimum_backoff = "10s"
    maximum_backoff = "300s"
  }
}

# BigQuery subscription for analytics
resource "google_pubsub_subscription" "events_bigquery" {
  name  = "events-bigquery"
  topic = google_pubsub_topic.events.name

  bigquery_config {
    table            = "${var.project_id}.analytics.events_raw"
    use_topic_schema = true
    write_metadata   = true
  }
}

# Service account for push authentication
resource "google_service_account" "pubsub_invoker" {
  account_id   = "pubsub-invoker"
  display_name = "Pub/Sub Push Invoker"
}

# IAM for dead letter publishing
resource "google_pubsub_topic_iam_member" "dlq_publisher" {
  topic  = google_pubsub_topic.events_dlq.name
  role   = "roles/pubsub.publisher"
  member = "serviceAccount:service-${data.google_project.current.number}@gcp-sa-pubsub.iam.gserviceaccount.com"
}

data "google_project" "current" {}

Python SDK for Pub/Sub Operations

This Python implementation demonstrates enterprise patterns for Pub/Sub publishing and subscribing with proper error handling and batching:

"""Pub/Sub Manager - Enterprise Python Implementation"""
from dataclasses import dataclass
from typing import Callable, Dict, Any, List
from concurrent import futures
from google.cloud import pubsub_v1
from google.api_core import retry
import json
import logging
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class PublishResult:
    message_id: str
    success: bool
    error: str = None

class PubSubPublisher:
    """High-throughput publisher with batching and error handling."""
    
    def __init__(self, project_id: str):
        self.project_id = project_id
        
        # Configure batching for high throughput
        batch_settings = pubsub_v1.types.BatchSettings(
            max_messages=100,
            max_bytes=1024 * 1024,  # 1MB
            max_latency=0.1  # 100ms
        )
        
        self.publisher = pubsub_v1.PublisherClient(
            batch_settings=batch_settings
        )
        self.futures: Dict[str, futures.Future] = {}
    
    def publish(self, topic: str, data: Dict[str, Any], 
                ordering_key: str = None, 
                attributes: Dict[str, str] = None) -> futures.Future:
        """Publish message with optional ordering and attributes."""
        topic_path = self.publisher.topic_path(self.project_id, topic)
        
        message_data = json.dumps(data).encode('utf-8')
        
        kwargs = {'data': message_data}
        if ordering_key:
            kwargs['ordering_key'] = ordering_key
        if attributes:
            kwargs.update(attributes)
        
        future = self.publisher.publish(topic_path, **kwargs)
        
        # Add callback for logging
        future.add_done_callback(
            lambda f: self._on_publish_complete(f, data.get('event_id'))
        )
        
        return future
    
    def _on_publish_complete(self, future: futures.Future, event_id: str):
        try:
            message_id = future.result()
            logger.debug(f"Published {event_id} as {message_id}")
        except Exception as e:
            logger.error(f"Failed to publish {event_id}: {e}")
    
    def publish_batch(self, topic: str, messages: List[Dict]) -> List[PublishResult]:
        """Publish multiple messages and wait for all to complete."""
        publish_futures = []
        
        for msg in messages:
            future = self.publish(topic, msg)
            publish_futures.append((msg.get('event_id'), future))
        
        results = []
        for event_id, future in publish_futures:
            try:
                message_id = future.result(timeout=30)
                results.append(PublishResult(message_id, True))
            except Exception as e:
                results.append(PublishResult(None, False, str(e)))
        
        return results

class PubSubSubscriber:
    """Subscriber with exactly-once processing and graceful shutdown."""
    
    def __init__(self, project_id: str):
        self.project_id = project_id
        self.subscriber = pubsub_v1.SubscriberClient()
        self.streaming_pull_future = None
    
    def subscribe(self, subscription: str, 
                  callback: Callable[[Dict], bool],
                  max_messages: int = 100) -> None:
        """Subscribe with automatic ack/nack based on callback result."""
        subscription_path = self.subscriber.subscription_path(
            self.project_id, subscription
        )
        
        flow_control = pubsub_v1.types.FlowControl(
            max_messages=max_messages,
            max_bytes=10 * 1024 * 1024  # 10MB
        )
        
        def message_handler(message: pubsub_v1.subscriber.message.Message):
            try:
                data = json.loads(message.data.decode('utf-8'))
                
                # Process message
                success = callback(data)
                
                if success:
                    message.ack()
                    logger.debug(f"Processed message {message.message_id}")
                else:
                    message.nack()
                    logger.warning(f"Nacked message {message.message_id}")
                    
            except Exception as e:
                logger.error(f"Error processing message: {e}")
                message.nack()
        
        self.streaming_pull_future = self.subscriber.subscribe(
            subscription_path,
            callback=message_handler,
            flow_control=flow_control
        )
        
        logger.info(f"Listening on {subscription}")
        
        try:
            self.streaming_pull_future.result()
        except Exception as e:
            logger.error(f"Subscription error: {e}")
            self.streaming_pull_future.cancel()
    
    def shutdown(self):
        """Graceful shutdown of subscriber."""
        if self.streaming_pull_future:
            self.streaming_pull_future.cancel()
            self.streaming_pull_future.result()

# Usage example
if __name__ == "__main__":
    publisher = PubSubPublisher("my-project")
    
    # Publish event
    future = publisher.publish(
        topic="events",
        data={"event_id": "123", "event_type": "user.created"},
        ordering_key="user-123"
    )
    print(f"Published: {future.result()}")

Cost Optimization and Monitoring

Pub/Sub pricing is based on data volume: $40 per TiB for publishing and $40 per TiB for delivery. Seek and snapshot operations incur additional charges. For high-volume workloads, message batching significantly reduces costs by amortizing per-message overhead. The Python SDK’s batch settings (shown above) automatically batch messages for optimal throughput and cost efficiency.

Monitor subscription backlog to detect processing bottlenecks before they impact system reliability. Cloud Monitoring provides metrics for oldest unacked message age, unacked message count, and delivery latency. Set alerts on backlog age exceeding your SLA thresholds—I recommend alerting when the oldest message exceeds 5 minutes for real-time systems. Dead letter queue message counts indicate processing failures requiring investigation.

Message filtering at the subscription level reduces delivery costs by preventing unwanted messages from reaching subscribers. Define filters using CEL expressions on message attributes—subscribers receive only messages matching their filter criteria. This is more cost-effective than filtering in application code, as filtered messages don’t incur delivery charges.

Pub/Sub Architecture - showing topics, subscriptions, and integration patterns
Pub/Sub Enterprise Architecture – Illustrating topic and subscription patterns, dead letter queues, BigQuery integration, and event-driven microservices communication.

Key Takeaways and Best Practices

Pub/Sub excels for event-driven architectures requiring reliable, scalable messaging without infrastructure management. Implement dead letter queues on all production subscriptions to prevent poison messages from blocking processing. Use schema enforcement for topics shared across teams to ensure message compatibility. Enable exactly-once delivery for subscriptions where duplicate processing would cause issues.

For high-throughput scenarios, configure publisher batching and subscriber flow control appropriately. Monitor subscription backlog and set alerts on message age to detect processing issues early. BigQuery subscriptions simplify analytics pipelines by eliminating custom ingestion code. The Terraform and Python examples provided here establish patterns for production-ready Pub/Sub deployments that scale from thousands to billions of messages daily.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.