Real-time ML Inference Systems
Build high-performance real-time ML systems with streaming features and low-latency serving
50 min read•Advanced
Not Started
Loading...
⚡ Real-time Inference Performance Calculator
10,000 RPS
500MB
1000 dimensions
Performance Metrics
Cached Latency:9ms
Uncached Latency:11ms
Latency Improvement:20% faster
Effective Throughput:3,523 RPS
Memory Usage:750MB
Cache Hit Rate:85%
Cost Efficiency:47/10
Strategy: In-memory key-value cache
🏗️ Real-time ML Architecture Patterns
Streaming Architecture
Process data streams in real-time with continuous model updates
Latency Target:< 10ms
Throughput:Very High
Consistency:Eventual
Complexity:High
Request-Response
Synchronous inference serving with immediate results
Latency Target:< 100ms
Throughput:High
Consistency:Strong
Complexity:Medium
Micro-batch Streaming
Small batch processing for latency-throughput balance
Latency Target:< 50ms
Throughput:Very High
Consistency:Near Real-time
Complexity:Medium
Edge Caching
Pre-computed results with real-time cache updates
Latency Target:< 5ms
Throughput:Extreme
Consistency:Eventual
Complexity:High
💻 Implementation Examples
1. High-Throughput Streaming Inference
import asyncio
import numpy as np
from kafka import KafkaConsumer, KafkaProducer
import redis
import torch
from concurrent.futures import ThreadPoolExecutor
import time
class StreamingMLPipeline:
def __init__(self, model_path, redis_host='localhost'):
# Load optimized model (TorchScript, ONNX, etc.)
self.model = torch.jit.load(model_path)
self.model.eval()
# Feature cache for low-latency serving
self.redis_client = redis.Redis(host=redis_host, decode_responses=True)
# Async processing pools
self.feature_executor = ThreadPoolExecutor(max_workers=4)
self.inference_executor = ThreadPoolExecutor(max_workers=8)
# Kafka setup for streaming
self.consumer = KafkaConsumer(
'ml-features',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
async def process_stream(self):
"""Process streaming data with batching for efficiency"""
batch = []
batch_size = 32
last_batch_time = time.time()
for message in self.consumer:
data = message.value
batch.append(data)
# Process batch when full or timeout reached
should_process = (
len(batch) >= batch_size or
time.time() - last_batch_time > 0.050 # 50ms timeout
)
if should_process:
await self.process_batch(batch)
batch = []
last_batch_time = time.time()
async def process_batch(self, batch):
"""Parallel feature extraction and inference"""
# Extract features in parallel
feature_tasks = []
for item in batch:
task = asyncio.create_task(
self.get_features_async(item['user_id'], item['context'])
)
feature_tasks.append(task)
features = await asyncio.gather(*feature_tasks)
# Batch inference
feature_tensor = torch.stack([f for f in features if f is not None])
with torch.no_grad():
predictions = self.model(feature_tensor)
scores = torch.softmax(predictions, dim=1)
# Send results back
for i, item in enumerate(batch):
if features[i] is not None:
result = {
'user_id': item['user_id'],
'prediction': scores[i].tolist(),
'timestamp': time.time(),
'latency_ms': (time.time() - item['timestamp']) * 1000
}
self.producer.send('ml-predictions', value=result)
async def get_features_async(self, user_id, context):
"""Async feature retrieval with caching"""
cache_key = f"features:{user_id}:{hash(str(context))}"
# Try cache first
cached = self.redis_client.get(cache_key)
if cached:
return torch.tensor(json.loads(cached))
# Compute features if not cached
features = await asyncio.get_event_loop().run_in_executor(
self.feature_executor,
self.compute_features,
user_id, context
)
# Cache for future requests (TTL: 5 minutes)
self.redis_client.setex(cache_key, 300, json.dumps(features.tolist()))
return features
def compute_features(self, user_id, context):
"""Feature computation (can be expensive)"""
# Real feature engineering would be more complex
user_features = self.get_user_profile(user_id)
context_features = self.extract_context_features(context)
# Combine and normalize
combined = np.concatenate([user_features, context_features])
return torch.tensor(combined, dtype=torch.float32)
# Usage
async def main():
pipeline = StreamingMLPipeline('optimized_model.pt')
await pipeline.process_stream()
if __name__ == "__main__":
asyncio.run(main())2. Multi-Model Serving with Dynamic Routing
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import numpy as np
from typing import Dict, List, Optional
import asyncio
import time
from functools import lru_cache
class PredictionRequest(BaseModel):
user_id: str
features: List[float]
model_variant: Optional[str] = "default"
latency_budget_ms: Optional[int] = 100
class MultiModelServer:
def __init__(self):
# Load multiple model variants
self.models = {
"fast": torch.jit.load("models/fast_model.pt"), # 10ms, 85% accuracy
"default": torch.jit.load("models/default_model.pt"), # 50ms, 92% accuracy
"accurate": torch.jit.load("models/accurate_model.pt") # 200ms, 95% accuracy
}
# Model performance profiles
self.model_profiles = {
"fast": {"latency_ms": 10, "accuracy": 0.85, "memory_mb": 50},
"default": {"latency_ms": 50, "accuracy": 0.92, "memory_mb": 200},
"accurate": {"latency_ms": 200, "accuracy": 0.95, "memory_mb": 800}
}
# Performance monitoring
self.performance_stats = {model: [] for model in self.models.keys()}
for model in self.models.values():
model.eval()
def select_optimal_model(self, latency_budget_ms: int, user_tier: str = "standard") -> str:
"""Dynamically select best model based on constraints"""
# Premium users get better models
if user_tier == "premium":
latency_budget_ms *= 2
# Find best model within latency budget
best_model = "fast" # fallback
best_accuracy = 0
for model_name, profile in self.model_profiles.items():
if profile["latency_ms"] <= latency_budget_ms:
if profile["accuracy"] > best_accuracy:
best_accuracy = profile["accuracy"]
best_model = model_name
return best_model
async def predict_with_model(self, model_name: str, features: np.ndarray) -> Dict:
"""Execute prediction with specific model"""
start_time = time.time()
model = self.models[model_name]
feature_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0)
with torch.no_grad():
logits = model(feature_tensor)
probabilities = torch.softmax(logits, dim=1)
prediction = torch.argmax(probabilities, dim=1)
latency_ms = (time.time() - start_time) * 1000
# Update performance stats
self.performance_stats[model_name].append(latency_ms)
if len(self.performance_stats[model_name]) > 1000:
self.performance_stats[model_name].pop(0) # Keep recent 1000
return {
"prediction": prediction.item(),
"confidence": probabilities.max().item(),
"probabilities": probabilities.squeeze().tolist(),
"model_used": model_name,
"latency_ms": round(latency_ms, 2)
}
@lru_cache(maxsize=10000)
def get_user_tier(self, user_id: str) -> str:
"""Cache user tier lookup"""
# In practice, this would query user database
return "premium" if user_id.endswith("_premium") else "standard"
app = FastAPI()
ml_server = MultiModelServer()
@app.post("/predict")
async def predict(request: PredictionRequest):
try:
# Determine user tier and optimal model
user_tier = ml_server.get_user_tier(request.user_id)
if request.model_variant == "auto":
model_name = ml_server.select_optimal_model(
request.latency_budget_ms, user_tier
)
else:
model_name = request.model_variant
if model_name not in ml_server.models:
raise HTTPException(status_code=400, detail=f"Model {model_name} not available")
# Execute prediction
features = np.array(request.features)
result = await ml_server.predict_with_model(model_name, features)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
"""Health check with model performance stats"""
stats = {}
for model_name, latencies in ml_server.performance_stats.items():
if latencies:
stats[model_name] = {
"avg_latency_ms": round(np.mean(latencies), 2),
"p95_latency_ms": round(np.percentile(latencies, 95), 2),
"request_count": len(latencies)
}
return {"status": "healthy", "model_stats": stats}
# Run with: uvicorn app:app --host 0.0.0.0 --port 8000 --workers 43. Online Learning with Streaming Updates
import numpy as np
from sklearn.linear_model import SGDRegressor
from sklearn.preprocessing import StandardScaler
import pickle
import threading
import time
from collections import deque
import logging
class OnlineLearningSystem:
def __init__(self, initial_model_path=None, learning_rate=0.01):
# Initialize or load existing model
if initial_model_path:
self.load_model(initial_model_path)
else:
self.model = SGDRegressor(
learning_rate='adaptive',
eta0=learning_rate,
random_state=42
)
self.scaler = StandardScaler()
self.is_fitted = False
# Online learning configuration
self.batch_size = 100
self.update_frequency = 300 # seconds
self.performance_window = 1000
# Data buffers for streaming updates
self.feature_buffer = deque(maxlen=10000)
self.target_buffer = deque(maxlen=10000)
self.prediction_errors = deque(maxlen=self.performance_window)
# Thread-safe model access
self.model_lock = threading.RLock()
# Performance tracking
self.model_version = 1
self.last_update_time = time.time()
self.training_metrics = {
'samples_processed': 0,
'avg_error': 0.0,
'model_drift_score': 0.0
}
# Start background update thread
self.update_thread = threading.Thread(target=self._update_loop, daemon=True)
self.update_thread.start()
def predict(self, features: np.ndarray) -> Dict:
"""Thread-safe prediction with performance tracking"""
start_time = time.time()
with self.model_lock:
if not self.is_fitted:
return {"error": "Model not yet trained"}
# Normalize features
features_scaled = self.scaler.transform(features.reshape(1, -1))
# Make prediction
prediction = self.model.predict(features_scaled)[0]
# Calculate prediction uncertainty (for SGD models)
decision_function = getattr(self.model, 'decision_function', None)
uncertainty = 0.1 # Default uncertainty
if decision_function:
decision_score = decision_function(features_scaled)[0]
uncertainty = 1.0 / (1.0 + abs(decision_score)) # Higher uncertainty for scores near 0
latency_ms = (time.time() - start_time) * 1000
return {
"prediction": prediction,
"uncertainty": uncertainty,
"model_version": self.model_version,
"latency_ms": round(latency_ms, 2)
}
def add_feedback(self, features: np.ndarray, true_value: float):
"""Add new training sample for online learning"""
self.feature_buffer.append(features)
self.target_buffer.append(true_value)
# Track prediction error if we have a prediction
if self.is_fitted:
with self.model_lock:
features_scaled = self.scaler.transform(features.reshape(1, -1))
predicted_value = self.model.predict(features_scaled)[0]
error = abs(predicted_value - true_value)
self.prediction_errors.append(error)
def _update_loop(self):
"""Background thread for periodic model updates"""
while True:
try:
time.sleep(self.update_frequency)
if len(self.feature_buffer) >= self.batch_size:
self._incremental_update()
except Exception as e:
logging.error(f"Error in update loop: {e}")
def _incremental_update(self):
"""Perform incremental model update"""
logging.info(f"Starting incremental update with {len(self.feature_buffer)} samples")
# Prepare batch data
batch_features = np.array(list(self.feature_buffer)[-self.batch_size:])
batch_targets = np.array(list(self.target_buffer)[-self.batch_size:])
with self.model_lock:
# Update scaler incrementally
if self.is_fitted:
# Partial fit for online scaling
self.scaler.partial_fit(batch_features)
else:
# Initial fit
self.scaler.fit(batch_features)
# Scale features
batch_features_scaled = self.scaler.transform(batch_features)
# Update model incrementally
if self.is_fitted:
self.model.partial_fit(batch_features_scaled, batch_targets)
else:
self.model.fit(batch_features_scaled, batch_targets)
self.is_fitted = True
# Update model version and metrics
self.model_version += 1
self.training_metrics['samples_processed'] += len(batch_features)
if self.prediction_errors:
self.training_metrics['avg_error'] = np.mean(self.prediction_errors)
# Detect model drift (increasing error trend)
if len(self.prediction_errors) >= self.performance_window:
recent_errors = list(self.prediction_errors)[-100:]
older_errors = list(self.prediction_errors)[-200:-100]
if older_errors: # Avoid division by zero
drift_score = np.mean(recent_errors) / np.mean(older_errors)
self.training_metrics['model_drift_score'] = drift_score
if drift_score > 1.5: # 50% increase in error
logging.warning(f"Model drift detected: {drift_score:.2f}")
self.last_update_time = time.time()
logging.info(f"Model updated to version {self.model_version}")
def get_model_stats(self) -> Dict:
"""Get current model performance statistics"""
return {
"model_version": self.model_version,
"is_fitted": self.is_fitted,
"samples_in_buffer": len(self.feature_buffer),
"last_update_time": self.last_update_time,
"training_metrics": self.training_metrics.copy(),
"time_since_update": time.time() - self.last_update_time
}
def save_model(self, path: str):
"""Save current model state"""
with self.model_lock:
model_data = {
'model': self.model,
'scaler': self.scaler,
'is_fitted': self.is_fitted,
'model_version': self.model_version
}
with open(path, 'wb') as f:
pickle.dump(model_data, f)
def load_model(self, path: str):
"""Load model state"""
with open(path, 'rb') as f:
model_data = pickle.load(f)
self.model = model_data['model']
self.scaler = model_data['scaler']
self.is_fitted = model_data['is_fitted']
self.model_version = model_data.get('model_version', 1)
# Usage example
online_ml = OnlineLearningSystem()
# Simulate streaming predictions and feedback
for i in range(1000):
# Generate sample data
features = np.random.randn(10) # 10-dimensional features
# Make prediction
result = online_ml.predict(features)
# Simulate getting true value and providing feedback
true_value = np.sum(features) + np.random.normal(0, 0.1) # Simple linear relationship
online_ml.add_feedback(features, true_value)
if i % 100 == 0:
stats = online_ml.get_model_stats()
print(f"Step {i}: Avg Error = {stats['training_metrics']['avg_error']:.4f}")
# Save final model
online_ml.save_model("online_model.pkl")🏭 Production Real-time ML Examples
U
Uber Real-time ETA
Dynamic route optimization
- Latency Target:< 100ms
- Updates/Second:1M+ predictions
- Features:Traffic, weather, events
- Architecture:Streaming + caching
S
Spotify Recommendations
Real-time music suggestions
- Latency Target:< 50ms
- Requests/Second:500K+ queries
- Models:100+ recommendation models
- Updates:Real-time learning
N
Netflix Content Ranking
Personalized content feeds
- Latency Target:< 200ms
- Personalization:300M+ users
- A/B Testing:1000+ experiments
- Features:Viewing history, time, device
A
Amazon Product Search
Real-time search ranking
- Latency Target:< 100ms
- Queries/Second:100K+ searches
- Catalog Size:500M+ products
- Personalization:Real-time user context
✅ Real-time ML Best Practices
✅ Do's
- Implement multi-tier caching - Memory, Redis, and CDN layers for different latency needs
- Use async processing - Non-blocking I/O for feature retrieval and model serving
- Monitor feature freshness - Track feature lag and implement SLA monitoring
- Implement circuit breakers - Graceful degradation when dependencies fail
- Batch when possible - Group requests for higher throughput efficiency
❌ Don'ts
- Don't ignore cold start latency - Model loading can take seconds; implement warm pools
- Avoid synchronous database calls - Use async queries and connection pooling
- Don't over-optimize prematurely - Profile first to identify actual bottlenecks
- Avoid single points of failure - Replicate critical components and data
- Don't ignore monitoring - Real-time systems need comprehensive observability
No quiz questions available
Quiz ID "real-time-ml-inference" not found