Skip to main contentSkip to user menuSkip to navigation

Real-time ML Inference Systems

Build high-performance real-time ML systems with streaming features and low-latency serving

50 min readAdvanced
Not Started
Loading...

⚡ Real-time Inference Performance Calculator

10,000 RPS
500MB
1000 dimensions

Performance Metrics

Cached Latency:9ms
Uncached Latency:11ms
Latency Improvement:20% faster
Effective Throughput:3,523 RPS
Memory Usage:750MB
Cache Hit Rate:85%
Cost Efficiency:47/10

Strategy: In-memory key-value cache

🏗️ Real-time ML Architecture Patterns

Streaming Architecture

Process data streams in real-time with continuous model updates

Latency Target:< 10ms
Throughput:Very High
Consistency:Eventual
Complexity:High

Request-Response

Synchronous inference serving with immediate results

Latency Target:< 100ms
Throughput:High
Consistency:Strong
Complexity:Medium

Micro-batch Streaming

Small batch processing for latency-throughput balance

Latency Target:< 50ms
Throughput:Very High
Consistency:Near Real-time
Complexity:Medium

Edge Caching

Pre-computed results with real-time cache updates

Latency Target:< 5ms
Throughput:Extreme
Consistency:Eventual
Complexity:High

💻 Implementation Examples

1. High-Throughput Streaming Inference

import asyncio
import numpy as np
from kafka import KafkaConsumer, KafkaProducer
import redis
import torch
from concurrent.futures import ThreadPoolExecutor
import time

class StreamingMLPipeline:
    def __init__(self, model_path, redis_host='localhost'):
        # Load optimized model (TorchScript, ONNX, etc.)
        self.model = torch.jit.load(model_path)
        self.model.eval()
        
        # Feature cache for low-latency serving
        self.redis_client = redis.Redis(host=redis_host, decode_responses=True)
        
        # Async processing pools
        self.feature_executor = ThreadPoolExecutor(max_workers=4)
        self.inference_executor = ThreadPoolExecutor(max_workers=8)
        
        # Kafka setup for streaming
        self.consumer = KafkaConsumer(
            'ml-features',
            bootstrap_servers=['localhost:9092'],
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))
        )
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda x: json.dumps(x).encode('utf-8')
        )
        
    async def process_stream(self):
        """Process streaming data with batching for efficiency"""
        batch = []
        batch_size = 32
        last_batch_time = time.time()
        
        for message in self.consumer:
            data = message.value
            batch.append(data)
            
            # Process batch when full or timeout reached
            should_process = (
                len(batch) >= batch_size or 
                time.time() - last_batch_time > 0.050  # 50ms timeout
            )
            
            if should_process:
                await self.process_batch(batch)
                batch = []
                last_batch_time = time.time()
    
    async def process_batch(self, batch):
        """Parallel feature extraction and inference"""
        # Extract features in parallel
        feature_tasks = []
        for item in batch:
            task = asyncio.create_task(
                self.get_features_async(item['user_id'], item['context'])
            )
            feature_tasks.append(task)
        
        features = await asyncio.gather(*feature_tasks)
        
        # Batch inference
        feature_tensor = torch.stack([f for f in features if f is not None])
        
        with torch.no_grad():
            predictions = self.model(feature_tensor)
            scores = torch.softmax(predictions, dim=1)
        
        # Send results back
        for i, item in enumerate(batch):
            if features[i] is not None:
                result = {
                    'user_id': item['user_id'],
                    'prediction': scores[i].tolist(),
                    'timestamp': time.time(),
                    'latency_ms': (time.time() - item['timestamp']) * 1000
                }
                self.producer.send('ml-predictions', value=result)
    
    async def get_features_async(self, user_id, context):
        """Async feature retrieval with caching"""
        cache_key = f"features:{user_id}:{hash(str(context))}"
        
        # Try cache first
        cached = self.redis_client.get(cache_key)
        if cached:
            return torch.tensor(json.loads(cached))
        
        # Compute features if not cached
        features = await asyncio.get_event_loop().run_in_executor(
            self.feature_executor,
            self.compute_features,
            user_id, context
        )
        
        # Cache for future requests (TTL: 5 minutes)
        self.redis_client.setex(cache_key, 300, json.dumps(features.tolist()))
        
        return features
    
    def compute_features(self, user_id, context):
        """Feature computation (can be expensive)"""
        # Real feature engineering would be more complex
        user_features = self.get_user_profile(user_id)
        context_features = self.extract_context_features(context)
        
        # Combine and normalize
        combined = np.concatenate([user_features, context_features])
        return torch.tensor(combined, dtype=torch.float32)

