1. MLOps Foundations and Background
What is MLOps?
Machine Learning Operations (MLOps) is the discipline of integrating ML models into real-world software systems. It encompasses Continuous Integration (CI), Continuous Delivery (CD), and Continuous Training (CT) — combining machine learning, software engineering, DevOps, and data engineering practices. CT is what most distinguishes MLOps from traditional DevOps: models must be periodically retrained as data distributions evolve, not just redeployed.
Key Insight: The ML model itself is only a small part of a production ML system. The vast surrounding infrastructure is much larger and more complex.
Production ML System Architecture
(After Sculley et al., 2015 — "Hidden Technical Debt in Machine Learning Systems")
The ML model code is a small central component; surrounding infrastructure is vastly larger.
⬇ Infrastructure is the foundation layer. Each outer ring represents a larger operational surface than the one above it.
Why MLOps Matters
Model Decay Over Time
ML models face changing real-world conditions. Without proper operations, accurate models can quickly become unreliable or harmful.
Manual Processes
Without MLOps, teams end up with manual, brittle processes that are slow, error-prone, and don't scale.
Performance Requirements
Production systems must handle evolving data and usage patterns while meeting latency, throughput, and quality requirements.
Risk Management
Inadequate MLOps can lead to stale models making bad predictions that hurt the business and users.
MLOps vs DevOps
| Aspect | Traditional DevOps | MLOps |
|---|---|---|
| Development Process | Deterministic, code-focused | Experimental, data-driven |
| Testing Complexity | Unit tests, integration tests | + Data quality tests, model performance tests, drift detection |
| Deployment | Code deployment via CI/CD | + Continuous training (CT), model retraining pipelines |
| Versioning | Code versioning | + Data versioning, model versioning, experiment tracking |
| Monitoring | System metrics, logs | + Model performance, data drift, concept drift |
Key MLOps Challenges
Data Drift
When the distribution of input data changes over time, model performance can degrade without obvious system failures.
Training/Serving Skew
Arises when feature transformation logic in the training pipeline (e.g. Python/pandas) diverges from production serving code (e.g. Java, Go, SQL) — including different rounding, missing value handling, or encoding. Also caused by distribution shifts in the underlying input data over time.
Feedback Loops
ML predictions can influence the data used to train future models, creating complex dependencies.
2. Machine Learning System Lifecycle
1. Scoping
Define project goals and success metrics
2. Data
Collect, clean, and prepare data
3. Modeling
Train and validate models
4. Deployment
Deploy to production
5. Monitoring
Monitor and maintain
⟳ This is an iterative cycle, not a linear flow. Monitoring outcomes feed back into Scoping and Data (drift triggers retraining). Model error analysis drives new data collection. Deployment baselines initialise monitoring thresholds. Real ML systems continuously loop through these stages throughout their lifetime.
Data Pipelines
In ML, data quality and management are often more important than the specific modeling algorithm. Production ML systems need robust data pipelines for training and inference.
Data Ingestion
- Batch processing (periodic imports)
- Streaming (real-time events)
- API integrations
- Database connections
Data Storage
- Cloud object stores (S3, GCS, ADLS) — preferred for cloud-native workloads
- HDFS — on-premises / hybrid environments (legacy)
- Data warehouses (BigQuery, Snowflake, Redshift)
- Lakehouse table formats: Delta Lake, Apache Iceberg, Apache Hudi
- Feature stores (Feast, Tecton, Hopsworks)
- Relational databases (PostgreSQL, MySQL)
Data Processing (ETL)
- Cleaning and validation
- Feature engineering
- Data joining and aggregation
- Format standardization
Data Labeling
- Automated labeling
- Human annotation
- Active learning
- Quality control
Model Training and Experimentation
Training Pipeline Components
Experiment Tracking
- Parameter logging
- Metrics recording
- Artifact storage
- Reproducibility
Model Validation
- Cross-validation
- Hold-out testing
- A/B testing
- Performance metrics
Resource Management
- Compute allocation
- GPU scheduling
- Memory optimization
- Cost monitoring
Deployment and Inference
Deployment Patterns
Model Serving
Monitoring and Observability
Operational Monitoring
- System metrics (CPU, memory)
- Latency and throughput
- Error rates
- Service availability
Model Performance
- Accuracy tracking
- Prediction quality
- Business metrics
- A/B test results
Data Monitoring
- Data drift detection
- Concept drift
- Feature distribution
- Data quality checks
3. Reproducibility and Versioning
Why Reproducibility Matters
Critical Benefits
-
Debugging: Reproduce training to pinpoint performance drops or discrepancies
-
Collaboration: Team members can verify and build upon each other's work
-
Compliance: Regulatory requirements in healthcare, finance, autonomous vehicles
-
Rollback: Ability to revert to previous model versions when needed
Common Challenges
- Randomness in ML (initial weights, data shuffling)
- Complex dependencies (code + data + environment)
- Large data versioning challenges
- Environment-specific behaviors
- Untracked manual experiments
Best Practices for Reproducibility
1. Ensure Deterministic Processes
# Set random seeds for reproducibility
import numpy as np
import random
import torch
np.random.seed(42)
random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
torch.cuda.manual_seed(42)
torch.cuda.manual_seed_all(42)
# REQUIRED for full CUDA reproducibility — without these, cuDNN
# selects non-deterministic convolution algorithms even with seeds set
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False # disable auto-tuner
# PyTorch >= 1.8: raise errors for ops with no deterministic implementation
# Use warn_only=True to log warnings instead of raising exceptions
torch.use_deterministic_algorithms(True, warn_only=True)
Note: Setting cudnn.deterministic = True and cudnn.benchmark = False is required for full GPU reproducibility. benchmark=False prevents cuDNN from selecting non-deterministic fast-path algorithms. There may be a small performance cost. Set CUBLAS_WORKSPACE_CONFIG=:4096:8 as an environment variable for additional determinism in CUDA 10.2+.
2. Version Control for Code
# Include Git commit hash in model metadata
import subprocess
def get_git_hash():
return subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode('ascii').strip()
model_metadata = {
'git_hash': get_git_hash(),
'timestamp': datetime.now().isoformat(),
'parameters': hyperparameters
}
3. Data Versioning
Use tools like DVC (Data Version Control) to track data alongside code:
# Initialize DVC in your project
dvc init
# Add data to DVC tracking
dvc add data/train_dataset.csv
# Commit both code and data references
git add data/train_dataset.csv.dvc .gitignore
git commit -m "Add training dataset v1.0"
# Create version tags
git tag -a v1.0 -m "Model version 1.0 with dataset v1.0"
4. Experiment Tracking
Use MLflow, Weights & Biases, or similar tools:
import mlflow
import mlflow.sklearn
with mlflow.start_run():
# Log parameters
mlflow.log_param("n_estimators", 100)
mlflow.log_param("max_depth", 5)
# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=5, random_state=42)
model.fit(X_train, y_train)
# Log metrics
accuracy = model.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
# Log model
mlflow.sklearn.log_model(model, "random_forest_model")
5. Model Versioning
Register models with versions and stages:
# Register model in MLflow Model Registry
model_name = "customer_churn_predictor"
mlflow.register_model(
f"runs:/{run_id}/random_forest_model",
model_name
)
# (transition_model_version_stage() was deprecated in MLflow 2.9
# and removed in MLflow 3.x)
client = mlflow.MlflowClient()
# Assign a 'staging' alias (any string is valid: staging, production, champion, etc.)
client.set_registered_model_alias(
name=model_name,
alias="staging",
version=1
)
# Load model by alias in serving code
model_uri = f"models:/{model_name}@staging"
model = mlflow.pyfunc.load_model(model_uri)
# Promote to production alias when validated
client.set_registered_model_alias(name=model_name, alias="champion", version=1)
client.delete_registered_model_alias(name=model_name, alias="staging")
6. Environment Management
Requirements.txt
Dockerfile (multi-stage)
Tools Comparison
| Tool | Strengths | Best For | Limitations |
|---|---|---|---|
| MLflow | Open-source, flexible, self-hosted | Local/enterprise setups, custom infrastructure | Basic UI, requires setup and maintenance |
| Weights & Biases | Rich visualizations, team collaboration, managed service | Fast setup, visualization-heavy workflows | Cloud-dependent, subscription costs |
| DVC | Git-like data versioning, lightweight | Data versioning, reproducible pipelines | Limited experiment tracking UI |
| Kubeflow | Kubernetes-native, scalable pipelines | Large-scale, cloud-native deployments | Complex setup, steep learning curve |
4. Data and Pipeline Engineering
Understanding the Data Landscape
In production ML, success depends not just on models but on the data pipelines that feed those models. Data is the raw material, and pipeline choices have profound downstream consequences.
Data Sources
Data Formats
- JSON/CSV: Human-readable, debugging-friendly, storage-heavy
- Parquet/Avro: Compact, efficient processing, machine-only
- CSV: Row-major, write-heavy workloads
- Parquet: Column-major, analytical queries, ML training
ETL vs ELT Pipelines
ETL (Extract-Transform-Load)
ELT (Extract-Load-Transform)
Pipeline Implementation Patterns
Batch Processing Pipeline
# Example batch processing pipeline with Apache Airflow 2.x / 3.x
from airflow.decorators import dag, task
from datetime import datetime, timedelta
default_args = {
'owner': 'ml-team',
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
@dag(
dag_id='ml_data_pipeline',
default_args=default_args,
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['ml', 'data']
)
def ml_data_pipeline():
@task
def extract_data():
# Extract data from source systems
pass
@task
def transform_data():
# Clean and transform data
pass
@task
def load_data():
# Load into target system
pass
extract_data() >> transform_data() >> load_data()
dag_instance = ml_data_pipeline()
Streaming Pipeline
# Example streaming pipeline with Apache Kafka and Python
from kafka import KafkaConsumer, KafkaProducer
import json
# Consumer for raw events
consumer = KafkaConsumer(
'raw_events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# Producer for processed features
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def process_event(event):
# Feature engineering logic
features = {
'user_id': event['user_id'],
'timestamp': event['timestamp'],
'processed_features': extract_features(event)
}
return features
for message in consumer:
event = message.value
processed = process_event(event)
producer.send('ml_features', processed)
Feature Store Integration
# feature_store.py (registry — version-controlled with your repo)
# Define FeatureService here for registration, NOT in serving code
from feast import FeatureStore, FeatureService
# feature_service is defined in the feature repo and applied via `feast apply`
user_features_service = FeatureService(
name="user_recommendation_features",
features=[user_stats_view[["avg_rating", "total_purchases"]],
user_demographics_view[["age_group"]]]
)
# ─────────────────────────────────────────────────────────────────
# serving_code.py — online inference path (real-time)
# ─────────────────────────────────────────────────────────────────
from feast import FeatureStore
fs = FeatureStore(repo_path="feature_repo/")
# (event_timestamp is only for get_historical_features / offline retrieval)
entity_rows = [
{"user_id": 1001},
{"user_id": 1002},
{"user_id": 1003},
]
# Pass feature service by name (string) for online retrieval
features = fs.get_online_features(
features="user_recommendation_features",
entity_rows=entity_rows,
).to_df()
# ─────────────────────────────────────────────────────────────────
# For HISTORICAL / TRAINING retrieval use get_historical_features:
# ─────────────────────────────────────────────────────────────────
import pandas as pd
from datetime import datetime
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003],
"event_timestamp": [datetime.now()] * 3 # required here, not for online
})
training_features = fs.get_historical_features(
entity_df=entity_df,
features="user_recommendation_features",
).to_df()
Data Quality and Validation
Schema Validation
- Data type validation
- Required field checks
- Range constraints
- Format validation
Statistical Validation
- Distribution monitoring
- Outlier detection
- Missing value rates
- Correlation changes
Business Logic
- Business rule checks
- Referential integrity
- Temporal consistency
- Cross-field validation
Data Quality Framework
import great_expectations as gx
# Initialize or load a Data Context (file-based or ephemeral)
context = gx.get_context()
# Define a Pandas Data Source
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset("raw_data")
batch_request = data_asset.build_batch_request(dataframe=raw_data)
# Create or update an Expectation Suite
suite = context.add_or_update_expectation_suite("data_quality_suite")
# Build a Validator
validator = context.get_validator(
batch_request=batch_request,
expectation_suite=suite
)
# Define expectations
validator.expect_column_to_exist("user_id")
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validator.expect_column_values_to_be_in_set("status", ["active", "inactive"])
validator.save_expectation_suite()
# Run validation via a Checkpoint (CI/CD-friendly)
checkpoint = context.add_or_update_checkpoint(
name="data_quality_checkpoint",
validations=[{
"batch_request": batch_request,
"expectation_suite_name": "data_quality_suite"
}]
)
result = checkpoint.run()
if not result.success:
raise ValueError("Data quality checks failed — pipeline halted.")
for validation in result.run_results.values():
for r in validation["results"]:
if not r["success"]:
print(f"FAILED: {r['expectation_config']['expectation_type']}")
5. MLOps Best Practices
Development and Testing
Code Quality
- Use version control for all code
- Write unit tests for data processing functions
- Implement integration tests for pipelines
- Use linting and formatting tools (black, flake8)
- Document APIs and functions thoroughly
- Follow consistent naming conventions
ML-Specific Testing
- Test data preprocessing steps
- Validate model inputs and outputs
- Check for data leakage
- Test model performance thresholds
- Validate feature engineering logic
- Test training/serving parity
Production Deployment
Deployment Strategies
Run two identical production environments, switch traffic instantly
Gradually roll out to small percentage of users, monitor performance
Compare new model against existing one with split traffic
Safety Measures
import time
import threading
class ModelCircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
self._lock = threading.Lock()
def call_model(self, input_data):
with self._lock:
if self.state == 'OPEN':
if (self.last_failure_time and
time.time() - self.last_failure_time > self.timeout):
self.state = 'HALF_OPEN'
else:
return self.fallback_response()
try:
result = model.predict(input_data)
with self._lock:
if self.state == 'HALF_OPEN':
# Probe succeeded — close the circuit
self.state = 'CLOSED'
self.failure_count = 0
return result
except Exception as e:
with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == 'HALF_OPEN':
self.state = 'OPEN'
elif self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
return self.fallback_response()
def fallback_response(self):
# Return a safe default/cached response when model is unavailable
return {"prediction": "default_value", "confidence": 0.0, "fallback": True}
Monitoring and Alerting
Key Metrics to Monitor
- Response latency (p50, p95, p99)
- Throughput (requests per second)
- Error rates and status codes
- Resource utilization (CPU, memory, GPU)
- Prediction accuracy/performance
- Confidence score distributions
- Feature importance changes
- Model version performance comparison
- Input data distribution drift
- Missing or null value rates
- Feature correlation changes
- Outlier detection rates
Alerting Framework
# Example monitoring setup with Prometheus and Grafana
from prometheus_client import Counter, Histogram, Gauge
import time
# Define metrics
prediction_counter = Counter('ml_predictions_total', 'Total predictions made')
prediction_latency = Histogram('ml_prediction_duration_seconds', 'Prediction latency')
model_accuracy = Gauge('ml_model_accuracy', 'Current model accuracy')
data_drift_score = Gauge('ml_data_drift_score', 'Data drift detection score')
def make_prediction(input_data):
start_time = time.time()
try:
prediction = model.predict(input_data)
prediction_counter.inc()
# Record latency
duration = time.time() - start_time
prediction_latency.observe(duration)
return prediction
except Exception as e:
# Log error and increment error counter
logger.error(f"Prediction failed: {e}")
raise
# Update model metrics periodically
def update_model_metrics():
# Calculate current accuracy on validation set
current_accuracy = evaluate_model()
model_accuracy.set(current_accuracy)
# Calculate data drift
drift_score = detect_data_drift(recent_data, reference_data)
data_drift_score.set(drift_score)
Security and Compliance
Data Security
- Encrypt data at rest and in transit (TLS 1.3+)
- Implement RBAC and least-privilege access controls
- Regular security audits and penetration testing
- Secure model artifacts: sign and verify checksums
Compliance Frameworks
- GDPR / UK GDPR — EU personal data
- EU AI Act — risk-tier conformity assessments
- CCPA — California consumer data
- HIPAA — US healthcare data
- NIST AI RMF — GOVERN / MAP / MEASURE / MANAGE
- Model explainability, audit trails, data retention policies
Privacy
- Data anonymisation and pseudonymisation
- Differential privacy techniques
- Federated learning for sensitive data
- Regular privacy impact assessments (PIAs)
ML Supply Chain Security
- Verify model weight checksums/signatures before loading
- Scan base Docker images with Trivy or Grype for CVEs
- Use signed container images (Cosign / Sigstore)
- Pin dependencies with hash verification (
pip install --require-hashes) - Audit third-party datasets for poisoning or licence issues
- Use private model registries — do not pull weights from public sources without verification
Model & Inference Security
- Input validation and sanitisation before model inference
- Rate limiting and authentication on model endpoints
- Adversarial input detection / robustness testing
- Mitigate model inversion and membership inference attacks
- Store secrets in vault (HashiCorp Vault, AWS Secrets Manager) — never in env vars or images
- Enforce NetworkPolicy in Kubernetes to restrict model pod egress
Team Organization and Culture
Cross-functional Teams
- Data Scientists: Model development, experimentation
- ML Engineers: Pipeline development, model deployment
- Data Engineers: Data infrastructure, ETL pipelines
- DevOps Engineers: Infrastructure, monitoring, scaling
- Product Managers: Requirements, business metrics
Best Practices
- Regular code reviews and pair programming
- Knowledge sharing sessions and documentation
- Standardized project templates and tooling
- Continuous learning and skill development
- Post-mortem analysis for production issues
- "You build it, you run it" mentality
6. MLOps Maturity Levels
Why maturity levels matter: Most teams should NOT start with full pipeline automation. Understanding where you are today — and what the next pragmatic step is — prevents over-engineering and enables incremental, sustainable adoption. This framework is based on Google's MLOps white paper (2021) and Microsoft's MLOps maturity model.
Level 0 — Manual Process (most teams start here)
Characteristics
- Manual, script-driven training
- Models deployed as static artefacts
- No experiment tracking
- Ad-hoc retraining when performance degrades
- Jupyter notebooks in production
Pain Points
- Results not reproducible
- No visibility into model drift
- Deployment is a manual, risky event
- Knowledge siloed in individuals
Key Actions to Advance
- Add experiment tracking (MLflow/W&B)
- Refactor notebooks into Python scripts
- Version code in Git
- Containerise training and serving
Level 1 — ML Pipeline Automation
Characteristics
- Automated training pipelines
- Continuous Training (CT) on new data
- Experiment tracking and model registry
- Data and model validation gates
- Feature store in use
Tools Typical at This Level
- MLflow / W&B for tracking
- DVC or LakeFS for data versioning
- Airflow / Prefect for orchestration
- Docker + basic K8s for serving
Key Actions to Advance
- Add full CI/CD for pipeline code
- Automate model evaluation and promotion
- Implement drift monitoring and alerting
- Standardise project templates
Level 2 — CI/CD Pipeline Automation (full MLOps)
Characteristics
- Automated CI/CD for all pipeline components
- Auto-triggered retraining on drift detection
- Canary and shadow deployments
- Full observability stack
- Self-service ML platform for data scientists
Requirements
- Dedicated ML Platform team
- Mature DevOps/SRE culture
- Significant infrastructure investment
- Strong data governance framework
⚠️ Reality Check
Most organisations operate between Level 0 and Level 1. Level 2 is appropriate for teams with 10+ ML models in production and dedicated platform engineering capacity. Do not attempt to jump directly to Level 2 from Level 0.
7. LLMOps — Operating Large Language Models
LLMOps extends classical MLOps with patterns specific to large language models (LLMs) and generative AI systems. Key differences include non-deterministic outputs, prompt-based interfaces, RAG architectures, and new evaluation paradigms. If you are deploying GPT-family models, open-source LLMs (Llama, Mistral, Gemma), or embedding-based systems, these patterns apply.
Prompt Engineering & Versioning
- Store prompts in a prompt registry (version-controlled)
- A/B test prompt variants with metrics
- Log prompts and completions for auditability
- Use structured output enforcement (Pydantic, Instructor)
- Tools: LangSmith, PromptLayer, Agenta
RAG Pipeline Management
- Version your vector store indices alongside model versions
- Monitor retrieval quality (precision@k, recall@k)
- Evaluate end-to-end RAG quality with RAGAS framework
- Chunk strategy versioning (size, overlap, method)
- Re-index on knowledge base updates
LLM Evaluation
- LLM-as-judge for automated quality scoring
- Hallucination detection metrics (FactScore, TruthfulQA style)
- Task-specific benchmarks (HumanEval, MMLU, etc.)
- Human eval for high-stakes use cases
- Tools: DeepEval, RAGAS, Braintrust, Langfuse
LLM Safety & Guardrails
- Input validation: detect prompt injection, jailbreaks
- Output filtering: PII redaction, toxicity classifiers
- Implement guardrail layers (NeMo Guardrails, Guardrails AI)
- Rate limiting per user/session
- Log all interactions for compliance audit
LLM Inference Optimisation
- Quantisation: INT4/INT8 (AWQ, GPTQ, bitsandbytes)
- Speculative decoding for throughput
- Continuous batching (vLLM, TGI)
- KV cache management for long contexts
- Monitor tokens/sec, TTFT (Time To First Token), total latency
Fine-Tuning vs. Prompting
- Start with prompting; fine-tune only when prompting plateaus
- Use parameter-efficient methods: LoRA, QLoRA, PEFT
- Track fine-tune runs like any ML experiment (MLflow/W&B)
- Evaluate fine-tuned models against base model on held-out set
- Monitor for catastrophic forgetting
Key Differences from Classical MLOps
| Aspect | Classical MLOps | LLMOps |
|---|---|---|
| Output | Deterministic (same input → same output) | Stochastic (temperature-dependent) |
| Versioning unit | Model weights + hyperparams | Model + prompt + system message + RAG index |
| Evaluation | Numerical metrics (accuracy, F1) | LLM-as-judge, human eval, task-specific benchmarks |
| Retraining trigger | Data drift, performance drop | Knowledge cutoff, prompt degradation, new capabilities |
| Primary security concern | Model inversion, data leakage | Prompt injection, jailbreaks, PII in completions |
8. Implementation Guide and Templates
Project Structure Template
ml-project/
├── README.md
├── requirements.txt
├── Dockerfile
├── .gitignore
├── .dvcignore
├── dvc.yaml # DVC pipeline definition
├── params.yaml # Hyperparameters and config
├── data/
│ ├── raw/ # Original, immutable data
│ ├── processed/ # Cleaned and processed data
│ └── external/ # Third-party datasets
├── notebooks/
│ ├── 01-exploratory-analysis.ipynb
│ ├── 02-feature-engineering.ipynb
│ └── 03-model-experiments.ipynb
├── src/
│ ├── __init__.py
│ ├── data/
│ │ ├── __init__.py
│ │ ├── extract.py # Data extraction
│ │ ├── transform.py # Data transformation
│ │ └── validate.py # Data validation
│ ├── features/
│ │ ├── __init__.py
│ │ └── build_features.py # Feature engineering
│ ├── models/
│ │ ├── __init__.py
│ │ ├── train.py # Model training
│ │ ├── predict.py # Model prediction
│ │ └── evaluate.py # Model evaluation
│ └── utils/
│ ├── __init__.py
│ └── helpers.py # Utility functions
├── tests/
│ ├── __init__.py
│ ├── test_data.py
│ ├── test_features.py
│ └── test_models.py
├── models/ # Trained model artifacts
├── reports/ # Analysis reports and figures
├── deployment/
│ ├── docker/
│ ├── kubernetes/
│ └── terraform/
└── mlflow_tracking/ # MLflow tracking server data
Configuration Management
params.yaml
# Data processing parameters
data:
raw_data_path: "data/raw/dataset.csv"
processed_data_path: "data/processed/clean_dataset.csv"
test_size: 0.2
random_state: 42
# Feature engineering
features:
categorical_features:
- "category"
- "region"
numerical_features:
- "age"
- "income"
target: "target_variable"
# Model parameters
model:
algorithm: "random_forest"
n_estimators: 100
max_depth: 10
min_samples_split: 5
random_state: 42
# Training parameters
training:
validation_split: 0.2
cross_validation_folds: 5
metrics:
- "accuracy"
- "precision"
- "recall"
- "f1"
# MLflow tracking
mlflow:
experiment_name: "customer_churn_prediction"
tracking_uri: "http://localhost:5000"
Configuration Loader
import yaml
import os
from pathlib import Path
class Config:
def __init__(self, config_path="params.yaml"):
self.config_path = Path(config_path)
self.config = self._load_config()
def _load_config(self):
"""Load configuration from YAML file"""
with open(self.config_path, 'r') as file:
config = yaml.safe_load(file)
# Override with environment variables if present
self._override_with_env(config)
return config
def _override_with_env(self, config):
"""Override config values with environment variables"""
# Example: MLFLOW_TRACKING_URI overrides mlflow.tracking_uri
if 'MLFLOW_TRACKING_URI' in os.environ:
config['mlflow']['tracking_uri'] = os.environ['MLFLOW_TRACKING_URI']
def get(self, key_path, default=None):
"""Get configuration value using dot notation"""
keys = key_path.split('.')
value = self.config
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return default
return value
# Usage
config = Config()
model_params = config.get('model')
tracking_uri = config.get('mlflow.tracking_uri')
Training Pipeline Template
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd
import joblib
from datetime import datetime
import argparse
class ModelTrainer:
def __init__(self, config):
self.config = config
self.model = None
self.X_train = None
self.X_test = None
self.y_train = None
self.y_test = None
def load_data(self):
"""Load and split data"""
data_path = self.config.get('data.processed_data_path')
df = pd.read_csv(data_path)
feature_cols = (
self.config.get('features.categorical_features', []) +
self.config.get('features.numerical_features', [])
)
target_col = self.config.get('features.target')
X = df[feature_cols]
y = df[target_col]
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
X, y,
test_size=self.config.get('data.test_size', 0.2),
random_state=self.config.get('data.random_state', 42),
stratify=y
)
def train_model(self):
"""Train the model"""
model_params = self.config.get('model')
if model_params['algorithm'] == 'random_forest':
self.model = RandomForestClassifier(
n_estimators=model_params['n_estimators'],
max_depth=model_params['max_depth'],
min_samples_split=model_params['min_samples_split'],
random_state=model_params['random_state']
)
self.model.fit(self.X_train, self.y_train)
def evaluate_model(self):
"""Evaluate model performance"""
y_pred = self.model.predict(self.X_test)
metrics = {
'accuracy': accuracy_score(self.y_test, y_pred),
'precision': precision_score(self.y_test, y_pred, average='weighted'),
'recall': recall_score(self.y_test, y_pred, average='weighted'),
'f1': f1_score(self.y_test, y_pred, average='weighted')
}
# Cross-validation
cv_scores = cross_val_score(
self.model, self.X_train, self.y_train,
cv=self.config.get('training.cross_validation_folds', 5)
)
metrics['cv_accuracy_mean'] = cv_scores.mean()
metrics['cv_accuracy_std'] = cv_scores.std()
return metrics
def save_model(self, model_path):
"""Save trained model"""
joblib.dump(self.model, model_path)
def run_training(self):
"""Execute complete training pipeline"""
mlflow.set_experiment(self.config.get('mlflow.experiment_name'))
with mlflow.start_run():
# Log parameters
mlflow.log_params(self.config.get('model'))
mlflow.log_params(self.config.get('data'))
# Load data and train
self.load_data()
self.train_model()
# Evaluate
metrics = self.evaluate_model()
mlflow.log_metrics(metrics)
# Save model
model_path = f"models/model_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pkl"
self.save_model(model_path)
mlflow.log_artifact(model_path)
# Log model to MLflow Model Registry
mlflow.sklearn.log_model(
self.model,
"model",
registered_model_name=self.config.get('mlflow.experiment_name')
)
print(f"Training completed. Metrics: {metrics}")
return metrics
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--config", default="params.yaml", help="Configuration file path")
args = parser.parse_args()
config = Config(args.config)
trainer = ModelTrainer(config)
trainer.run_training()
Model Serving Template
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import pandas as pd
import numpy as np
from typing import List, Dict
import logging
import time
import os
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
model = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: load model from registry / object store — NOT from embedded file
global model
try:
model_uri = os.environ.get("MODEL_URI", "models/production_model.pkl")
model = joblib.load(model_uri)
logger.info(f"Model loaded successfully from {model_uri}")
except Exception as e:
logger.error(f"Failed to load model: {e}")
raise
yield
# Shutdown: release resources
model = None
logger.info("Model unloaded")
# Initialize FastAPI app
app = FastAPI(
title="ML Model API",
description="Production ML model serving API",
version="1.0.0",
lifespan=lifespan
)
class PredictionRequest(BaseModel):
features: Dict[str, float] # keys must match training column names
class PredictionResponse(BaseModel):
prediction: int
probability: List[float]
model_version: str
processing_time: float
class HealthResponse(BaseModel):
status: str
model_loaded: bool
timestamp: str
# Health check endpoint
@app.get("/health", response_model=HealthResponse)
async def health_check():
return HealthResponse(
status="healthy" if model is not None else "unhealthy",
model_loaded=model is not None,
timestamp=pd.Timestamp.now().isoformat()
)
# Prediction endpoint
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start_time = time.time()
try:
# Convert input to DataFrame
input_data = pd.DataFrame([request.features])
# Make prediction
prediction = model.predict(input_data)[0]
probability = model.predict_proba(input_data)[0].tolist()
processing_time = time.time() - start_time
# Log prediction
logger.info(f"Prediction made: {prediction}, Processing time: {processing_time:.3f}s")
return PredictionResponse(
prediction=int(prediction),
probability=probability,
model_version="1.0.0",
processing_time=processing_time
)
except Exception as e:
logger.error(f"Prediction failed: {e}")
raise HTTPException(status_code=400, detail=f"Prediction failed: {str(e)}")
# Batch prediction endpoint
@app.post("/predict/batch")
async def predict_batch(requests: List[PredictionRequest]):
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
try:
# Convert all inputs to DataFrame
features_list = [req.features for req in requests]
input_data = pd.DataFrame(features_list)
# Make predictions
predictions = model.predict(input_data)
probabilities = model.predict_proba(input_data)
# Format responses
responses = []
for i, (pred, prob) in enumerate(zip(predictions, probabilities)):
responses.append({
"id": i,
"prediction": int(pred),
"probability": prob.tolist()
})
return responses
except Exception as e:
logger.error(f"Batch prediction failed: {e}")
raise HTTPException(status_code=400, detail=f"Batch prediction failed: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Docker and Kubernetes Deployment
Dockerfile
FROM python:3.9-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY src/ ./src/
COPY params.yaml .
# Models should be loaded at runtime from a model registry or object store.
# Inject MODEL_URI as an environment variable at container startup.
# This keeps images small, immutable, and free of potentially sensitive weights.
# Example: docker run -e MODEL_URI=s3://bucket/models/v1.pkl ml-model-api:latest
# Create non-root user
RUN adduser --disabled-password --gecos '' mluser
USER mluser
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run application
CMD ["python", "src/api/serve.py"]
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-model-api
labels:
app: ml-model-api
spec:
replicas: 3
selector:
matchLabels:
app: ml-model-api
template:
metadata:
labels:
app: ml-model-api
spec:
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
seccompProfile:
type: RuntimeDefault
containers:
- name: ml-model-api
image: ml-model-api:latest
imagePullPolicy: Always ports:
- containerPort: 8000
env:
- name: MODEL_URI
valueFrom:
configMapKeyRef:
name: ml-model-config
key: model_uri
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop: ["ALL"]
resources:
requests:
memory: "1Gi" cpu: "500m"
limits:
memory: "2Gi" # profile your model — 512Mi will OOM most models
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: ml-model-api-service
spec:
selector:
app: ml-model-api
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: ml-model-api-netpol
spec:
podSelector:
matchLabels:
app: ml-model-api
policyTypes:
- Egress
egress:
- to:
- namespaceSelector: {}
ports:
- port: 443 # allow HTTPS to model registry / object store only
CI/CD Pipeline Template
# .github/workflows/ml-pipeline.yml
name: ML Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5 with:
python-version: "3.11"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov flake8
- name: Lint with flake8
run: |
flake8 src/ --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 src/ --count --max-complexity=10 --max-line-length=100 --statistics
- name: Run tests
run: |
pytest tests/ --cov=src/ --cov-report=xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
train:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Setup DVC
uses: iterative/setup-dvc@v2
- name: Pull data with DVC
run: dvc pull
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Train model
run: python src/models/train.py
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
- name: Run model tests
run: pytest tests/test_models.py
deploy:
needs: [test, train]
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Build Docker image
run: |
docker build -t ml-model-api:${{ github.sha }} .
docker tag ml-model-api:${{ github.sha }} ml-model-api:latest
- name: Scan image for vulnerabilities
run: |
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock \
aquasec/trivy:latest image --exit-code 1 --severity CRITICAL \
ml-model-api:${{ github.sha }}
- name: Deploy to staging
run: |
# Deploy to staging environment
kubectl apply -f deployment/kubernetes/staging/
- name: Run integration tests
run: |
# Run integration tests against staging
pytest tests/integration/
- name: Deploy to production
if: success()
run: |
# Deploy to production environment
kubectl apply -f deployment/kubernetes/production/
Monitoring and Observability Setup
# monitoring/prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'ml-model-api'
static_configs:
- targets: ['ml-model-api-service:80']
metrics_path: '/metrics'
scrape_interval: 30s
# monitoring/grafana-dashboard.json
{
"dashboard": {
"title": "ML Model Monitoring",
"panels": [
{
"title": "Prediction Rate",
"type": "timeseries",
"datasource": { "type": "prometheus" },
"targets": [
{
"expr": "rate(ml_predictions_total[5m])",
"legendFormat": "Predictions/sec"
}
]
},
{
"title": "Prediction Latency",
"type": "timeseries",
"datasource": { "type": "prometheus" },
"targets": [
{
"expr": "histogram_quantile(0.95, rate(ml_prediction_duration_seconds_bucket[5m]))",
"legendFormat": "p95 latency"
},
{
"expr": "histogram_quantile(0.50, rate(ml_prediction_duration_seconds_bucket[5m]))",
"legendFormat": "p50 (median)"
}
]
},
{
"title": "Model Accuracy",
"type": "stat",
"datasource": { "type": "prometheus" },
"targets": [
{
"expr": "ml_model_accuracy",
"legendFormat": "Current Accuracy"
}
]
},
{
"title": "Data Drift Score",
"type": "timeseries",
"datasource": { "type": "prometheus" },
"targets": [
{
"expr": "ml_data_drift_score",
"legendFormat": "Drift Score"
}
]
}
]
}
}
Conclusion and Next Steps
Key Takeaways
- MLOps is not just about models—it's about building reliable, scalable systems that deliver business value
- Reproducibility and versioning are fundamental to trustworthy ML systems
- Data quality and pipeline engineering are often more important than model sophistication
- Monitoring and observability are critical for maintaining model performance over time
- Successful MLOps requires both technical excellence and strong team collaboration
Implementation Roadmap
- Phase 1: Establish version control and experiment tracking
- Phase 2: Build robust data pipelines and validation
- Phase 3: Implement CI/CD for model deployment
- Phase 4: Set up comprehensive monitoring and alerting
- Phase 5: Scale with advanced orchestration and automation
Recommended Tool Stack
- Experiment Tracking: MLflow, Weights & Biases, Comet ML
- Data Versioning: DVC, LakeFS (data lake versioning)
- Pipeline Orchestration: Apache Airflow 2.x, Prefect, ZenML
- Model Serving: FastAPI, BentoML, Ray Serve, vLLM (LLMs)
- Feature Stores: Feast, Tecton, Hopsworks
- Monitoring: Prometheus, Grafana, Evidently AI (drift), DataDog
- Infrastructure: Docker, Kubernetes, Terraform, Helm
- Security: Trivy (scanning), OPA/Kyverno (policy), HashiCorp Vault (secrets), Cosign (signing)
This blueprint provides the foundation for building production-ready ML systems. Remember that MLOps is a journey, not a destination—continuously iterate and improve your processes as your team and systems mature.
Pro Tip: Start small, automate incrementally, and always prioritize reliability over complexity.