Stephen A. Adebowale / Knowledge Base / MLOps Blueprint

Complete MLOps Blueprint

A Comprehensive Guide for Production Machine Learning Systems

Based on the Daily Dose of Data Science MLOps Crash Course

1. MLOps Foundations and Background

What is MLOps?

Machine Learning Operations (MLOps) is the discipline of integrating ML models into real-world software systems. It encompasses Continuous Integration (CI), Continuous Delivery (CD), and Continuous Training (CT) — combining machine learning, software engineering, DevOps, and data engineering practices. CT is what most distinguishes MLOps from traditional DevOps: models must be periodically retrained as data distributions evolve, not just redeployed.

Key Insight: The ML model itself is only a small part of a production ML system. The vast surrounding infrastructure is much larger and more complex.

Production ML System Architecture

(After Sculley et al., 2015 — "Hidden Technical Debt in Machine Learning Systems")
The ML model code is a small central component; surrounding infrastructure is vastly larger.

User Interface & APIs (top / user-facing)
Model Serving & Inference
⬡ ML Model Code (small core)
Data Processing & Feature Engineering
Infrastructure · Monitoring · Security · Configuration (foundation)

⬇ Infrastructure is the foundation layer. Each outer ring represents a larger operational surface than the one above it.

Why MLOps Matters

Model Decay Over Time

ML models face changing real-world conditions. Without proper operations, accurate models can quickly become unreliable or harmful.

Manual Processes

Without MLOps, teams end up with manual, brittle processes that are slow, error-prone, and don't scale.

Performance Requirements

Production systems must handle evolving data and usage patterns while meeting latency, throughput, and quality requirements.

Risk Management

Inadequate MLOps can lead to stale models making bad predictions that hurt the business and users.

MLOps vs DevOps

Aspect Traditional DevOps MLOps
Development Process Deterministic, code-focused Experimental, data-driven
Testing Complexity Unit tests, integration tests + Data quality tests, model performance tests, drift detection
Deployment Code deployment via CI/CD + Continuous training (CT), model retraining pipelines
Versioning Code versioning + Data versioning, model versioning, experiment tracking
Monitoring System metrics, logs + Model performance, data drift, concept drift

Key MLOps Challenges

Data Drift

When the distribution of input data changes over time, model performance can degrade without obvious system failures.

Training/Serving Skew

Arises when feature transformation logic in the training pipeline (e.g. Python/pandas) diverges from production serving code (e.g. Java, Go, SQL) — including different rounding, missing value handling, or encoding. Also caused by distribution shifts in the underlying input data over time.

Feedback Loops

ML predictions can influence the data used to train future models, creating complex dependencies.

2. Machine Learning System Lifecycle

1. Scoping

Define project goals and success metrics

2. Data

Collect, clean, and prepare data

3. Modeling

Train and validate models

4. Deployment

Deploy to production

5. Monitoring

Monitor and maintain

⟳ This is an iterative cycle, not a linear flow. Monitoring outcomes feed back into Scoping and Data (drift triggers retraining). Model error analysis drives new data collection. Deployment baselines initialise monitoring thresholds. Real ML systems continuously loop through these stages throughout their lifetime.

Data Pipelines

In ML, data quality and management are often more important than the specific modeling algorithm. Production ML systems need robust data pipelines for training and inference.

Data Ingestion

  • Batch processing (periodic imports)
  • Streaming (real-time events)
  • API integrations
  • Database connections

Data Storage

  • Cloud object stores (S3, GCS, ADLS) — preferred for cloud-native workloads
  • HDFS — on-premises / hybrid environments (legacy)
  • Data warehouses (BigQuery, Snowflake, Redshift)
  • Lakehouse table formats: Delta Lake, Apache Iceberg, Apache Hudi
  • Feature stores (Feast, Tecton, Hopsworks)
  • Relational databases (PostgreSQL, MySQL)

Data Processing (ETL)

  • Cleaning and validation
  • Feature engineering
  • Data joining and aggregation
  • Format standardization

Data Labeling

  • Automated labeling
  • Human annotation
  • Active learning
  • Quality control

Model Training and Experimentation

Training Pipeline Components

Experiment Tracking
  • Parameter logging
  • Metrics recording
  • Artifact storage
  • Reproducibility
Model Validation
  • Cross-validation
  • Hold-out testing
  • A/B testing
  • Performance metrics
Resource Management
  • Compute allocation
  • GPU scheduling
  • Memory optimization
  • Cost monitoring

Deployment and Inference

Deployment Patterns

Batch Inference: Process large datasets offline, suitable for non-real-time predictions
Online Inference: Real-time predictions via REST APIs or streaming
Edge Deployment: Models deployed on edge devices for low-latency inference

Model Serving

Model Packaging: Containerization, serialization, dependency management
Load Balancing: Distribute requests across multiple model instances
Auto-scaling: Dynamic resource allocation based on demand

Monitoring and Observability

Operational Monitoring

  • System metrics (CPU, memory)
  • Latency and throughput
  • Error rates
  • Service availability

Model Performance

  • Accuracy tracking
  • Prediction quality
  • Business metrics
  • A/B test results

Data Monitoring

  • Data drift detection
  • Concept drift
  • Feature distribution
  • Data quality checks

3. Reproducibility and Versioning

Why Reproducibility Matters

