π
MLflow: ML Lifecycle Management
MLflow is an open-source platform for managing the complete machine learning lifecycle, including experimentation, reproducibility, deployment, and model registry management.
Track
Experiments
Registry
Model Versions
Deploy
Production Models
Pipeline
End-to-End ML
MLflow Implementation Guide
Experiment Tracking
Pythonimport mlflow
import mlflow.sklearn
import mlflow.pytorch
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score
import pandas as pd
import numpy as np
class MLflowExperimentTracker:
"""Comprehensive experiment tracking with MLflow"""
def __init__(self, experiment_name: str, tracking_uri: str = None):
self.experiment_name = experiment_name
# Set tracking URI (local, remote, or cloud)
if tracking_uri:
mlflow.set_tracking_uri(tracking_uri)
# Create or get experiment
experiment = mlflow.get_experiment_by_name(experiment_name)
if experiment is None:
experiment_id = mlflow.create_experiment(experiment_name)
else:
experiment_id = experiment.experiment_id
mlflow.set_experiment(experiment_name)
self.experiment_id = experiment_id
def train_and_track_sklearn_model(self, X, y, model_params: dict = None):
"""Train and track scikit-learn model with comprehensive logging"""
with mlflow.start_run(run_name=f"random_forest_{int(time.time())}") as run:
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Set default parameters
params = {
'n_estimators': 100,
'max_depth': 10,
'random_state': 42,
'n_jobs': -1
}
if model_params:
params.update(model_params)
# Log parameters
mlflow.log_params(params)
# Log dataset info
mlflow.log_params({
'total_samples': len(X),
'train_samples': len(X_train),
'test_samples': len(X_test),
'features': X.shape[1]
})
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Make predictions
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)
# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='weighted')
recall = recall_score(y_test, y_pred, average='weighted')
# Log metrics
mlflow.log_metrics({
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'feature_importances_std': np.std(model.feature_importances_)
})
# Log model
mlflow.sklearn.log_model(
model,
"model",
registered_model_name=f"{self.experiment_name}_best_model"
)
# Log feature importances plot
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.bar(range(len(model.feature_importances_)), model.feature_importances_)
plt.title('Feature Importances')
plt.xlabel('Feature Index')
plt.ylabel('Importance')
plt.savefig('feature_importances.png')
mlflow.log_artifact('feature_importances.png')
plt.close()
# Log confusion matrix
from sklearn.metrics import confusion_matrix
import seaborn as sns
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.savefig('confusion_matrix.png')
mlflow.log_artifact('confusion_matrix.png')
plt.close()
return {
'run_id': run.info.run_id,
'model': model,
'metrics': {'accuracy': accuracy, 'precision': precision, 'recall': recall}
}
def hyperparameter_search(self, X, y, param_grid: dict):
"""Perform hyperparameter search with MLflow tracking"""
from sklearn.model_selection import ParameterGrid
best_score = 0
best_run_id = None
for params in ParameterGrid(param_grid):
result = self.train_and_track_sklearn_model(X, y, params)
if result['metrics']['accuracy'] > best_score:
best_score = result['metrics']['accuracy']
best_run_id = result['run_id']
return {
'best_run_id': best_run_id,
'best_score': best_score
}
def compare_runs(self, metric_name: str = 'accuracy'):
"""Compare runs within the experiment"""
experiment = mlflow.get_experiment_by_name(self.experiment_name)
runs = mlflow.search_runs(experiment_ids=[experiment.experiment_id])
# Sort by metric
if f'metrics.{metric_name}' in runs.columns:
runs_sorted = runs.sort_values(f'metrics.{metric_name}', ascending=False)
return {
'best_run': {
'run_id': runs_sorted.iloc[0]['run_id'],
'metric_value': runs_sorted.iloc[0][f'metrics.{metric_name}'],
'params': {col.replace('params.', ''): runs_sorted.iloc[0][col]
for col in runs_sorted.columns if col.startswith('params.')}
},
'comparison_df': runs_sorted[['run_id', f'metrics.{metric_name}'] +
[col for col in runs_sorted.columns if col.startswith('params.')]]
}
return runs
# Example usage
def track_ml_experiments():
# Generate sample data
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
# Initialize tracker
tracker = MLflowExperimentTracker("customer_churn_prediction")
# Single model training
result = tracker.train_and_track_sklearn_model(X, y, {
'n_estimators': 150,
'max_depth': 12
})
# Hyperparameter search
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [5, 10, 15],
'min_samples_split': [2, 5, 10]
}
best_result = tracker.hyperparameter_search(X, y, param_grid)
print(f"Best run: {best_result['best_run_id']} with score: {best_result['best_score']}")
# Compare all runs
comparison = tracker.compare_runs('accuracy')
print(comparison['comparison_df'].head())
return tracker
MLflow Components
π¬ MLflow Tracking
- β’ Log parameters, metrics, and artifacts
- β’ Compare experiments and runs
- β’ Store model lineage and metadata
- β’ Search and filter experiments
π¦ Model Registry
- β’ Centralized model store
- β’ Version control for models
- β’ Stage management (staging, production)
- β’ Model lineage and annotations
π MLflow Models
- β’ Deploy models to various platforms
- β’ REST API serving
- β’ Batch and real-time inference
- β’ Docker and Kubernetes deployment
π§ MLflow Projects
- β’ Reproducible ML code packaging
- β’ Environment management
- β’ Parameter specification
- β’ Remote execution capabilities
Deployment & Infrastructure Options
Platform | Tracking | Registry | Serving | Scalability |
---|---|---|---|---|
Local SQLite | π’ Full | π’ Full | π’ Basic | π΄ Limited |
PostgreSQL + S3 | π’ Full | π’ Full | π’ Full | π’ High |
Databricks MLflow | π’ Enhanced | π’ Enhanced | π’ Enterprise | π’ Auto-scale |
Azure ML + MLflow | π’ Integrated | π’ Integrated | π’ Cloud-native | π’ Managed |
MLflow Best Practices
π Experiment Tracking
- β’ Use descriptive experiment and run names
- β’ Log all relevant parameters and hyperparameters
- β’ Track both training and validation metrics
- β’ Save model artifacts and important plots
π¦ Model Management
- β’ Use semantic versioning for models
- β’ Add detailed descriptions and tags
- β’ Implement proper stage transitions
- β’ Archive old models regularly
π Production Deployment
- β’ Test models thoroughly before promotion
- β’ Implement health checks and monitoring
- β’ Use containers for consistent environments
- β’ Plan for model rollback scenarios
π§ Infrastructure
- β’ Use remote tracking store for teams
- β’ Implement proper access controls
- β’ Regular backups of experiments and models
- β’ Monitor storage usage and costs
Related Technologies
π₯
PyTorchβ
Deep learning framework for model development
π€
Transformersβ
Core architecture for modern AI models
π³
Dockerβ
Containerization for deployment
βΈοΈ
Kubernetesβ
Container orchestration platform
π
Prometheusβ
Monitoring and alerting toolkit
β‘
Apache Sparkβ
Large-scale data processing engine
π Test Your Understanding
1 of 8Current: 0/8