Skip to main contentSkip to user menuSkip to navigation

Advanced Recommender Systems Architecture

Design production-scale recommender systems with deep learning, real-time personalization, and distributed ranking algorithms

50 min readAdvanced
Not Started
Loading...

Modern Recommender Systems Architecture

Production recommender systems at scale require sophisticated architectures combining collaborative filtering, deep learning, real-time feature engineering, and distributed serving infrastructure. Modern systems like Netflix, Amazon, and Spotify handle billions of recommendations per day with millisecond latency requirements.

Key Challenges

  • Cold Start Problem: Recommending to new users with no interaction history
  • Scalability: Handling millions of users and items with real-time updates
  • Diversity vs Accuracy: Balancing recommendation relevance with exploration
  • Real-time Learning: Incorporating user feedback immediately

Recommender Systems Performance Calculator

10M
1M
128
10,000/s

Performance Metrics

Memory Requirements:5.2 GB
Inference Latency:0.5 ms
GPU Utilization:100.0%
Recommendations/sec:10,000

Optimization: Memory usage within single GPU limits

Production Architecture

Two-Stage Retrieval + Ranking

1. Candidate Retrieval

  • • ANN search (Faiss/ScaNN)
  • • Collaborative filtering
  • • Content-based filtering
  • • Popular/trending items

2. Ranking

  • • Deep neural networks
  • • Multi-task learning
  • • Real-time features
  • • A/B test variants

3. Post-processing

  • • Diversity injection
  • • Business rules
  • • Freshness boosting
  • • Deduplication

Deep Learning Ranking Model

Multi-Task Deep CTR Model

import torch
import torch.nn as nn
from typing import Dict, List, Tuple
import numpy as np

class DeepCTRModel(nn.Module):
    """
    Multi-task deep learning model for recommendation ranking.
    Predicts CTR, conversion rate, and user engagement simultaneously.
    """
    def __init__(
        self,
        user_vocab_size: int = 10_000_000,
        item_vocab_size: int = 1_000_000,
        category_vocab_size: int = 10000,
        embedding_dim: int = 128,
        hidden_dims: List[int] = [512, 256, 128],
        dropout_rate: float = 0.1
    ):
        super().__init__()
        
        # Embedding layers
        self.user_embedding = nn.Embedding(user_vocab_size, embedding_dim)
        self.item_embedding = nn.Embedding(item_vocab_size, embedding_dim)
        self.category_embedding = nn.Embedding(category_vocab_size, embedding_dim)
        
        # Feature interaction layers
        self.feature_dim = embedding_dim * 3 + 10  # 3 embeddings + numerical features
        
        # Shared bottom layers
        shared_layers = []
        input_dim = self.feature_dim
        for hidden_dim in hidden_dims:
            shared_layers.extend([
                nn.Linear(input_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(dropout_rate),
                nn.BatchNorm1d(hidden_dim)
            ])
            input_dim = hidden_dim
        
        self.shared_layers = nn.Sequential(*shared_layers)
        
        # Task-specific heads
        self.ctr_head = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )
        
        self.conversion_head = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(), 
            nn.Linear(64, 1),
            nn.Sigmoid()
        )
        
        self.engagement_head = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.ReLU()  # Predict watch time (positive values)
        )
        
        self._init_weights()
    
    def _init_weights(self):
        """Initialize embeddings with Xavier uniform"""
        for embedding in [self.user_embedding, self.item_embedding, self.category_embedding]:
            nn.init.xavier_uniform_(embedding.weight)
    
    def forward(self, batch: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
        """
        Forward pass for multi-task prediction
        
        Args:
            batch: Dictionary containing:
                - user_ids: [batch_size]
                - item_ids: [batch_size] 
                - category_ids: [batch_size]
                - numerical_features: [batch_size, 10]
        
        Returns:
            Dictionary with ctr, conversion, engagement predictions
        """
        # Get embeddings
        user_emb = self.user_embedding(batch['user_ids'])  # [batch_size, emb_dim]
        item_emb = self.item_embedding(batch['item_ids'])  # [batch_size, emb_dim]
        category_emb = self.category_embedding(batch['category_ids'])  # [batch_size, emb_dim]
        
        # Concatenate all features
        features = torch.cat([
            user_emb,
            item_emb, 
            category_emb,
            batch['numerical_features']
        ], dim=1)  # [batch_size, feature_dim]
        
        # Shared representation
        shared_rep = self.shared_layers(features)  # [batch_size, hidden_dims[-1]]
        
        # Task-specific predictions
        predictions = {
            'ctr': self.ctr_head(shared_rep).squeeze(-1),
            'conversion': self.conversion_head(shared_rep).squeeze(-1),
            'engagement': self.engagement_head(shared_rep).squeeze(-1)
        }
        
        return predictions
    
    def compute_loss(
        self, 
        predictions: Dict[str, torch.Tensor], 
        targets: Dict[str, torch.Tensor],
        task_weights: Dict[str, float] = None
    ) -> torch.Tensor:
        """Multi-task loss computation"""
        if task_weights is None:
            task_weights = {'ctr': 1.0, 'conversion': 1.0, 'engagement': 0.5}
        
        losses = {}
        
        # Binary classification losses
        bce_loss = nn.BCELoss()
        losses['ctr'] = bce_loss(predictions['ctr'], targets['ctr'].float())
        losses['conversion'] = bce_loss(predictions['conversion'], targets['conversion'].float())
        
        # Regression loss for engagement
        mse_loss = nn.MSELoss()
        losses['engagement'] = mse_loss(predictions['engagement'], targets['engagement'].float())
        
        # Weighted total loss
        total_loss = sum(task_weights[task] * loss for task, loss in losses.items())
        
        return total_loss, losses

# Training loop with real-time feature updates
class RecommenderTrainer:
    def __init__(self, model: DeepCTRModel, lr: float = 0.001):
        self.model = model
        self.optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)
        self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
            self.optimizer, T_max=1000, eta_min=1e-6
        )
    
    def train_step(self, batch: Dict[str, torch.Tensor]) -> Dict[str, float]:
        """Single training step with multi-task learning"""
        self.model.train()
        
        # Forward pass
        predictions = self.model(batch)
        
        # Compute loss
        targets = {
            'ctr': batch['clicked'],
            'conversion': batch['converted'], 
            'engagement': batch['watch_time']
        }
        
        total_loss, task_losses = self.model.compute_loss(predictions, targets)
        
        # Backward pass
        self.optimizer.zero_grad()
        total_loss.backward()
        
        # Gradient clipping for stability
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
        
        self.optimizer.step()
        self.scheduler.step()
        
        # Return metrics
        metrics = {
            'total_loss': total_loss.item(),
            'ctr_loss': task_losses['ctr'].item(),
            'conversion_loss': task_losses['conversion'].item(),
            'engagement_loss': task_losses['engagement'].item(),
            'lr': self.scheduler.get_last_lr()[0]
        }
        
        return metrics