Critical Benefits

  • Debugging: Reproduce training to pinpoint performance drops or discrepancies
  • Collaboration: Team members can verify and build upon each other's work
  • Compliance: Regulatory requirements in healthcare, finance, autonomous vehicles
  • Rollback: Ability to revert to previous model versions when needed

Common Challenges

  • Randomness in ML (initial weights, data shuffling)
  • Complex dependencies (code + data + environment)
  • Large data versioning challenges
  • Environment-specific behaviors
  • Untracked manual experiments

Best Practices for Reproducibility

1. Ensure Deterministic Processes

Python
# Set random seeds for reproducibility
import numpy as np
import random
import torch

np.random.seed(42)
random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)
    torch.cuda.manual_seed_all(42)
    # REQUIRED for full CUDA reproducibility — without these, cuDNN
    # selects non-deterministic convolution algorithms even with seeds set
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False  # disable auto-tuner

# PyTorch >= 1.8: raise errors for ops with no deterministic implementation
# Use warn_only=True to log warnings instead of raising exceptions
torch.use_deterministic_algorithms(True, warn_only=True)

Note: Setting cudnn.deterministic = True and cudnn.benchmark = False is required for full GPU reproducibility. benchmark=False prevents cuDNN from selecting non-deterministic fast-path algorithms. There may be a small performance cost. Set CUBLAS_WORKSPACE_CONFIG=:4096:8 as an environment variable for additional determinism in CUDA 10.2+.

2. Version Control for Code

Python
# Include Git commit hash in model metadata
import subprocess

def get_git_hash():
    return subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode('ascii').strip()

model_metadata = {
    'git_hash': get_git_hash(),
    'timestamp': datetime.now().isoformat(),
    'parameters': hyperparameters
}

3. Data Versioning

Use tools like DVC (Data Version Control) to track data alongside code:

Shell
# Initialize DVC in your project
dvc init

# Add data to DVC tracking
dvc add data/train_dataset.csv

# Commit both code and data references
git add data/train_dataset.csv.dvc .gitignore
git commit -m "Add training dataset v1.0"

# Create version tags
git tag -a v1.0 -m "Model version 1.0 with dataset v1.0"

4. Experiment Tracking

Use MLflow, Weights & Biases, or similar tools:

Python
import mlflow
import mlflow.sklearn

with mlflow.start_run():
    # Log parameters
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("max_depth", 5)
    
    # Train model
    model = RandomForestClassifier(n_estimators=100, max_depth=5, random_state=42)
    model.fit(X_train, y_train)
    
    # Log metrics
    accuracy = model.score(X_test, y_test)
    mlflow.log_metric("accuracy", accuracy)
    
    # Log model
    mlflow.sklearn.log_model(model, "random_forest_model")

5. Model Versioning

Register models with versions and stages:

Python
# Register model in MLflow Model Registry
model_name = "customer_churn_predictor"
mlflow.register_model(
    f"runs:/{run_id}/random_forest_model",
    model_name
)

# (transition_model_version_stage() was deprecated in MLflow 2.9
#  and removed in MLflow 3.x)
client = mlflow.MlflowClient()

# Assign a 'staging' alias (any string is valid: staging, production, champion, etc.)
client.set_registered_model_alias(
    name=model_name,
    alias="staging",
    version=1
)

# Load model by alias in serving code
model_uri = f"models:/{model_name}@staging"
model = mlflow.pyfunc.load_model(model_uri)

# Promote to production alias when validated
client.set_registered_model_alias(name=model_name, alias="champion", version=1)
client.delete_registered_model_alias(name=model_name, alias="staging")

6. Environment Management

Requirements.txt
# ⚠️ Do NOT manually pin versions for production. # Use pip-compile (pip-tools) or uv lock to generate # a fully-resolved, hash-verified lockfile. # Example current stable versions (Q1 2026 — verify before use): numpy>=1.26,<2.0 pandas>=2.1,<3.0 scikit-learn>=1.4,<2.0 torch>=2.3,<3.0 mlflow>=2.12,<3.0 # Generate locked requirements: # pip-compile requirements.in --generate-hashes -o requirements.txt
Dockerfile (multi-stage)
# Stage 1: builder — install deps in isolated layer FROM python:3.11-slim AS builder WORKDIR /build COPY requirements.txt . RUN pip install --no-cache-dir --prefix=/install -r requirements.txt # Stage 2: runtime — minimal attack surface FROM python:3.11-slim WORKDIR /app # Copy only installed packages, not build tools COPY --from=builder /install /usr/local COPY src/ ./src/ COPY params.yaml . # NOTE: Do NOT embed model artifacts here. # Load model at runtime from a model registry / object store. # Inject MODEL_URI via env var at container startup. RUN adduser --disabled-password --gecos '' mluser USER mluser CMD ["python", "src/api/serve.py"]

Tools Comparison

Tool Strengths Best For Limitations
MLflow Open-source, flexible, self-hosted Local/enterprise setups, custom infrastructure Basic UI, requires setup and maintenance
Weights & Biases Rich visualizations, team collaboration, managed service Fast setup, visualization-heavy workflows Cloud-dependent, subscription costs
DVC Git-like data versioning, lightweight Data versioning, reproducible pipelines Limited experiment tracking UI
Kubeflow Kubernetes-native, scalable pipelines Large-scale, cloud-native deployments Complex setup, steep learning curve

4. Data and Pipeline Engineering