# Usage
async def main():
    pipeline = StreamingMLPipeline('optimized_model.pt')
    await pipeline.process_stream()

if __name__ == "__main__":
    asyncio.run(main())

2. Multi-Model Serving with Dynamic Routing

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import numpy as np
from typing import Dict, List, Optional
import asyncio
import time
from functools import lru_cache

class PredictionRequest(BaseModel):
    user_id: str
    features: List[float]
    model_variant: Optional[str] = "default"
    latency_budget_ms: Optional[int] = 100

class MultiModelServer:
    def __init__(self):
        # Load multiple model variants
        self.models = {
            "fast": torch.jit.load("models/fast_model.pt"),      # 10ms, 85% accuracy
            "default": torch.jit.load("models/default_model.pt"), # 50ms, 92% accuracy
            "accurate": torch.jit.load("models/accurate_model.pt") # 200ms, 95% accuracy
        }
        
        # Model performance profiles
        self.model_profiles = {
            "fast": {"latency_ms": 10, "accuracy": 0.85, "memory_mb": 50},
            "default": {"latency_ms": 50, "accuracy": 0.92, "memory_mb": 200},
            "accurate": {"latency_ms": 200, "accuracy": 0.95, "memory_mb": 800}
        }
        
        # Performance monitoring
        self.performance_stats = {model: [] for model in self.models.keys()}
        
        for model in self.models.values():
            model.eval()
    
    def select_optimal_model(self, latency_budget_ms: int, user_tier: str = "standard") -> str:
        """Dynamically select best model based on constraints"""
        
        # Premium users get better models
        if user_tier == "premium":
            latency_budget_ms *= 2
        
        # Find best model within latency budget
        best_model = "fast"  # fallback
        best_accuracy = 0
        
        for model_name, profile in self.model_profiles.items():
            if profile["latency_ms"] <= latency_budget_ms:
                if profile["accuracy"] > best_accuracy:
                    best_accuracy = profile["accuracy"]
                    best_model = model_name
        
        return best_model
    
    async def predict_with_model(self, model_name: str, features: np.ndarray) -> Dict:
        """Execute prediction with specific model"""
        start_time = time.time()
        
        model = self.models[model_name]
        feature_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0)
        
        with torch.no_grad():
            logits = model(feature_tensor)
            probabilities = torch.softmax(logits, dim=1)
            prediction = torch.argmax(probabilities, dim=1)
        
        latency_ms = (time.time() - start_time) * 1000
        
        # Update performance stats
        self.performance_stats[model_name].append(latency_ms)
        if len(self.performance_stats[model_name]) > 1000:
            self.performance_stats[model_name].pop(0)  # Keep recent 1000
        
        return {
            "prediction": prediction.item(),
            "confidence": probabilities.max().item(),
            "probabilities": probabilities.squeeze().tolist(),
            "model_used": model_name,
            "latency_ms": round(latency_ms, 2)
        }
    
    @lru_cache(maxsize=10000)
    def get_user_tier(self, user_id: str) -> str:
        """Cache user tier lookup"""
        # In practice, this would query user database
        return "premium" if user_id.endswith("_premium") else "standard"

app = FastAPI()
ml_server = MultiModelServer()

@app.post("/predict")
async def predict(request: PredictionRequest):
    try:
        # Determine user tier and optimal model
        user_tier = ml_server.get_user_tier(request.user_id)
        
        if request.model_variant == "auto":
            model_name = ml_server.select_optimal_model(
                request.latency_budget_ms, user_tier
            )
        else:
            model_name = request.model_variant
            
        if model_name not in ml_server.models:
            raise HTTPException(status_code=400, detail=f"Model {model_name} not available")
        
        # Execute prediction
        features = np.array(request.features)
        result = await ml_server.predict_with_model(model_name, features)
        
        return result
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health():
    """Health check with model performance stats"""
    stats = {}
    for model_name, latencies in ml_server.performance_stats.items():
        if latencies:
            stats[model_name] = {
                "avg_latency_ms": round(np.mean(latencies), 2),
                "p95_latency_ms": round(np.percentile(latencies, 95), 2),
                "request_count": len(latencies)
            }
    
    return {"status": "healthy", "model_stats": stats}

