Vertex AI Masterclass: Building Production ML Pipelines on Google Cloud

Executive Summary: Vertex AI represents Google Cloud’s unified machine learning platform, bringing together AutoML, custom training, model deployment, and MLOps capabilities under a single, cohesive experience. This comprehensive guide explores Vertex AI’s enterprise capabilities, from managed training pipelines and feature stores to model monitoring and A/B testing. After building production ML systems across multiple cloud platforms, I’ve found Vertex AI delivers the most integrated and developer-friendly MLOps experience available. Organizations should leverage Vertex AI for end-to-end ML workflows, from experimentation to production deployment, while implementing proper model governance and cost controls from the start.

Vertex AI Architecture: Unified ML Platform

Vertex AI consolidates Google’s ML offerings into a unified platform with consistent APIs and tooling. The architecture spans the entire ML lifecycle: data preparation with Vertex AI Datasets, feature engineering with Feature Store, model training with AutoML or custom containers, deployment with Vertex AI Endpoints, and monitoring with Model Monitoring. This integration eliminates the fragmentation common in DIY ML platforms.

Training infrastructure scales automatically from single GPUs to distributed clusters with thousands of accelerators. Vertex AI supports NVIDIA GPUs (T4, V100, A100) and Google’s TPUs for large-scale training. Custom training jobs run in managed containers, supporting any ML framework—TensorFlow, PyTorch, JAX, or custom implementations. Pre-built containers for popular frameworks simplify deployment while custom containers provide unlimited flexibility.

Vertex AI Pipelines orchestrates ML workflows using Kubeflow Pipelines or TFX, providing reproducibility, versioning, and lineage tracking. Pipelines integrate with Vertex AI’s managed services—training, prediction, feature store—while supporting custom components for specialized processing. This enables GitOps-style ML workflows where pipeline definitions are version-controlled and changes trigger automated retraining.

Feature Store and Data Management

Vertex AI Feature Store provides a centralized repository for ML features, solving the critical challenge of feature consistency between training and serving. Features are defined once and served consistently across batch training and online inference, eliminating training-serving skew that plagues many ML systems. The Feature Store supports both batch ingestion from BigQuery and streaming updates for real-time features.

Feature engineering at scale leverages BigQuery ML and Dataflow for transformation pipelines. Point-in-time correctness ensures training data reflects the feature values available at prediction time, preventing data leakage. Feature monitoring tracks distribution drift, alerting when production features deviate from training distributions—a leading indicator of model degradation.

Vertex AI Datasets manage training data with versioning and lineage tracking. Datasets support structured data (BigQuery, CSV), images, video, and text, with automatic data validation and statistics. Managed datasets integrate with AutoML for no-code model training and with custom training for full control over data preprocessing.

Production Terraform Configuration

Here’s a comprehensive Terraform configuration for Vertex AI infrastructure including endpoints, feature store, and model monitoring:

# Vertex AI Enterprise Configuration
terraform {
  required_version = ">= 1.5.0"
  required_providers {
    google = { source = "hashicorp/google", version = "~> 5.0" }
  }
}

variable "project_id" { type = string }
variable "region" { type = string, default = "us-central1" }

# Feature Store for ML features
resource "google_vertex_ai_featurestore" "main" {
  name   = "production_features"
  region = var.region
  
  online_serving_config {
    fixed_node_count = 2
  }

  force_destroy = false

  labels = {
    environment = "production"
  }
}

# Feature Store Entity Type
resource "google_vertex_ai_featurestore_entitytype" "users" {
  name         = "users"
  featurestore = google_vertex_ai_featurestore.main.id
  
  monitoring_config {
    snapshot_analysis {
      disabled = false
    }
    numerical_threshold_config {
      value = 0.3
    }
    categorical_threshold_config {
      value = 0.3
    }
  }

  labels = {
    entity = "user"
  }
}

# Feature definitions
resource "google_vertex_ai_featurestore_entitytype_feature" "user_features" {
  for_each   = toset(["purchase_count", "avg_order_value", "days_since_last_order"])
  name       = each.key
  entitytype = google_vertex_ai_featurestore_entitytype.users.id
  value_type = "DOUBLE"
}