Understanding the Data Landscape

In production ML, success depends not just on models but on the data pipelines that feed those models. Data is the raw material, and pipeline choices have profound downstream consequences.

Data Sources

User Input Data: Text, images, form submissions - requires heavy validation
System Logs: Application events, system states, model predictions
Internal Databases: CRM, inventory, financial transactions - highly structured
Third-party Data: External APIs, demographic data, social media

Data Formats

Text vs Binary:
  • JSON/CSV: Human-readable, debugging-friendly, storage-heavy
  • Parquet/Avro: Compact, efficient processing, machine-only
Row-major vs Column-major:
  • CSV: Row-major, write-heavy workloads
  • Parquet: Column-major, analytical queries, ML training

ETL vs ELT Pipelines

ETL (Extract-Transform-Load)

Extract
Pull data from sources
Transform
Clean, process, engineer features
Load
Store in target system
Best for: Structured processes, data quality control, smaller datasets

ELT (Extract-Load-Transform)

Extract
Pull data from sources
Load
Store raw data in data lake
Transform
Process when needed
Best for: Large datasets, flexible schemas, cloud-native architectures

Pipeline Implementation Patterns

Batch Processing Pipeline

Python
# Example batch processing pipeline with Apache Airflow 2.x / 3.x
from airflow.decorators import dag, task
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

@dag(
    dag_id='ml_data_pipeline',
    default_args=default_args,
    schedule='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['ml', 'data']
)
def ml_data_pipeline():

    @task
    def extract_data():
        # Extract data from source systems
        pass

    @task
    def transform_data():
        # Clean and transform data
        pass

    @task
    def load_data():
        # Load into target system
        pass

    extract_data() >> transform_data() >> load_data()

dag_instance = ml_data_pipeline()

Streaming Pipeline

Python
# Example streaming pipeline with Apache Kafka and Python
from kafka import KafkaConsumer, KafkaProducer
import json

# Consumer for raw events
consumer = KafkaConsumer(
    'raw_events',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# Producer for processed features
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def process_event(event):
    # Feature engineering logic
    features = {
        'user_id': event['user_id'],
        'timestamp': event['timestamp'],
        'processed_features': extract_features(event)
    }
    return features

for message in consumer:
    event = message.value
    processed = process_event(event)
    producer.send('ml_features', processed)

Feature Store Integration

Python
# feature_store.py (registry — version-controlled with your repo)
# Define FeatureService here for registration, NOT in serving code
from feast import FeatureStore, FeatureService

# feature_service is defined in the feature repo and applied via `feast apply`
user_features_service = FeatureService(
    name="user_recommendation_features",
    features=[user_stats_view[["avg_rating", "total_purchases"]],
               user_demographics_view[["age_group"]]]
)

# ─────────────────────────────────────────────────────────────────
# serving_code.py — online inference path (real-time)
# ─────────────────────────────────────────────────────────────────
from feast import FeatureStore

fs = FeatureStore(repo_path="feature_repo/")

# (event_timestamp is only for get_historical_features / offline retrieval)
entity_rows = [
    {"user_id": 1001},
    {"user_id": 1002},
    {"user_id": 1003},
]

# Pass feature service by name (string) for online retrieval
features = fs.get_online_features(
    features="user_recommendation_features",
    entity_rows=entity_rows,
).to_df()

# ─────────────────────────────────────────────────────────────────
# For HISTORICAL / TRAINING retrieval use get_historical_features:
# ─────────────────────────────────────────────────────────────────
import pandas as pd
from datetime import datetime

entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003],
    "event_timestamp": [datetime.now()] * 3   # required here, not for online
})
training_features = fs.get_historical_features(
    entity_df=entity_df,
    features="user_recommendation_features",
).to_df()

Data Quality and Validation

Schema Validation

  • Data type validation
  • Required field checks
  • Range constraints
  • Format validation

Statistical Validation

  • Distribution monitoring
  • Outlier detection
  • Missing value rates
  • Correlation changes

Business Logic

  • Business rule checks
  • Referential integrity
  • Temporal consistency
  • Cross-field validation

Data Quality Framework

Python
import great_expectations as gx

# Initialize or load a Data Context (file-based or ephemeral)
context = gx.get_context()

# Define a Pandas Data Source
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset("raw_data")
batch_request = data_asset.build_batch_request(dataframe=raw_data)

# Create or update an Expectation Suite
suite = context.add_or_update_expectation_suite("data_quality_suite")

# Build a Validator
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite=suite
)

# Define expectations
validator.expect_column_to_exist("user_id")
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validator.expect_column_values_to_be_in_set("status", ["active", "inactive"])
validator.save_expectation_suite()

# Run validation via a Checkpoint (CI/CD-friendly)
checkpoint = context.add_or_update_checkpoint(
    name="data_quality_checkpoint",
    validations=[{
        "batch_request": batch_request,
        "expectation_suite_name": "data_quality_suite"
    }]
)
result = checkpoint.run()

if not result.success:
    raise ValueError("Data quality checks failed — pipeline halted.")
    for validation in result.run_results.values():
        for r in validation["results"]:
            if not r["success"]:
                print(f"FAILED: {r['expectation_config']['expectation_type']}")

5. MLOps Best Practices

Development and Testing