# Run with: uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4

3. Online Learning with Streaming Updates

import numpy as np
from sklearn.linear_model import SGDRegressor
from sklearn.preprocessing import StandardScaler
import pickle
import threading
import time
from collections import deque
import logging

class OnlineLearningSystem:
    def __init__(self, initial_model_path=None, learning_rate=0.01):
        # Initialize or load existing model
        if initial_model_path:
            self.load_model(initial_model_path)
        else:
            self.model = SGDRegressor(
                learning_rate='adaptive',
                eta0=learning_rate,
                random_state=42
            )
            self.scaler = StandardScaler()
            self.is_fitted = False
        
        # Online learning configuration
        self.batch_size = 100
        self.update_frequency = 300  # seconds
        self.performance_window = 1000
        
        # Data buffers for streaming updates
        self.feature_buffer = deque(maxlen=10000)
        self.target_buffer = deque(maxlen=10000)
        self.prediction_errors = deque(maxlen=self.performance_window)
        
        # Thread-safe model access
        self.model_lock = threading.RLock()
        
        # Performance tracking
        self.model_version = 1
        self.last_update_time = time.time()
        self.training_metrics = {
            'samples_processed': 0,
            'avg_error': 0.0,
            'model_drift_score': 0.0
        }
        
        # Start background update thread
        self.update_thread = threading.Thread(target=self._update_loop, daemon=True)
        self.update_thread.start()
    
    def predict(self, features: np.ndarray) -> Dict:
        """Thread-safe prediction with performance tracking"""
        start_time = time.time()
        
        with self.model_lock:
            if not self.is_fitted:
                return {"error": "Model not yet trained"}
            
            # Normalize features
            features_scaled = self.scaler.transform(features.reshape(1, -1))
            
            # Make prediction
            prediction = self.model.predict(features_scaled)[0]
            
            # Calculate prediction uncertainty (for SGD models)
            decision_function = getattr(self.model, 'decision_function', None)
            uncertainty = 0.1  # Default uncertainty
            
            if decision_function:
                decision_score = decision_function(features_scaled)[0]
                uncertainty = 1.0 / (1.0 + abs(decision_score))  # Higher uncertainty for scores near 0
        
        latency_ms = (time.time() - start_time) * 1000
        
        return {
            "prediction": prediction,
            "uncertainty": uncertainty,
            "model_version": self.model_version,
            "latency_ms": round(latency_ms, 2)
        }
    
    def add_feedback(self, features: np.ndarray, true_value: float):
        """Add new training sample for online learning"""
        self.feature_buffer.append(features)
        self.target_buffer.append(true_value)
        
        # Track prediction error if we have a prediction
        if self.is_fitted:
            with self.model_lock:
                features_scaled = self.scaler.transform(features.reshape(1, -1))
                predicted_value = self.model.predict(features_scaled)[0]
                error = abs(predicted_value - true_value)
                self.prediction_errors.append(error)
    
    def _update_loop(self):
        """Background thread for periodic model updates"""
        while True:
            try:
                time.sleep(self.update_frequency)
                
                if len(self.feature_buffer) >= self.batch_size:
                    self._incremental_update()
                    
            except Exception as e:
                logging.error(f"Error in update loop: {e}")
    
    def _incremental_update(self):
        """Perform incremental model update"""
        logging.info(f"Starting incremental update with {len(self.feature_buffer)} samples")
        
        # Prepare batch data
        batch_features = np.array(list(self.feature_buffer)[-self.batch_size:])
        batch_targets = np.array(list(self.target_buffer)[-self.batch_size:])
        
        with self.model_lock:
            # Update scaler incrementally
            if self.is_fitted:
                # Partial fit for online scaling
                self.scaler.partial_fit(batch_features)
            else:
                # Initial fit
                self.scaler.fit(batch_features)
            
            # Scale features
            batch_features_scaled = self.scaler.transform(batch_features)
            
            # Update model incrementally
            if self.is_fitted:
                self.model.partial_fit(batch_features_scaled, batch_targets)
            else:
                self.model.fit(batch_features_scaled, batch_targets)
                self.is_fitted = True
            
            # Update model version and metrics
            self.model_version += 1
            self.training_metrics['samples_processed'] += len(batch_features)
            
            if self.prediction_errors:
                self.training_metrics['avg_error'] = np.mean(self.prediction_errors)
                
                # Detect model drift (increasing error trend)
                if len(self.prediction_errors) >= self.performance_window:
                    recent_errors = list(self.prediction_errors)[-100:]
                    older_errors = list(self.prediction_errors)[-200:-100]
                    
                    if older_errors:  # Avoid division by zero
                        drift_score = np.mean(recent_errors) / np.mean(older_errors)
                        self.training_metrics['model_drift_score'] = drift_score
                        
                        if drift_score > 1.5:  # 50% increase in error
                            logging.warning(f"Model drift detected: {drift_score:.2f}")
        
        self.last_update_time = time.time()
        logging.info(f"Model updated to version {self.model_version}")
    
    def get_model_stats(self) -> Dict:
        """Get current model performance statistics"""
        return {
            "model_version": self.model_version,
            "is_fitted": self.is_fitted,
            "samples_in_buffer": len(self.feature_buffer),
            "last_update_time": self.last_update_time,
            "training_metrics": self.training_metrics.copy(),
            "time_since_update": time.time() - self.last_update_time
        }
    
    def save_model(self, path: str):
        """Save current model state"""
        with self.model_lock:
            model_data = {
                'model': self.model,
                'scaler': self.scaler,
                'is_fitted': self.is_fitted,
                'model_version': self.model_version
            }
            
            with open(path, 'wb') as f:
                pickle.dump(model_data, f)
    
    def load_model(self, path: str):
        """Load model state"""
        with open(path, 'rb') as f:
            model_data = pickle.load(f)
        
        self.model = model_data['model']
        self.scaler = model_data['scaler']
        self.is_fitted = model_data['is_fitted']
        self.model_version = model_data.get('model_version', 1)