# Distributed serving with batching
class RecommenderServing:
    """Production serving system with batching and caching"""
    
    def __init__(self, model_path: str, batch_size: int = 32, max_wait_ms: int = 10):
        self.model = torch.jit.load(model_path)
        self.model.eval()
        self.batch_size = batch_size
        self.max_wait_ms = max_wait_ms
        
        # Request batching
        self.pending_requests = []
        self.request_cache = {}
    
    async def get_recommendations(
        self, 
        user_id: int, 
        candidate_items: List[int],
        context_features: Dict[str, float]
    ) -> List[Tuple[int, float]]:
        """
        Get ranked recommendations for a user
        
        Returns list of (item_id, score) tuples sorted by score descending
        """
        # Check cache first
        cache_key = f"{user_id}_{hash(tuple(candidate_items))}"
        if cache_key in self.request_cache:
            return self.request_cache[cache_key]
        
        # Prepare batch input
        batch = self._prepare_batch(user_id, candidate_items, context_features)
        
        # Model inference
        with torch.no_grad():
            predictions = self.model(batch)
        
        # Compute final scores (weighted combination)
        final_scores = (
            0.7 * predictions['ctr'] + 
            0.2 * predictions['conversion'] + 
            0.1 * (predictions['engagement'] / 100)  # Normalize engagement
        )
        
        # Rank items by score
        ranked_items = [
            (candidate_items[i], float(final_scores[i]))
            for i in torch.argsort(final_scores, descending=True)
        ]
        
        # Cache result
        self.request_cache[cache_key] = ranked_items
        
        return ranked_items
    
    def _prepare_batch(
        self, 
        user_id: int, 
        candidate_items: List[int],
        context_features: Dict[str, float]
    ) -> Dict[str, torch.Tensor]:
        """Convert input to model batch format"""
        batch_size = len(candidate_items)
        
        return {
            'user_ids': torch.tensor([user_id] * batch_size),
            'item_ids': torch.tensor(candidate_items),
            'category_ids': torch.tensor([0] * batch_size),  # Lookup from item catalog
            'numerical_features': torch.tensor([
                list(context_features.values()) for _ in range(batch_size)
            ])
        }

Real-time Feature Engineering

Streaming Feature Pipeline