Code Quality

  • Use version control for all code
  • Write unit tests for data processing functions
  • Implement integration tests for pipelines
  • Use linting and formatting tools (black, flake8)
  • Document APIs and functions thoroughly
  • Follow consistent naming conventions

ML-Specific Testing

  • Test data preprocessing steps
  • Validate model inputs and outputs
  • Check for data leakage
  • Test model performance thresholds
  • Validate feature engineering logic
  • Test training/serving parity

Production Deployment

Deployment Strategies

Blue-Green Deployment:

Run two identical production environments, switch traffic instantly

Canary Deployment:

Gradually roll out to small percentage of users, monitor performance

A/B Testing:

Compare new model against existing one with split traffic

Safety Measures

Python
import time
import threading

class ModelCircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
        self._lock = threading.Lock()      
    def call_model(self, input_data):
        with self._lock:
            if self.state == 'OPEN':
                if (self.last_failure_time and
                        time.time() - self.last_failure_time > self.timeout):
                    self.state = 'HALF_OPEN'
                else:
                    return self.fallback_response()
        
        try:
            result = model.predict(input_data)
            with self._lock:
                if self.state == 'HALF_OPEN':
                    # Probe succeeded — close the circuit
                    self.state = 'CLOSED'
                    self.failure_count = 0
            return result
        except Exception as e:
            with self._lock:
                self.failure_count += 1
                self.last_failure_time = time.time()
                if self.state == 'HALF_OPEN':
                                        self.state = 'OPEN'
                elif self.failure_count >= self.failure_threshold:
                    self.state = 'OPEN'
            return self.fallback_response()
    
    def fallback_response(self):
        # Return a safe default/cached response when model is unavailable
        return {"prediction": "default_value", "confidence": 0.0, "fallback": True}

Monitoring and Alerting

Key Metrics to Monitor

System Metrics:
  • Response latency (p50, p95, p99)
  • Throughput (requests per second)
  • Error rates and status codes
  • Resource utilization (CPU, memory, GPU)
Model Metrics:
  • Prediction accuracy/performance
  • Confidence score distributions
  • Feature importance changes
  • Model version performance comparison
Data Metrics:
  • Input data distribution drift
  • Missing or null value rates
  • Feature correlation changes
  • Outlier detection rates

Alerting Framework

Python
# Example monitoring setup with Prometheus and Grafana
from prometheus_client import Counter, Histogram, Gauge
import time

# Define metrics
prediction_counter = Counter('ml_predictions_total', 'Total predictions made')
prediction_latency = Histogram('ml_prediction_duration_seconds', 'Prediction latency')
model_accuracy = Gauge('ml_model_accuracy', 'Current model accuracy')
data_drift_score = Gauge('ml_data_drift_score', 'Data drift detection score')

def make_prediction(input_data):
    start_time = time.time()
    
    try:
        prediction = model.predict(input_data)
        prediction_counter.inc()
        
        # Record latency
        duration = time.time() - start_time
        prediction_latency.observe(duration)
        
        return prediction
        
    except Exception as e:
        # Log error and increment error counter
        logger.error(f"Prediction failed: {e}")
        raise

# Update model metrics periodically
def update_model_metrics():
    # Calculate current accuracy on validation set
    current_accuracy = evaluate_model()
    model_accuracy.set(current_accuracy)
    
    # Calculate data drift
    drift_score = detect_data_drift(recent_data, reference_data)
    data_drift_score.set(drift_score)

Security and Compliance

Data Security

  • Encrypt data at rest and in transit (TLS 1.3+)
  • Implement RBAC and least-privilege access controls
  • Regular security audits and penetration testing
  • Secure model artifacts: sign and verify checksums

Compliance Frameworks

  • GDPR / UK GDPR — EU personal data
  • EU AI Act — risk-tier conformity assessments
  • CCPA — California consumer data
  • HIPAA — US healthcare data
  • NIST AI RMF — GOVERN / MAP / MEASURE / MANAGE
  • Model explainability, audit trails, data retention policies

Privacy

  • Data anonymisation and pseudonymisation
  • Differential privacy techniques
  • Federated learning for sensitive data
  • Regular privacy impact assessments (PIAs)

ML Supply Chain Security

  • Verify model weight checksums/signatures before loading
  • Scan base Docker images with Trivy or Grype for CVEs
  • Use signed container images (Cosign / Sigstore)
  • Pin dependencies with hash verification (pip install --require-hashes)
  • Audit third-party datasets for poisoning or licence issues
  • Use private model registries — do not pull weights from public sources without verification

Model & Inference Security

  • Input validation and sanitisation before model inference
  • Rate limiting and authentication on model endpoints
  • Adversarial input detection / robustness testing
  • Mitigate model inversion and membership inference attacks
  • Store secrets in vault (HashiCorp Vault, AWS Secrets Manager) — never in env vars or images
  • Enforce NetworkPolicy in Kubernetes to restrict model pod egress

Team Organization and Culture

Cross-functional Teams

  • Data Scientists: Model development, experimentation
  • ML Engineers: Pipeline development, model deployment
  • Data Engineers: Data infrastructure, ETL pipelines
  • DevOps Engineers: Infrastructure, monitoring, scaling
  • Product Managers: Requirements, business metrics

Best Practices

  • Regular code reviews and pair programming
  • Knowledge sharing sessions and documentation
  • Standardized project templates and tooling
  • Continuous learning and skill development
  • Post-mortem analysis for production issues
  • "You build it, you run it" mentality