# Model endpoint for serving
resource "google_vertex_ai_endpoint" "prediction" {
  name         = "prediction-endpoint"
  display_name = "Production Prediction Endpoint"
  location     = var.region
  
  network = "projects/${var.project_id}/global/networks/default"

  labels = {
    environment = "production"
    model       = "recommendation"
  }
}

# Service account for Vertex AI
resource "google_service_account" "vertex_sa" {
  account_id   = "vertex-ai-sa"
  display_name = "Vertex AI Service Account"
}

# IAM roles for Vertex AI
resource "google_project_iam_member" "vertex_roles" {
  for_each = toset([
    "roles/aiplatform.user",
    "roles/bigquery.dataEditor",
    "roles/storage.objectAdmin"
  ])
  project = var.project_id
  role    = each.value
  member  = "serviceAccount:${google_service_account.vertex_sa.email}"
}

# Cloud Storage bucket for artifacts
resource "google_storage_bucket" "ml_artifacts" {
  name     = "${var.project_id}-ml-artifacts"
  location = var.region
  
  uniform_bucket_level_access = true
  
  versioning {
    enabled = true
  }

  lifecycle_rule {
    condition {
      age = 90
    }
    action {
      type = "Delete"
    }
  }
}

# Artifact Registry for model containers
resource "google_artifact_registry_repository" "ml_models" {
  location      = var.region
  repository_id = "ml-models"
  format        = "DOCKER"
  
  labels = {
    environment = "production"
  }
}

Python SDK for Vertex AI Operations

This Python implementation demonstrates enterprise patterns for Vertex AI including training, deployment, and prediction:

"""Vertex AI Manager - Enterprise Python Implementation"""
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from google.cloud import aiplatform
from google.cloud.aiplatform import Model, Endpoint
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class TrainingConfig:
    display_name: str
    container_uri: str
    model_serving_container_uri: str
    machine_type: str = "n1-standard-4"
    accelerator_type: str = None
    accelerator_count: int = 0