import redis
import json
from kafka import KafkaConsumer, KafkaProducer
from typing import Dict, List, Any
import numpy as np
from collections import defaultdict
import time

class RealTimeFeatureStore:
    """
    Real-time feature store for recommender systems
    Processes user interactions and maintains fresh features
    """
    
    def __init__(self, redis_host: str = 'localhost', redis_port: int = 6379):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
        
        # Kafka for streaming events
        self.consumer = KafkaConsumer(
            'user_events',
            'item_events', 
            bootstrap_servers=['localhost:9092'],
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))
        )
        
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda x: json.dumps(x).encode('utf-8')
        )
        
        # Feature aggregation windows
        self.time_windows = [300, 1800, 3600, 86400]  # 5min, 30min, 1hr, 1day
        
    def process_user_interaction(self, event: Dict[str, Any]):
        """Process real-time user interaction event"""
        user_id = event['user_id']
        item_id = event['item_id']
        action_type = event['action_type']  # click, view, like, share, purchase
        timestamp = event['timestamp']
        
        # Update immediate user features
        self._update_user_features(user_id, action_type, timestamp)
        
        # Update item popularity features  
        self._update_item_features(item_id, action_type, timestamp)
        
        # Update user-item interaction history
        self._update_interaction_history(user_id, item_id, action_type, timestamp)
        
        # Trigger model retraining if needed
        self._check_model_refresh_trigger(user_id)
    
    def _update_user_features(self, user_id: str, action_type: str, timestamp: float):
        """Update real-time user behavioral features"""
        
        for window in self.time_windows:
            window_key = f"user:{user_id}:features:{window}"
            
            # Get or initialize feature dict
            features = self.redis_client.hgetall(window_key)
            if not features:
                features = defaultdict(float)
            else:
                features = {k: float(v) for k, v in features.items()}
            
            # Update activity counts
            features[f'{action_type}_count'] += 1
            features['total_actions'] += 1
            features['last_activity'] = timestamp
            
            # Calculate activity rate (actions per minute)
            if 'first_activity' not in features:
                features['first_activity'] = timestamp
            
            time_diff = timestamp - float(features['first_activity'])
            if time_diff > 0:
                features['activity_rate'] = features['total_actions'] / (time_diff / 60)
            
            # Update engagement score (weighted by action importance)
            action_weights = {
                'view': 1.0,
                'click': 2.0, 
                'like': 3.0,
                'share': 4.0,
                'purchase': 5.0
            }
            features['engagement_score'] += action_weights.get(action_type, 1.0)
            
            # Store updated features with expiration
            self.redis_client.hset(window_key, mapping={k: str(v) for k, v in features.items()})
            self.redis_client.expire(window_key, window * 2)  # Keep 2x window duration
    
    def _update_item_features(self, item_id: str, action_type: str, timestamp: float):
        """Update real-time item popularity features"""
        
        for window in self.time_windows:
            window_key = f"item:{item_id}:features:{window}"
            
            features = self.redis_client.hgetall(window_key)
            if not features:
                features = defaultdict(float)
            else:
                features = {k: float(v) for k, v in features.items()}
            
            # Update popularity metrics
            features[f'{action_type}_count'] += 1
            features['total_interactions'] += 1
            features['last_interaction'] = timestamp
            
            # Calculate velocity (interactions per hour)
            if 'first_interaction' not in features:
                features['first_interaction'] = timestamp
            
            time_diff = timestamp - float(features['first_interaction'])
            if time_diff > 0:
                features['velocity'] = features['total_interactions'] / (time_diff / 3600)
            
            # Trending score (recent interactions weighted higher)
            recent_weight = 1.0 / max(1, (time.time() - timestamp) / 3600)  # Decay over hours
            features['trending_score'] = features.get('trending_score', 0) + recent_weight
            
            self.redis_client.hset(window_key, mapping={k: str(v) for k, v in features.items()})
            self.redis_client.expire(window_key, window * 2)
    
    def get_user_features(self, user_id: str, window: int = 3600) -> Dict[str, float]:
        """Get real-time user features for model serving"""
        window_key = f"user:{user_id}:features:{window}"
        features = self.redis_client.hgetall(window_key)
        
        if not features:
            # Cold start features
            return {
                'total_actions': 0.0,
                'activity_rate': 0.0,
                'engagement_score': 0.0,
                'click_count': 0.0,
                'view_count': 0.0,
                'like_count': 0.0,
                'share_count': 0.0,
                'purchase_count': 0.0,
                'last_activity': 0.0
            }
        
        return {k: float(v) for k, v in features.items()}
    
    def get_item_features(self, item_id: str, window: int = 3600) -> Dict[str, float]:
        """Get real-time item features for model serving"""
        window_key = f"item:{item_id}:features:{window}"
        features = self.redis_client.hgetall(window_key)
        
        if not features:
            return {
                'total_interactions': 0.0,
                'velocity': 0.0,
                'trending_score': 0.0,
                'click_count': 0.0,
                'view_count': 0.0,
                'like_count': 0.0,
                'share_count': 0.0,
                'purchase_count': 0.0,
                'last_interaction': 0.0
            }
        
        return {k: float(v) for k, v in features.items()}
    
    def run_stream_processor(self):
        """Main streaming processor loop"""
        print("Starting real-time feature processor...")
        
        for message in self.consumer:
            try:
                event = message.value
                self.process_user_interaction(event)
                
                # Publish processed features for downstream consumers
                self.producer.send('processed_features', value=event)
                
            except Exception as e:
                print(f"Error processing event: {e}")
                continue