6. MLOps Maturity Levels

Why maturity levels matter: Most teams should NOT start with full pipeline automation. Understanding where you are today — and what the next pragmatic step is — prevents over-engineering and enables incremental, sustainable adoption. This framework is based on Google's MLOps white paper (2021) and Microsoft's MLOps maturity model.

Level 0 — Manual Process (most teams start here)

Characteristics
  • Manual, script-driven training
  • Models deployed as static artefacts
  • No experiment tracking
  • Ad-hoc retraining when performance degrades
  • Jupyter notebooks in production
Pain Points
  • Results not reproducible
  • No visibility into model drift
  • Deployment is a manual, risky event
  • Knowledge siloed in individuals
Key Actions to Advance
  • Add experiment tracking (MLflow/W&B)
  • Refactor notebooks into Python scripts
  • Version code in Git
  • Containerise training and serving

Level 1 — ML Pipeline Automation

Characteristics
  • Automated training pipelines
  • Continuous Training (CT) on new data
  • Experiment tracking and model registry
  • Data and model validation gates
  • Feature store in use
Tools Typical at This Level
  • MLflow / W&B for tracking
  • DVC or LakeFS for data versioning
  • Airflow / Prefect for orchestration
  • Docker + basic K8s for serving
Key Actions to Advance
  • Add full CI/CD for pipeline code
  • Automate model evaluation and promotion
  • Implement drift monitoring and alerting
  • Standardise project templates

Level 2 — CI/CD Pipeline Automation (full MLOps)

Characteristics
  • Automated CI/CD for all pipeline components
  • Auto-triggered retraining on drift detection
  • Canary and shadow deployments
  • Full observability stack
  • Self-service ML platform for data scientists
Requirements
  • Dedicated ML Platform team
  • Mature DevOps/SRE culture
  • Significant infrastructure investment
  • Strong data governance framework
⚠️ Reality Check

Most organisations operate between Level 0 and Level 1. Level 2 is appropriate for teams with 10+ ML models in production and dedicated platform engineering capacity. Do not attempt to jump directly to Level 2 from Level 0.

7. LLMOps — Operating Large Language Models

LLMOps extends classical MLOps with patterns specific to large language models (LLMs) and generative AI systems. Key differences include non-deterministic outputs, prompt-based interfaces, RAG architectures, and new evaluation paradigms. If you are deploying GPT-family models, open-source LLMs (Llama, Mistral, Gemma), or embedding-based systems, these patterns apply.

Prompt Engineering & Versioning

  • Store prompts in a prompt registry (version-controlled)
  • A/B test prompt variants with metrics
  • Log prompts and completions for auditability
  • Use structured output enforcement (Pydantic, Instructor)
  • Tools: LangSmith, PromptLayer, Agenta

RAG Pipeline Management

  • Version your vector store indices alongside model versions
  • Monitor retrieval quality (precision@k, recall@k)
  • Evaluate end-to-end RAG quality with RAGAS framework
  • Chunk strategy versioning (size, overlap, method)
  • Re-index on knowledge base updates

LLM Evaluation

  • LLM-as-judge for automated quality scoring
  • Hallucination detection metrics (FactScore, TruthfulQA style)
  • Task-specific benchmarks (HumanEval, MMLU, etc.)
  • Human eval for high-stakes use cases
  • Tools: DeepEval, RAGAS, Braintrust, Langfuse

LLM Safety & Guardrails

  • Input validation: detect prompt injection, jailbreaks
  • Output filtering: PII redaction, toxicity classifiers
  • Implement guardrail layers (NeMo Guardrails, Guardrails AI)
  • Rate limiting per user/session
  • Log all interactions for compliance audit

LLM Inference Optimisation

  • Quantisation: INT4/INT8 (AWQ, GPTQ, bitsandbytes)
  • Speculative decoding for throughput
  • Continuous batching (vLLM, TGI)
  • KV cache management for long contexts
  • Monitor tokens/sec, TTFT (Time To First Token), total latency

Fine-Tuning vs. Prompting

  • Start with prompting; fine-tune only when prompting plateaus
  • Use parameter-efficient methods: LoRA, QLoRA, PEFT
  • Track fine-tune runs like any ML experiment (MLflow/W&B)
  • Evaluate fine-tuned models against base model on held-out set
  • Monitor for catastrophic forgetting

Key Differences from Classical MLOps

AspectClassical MLOpsLLMOps
OutputDeterministic (same input → same output)Stochastic (temperature-dependent)
Versioning unitModel weights + hyperparamsModel + prompt + system message + RAG index
EvaluationNumerical metrics (accuracy, F1)LLM-as-judge, human eval, task-specific benchmarks
Retraining triggerData drift, performance dropKnowledge cutoff, prompt degradation, new capabilities
Primary security concernModel inversion, data leakagePrompt injection, jailbreaks, PII in completions

8. Implementation Guide and Templates

Project Structure Template

