Advanced Recommender Systems Architecture
Design production-scale recommender systems with deep learning, real-time personalization, and distributed ranking algorithms
50 min read•Advanced
Not Started
Loading...
Modern Recommender Systems Architecture
Production recommender systems at scale require sophisticated architectures combining collaborative filtering, deep learning, real-time feature engineering, and distributed serving infrastructure. Modern systems like Netflix, Amazon, and Spotify handle billions of recommendations per day with millisecond latency requirements.
Key Challenges
- Cold Start Problem: Recommending to new users with no interaction history
- Scalability: Handling millions of users and items with real-time updates
- Diversity vs Accuracy: Balancing recommendation relevance with exploration
- Real-time Learning: Incorporating user feedback immediately
Recommender Systems Performance Calculator
10M
1M
128
10,000/s
Performance Metrics
Memory Requirements:5.2 GB
Inference Latency:0.5 ms
GPU Utilization:100.0%
Recommendations/sec:10,000
Optimization: Memory usage within single GPU limits
Production Architecture
Two-Stage Retrieval + Ranking
1. Candidate Retrieval
- • ANN search (Faiss/ScaNN)
- • Collaborative filtering
- • Content-based filtering
- • Popular/trending items
2. Ranking
- • Deep neural networks
- • Multi-task learning
- • Real-time features
- • A/B test variants
3. Post-processing
- • Diversity injection
- • Business rules
- • Freshness boosting
- • Deduplication
Deep Learning Ranking Model
Multi-Task Deep CTR Model
import torch
import torch.nn as nn
from typing import Dict, List, Tuple
import numpy as np
class DeepCTRModel(nn.Module):
"""
Multi-task deep learning model for recommendation ranking.
Predicts CTR, conversion rate, and user engagement simultaneously.
"""
def __init__(
self,
user_vocab_size: int = 10_000_000,
item_vocab_size: int = 1_000_000,
category_vocab_size: int = 10000,
embedding_dim: int = 128,
hidden_dims: List[int] = [512, 256, 128],
dropout_rate: float = 0.1
):
super().__init__()
# Embedding layers
self.user_embedding = nn.Embedding(user_vocab_size, embedding_dim)
self.item_embedding = nn.Embedding(item_vocab_size, embedding_dim)
self.category_embedding = nn.Embedding(category_vocab_size, embedding_dim)
# Feature interaction layers
self.feature_dim = embedding_dim * 3 + 10 # 3 embeddings + numerical features
# Shared bottom layers
shared_layers = []
input_dim = self.feature_dim
for hidden_dim in hidden_dims:
shared_layers.extend([
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(dropout_rate),
nn.BatchNorm1d(hidden_dim)
])
input_dim = hidden_dim
self.shared_layers = nn.Sequential(*shared_layers)
# Task-specific heads
self.ctr_head = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 1),
nn.Sigmoid()
)
self.conversion_head = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 1),
nn.Sigmoid()
)
self.engagement_head = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 1),
nn.ReLU() # Predict watch time (positive values)
)
self._init_weights()
def _init_weights(self):
"""Initialize embeddings with Xavier uniform"""
for embedding in [self.user_embedding, self.item_embedding, self.category_embedding]:
nn.init.xavier_uniform_(embedding.weight)
def forward(self, batch: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
"""
Forward pass for multi-task prediction
Args:
batch: Dictionary containing:
- user_ids: [batch_size]
- item_ids: [batch_size]
- category_ids: [batch_size]
- numerical_features: [batch_size, 10]
Returns:
Dictionary with ctr, conversion, engagement predictions
"""
# Get embeddings
user_emb = self.user_embedding(batch['user_ids']) # [batch_size, emb_dim]
item_emb = self.item_embedding(batch['item_ids']) # [batch_size, emb_dim]
category_emb = self.category_embedding(batch['category_ids']) # [batch_size, emb_dim]
# Concatenate all features
features = torch.cat([
user_emb,
item_emb,
category_emb,
batch['numerical_features']
], dim=1) # [batch_size, feature_dim]
# Shared representation
shared_rep = self.shared_layers(features) # [batch_size, hidden_dims[-1]]
# Task-specific predictions
predictions = {
'ctr': self.ctr_head(shared_rep).squeeze(-1),
'conversion': self.conversion_head(shared_rep).squeeze(-1),
'engagement': self.engagement_head(shared_rep).squeeze(-1)
}
return predictions
def compute_loss(
self,
predictions: Dict[str, torch.Tensor],
targets: Dict[str, torch.Tensor],
task_weights: Dict[str, float] = None
) -> torch.Tensor:
"""Multi-task loss computation"""
if task_weights is None:
task_weights = {'ctr': 1.0, 'conversion': 1.0, 'engagement': 0.5}
losses = {}
# Binary classification losses
bce_loss = nn.BCELoss()
losses['ctr'] = bce_loss(predictions['ctr'], targets['ctr'].float())
losses['conversion'] = bce_loss(predictions['conversion'], targets['conversion'].float())
# Regression loss for engagement
mse_loss = nn.MSELoss()
losses['engagement'] = mse_loss(predictions['engagement'], targets['engagement'].float())
# Weighted total loss
total_loss = sum(task_weights[task] * loss for task, loss in losses.items())
return total_loss, losses
# Training loop with real-time feature updates
class RecommenderTrainer:
def __init__(self, model: DeepCTRModel, lr: float = 0.001):
self.model = model
self.optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)
self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
self.optimizer, T_max=1000, eta_min=1e-6
)
def train_step(self, batch: Dict[str, torch.Tensor]) -> Dict[str, float]:
"""Single training step with multi-task learning"""
self.model.train()
# Forward pass
predictions = self.model(batch)
# Compute loss
targets = {
'ctr': batch['clicked'],
'conversion': batch['converted'],
'engagement': batch['watch_time']
}
total_loss, task_losses = self.model.compute_loss(predictions, targets)
# Backward pass
self.optimizer.zero_grad()
total_loss.backward()
# Gradient clipping for stability
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
self.scheduler.step()
# Return metrics
metrics = {
'total_loss': total_loss.item(),
'ctr_loss': task_losses['ctr'].item(),
'conversion_loss': task_losses['conversion'].item(),
'engagement_loss': task_losses['engagement'].item(),
'lr': self.scheduler.get_last_lr()[0]
}
return metrics
# Distributed serving with batching
class RecommenderServing:
"""Production serving system with batching and caching"""
def __init__(self, model_path: str, batch_size: int = 32, max_wait_ms: int = 10):
self.model = torch.jit.load(model_path)
self.model.eval()
self.batch_size = batch_size
self.max_wait_ms = max_wait_ms
# Request batching
self.pending_requests = []
self.request_cache = {}
async def get_recommendations(
self,
user_id: int,
candidate_items: List[int],
context_features: Dict[str, float]
) -> List[Tuple[int, float]]:
"""
Get ranked recommendations for a user
Returns list of (item_id, score) tuples sorted by score descending
"""
# Check cache first
cache_key = f"{user_id}_{hash(tuple(candidate_items))}"
if cache_key in self.request_cache:
return self.request_cache[cache_key]
# Prepare batch input
batch = self._prepare_batch(user_id, candidate_items, context_features)
# Model inference
with torch.no_grad():
predictions = self.model(batch)
# Compute final scores (weighted combination)
final_scores = (
0.7 * predictions['ctr'] +
0.2 * predictions['conversion'] +
0.1 * (predictions['engagement'] / 100) # Normalize engagement
)
# Rank items by score
ranked_items = [
(candidate_items[i], float(final_scores[i]))
for i in torch.argsort(final_scores, descending=True)
]
# Cache result
self.request_cache[cache_key] = ranked_items
return ranked_items
def _prepare_batch(
self,
user_id: int,
candidate_items: List[int],
context_features: Dict[str, float]
) -> Dict[str, torch.Tensor]:
"""Convert input to model batch format"""
batch_size = len(candidate_items)
return {
'user_ids': torch.tensor([user_id] * batch_size),
'item_ids': torch.tensor(candidate_items),
'category_ids': torch.tensor([0] * batch_size), # Lookup from item catalog
'numerical_features': torch.tensor([
list(context_features.values()) for _ in range(batch_size)
])
}Real-time Feature Engineering
Streaming Feature Pipeline
import redis
import json
from kafka import KafkaConsumer, KafkaProducer
from typing import Dict, List, Any
import numpy as np
from collections import defaultdict
import time
class RealTimeFeatureStore:
"""
Real-time feature store for recommender systems
Processes user interactions and maintains fresh features
"""
def __init__(self, redis_host: str = 'localhost', redis_port: int = 6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
# Kafka for streaming events
self.consumer = KafkaConsumer(
'user_events',
'item_events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# Feature aggregation windows
self.time_windows = [300, 1800, 3600, 86400] # 5min, 30min, 1hr, 1day
def process_user_interaction(self, event: Dict[str, Any]):
"""Process real-time user interaction event"""
user_id = event['user_id']
item_id = event['item_id']
action_type = event['action_type'] # click, view, like, share, purchase
timestamp = event['timestamp']
# Update immediate user features
self._update_user_features(user_id, action_type, timestamp)
# Update item popularity features
self._update_item_features(item_id, action_type, timestamp)
# Update user-item interaction history
self._update_interaction_history(user_id, item_id, action_type, timestamp)
# Trigger model retraining if needed
self._check_model_refresh_trigger(user_id)
def _update_user_features(self, user_id: str, action_type: str, timestamp: float):
"""Update real-time user behavioral features"""
for window in self.time_windows:
window_key = f"user:{user_id}:features:{window}"
# Get or initialize feature dict
features = self.redis_client.hgetall(window_key)
if not features:
features = defaultdict(float)
else:
features = {k: float(v) for k, v in features.items()}
# Update activity counts
features[f'{action_type}_count'] += 1
features['total_actions'] += 1
features['last_activity'] = timestamp
# Calculate activity rate (actions per minute)
if 'first_activity' not in features:
features['first_activity'] = timestamp
time_diff = timestamp - float(features['first_activity'])
if time_diff > 0:
features['activity_rate'] = features['total_actions'] / (time_diff / 60)
# Update engagement score (weighted by action importance)
action_weights = {
'view': 1.0,
'click': 2.0,
'like': 3.0,
'share': 4.0,
'purchase': 5.0
}
features['engagement_score'] += action_weights.get(action_type, 1.0)
# Store updated features with expiration
self.redis_client.hset(window_key, mapping={k: str(v) for k, v in features.items()})
self.redis_client.expire(window_key, window * 2) # Keep 2x window duration
def _update_item_features(self, item_id: str, action_type: str, timestamp: float):
"""Update real-time item popularity features"""
for window in self.time_windows:
window_key = f"item:{item_id}:features:{window}"
features = self.redis_client.hgetall(window_key)
if not features:
features = defaultdict(float)
else:
features = {k: float(v) for k, v in features.items()}
# Update popularity metrics
features[f'{action_type}_count'] += 1
features['total_interactions'] += 1
features['last_interaction'] = timestamp
# Calculate velocity (interactions per hour)
if 'first_interaction' not in features:
features['first_interaction'] = timestamp
time_diff = timestamp - float(features['first_interaction'])
if time_diff > 0:
features['velocity'] = features['total_interactions'] / (time_diff / 3600)
# Trending score (recent interactions weighted higher)
recent_weight = 1.0 / max(1, (time.time() - timestamp) / 3600) # Decay over hours
features['trending_score'] = features.get('trending_score', 0) + recent_weight
self.redis_client.hset(window_key, mapping={k: str(v) for k, v in features.items()})
self.redis_client.expire(window_key, window * 2)
def get_user_features(self, user_id: str, window: int = 3600) -> Dict[str, float]:
"""Get real-time user features for model serving"""
window_key = f"user:{user_id}:features:{window}"
features = self.redis_client.hgetall(window_key)
if not features:
# Cold start features
return {
'total_actions': 0.0,
'activity_rate': 0.0,
'engagement_score': 0.0,
'click_count': 0.0,
'view_count': 0.0,
'like_count': 0.0,
'share_count': 0.0,
'purchase_count': 0.0,
'last_activity': 0.0
}
return {k: float(v) for k, v in features.items()}
def get_item_features(self, item_id: str, window: int = 3600) -> Dict[str, float]:
"""Get real-time item features for model serving"""
window_key = f"item:{item_id}:features:{window}"
features = self.redis_client.hgetall(window_key)
if not features:
return {
'total_interactions': 0.0,
'velocity': 0.0,
'trending_score': 0.0,
'click_count': 0.0,
'view_count': 0.0,
'like_count': 0.0,
'share_count': 0.0,
'purchase_count': 0.0,
'last_interaction': 0.0
}
return {k: float(v) for k, v in features.items()}
def run_stream_processor(self):
"""Main streaming processor loop"""
print("Starting real-time feature processor...")
for message in self.consumer:
try:
event = message.value
self.process_user_interaction(event)
# Publish processed features for downstream consumers
self.producer.send('processed_features', value=event)
except Exception as e:
print(f"Error processing event: {e}")
continue
# Online learning for model updates
class OnlineLearningSystem:
"""Incremental learning system for recommendation models"""
def __init__(self, base_model_path: str):
self.base_model = torch.jit.load(base_model_path)
self.online_optimizer = torch.optim.SGD(
self.base_model.parameters(),
lr=0.0001,
momentum=0.9
)
# Buffer for online samples
self.sample_buffer = []
self.buffer_size = 1000
def update_model_online(self, interaction_batch: List[Dict]):
"""Update model with recent interactions"""
if len(interaction_batch) < 32: # Wait for minimum batch
return
# Convert interactions to training batch
batch = self._prepare_online_batch(interaction_batch)
# Online gradient update
self.base_model.train()
predictions = self.base_model(batch)
targets = {
'ctr': batch['labels']['clicked'],
'conversion': batch['labels']['converted'],
'engagement': batch['labels']['watch_time']
}
loss, _ = self.base_model.compute_loss(predictions, targets)
# Small learning rate for stability
self.online_optimizer.zero_grad()
loss.backward()
# Gradient clipping more aggressive for online learning
torch.nn.utils.clip_grad_norm_(self.base_model.parameters(), max_norm=0.1)
self.online_optimizer.step()
return {'online_loss': loss.item()}Real-World Production Systems
Netflix
Video Recommendation Engine
- Scale: 230M+ users, 15,000+ titles
- Architecture: Two-stage (candidate + ranking)
- Models: Deep neural networks with 1000+ features
- Real-time: A/B testing on 80% of recommendations
- Latency: <100ms for homepage personalization
- Business Impact: 80% of content consumption from recommendations
Amazon
Product Recommendations
- Scale: 300M+ users, 100M+ products
- Architecture: Item-to-item collaborative filtering
- Models: Matrix factorization + deep learning
- Real-time: Purchase history updated within seconds
- Latency: <50ms for product page recommendations
- Business Impact: 35% of revenue from recommendations
Spotify
Music Discovery Platform
- Scale: 400M+ users, 70M+ tracks
- Architecture: Content + collaborative + NLP features
- Models: Deep learning with audio signal processing
- Real-time: Skip behavior updates recommendations instantly
- Latency: <200ms for Discover Weekly generation
- Business Impact: 40% of listening time from algorithmic playlists
TikTok
For You Page Algorithm
- Scale: 1B+ users, millions of videos daily
- Architecture: Multi-armed bandit + deep reinforcement learning
- Models: Transformer-based with video understanding
- Real-time: Immediate feedback incorporation
- Latency: <50ms for feed refresh
- Business Impact: 90%+ of user engagement from FYP
Production Best Practices
✅ Do
- •Use two-stage architecture - Retrieve top-K candidates efficiently, then rank with complex models
- •Implement online learning - Update models with fresh user interactions
- •A/B test everything - Measure business metrics, not just accuracy
- •Handle cold start gracefully - Popular items, content-based features, and onboarding flows
- •Optimize for diversity - Balance relevance with exploration and serendipity
❌ Don't
- •Optimize only for accuracy - Business metrics like engagement and revenue matter more
- •Ignore computational constraints - Real-time serving requires efficient models
- •Create filter bubbles - Pure exploitation leads to narrow recommendation sets
- •Neglect feedback loops - Recommendation systems create their own training data
- •Ignore freshness - Stale recommendations hurt user experience
No quiz questions available
Quiz ID "recommender-systems-advanced" not found