# Usage example
online_ml = OnlineLearningSystem()

# Simulate streaming predictions and feedback
for i in range(1000):
    # Generate sample data
    features = np.random.randn(10)  # 10-dimensional features
    
    # Make prediction
    result = online_ml.predict(features)
    
    # Simulate getting true value and providing feedback
    true_value = np.sum(features) + np.random.normal(0, 0.1)  # Simple linear relationship
    online_ml.add_feedback(features, true_value)
    
    if i % 100 == 0:
        stats = online_ml.get_model_stats()
        print(f"Step {i}: Avg Error = {stats['training_metrics']['avg_error']:.4f}")

# Save final model
online_ml.save_model("online_model.pkl")

🏭 Production Real-time ML Examples

U

Uber Real-time ETA

Dynamic route optimization

  • Latency Target:< 100ms
  • Updates/Second:1M+ predictions
  • Features:Traffic, weather, events
  • Architecture:Streaming + caching
S

Spotify Recommendations

Real-time music suggestions

  • Latency Target:< 50ms
  • Requests/Second:500K+ queries
  • Models:100+ recommendation models
  • Updates:Real-time learning
N

Netflix Content Ranking

Personalized content feeds

  • Latency Target:< 200ms
  • Personalization:300M+ users
  • A/B Testing:1000+ experiments
  • Features:Viewing history, time, device
A

Amazon Product Search

Real-time search ranking

  • Latency Target:< 100ms
  • Queries/Second:100K+ searches
  • Catalog Size:500M+ products
  • Personalization:Real-time user context

✅ Real-time ML Best Practices

✅ Do's

  • Implement multi-tier caching - Memory, Redis, and CDN layers for different latency needs
  • Use async processing - Non-blocking I/O for feature retrieval and model serving
  • Monitor feature freshness - Track feature lag and implement SLA monitoring
  • Implement circuit breakers - Graceful degradation when dependencies fail
  • Batch when possible - Group requests for higher throughput efficiency

❌ Don'ts

  • Don't ignore cold start latency - Model loading can take seconds; implement warm pools
  • Avoid synchronous database calls - Use async queries and connection pooling
  • Don't over-optimize prematurely - Profile first to identify actual bottlenecks
  • Avoid single points of failure - Replicate critical components and data
  • Don't ignore monitoring - Real-time systems need comprehensive observability
No quiz questions available
Quiz ID "real-time-ml-inference" not found