Shell
ml-project/
├── README.md
├── requirements.txt
├── Dockerfile
├── .gitignore
├── .dvcignore
├── dvc.yaml                    # DVC pipeline definition
├── params.yaml                 # Hyperparameters and config
├── data/
│   ├── raw/                   # Original, immutable data
│   ├── processed/             # Cleaned and processed data
│   └── external/              # Third-party datasets
├── notebooks/
│   ├── 01-exploratory-analysis.ipynb
│   ├── 02-feature-engineering.ipynb
│   └── 03-model-experiments.ipynb
├── src/
│   ├── __init__.py
│   ├── data/
│   │   ├── __init__.py
│   │   ├── extract.py         # Data extraction
│   │   ├── transform.py       # Data transformation
│   │   └── validate.py        # Data validation
│   ├── features/
│   │   ├── __init__.py
│   │   └── build_features.py  # Feature engineering
│   ├── models/
│   │   ├── __init__.py
│   │   ├── train.py          # Model training
│   │   ├── predict.py        # Model prediction
│   │   └── evaluate.py       # Model evaluation
│   └── utils/
│       ├── __init__.py
│       └── helpers.py        # Utility functions
├── tests/
│   ├── __init__.py
│   ├── test_data.py
│   ├── test_features.py
│   └── test_models.py
├── models/                    # Trained model artifacts
├── reports/                   # Analysis reports and figures
├── deployment/
│   ├── docker/
│   ├── kubernetes/
│   └── terraform/
└── mlflow_tracking/          # MLflow tracking server data

Configuration Management

params.yaml

Python
# Data processing parameters
data:
  raw_data_path: "data/raw/dataset.csv"
  processed_data_path: "data/processed/clean_dataset.csv"
  test_size: 0.2
  random_state: 42

# Feature engineering
features:
  categorical_features:
    - "category"
    - "region" 
  numerical_features:
    - "age"
    - "income"
  target: "target_variable"

# Model parameters
model:
  algorithm: "random_forest"
  n_estimators: 100
  max_depth: 10
  min_samples_split: 5
  random_state: 42

# Training parameters
training:
  validation_split: 0.2
  cross_validation_folds: 5
  metrics:
    - "accuracy"
    - "precision"
    - "recall"
    - "f1"

# MLflow tracking
mlflow:
  experiment_name: "customer_churn_prediction"
  tracking_uri: "http://localhost:5000"

Configuration Loader

Python
import yaml
import os
from pathlib import Path

class Config:
    def __init__(self, config_path="params.yaml"):
        self.config_path = Path(config_path)
        self.config = self._load_config()
    
    def _load_config(self):
        """Load configuration from YAML file"""
        with open(self.config_path, 'r') as file:
            config = yaml.safe_load(file)
        
        # Override with environment variables if present
        self._override_with_env(config)
        return config
    
    def _override_with_env(self, config):
        """Override config values with environment variables"""
        # Example: MLFLOW_TRACKING_URI overrides mlflow.tracking_uri
        if 'MLFLOW_TRACKING_URI' in os.environ:
            config['mlflow']['tracking_uri'] = os.environ['MLFLOW_TRACKING_URI']
    
    def get(self, key_path, default=None):
        """Get configuration value using dot notation"""
        keys = key_path.split('.')
        value = self.config
        
        for key in keys:
            if isinstance(value, dict) and key in value:
                value = value[key]
            else:
                return default
        
        return value

# Usage
config = Config()
model_params = config.get('model')
tracking_uri = config.get('mlflow.tracking_uri')

Training Pipeline Template

Python
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd
import joblib
from datetime import datetime
import argparse

class ModelTrainer:
    def __init__(self, config):
        self.config = config
        self.model = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
    
    def load_data(self):
        """Load and split data"""
        data_path = self.config.get('data.processed_data_path')
        df = pd.read_csv(data_path)
        
        feature_cols = (
            self.config.get('features.categorical_features', []) + 
            self.config.get('features.numerical_features', [])
        )
        target_col = self.config.get('features.target')
        
        X = df[feature_cols]
        y = df[target_col]
        
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, 
            test_size=self.config.get('data.test_size', 0.2),
            random_state=self.config.get('data.random_state', 42),
            stratify=y
        )
    
    def train_model(self):
        """Train the model"""
        model_params = self.config.get('model')
        
        if model_params['algorithm'] == 'random_forest':
            self.model = RandomForestClassifier(
                n_estimators=model_params['n_estimators'],
                max_depth=model_params['max_depth'],
                min_samples_split=model_params['min_samples_split'],
                random_state=model_params['random_state']
            )
        
        self.model.fit(self.X_train, self.y_train)
    
    def evaluate_model(self):
        """Evaluate model performance"""
        y_pred = self.model.predict(self.X_test)
        
        metrics = {
            'accuracy': accuracy_score(self.y_test, y_pred),
            'precision': precision_score(self.y_test, y_pred, average='weighted'),
            'recall': recall_score(self.y_test, y_pred, average='weighted'),
            'f1': f1_score(self.y_test, y_pred, average='weighted')
        }
        
        # Cross-validation
        cv_scores = cross_val_score(
            self.model, self.X_train, self.y_train, 
            cv=self.config.get('training.cross_validation_folds', 5)
        )
        metrics['cv_accuracy_mean'] = cv_scores.mean()
        metrics['cv_accuracy_std'] = cv_scores.std()
        
        return metrics
    
    def save_model(self, model_path):
        """Save trained model"""
        joblib.dump(self.model, model_path)
    
    def run_training(self):
        """Execute complete training pipeline"""
        mlflow.set_experiment(self.config.get('mlflow.experiment_name'))
        
        with mlflow.start_run():
            # Log parameters
            mlflow.log_params(self.config.get('model'))
            mlflow.log_params(self.config.get('data'))
            
            # Load data and train
            self.load_data()
            self.train_model()
            
            # Evaluate
            metrics = self.evaluate_model()
            mlflow.log_metrics(metrics)
            
            # Save model
            model_path = f"models/model_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pkl"
            self.save_model(model_path)
            mlflow.log_artifact(model_path)
            
            # Log model to MLflow Model Registry
            mlflow.sklearn.log_model(
                self.model, 
                "model",
                registered_model_name=self.config.get('mlflow.experiment_name')
            )
            
            print(f"Training completed. Metrics: {metrics}")
            return metrics

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", default="params.yaml", help="Configuration file path")
    args = parser.parse_args()
    
    config = Config(args.config)
    trainer = ModelTrainer(config)
    trainer.run_training()