class VertexAIManager:
    """Enterprise Vertex AI client for ML operations."""
    
    def __init__(self, project_id: str, region: str, staging_bucket: str):
        self.project_id = project_id
        self.region = region
        self.staging_bucket = staging_bucket
        
        aiplatform.init(
            project=project_id,
            location=region,
            staging_bucket=staging_bucket
        )
    
    def train_custom_model(self, config: TrainingConfig, 
                          dataset_id: str = None,
                          args: List[str] = None) -> Model:
        """Launch custom training job and return trained model."""
        job = aiplatform.CustomContainerTrainingJob(
            display_name=config.display_name,
            container_uri=config.container_uri,
            model_serving_container_image_uri=config.model_serving_container_uri
        )
        
        model = job.run(
            dataset=aiplatform.TabularDataset(dataset_id) if dataset_id else None,
            args=args or [],
            replica_count=1,
            machine_type=config.machine_type,
            accelerator_type=config.accelerator_type,
            accelerator_count=config.accelerator_count,
            sync=True
        )
        
        logger.info(f"Training complete: {model.resource_name}")
        return model
    
    def deploy_model(self, model: Model, endpoint_name: str,
                    machine_type: str = "n1-standard-4",
                    min_replicas: int = 1,
                    max_replicas: int = 10) -> Endpoint:
        """Deploy model to endpoint with autoscaling."""
        # Get or create endpoint
        endpoints = aiplatform.Endpoint.list(
            filter=f'display_name="{endpoint_name}"'
        )
        
        if endpoints:
            endpoint = endpoints[0]
        else:
            endpoint = aiplatform.Endpoint.create(display_name=endpoint_name)
        
        # Deploy with traffic split
        model.deploy(
            endpoint=endpoint,
            deployed_model_display_name=f"{model.display_name}-deployed",
            machine_type=machine_type,
            min_replica_count=min_replicas,
            max_replica_count=max_replicas,
            traffic_percentage=100,
            sync=True
        )
        
        logger.info(f"Model deployed to {endpoint.resource_name}")
        return endpoint
    
    def predict(self, endpoint_name: str, 
                instances: List[Dict]) -> List[Dict]:
        """Make predictions using deployed model."""
        endpoints = aiplatform.Endpoint.list(
            filter=f'display_name="{endpoint_name}"'
        )
        
        if not endpoints:
            raise ValueError(f"Endpoint {endpoint_name} not found")
        
        endpoint = endpoints[0]
        predictions = endpoint.predict(instances=instances)
        
        return predictions.predictions
    
    def batch_predict(self, model_name: str, 
                     input_uri: str, 
                     output_uri: str) -> str:
        """Run batch prediction job."""
        model = aiplatform.Model(model_name)
        
        batch_job = model.batch_predict(
            job_display_name=f"batch-{model.display_name}",
            gcs_source=input_uri,
            gcs_destination_prefix=output_uri,
            machine_type="n1-standard-4",
            starting_replica_count=1,
            max_replica_count=10,
            sync=False
        )
        
        logger.info(f"Batch job started: {batch_job.resource_name}")
        return batch_job.resource_name
    
    def setup_model_monitoring(self, endpoint: Endpoint,
                              alert_emails: List[str]) -> None:
        """Configure model monitoring for drift detection."""
        objective_config = aiplatform.ModelDeploymentMonitoringObjectiveConfig(
            deployed_model_id=endpoint.list_models()[0].id,
            objective_config=aiplatform.ObjectiveConfig(
                training_dataset=aiplatform.TrainingDataset(
                    gcs_source=aiplatform.GcsSource(
                        uris=[f"gs://{self.staging_bucket}/training_data.csv"]
                    ),
                    target_field="target"
                ),
                training_prediction_skew_detection_config=aiplatform.TrainingPredictionSkewDetectionConfig(
                    skew_thresholds={"feature1": 0.3}
                )
            )
        )
        
        job = aiplatform.ModelDeploymentMonitoringJob.create(
            display_name=f"monitoring-{endpoint.display_name}",
            endpoint=endpoint,
            objective_configs=[objective_config],
            logging_sampling_strategy=aiplatform.SamplingStrategy(
                random_sample_config=aiplatform.RandomSampleConfig(
                    sample_rate=0.1
                )
            ),
            alert_config=aiplatform.AlertConfig(
                email_alert_config=aiplatform.EmailAlertConfig(
                    user_emails=alert_emails
                )
            )
        )
        
        logger.info(f"Monitoring configured: {job.resource_name}")

Cost Management and MLOps Best Practices

Vertex AI pricing varies by service: training costs depend on machine type and duration, prediction costs on requests and compute, and Feature Store on storage and serving nodes. For training, use preemptible VMs for fault-tolerant jobs to reduce costs by up to 80%. Spot instances work well for hyperparameter tuning where individual trial failures are acceptable.

Endpoint autoscaling prevents over-provisioning while maintaining latency SLAs. Configure min replicas based on baseline traffic and max replicas for peak loads. Use traffic splitting for gradual rollouts—deploy new models with 5-10% traffic initially, monitoring prediction quality before full rollout. Model versioning in Vertex AI Model Registry enables instant rollback if issues arise.

Implement CI/CD for ML with Vertex AI Pipelines. Pipeline definitions in version control trigger automated retraining when code or data changes. Model evaluation gates prevent deploying models that don’t meet quality thresholds. Metadata tracking provides full lineage from training data through deployed model, essential for debugging and compliance.

Vertex AI Architecture - showing ML pipeline, feature store, and model serving
Vertex AI Enterprise Architecture – Illustrating the unified ML platform with training pipelines, Feature Store, model deployment, and monitoring for production ML systems.

Key Takeaways and Best Practices

Vertex AI excels for organizations seeking a unified ML platform that scales from experimentation to production. Use Feature Store from day one to ensure training-serving consistency and enable feature reuse across models. Implement Vertex AI Pipelines for reproducible, automated ML workflows with proper versioning and lineage tracking.

For production deployments, configure model monitoring to detect drift before it impacts business metrics. Use traffic splitting for safe deployments and maintain model versions for instant rollback capability. The Terraform and Python examples provided here establish patterns for production-ready Vertex AI deployments that support the full ML lifecycle from data preparation through model monitoring.


Discover more from Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.