# Online learning for model updates
class OnlineLearningSystem:
    """Incremental learning system for recommendation models"""
    
    def __init__(self, base_model_path: str):
        self.base_model = torch.jit.load(base_model_path)
        self.online_optimizer = torch.optim.SGD(
            self.base_model.parameters(), 
            lr=0.0001,
            momentum=0.9
        )
        
        # Buffer for online samples
        self.sample_buffer = []
        self.buffer_size = 1000
        
    def update_model_online(self, interaction_batch: List[Dict]):
        """Update model with recent interactions"""
        if len(interaction_batch) < 32:  # Wait for minimum batch
            return
        
        # Convert interactions to training batch
        batch = self._prepare_online_batch(interaction_batch)
        
        # Online gradient update
        self.base_model.train()
        predictions = self.base_model(batch)
        
        targets = {
            'ctr': batch['labels']['clicked'],
            'conversion': batch['labels']['converted'],
            'engagement': batch['labels']['watch_time']
        }
        
        loss, _ = self.base_model.compute_loss(predictions, targets)
        
        # Small learning rate for stability
        self.online_optimizer.zero_grad()
        loss.backward()
        
        # Gradient clipping more aggressive for online learning
        torch.nn.utils.clip_grad_norm_(self.base_model.parameters(), max_norm=0.1)
        
        self.online_optimizer.step()
        
        return {'online_loss': loss.item()}

Real-World Production Systems

Netflix

Video Recommendation Engine

  • Scale: 230M+ users, 15,000+ titles
  • Architecture: Two-stage (candidate + ranking)
  • Models: Deep neural networks with 1000+ features
  • Real-time: A/B testing on 80% of recommendations
  • Latency: <100ms for homepage personalization
  • Business Impact: 80% of content consumption from recommendations
Amazon

Product Recommendations

  • Scale: 300M+ users, 100M+ products
  • Architecture: Item-to-item collaborative filtering
  • Models: Matrix factorization + deep learning
  • Real-time: Purchase history updated within seconds
  • Latency: <50ms for product page recommendations
  • Business Impact: 35% of revenue from recommendations
Spotify

Music Discovery Platform

  • Scale: 400M+ users, 70M+ tracks
  • Architecture: Content + collaborative + NLP features
  • Models: Deep learning with audio signal processing
  • Real-time: Skip behavior updates recommendations instantly
  • Latency: <200ms for Discover Weekly generation
  • Business Impact: 40% of listening time from algorithmic playlists
TikTok

For You Page Algorithm

  • Scale: 1B+ users, millions of videos daily
  • Architecture: Multi-armed bandit + deep reinforcement learning
  • Models: Transformer-based with video understanding
  • Real-time: Immediate feedback incorporation
  • Latency: <50ms for feed refresh
  • Business Impact: 90%+ of user engagement from FYP

Production Best Practices

✅ Do

  • Use two-stage architecture - Retrieve top-K candidates efficiently, then rank with complex models
  • Implement online learning - Update models with fresh user interactions
  • A/B test everything - Measure business metrics, not just accuracy
  • Handle cold start gracefully - Popular items, content-based features, and onboarding flows
  • Optimize for diversity - Balance relevance with exploration and serendipity

❌ Don't

  • Optimize only for accuracy - Business metrics like engagement and revenue matter more
  • Ignore computational constraints - Real-time serving requires efficient models
  • Create filter bubbles - Pure exploitation leads to narrow recommendation sets
  • Neglect feedback loops - Recommendation systems create their own training data
  • Ignore freshness - Stale recommendations hurt user experience
No quiz questions available
Quiz ID "recommender-systems-advanced" not found