Model Serving Template

Python
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import pandas as pd
import numpy as np
from typing import List, Dict
import logging
import time
import os

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

model = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: load model from registry / object store — NOT from embedded file
    global model
    try:
        model_uri = os.environ.get("MODEL_URI", "models/production_model.pkl")
        model = joblib.load(model_uri)
        logger.info(f"Model loaded successfully from {model_uri}")
    except Exception as e:
        logger.error(f"Failed to load model: {e}")
        raise
    yield
    # Shutdown: release resources
    model = None
    logger.info("Model unloaded")

# Initialize FastAPI app
app = FastAPI(
    title="ML Model API",
    description="Production ML model serving API",
    version="1.0.0",
    lifespan=lifespan
)

class PredictionRequest(BaseModel):
    features: Dict[str, float]   # keys must match training column names
    
class PredictionResponse(BaseModel):
    prediction: int
    probability: List[float]
    model_version: str
    processing_time: float

class HealthResponse(BaseModel):
    status: str
    model_loaded: bool
    timestamp: str

# Health check endpoint
@app.get("/health", response_model=HealthResponse)
async def health_check():
    return HealthResponse(
        status="healthy" if model is not None else "unhealthy",
        model_loaded=model is not None,
        timestamp=pd.Timestamp.now().isoformat()
    )

# Prediction endpoint
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")
    
    start_time = time.time()
    
    try:
        # Convert input to DataFrame
        input_data = pd.DataFrame([request.features])
        
        # Make prediction
        prediction = model.predict(input_data)[0]
        probability = model.predict_proba(input_data)[0].tolist()
        
        processing_time = time.time() - start_time
        
        # Log prediction
        logger.info(f"Prediction made: {prediction}, Processing time: {processing_time:.3f}s")
        
        return PredictionResponse(
            prediction=int(prediction),
            probability=probability,
            model_version="1.0.0",
            processing_time=processing_time
        )
        
    except Exception as e:
        logger.error(f"Prediction failed: {e}")
        raise HTTPException(status_code=400, detail=f"Prediction failed: {str(e)}")

# Batch prediction endpoint
@app.post("/predict/batch")
async def predict_batch(requests: List[PredictionRequest]):
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")
    
    try:
        # Convert all inputs to DataFrame
        features_list = [req.features for req in requests]
        input_data = pd.DataFrame(features_list)
        
        # Make predictions
        predictions = model.predict(input_data)
        probabilities = model.predict_proba(input_data)
        
        # Format responses
        responses = []
        for i, (pred, prob) in enumerate(zip(predictions, probabilities)):
            responses.append({
                "id": i,
                "prediction": int(pred),
                "probability": prob.tolist()
            })
        
        return responses
        
    except Exception as e:
        logger.error(f"Batch prediction failed: {e}")
        raise HTTPException(status_code=400, detail=f"Batch prediction failed: {str(e)}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Docker and Kubernetes Deployment

Dockerfile

Dockerfile
FROM python:3.9-slim

# Set working directory
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY src/ ./src/
COPY params.yaml .

# Models should be loaded at runtime from a model registry or object store.
# Inject MODEL_URI as an environment variable at container startup.
# This keeps images small, immutable, and free of potentially sensitive weights.
# Example: docker run -e MODEL_URI=s3://bucket/models/v1.pkl ml-model-api:latest

# Create non-root user
RUN adduser --disabled-password --gecos '' mluser
USER mluser

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Run application
CMD ["python", "src/api/serve.py"]

Kubernetes Deployment

YAML
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-api
  labels:
    app: ml-model-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model-api
  template:
    metadata:
      labels:
        app: ml-model-api
    spec:
            securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 1000
        seccompProfile:
          type: RuntimeDefault
      containers:
      - name: ml-model-api
        image: ml-model-api:latest
        imagePullPolicy: Always           ports:
        - containerPort: 8000
        env:
        - name: MODEL_URI
          valueFrom:
            configMapKeyRef:
              name: ml-model-config
              key: model_uri
                securityContext:
          allowPrivilegeEscalation: false
          readOnlyRootFilesystem: true
          capabilities:
            drop: ["ALL"]
        resources:
          requests:
            memory: "1Gi"                      cpu: "500m"
          limits:
            memory: "2Gi"          # profile your model — 512Mi will OOM most models
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-api-service
spec:
  selector:
    app: ml-model-api
  ports:
  - port: 80
    targetPort: 8000
    type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: ml-model-api-netpol
spec:
  podSelector:
    matchLabels:
      app: ml-model-api
  policyTypes:
  - Egress
  egress:
  - to:
    - namespaceSelector: {}
    ports:
    - port: 443   # allow HTTPS to model registry / object store only

CI/CD Pipeline Template

Python
# .github/workflows/ml-pipeline.yml
name: ML Pipeline

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
        - uses: actions/checkout@v4
    
    - name: Set up Python
      uses: actions/setup-python@v5         with:
        python-version: "3.11"
    
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
        pip install pytest pytest-cov flake8
    
    - name: Lint with flake8
      run: |
        flake8 src/ --count --select=E9,F63,F7,F82 --show-source --statistics
        flake8 src/ --count --max-complexity=10 --max-line-length=100 --statistics
    
    - name: Run tests
      run: |
        pytest tests/ --cov=src/ --cov-report=xml
    
    - name: Upload coverage to Codecov
      uses: codecov/codecov-action@v4   
  train:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
    - uses: actions/checkout@v4
    
    - name: Set up Python
      uses: actions/setup-python@v5
      with:
        python-version: "3.11"
    
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    
    - name: Setup DVC
      uses: iterative/setup-dvc@v2       
    - name: Pull data with DVC
      run: dvc pull
      env:
        AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
        AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
    
    - name: Train model
      run: python src/models/train.py
      env:
        MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
    
    - name: Run model tests
      run: pytest tests/test_models.py

  deploy:
    needs: [test, train]
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
    - uses: actions/checkout@v4
    
    - name: Build Docker image
      run: |
        docker build -t ml-model-api:${{ github.sha }} .
        docker tag ml-model-api:${{ github.sha }} ml-model-api:latest
    
    - name: Scan image for vulnerabilities
      run: |
                docker run --rm -v /var/run/docker.sock:/var/run/docker.sock \
          aquasec/trivy:latest image --exit-code 1 --severity CRITICAL \
          ml-model-api:${{ github.sha }}
    
    - name: Deploy to staging
      run: |
        # Deploy to staging environment
        kubectl apply -f deployment/kubernetes/staging/
    
    - name: Run integration tests
      run: |
        # Run integration tests against staging
        pytest tests/integration/
    
    - name: Deploy to production
      if: success()
      run: |
        # Deploy to production environment
        kubectl apply -f deployment/kubernetes/production/

Monitoring and Observability Setup

YAML
# monitoring/prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'ml-model-api'
    static_configs:
      - targets: ['ml-model-api-service:80']
    metrics_path: '/metrics'
    scrape_interval: 30s

# monitoring/grafana-dashboard.json
{
  "dashboard": {
    "title": "ML Model Monitoring",
    "panels": [
      {
        "title": "Prediction Rate",
        "type": "timeseries",
        "datasource": { "type": "prometheus" },
        "targets": [
          {
            "expr": "rate(ml_predictions_total[5m])",
            "legendFormat": "Predictions/sec"
          }
        ]
      },
      {
        "title": "Prediction Latency",
        "type": "timeseries",
        "datasource": { "type": "prometheus" },
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(ml_prediction_duration_seconds_bucket[5m]))",
            "legendFormat": "p95 latency"
          },
          {
            "expr": "histogram_quantile(0.50, rate(ml_prediction_duration_seconds_bucket[5m]))",
            "legendFormat": "p50 (median)"
          }
        ]
      },
      {
        "title": "Model Accuracy",
        "type": "stat",
        "datasource": { "type": "prometheus" },
        "targets": [
          {
            "expr": "ml_model_accuracy",
            "legendFormat": "Current Accuracy"
          }
        ]
      },
      {
        "title": "Data Drift Score",
        "type": "timeseries",
        "datasource": { "type": "prometheus" },
        "targets": [
          {
            "expr": "ml_data_drift_score",
            "legendFormat": "Drift Score"
          }
        ]
      }
    ]
  }
}

Conclusion and Next Steps

Key Takeaways

  • MLOps is not just about models—it's about building reliable, scalable systems that deliver business value
  • Reproducibility and versioning are fundamental to trustworthy ML systems
  • Data quality and pipeline engineering are often more important than model sophistication
  • Monitoring and observability are critical for maintaining model performance over time
  • Successful MLOps requires both technical excellence and strong team collaboration

Implementation Roadmap

  1. Phase 1: Establish version control and experiment tracking
  2. Phase 2: Build robust data pipelines and validation
  3. Phase 3: Implement CI/CD for model deployment
  4. Phase 4: Set up comprehensive monitoring and alerting
  5. Phase 5: Scale with advanced orchestration and automation

Recommended Tool Stack

  • Experiment Tracking: MLflow, Weights & Biases, Comet ML
  • Data Versioning: DVC, LakeFS (data lake versioning)
  • Pipeline Orchestration: Apache Airflow 2.x, Prefect, ZenML
  • Model Serving: FastAPI, BentoML, Ray Serve, vLLM (LLMs)
  • Feature Stores: Feast, Tecton, Hopsworks
  • Monitoring: Prometheus, Grafana, Evidently AI (drift), DataDog
  • Infrastructure: Docker, Kubernetes, Terraform, Helm
  • Security: Trivy (scanning), OPA/Kyverno (policy), HashiCorp Vault (secrets), Cosign (signing)

This blueprint provides the foundation for building production-ready ML systems. Remember that MLOps is a journey, not a destination—continuously iterate and improve your processes as your team and systems mature.

Pro Tip: Start small, automate incrementally, and always prioritize reliability over complexity.