Real-Time Analytics Systems
Master streaming ML, low-latency inference, feature streaming, online learning, and production deployment
55 min read•Advanced
Not Started
Loading...
What are Real-Time Analytics Systems?
Real-time analytics systems process and analyze streaming data to provide immediate insights, predictions, and actions with sub-second to sub-minute latencies for business-critical applications.
Sub-100ms
Ultra-low latency processing
Streaming ML
Online learning and inference
Feature Streaming
Real-time feature computation
🧮 Real-Time Analytics Calculator
Calculate throughput, latency, memory usage, and scaling requirements for real-time analytics systems.
Performance Analysis
Events/sec:100,000
Events/partition:8,333
Total Latency:72ms
Window Memory:5859 MB
Total Memory:6066 MB
Latency Class:Sub-100ms
Recommended Partitions:13
Recommended Instances:4
Real-Time Analytics Architecture
Streaming Ingestion
- • Apache Kafka / Amazon Kinesis
- • Event schema validation
- • Partitioning strategies
- • Exactly-once semantics
- • Dead letter queues
Stream Processing
- • Apache Flink / Kafka Streams
- • Windowing operations
- • State management
- • Watermarks & late data
- • Checkpointing & recovery
Feature Streaming
- • Real-time feature computation
- • Feature stores (Feast, Tecton)
- • Feature freshness monitoring
- • Historical feature serving
- • Feature drift detection
Online Inference
- • Model serving platforms
- • A/B testing frameworks
- • Model versioning
- • Circuit breakers
- • Prediction caching
Streaming Analytics Engine
Real-Time Feature Pipeline
streaming_features.py
from kafka import KafkaConsumer, KafkaProducer
from typing import Dict, List, Any, Optional
import json
import time
import logging
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
from redis import Redis
import asyncio
from collections import defaultdict, deque
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class StreamingEvent:
user_id: str
event_type: str
timestamp: float
properties: Dict[str, Any]
session_id: Optional[str] = None
@dataclass
class ComputedFeature:
feature_name: str
feature_value: float
user_id: str
timestamp: float
window_type: str
ttl_seconds: int = 3600
class RealTimeFeatureEngine:
"""Production-ready real-time feature computation engine"""
def __init__(self,
kafka_bootstrap_servers: str = "localhost:9092",
redis_host: str = "localhost",
redis_port: int = 6379):
# Kafka setup
self.consumer = KafkaConsumer(
'user_events',
bootstrap_servers=kafka_bootstrap_servers,
auto_offset_reset='latest',
enable_auto_commit=True,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=kafka_bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Redis for real-time state
self.redis_client = Redis(host=redis_host, port=redis_port, decode_responses=True)
# In-memory windows for fast computation
self.sliding_windows = defaultdict(lambda: defaultdict(deque))
self.window_configs = {
'1m': 60,
'5m': 300,
'15m': 900,
'1h': 3600
}
# Feature computation functions
self.feature_computers = {
'click_rate_1m': self._compute_click_rate,
'session_duration': self._compute_session_duration,
'event_velocity': self._compute_event_velocity,
'conversion_score': self._compute_conversion_score,
'engagement_trend': self._compute_engagement_trend,
'anomaly_score': self._compute_anomaly_score
}
# Performance monitoring
self.processed_events = 0
self.processing_times = deque(maxlen=1000)
self.last_stats_time = time.time()
async def process_stream(self):
"""Main streaming processing loop"""
logger.info("Starting real-time feature processing...")
try:
for message in self.consumer:
start_time = time.time()
# Parse event
event_data = message.value
event = StreamingEvent(**event_data)
# Compute features
features = await self._compute_all_features(event)
# Store features
await self._store_features(features)
# Publish computed features
await self._publish_features(features)
# Update performance metrics
processing_time = time.time() - start_time
self.processing_times.append(processing_time)
self.processed_events += 1
# Log stats every 1000 events
if self.processed_events % 1000 == 0:
await self._log_performance_stats()
except Exception as e:
logger.error(f"Stream processing error: {e}")
raise
async def _compute_all_features(self, event: StreamingEvent) -> List[ComputedFeature]:
"""Compute all features for an event"""
features = []
# Update sliding windows first
await self._update_windows(event)
# Compute each feature
for feature_name, compute_func in self.feature_computers.items():
try:
feature_value = await compute_func(event)
if feature_value is not None:
feature = ComputedFeature(
feature_name=feature_name,
feature_value=feature_value,
user_id=event.user_id,
timestamp=event.timestamp,
window_type='streaming'
)
features.append(feature)
except Exception as e:
logger.error(f"Error computing {feature_name}: {e}")
return features
async def _update_windows(self, event: StreamingEvent):
"""Update sliding windows with new event"""
current_time = event.timestamp
user_id = event.user_id
# Add to all window types
for window_name, window_size in self.window_configs.items():
window = self.sliding_windows[user_id][window_name]
# Add new event
window.append((current_time, event))
# Remove old events outside window
cutoff_time = current_time - window_size
while window and window[0][0] < cutoff_time:
window.popleft()
async def _compute_click_rate(self, event: StreamingEvent) -> Optional[float]:
"""Compute click rate in last 1 minute"""
window = self.sliding_windows[event.user_id]['1m']
if not window:
return None
total_events = len(window)
click_events = sum(1 for _, e in window if e.event_type == 'click')
return click_events / max(total_events, 1)
async def _compute_session_duration(self, event: StreamingEvent) -> Optional[float]:
"""Compute current session duration"""
if not event.session_id:
return None
# Get session start from Redis
session_key = f"session:{event.session_id}"
session_start = self.redis_client.get(session_key)
if not session_start:
# First event in session
self.redis_client.setex(session_key, 3600, str(event.timestamp))
return 0.0
else:
return event.timestamp - float(session_start)
async def _compute_event_velocity(self, event: StreamingEvent) -> Optional[float]:
"""Compute events per minute velocity"""
window = self.sliding_windows[event.user_id]['1m']
if len(window) < 2:
return 0.0
# Events per minute
return len(window)
async def _compute_conversion_score(self, event: StreamingEvent) -> Optional[float]:
"""Compute conversion propensity score"""
window_5m = self.sliding_windows[event.user_id]['5m']
if not window_5m:
return 0.0
# Simple scoring based on event types and frequency
event_weights = {
'page_view': 0.1,
'click': 0.3,
'add_to_cart': 0.7,
'purchase': 1.0,
'search': 0.2
}
score = 0.0
for _, e in window_5m:
weight = event_weights.get(e.event_type, 0.0)
score += weight
# Normalize by window size
return min(score / 10, 1.0)
async def _compute_engagement_trend(self, event: StreamingEvent) -> Optional[float]:
"""Compute engagement trend (increasing/decreasing)"""
window_15m = self.sliding_windows[event.user_id]['15m']
if len(window_15m) < 10:
return 0.0
# Split into two halves and compare event rates
events = list(window_15m)
mid_point = len(events) // 2
first_half_rate = mid_point / 450 # events per second in first 7.5 min
second_half_rate = (len(events) - mid_point) / 450 # events per second in last 7.5 min
if first_half_rate == 0:
return 1.0 if second_half_rate > 0 else 0.0
trend = (second_half_rate - first_half_rate) / first_half_rate
return max(-1.0, min(1.0, trend)) # Clamp between -1 and 1
async def _compute_anomaly_score(self, event: StreamingEvent) -> Optional[float]:
"""Compute anomaly score based on user behavior"""
window_1h = self.sliding_windows[event.user_id]['1h']
if len(window_1h) < 5:
return 0.0
# Get historical average from Redis
avg_key = f"user_avg:{event.user_id}"
historical_avg = self.redis_client.get(avg_key)
current_rate = len(window_1h) / 3600 # events per second
if historical_avg:
hist_rate = float(historical_avg)
if hist_rate > 0:
deviation = abs(current_rate - hist_rate) / hist_rate
anomaly_score = min(deviation, 1.0)
else:
anomaly_score = 1.0 if current_rate > 0 else 0.0
else:
anomaly_score = 0.0
# Update historical average
new_avg = current_rate if not historical_avg else (float(historical_avg) * 0.9 + current_rate * 0.1)
self.redis_client.setex(avg_key, 86400, str(new_avg))
return anomaly_score
async def _store_features(self, features: List[ComputedFeature]):
"""Store computed features in Redis"""
pipe = self.redis_client.pipeline()
for feature in features:
key = f"feature:{feature.user_id}:{feature.feature_name}"
value = {
'value': feature.feature_value,
'timestamp': feature.timestamp,
'window_type': feature.window_type
}
# Store with TTL
pipe.setex(key, feature.ttl_seconds, json.dumps(value))
pipe.execute()
async def _publish_features(self, features: List[ComputedFeature]):
"""Publish computed features to Kafka"""
for feature in features:
message = {
'user_id': feature.user_id,
'feature_name': feature.feature_name,
'feature_value': feature.feature_value,
'timestamp': feature.timestamp,
'window_type': feature.window_type
}
self.producer.send('computed_features', message)
# Ensure delivery
self.producer.flush()
async def _log_performance_stats(self):
"""Log performance statistics"""
current_time = time.time()
elapsed = current_time - self.last_stats_time
if self.processing_times:
avg_processing_time = np.mean(list(self.processing_times))
p95_processing_time = np.percentile(list(self.processing_times), 95)
p99_processing_time = np.percentile(list(self.processing_times), 99)
else:
avg_processing_time = p95_processing_time = p99_processing_time = 0
events_per_second = 1000 / elapsed if elapsed > 0 else 0
logger.info(f"""
Performance Stats:
- Events processed: {self.processed_events}
- Events/sec: {events_per_second:.1f}
- Avg processing time: {avg_processing_time*1000:.2f}ms
- P95 processing time: {p95_processing_time*1000:.2f}ms
- P99 processing time: {p99_processing_time*1000:.2f}ms
- Active windows: {sum(len(windows) for windows in self.sliding_windows.values())}
""")
self.last_stats_time = current_time
def get_feature(self, user_id: str, feature_name: str) -> Optional[Dict[str, Any]]:
"""Get latest feature value for a user"""
key = f"feature:{user_id}:{feature_name}"
value = self.redis_client.get(key)
if value:
return json.loads(value)
return None
def get_all_features(self, user_id: str) -> Dict[str, Any]:
"""Get all features for a user"""
pattern = f"feature:{user_id}:*"
keys = self.redis_client.keys(pattern)
features = {}
for key in keys:
feature_name = key.split(':')[-1]
value = self.redis_client.get(key)
if value:
features[feature_name] = json.loads(value)
return features
# Usage example
async def run_feature_engine():
engine = RealTimeFeatureEngine()
try:
await engine.process_stream()
except KeyboardInterrupt:
logger.info("Shutting down feature engine...")
except Exception as e:
logger.error(f"Feature engine error: {e}")
raise
if __name__ == "__main__":
asyncio.run(run_feature_engine())Online Learning System
Streaming Model Update Pipeline
online_learning.py
import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score
from typing import Dict, List, Any, Optional, Tuple
import joblib
import json
import time
import logging
from dataclasses import dataclass
from threading import Lock
import asyncio
from collections import deque
import redis
from kafka import KafkaConsumer, KafkaProducer
logger = logging.getLogger(__name__)
@dataclass
class TrainingExample:
features: np.ndarray
label: float
timestamp: float
weight: float = 1.0
example_id: Optional[str] = None
@dataclass
class ModelMetrics:
accuracy: float
precision: float
recall: float
training_examples: int
last_updated: float
model_version: str
class OnlineLearningSystem:
"""Production online learning system with concept drift detection"""
def __init__(self,
model_name: str = "fraud_detector",
kafka_bootstrap_servers: str = "localhost:9092",
redis_host: str = "localhost",
learning_rate: float = 0.01,
batch_size: int = 100):
self.model_name = model_name
self.learning_rate = learning_rate
self.batch_size = batch_size
# Online learning models
self.model = SGDClassifier(
loss='log_loss', # Logistic regression
learning_rate='adaptive',
eta0=learning_rate,
random_state=42
)
self.scaler = StandardScaler()
self.model_lock = Lock()
self.is_initialized = False
# Kafka setup
self.consumer = KafkaConsumer(
'training_examples',
bootstrap_servers=kafka_bootstrap_servers,
auto_offset_reset='latest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=kafka_bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Redis for model storage and metrics
self.redis_client = redis.Redis(host=redis_host, decode_responses=True)
# Performance tracking
self.training_buffer = deque(maxlen=batch_size)
self.recent_performance = deque(maxlen=1000)
self.drift_detection_window = deque(maxlen=500)
self.model_version = 1
self.examples_processed = 0
# Concept drift detection
self.baseline_accuracy = None
self.drift_threshold = 0.05 # 5% accuracy drop triggers retraining
self.min_examples_for_drift = 100
async def start_online_learning(self):
"""Start the online learning process"""
logger.info(f"Starting online learning for {self.model_name}")
# Load existing model if available
await self._load_model()
try:
for message in self.consumer:
example_data = message.value
example = TrainingExample(
features=np.array(example_data['features']),
label=example_data['label'],
timestamp=example_data.get('timestamp', time.time()),
weight=example_data.get('weight', 1.0),
example_id=example_data.get('example_id')
)
# Add to training buffer
await self._add_training_example(example)
# Check if we should update the model
if len(self.training_buffer) >= self.batch_size:
await self._update_model()
# Check for concept drift
if len(self.drift_detection_window) >= self.min_examples_for_drift:
await self._check_concept_drift()
self.examples_processed += 1
except Exception as e:
logger.error(f"Online learning error: {e}")
raise
async def _add_training_example(self, example: TrainingExample):
"""Add training example to buffer"""
self.training_buffer.append(example)
# Also track for drift detection
if self.is_initialized:
prediction = await self._predict_single(example.features)
is_correct = (prediction > 0.5) == (example.label > 0.5)
self.drift_detection_window.append(is_correct)
async def _update_model(self):
"""Update model with batched examples"""
if not self.training_buffer:
return
# Prepare batch data
features_batch = np.array([ex.features for ex in self.training_buffer])
labels_batch = np.array([ex.label for ex in self.training_buffer])
weights_batch = np.array([ex.weight for ex in self.training_buffer])
with self.model_lock:
try:
if not self.is_initialized:
# First batch - fit scaler and initialize model
self.scaler.fit(features_batch)
features_scaled = self.scaler.transform(features_batch)
self.model.fit(features_scaled, labels_batch, sample_weight=weights_batch)
self.is_initialized = True
logger.info("Model initialized with first batch")
else:
# Subsequent batches - partial fit
features_scaled = self.scaler.transform(features_batch)
self.model.partial_fit(features_scaled, labels_batch, sample_weight=weights_batch)
# Update performance metrics
await self._update_performance_metrics(features_scaled, labels_batch)
# Save model periodically
if self.examples_processed % 1000 == 0:
await self._save_model()
# Clear buffer
self.training_buffer.clear()
logger.info(f"Model updated with batch of {len(features_batch)} examples")
except Exception as e:
logger.error(f"Model update error: {e}")
# Clear buffer on error to prevent infinite loop
self.training_buffer.clear()
async def _update_performance_metrics(self, features: np.ndarray, labels: np.ndarray):
"""Update and track model performance metrics"""
if not self.is_initialized:
return
# Make predictions
predictions = self.model.predict(features)
probabilities = self.model.predict_proba(features)[:, 1]
# Calculate metrics
accuracy = accuracy_score(labels, predictions)
precision = precision_score(labels, predictions, zero_division=0)
recall = recall_score(labels, predictions, zero_division=0)
# Store metrics
metrics = ModelMetrics(
accuracy=accuracy,
precision=precision,
recall=recall,
training_examples=self.examples_processed,
last_updated=time.time(),
model_version=str(self.model_version)
)
# Add to recent performance tracking
self.recent_performance.append(accuracy)
# Store in Redis
await self._store_metrics(metrics)
# Set baseline if not set
if self.baseline_accuracy is None:
self.baseline_accuracy = accuracy
logger.info(f"Baseline accuracy set to {accuracy:.3f}")
async def _check_concept_drift(self):
"""Check for concept drift and trigger retraining if needed"""
if not self.baseline_accuracy or len(self.drift_detection_window) < self.min_examples_for_drift:
return
# Calculate recent accuracy
recent_accuracy = sum(self.drift_detection_window) / len(self.drift_detection_window)
# Check for significant drop
accuracy_drop = self.baseline_accuracy - recent_accuracy
if accuracy_drop > self.drift_threshold:
logger.warning(f"Concept drift detected! Accuracy dropped by {accuracy_drop:.3f}")
await self._handle_concept_drift(recent_accuracy)
# Update baseline with weighted average
self.baseline_accuracy = 0.95 * self.baseline_accuracy + 0.05 * recent_accuracy
async def _handle_concept_drift(self, current_accuracy: float):
"""Handle detected concept drift"""
logger.info("Handling concept drift - creating new model version")
# Create new model version
self.model_version += 1
# Reset model with more aggressive learning
with self.model_lock:
self.model = SGDClassifier(
loss='log_loss',
learning_rate='adaptive',
eta0=self.learning_rate * 2, # Increase learning rate
random_state=42
)
self.scaler = StandardScaler()
self.is_initialized = False
# Reset drift detection
self.drift_detection_window.clear()
self.baseline_accuracy = None
# Notify about drift
await self._notify_concept_drift(current_accuracy)
async def _notify_concept_drift(self, accuracy: float):
"""Notify about concept drift event"""
notification = {
'event': 'concept_drift_detected',
'model_name': self.model_name,
'accuracy': accuracy,
'new_model_version': self.model_version,
'timestamp': time.time()
}
self.producer.send('model_events', notification)
self.producer.flush()
async def predict(self, features: np.ndarray) -> Tuple[float, float]:
"""Make prediction with confidence"""
if not self.is_initialized:
return 0.5, 0.0 # Default prediction
with self.model_lock:
features_scaled = self.scaler.transform(features.reshape(1, -1))
prediction = self.model.predict_proba(features_scaled)[0]
confidence = max(prediction) - min(prediction)
return prediction[1], confidence # Return probability of positive class
async def _predict_single(self, features: np.ndarray) -> float:
"""Single prediction for internal use"""
if not self.is_initialized:
return 0.5
with self.model_lock:
features_scaled = self.scaler.transform(features.reshape(1, -1))
return self.model.predict_proba(features_scaled)[0][1]
async def _save_model(self):
"""Save model to Redis"""
if not self.is_initialized:
return
try:
with self.model_lock:
# Serialize model and scaler
model_data = {
'model': joblib.dump(self.model, None),
'scaler': joblib.dump(self.scaler, None),
'model_version': self.model_version,
'examples_processed': self.examples_processed,
'last_saved': time.time()
}
# Store in Redis
key = f"model:{self.model_name}:latest"
self.redis_client.set(key, json.dumps(model_data, default=str))
logger.info(f"Model saved to Redis (version {self.model_version})")
except Exception as e:
logger.error(f"Model save error: {e}")
async def _load_model(self):
"""Load model from Redis"""
try:
key = f"model:{self.model_name}:latest"
model_data = self.redis_client.get(key)
if model_data:
data = json.loads(model_data)
with self.model_lock:
self.model = joblib.load(data['model'])
self.scaler = joblib.load(data['scaler'])
self.model_version = data['model_version']
self.examples_processed = data['examples_processed']
self.is_initialized = True
logger.info(f"Model loaded from Redis (version {self.model_version})")
else:
logger.info("No existing model found, starting fresh")
except Exception as e:
logger.error(f"Model load error: {e}")
async def _store_metrics(self, metrics: ModelMetrics):
"""Store performance metrics"""
key = f"metrics:{self.model_name}:latest"
self.redis_client.set(key, json.dumps(metrics.__dict__))
# Also store time series
ts_key = f"metrics:{self.model_name}:timeseries"
self.redis_client.lpush(ts_key, json.dumps({
**metrics.__dict__,
'timestamp': time.time()
}))
# Keep only last 1000 entries
self.redis_client.ltrim(ts_key, 0, 999)
def get_metrics(self) -> Optional[ModelMetrics]:
"""Get current model metrics"""
key = f"metrics:{self.model_name}:latest"
data = self.redis_client.get(key)
if data:
metrics_dict = json.loads(data)
return ModelMetrics(**metrics_dict)
return None
def get_model_info(self) -> Dict[str, Any]:
"""Get model information and stats"""
metrics = self.get_metrics()
return {
'model_name': self.model_name,
'model_version': self.model_version,
'examples_processed': self.examples_processed,
'is_initialized': self.is_initialized,
'baseline_accuracy': self.baseline_accuracy,
'recent_performance_avg': np.mean(list(self.recent_performance)) if self.recent_performance else None,
'current_metrics': metrics.__dict__ if metrics else None,
'drift_detection_size': len(self.drift_detection_window)
}
# Usage example
async def run_online_learning():
system = OnlineLearningSystem(model_name="fraud_detector")
try:
await system.start_online_learning()
except KeyboardInterrupt:
logger.info("Shutting down online learning system...")
except Exception as e:
logger.error(f"Online learning error: {e}")
if __name__ == "__main__":
asyncio.run(run_online_learning())Production Analytics API
Real-Time Analytics API Service
realtime_analytics_api.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from typing import Dict, List, Any, Optional
import asyncio
import json
import time
import logging
from dataclasses import asdict
import numpy as np
import redis
from kafka import KafkaProducer
import uvicorn
from contextlib import asynccontextmanager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FeatureRequest(BaseModel):
user_id: str
features: Optional[List[str]] = None # If None, return all features
class PredictionRequest(BaseModel):
user_id: str
model_name: str = "fraud_detector"
features: Optional[Dict[str, float]] = None # Override features
class AnalyticsEvent(BaseModel):
user_id: str
event_type: str
properties: Dict[str, Any]
session_id: Optional[str] = None
timestamp: Optional[float] = Field(default_factory=time.time)
class BatchPredictionRequest(BaseModel):
requests: List[PredictionRequest]
return_features: bool = False
class RealTimeAnalyticsService:
"""Production real-time analytics service"""
def __init__(self):
# Redis connection for features and models
self.redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Kafka producer for events
self.kafka_producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Cache for frequently accessed features
self.feature_cache = {}
self.cache_ttl = 60 # seconds
# Performance monitoring
self.request_count = 0
self.total_response_time = 0.0
# Models registry
self.models = {}
async def get_features(self, user_id: str, feature_names: Optional[List[str]] = None) -> Dict[str, Any]:
"""Get real-time features for a user"""
try:
start_time = time.time()
# Check cache first
cache_key = f"features_cache:{user_id}"
cached_features = self.feature_cache.get(cache_key)
if cached_features and time.time() - cached_features['timestamp'] < self.cache_ttl:
features = cached_features['features']
else:
# Get from Redis
if feature_names:
features = {}
for feature_name in feature_names:
key = f"feature:{user_id}:{feature_name}"
value = self.redis_client.get(key)
if value:
features[feature_name] = json.loads(value)
else:
# Get all features
pattern = f"feature:{user_id}:*"
keys = self.redis_client.keys(pattern)
features = {}
for key in keys:
feature_name = key.split(':')[-1]
value = self.redis_client.get(key)
if value:
features[feature_name] = json.loads(value)
# Cache the result
self.feature_cache[cache_key] = {
'features': features,
'timestamp': time.time()
}
processing_time = time.time() - start_time
self._update_metrics(processing_time)
return {
'user_id': user_id,
'features': features,
'retrieved_at': time.time(),
'processing_time_ms': processing_time * 1000,
'cache_hit': cached_features is not None
}
except Exception as e:
logger.error(f"Feature retrieval error for {user_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def make_prediction(self, request: PredictionRequest) -> Dict[str, Any]:
"""Make real-time prediction for a user"""
try:
start_time = time.time()
# Get features (either provided or from feature store)
if request.features:
features = request.features
else:
feature_result = await self.get_features(request.user_id)
raw_features = feature_result['features']
# Extract just the values
features = {name: data['value'] for name, data in raw_features.items() if 'value' in data}
if not features:
raise HTTPException(status_code=400, detail="No features available for prediction")
# Get model metrics to check if model is healthy
metrics_key = f"metrics:{request.model_name}:latest"
metrics_data = self.redis_client.get(metrics_key)
if not metrics_data:
raise HTTPException(status_code=404, detail=f"Model {request.model_name} not found")
metrics = json.loads(metrics_data)
# Simple prediction logic (in production, this would call the actual model)
# For demonstration, we'll use the features to compute a score
prediction_score = self._compute_prediction_score(features)
confidence = min(0.95, max(0.05, abs(prediction_score - 0.5) * 2))
processing_time = time.time() - start_time
self._update_metrics(processing_time)
result = {
'user_id': request.user_id,
'model_name': request.model_name,
'prediction': prediction_score,
'confidence': confidence,
'features_used': list(features.keys()),
'model_version': metrics['model_version'],
'predicted_at': time.time(),
'processing_time_ms': processing_time * 1000
}
# Log prediction for model monitoring
await self._log_prediction(result)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Prediction error for {request.user_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def batch_predictions(self, request: BatchPredictionRequest) -> Dict[str, Any]:
"""Make batch predictions"""
try:
start_time = time.time()
# Process all predictions concurrently
tasks = [self.make_prediction(pred_req) for pred_req in request.requests]
predictions = await asyncio.gather(*tasks, return_exceptions=True)
# Separate successful predictions from errors
successful = []
errors = []
for i, pred in enumerate(predictions):
if isinstance(pred, Exception):
errors.append({
'request_index': i,
'user_id': request.requests[i].user_id,
'error': str(pred)
})
else:
successful.append(pred)
processing_time = time.time() - start_time
return {
'predictions': successful,
'errors': errors,
'total_requests': len(request.requests),
'successful_count': len(successful),
'error_count': len(errors),
'batch_processing_time_ms': processing_time * 1000
}
except Exception as e:
logger.error(f"Batch prediction error: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def ingest_event(self, event: AnalyticsEvent, background_tasks: BackgroundTasks):
"""Ingest real-time analytics event"""
try:
# Add to Kafka for processing
event_data = event.dict()
self.kafka_producer.send('user_events', event_data)
# Also trigger immediate feature computation for critical features
background_tasks.add_task(self._compute_immediate_features, event)
return {
'status': 'accepted',
'event_id': f"{event.user_id}_{event.timestamp}",
'timestamp': event.timestamp
}
except Exception as e:
logger.error(f"Event ingestion error: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def _compute_immediate_features(self, event: AnalyticsEvent):
"""Compute immediate features for low-latency use cases"""
try:
# Simple immediate feature computation
user_id = event.user_id
# Update event count
count_key = f"feature:{user_id}:event_count_realtime"
current_count = self.redis_client.get(count_key)
new_count = (int(current_count) if current_count else 0) + 1
feature_data = {
'value': new_count,
'timestamp': event.timestamp,
'window_type': 'immediate'
}
self.redis_client.setex(count_key, 3600, json.dumps(feature_data))
# Update last event type
event_type_key = f"feature:{user_id}:last_event_type"
event_type_data = {
'value': event.event_type,
'timestamp': event.timestamp,
'window_type': 'immediate'
}
self.redis_client.setex(event_type_key, 3600, json.dumps(event_type_data))
except Exception as e:
logger.error(f"Immediate feature computation error: {e}")
def _compute_prediction_score(self, features: Dict[str, Any]) -> float:
"""Simple prediction score computation"""
# In production, this would use the actual trained model
# For demo, we'll use a simple weighted combination
weights = {
'click_rate_1m': 0.3,
'session_duration': 0.2,
'event_velocity': 0.15,
'conversion_score': 0.25,
'engagement_trend': 0.1
}
score = 0.5 # baseline
total_weight = 0
for feature_name, weight in weights.items():
if feature_name in features:
value = features[feature_name]
if isinstance(value, (int, float)):
score += weight * min(1.0, max(0.0, value))
total_weight += weight
# Normalize
if total_weight > 0:
score = score / (0.5 + total_weight) # Adjust for baseline
return max(0.0, min(1.0, score))
async def _log_prediction(self, prediction_result: Dict[str, Any]):
"""Log prediction for monitoring and feedback"""
try:
log_entry = {
'type': 'prediction',
'timestamp': time.time(),
**prediction_result
}
self.kafka_producer.send('prediction_logs', log_entry)
self.kafka_producer.flush()
except Exception as e:
logger.error(f"Prediction logging error: {e}")
def _update_metrics(self, processing_time: float):
"""Update service performance metrics"""
self.request_count += 1
self.total_response_time += processing_time
def get_service_stats(self) -> Dict[str, Any]:
"""Get service performance statistics"""
avg_response_time = (
self.total_response_time / max(1, self.request_count)
)
return {
'total_requests': self.request_count,
'average_response_time_ms': avg_response_time * 1000,
'cache_size': len(self.feature_cache),
'models_loaded': len(self.models),
'uptime_seconds': time.time() - self.start_time if hasattr(self, 'start_time') else 0
}
# Initialize service
analytics_service = RealTimeAnalyticsService()
@asynccontextmanager
async def lifespan(app: FastAPI):
analytics_service.start_time = time.time()
logger.info("Real-Time Analytics Service started")
yield
logger.info("Real-Time Analytics Service shutting down")
analytics_service.kafka_producer.close()
# FastAPI app
app = FastAPI(
title="Real-Time Analytics Service",
description="Production real-time analytics and ML inference API",
version="1.0.0",
lifespan=lifespan
)
@app.post("/features", response_model=Dict[str, Any])
async def get_user_features(request: FeatureRequest):
"""Get real-time features for a user"""
return await analytics_service.get_features(request.user_id, request.features)
@app.post("/predict", response_model=Dict[str, Any])
async def make_prediction(request: PredictionRequest):
"""Make real-time prediction"""
return await analytics_service.make_prediction(request)
@app.post("/predict/batch", response_model=Dict[str, Any])
async def batch_predict(request: BatchPredictionRequest):
"""Make batch predictions"""
return await analytics_service.batch_predictions(request)
@app.post("/events", response_model=Dict[str, Any])
async def ingest_event(event: AnalyticsEvent, background_tasks: BackgroundTasks):
"""Ingest analytics event for real-time processing"""
return await analytics_service.ingest_event(event, background_tasks)
@app.get("/stats")
async def get_stats():
"""Get service performance statistics"""
return analytics_service.get_service_stats()
@app.get("/health")
async def health_check():
"""Health check endpoint"""
try:
# Test Redis connection
analytics_service.redis_client.ping()
return {
"status": "healthy",
"timestamp": time.time(),
"services": {
"redis": "connected",
"kafka": "connected"
}
}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Service unhealthy: {e}")
if __name__ == "__main__":
uvicorn.run(
"realtime_analytics_api:app",
host="0.0.0.0",
port=8000,
workers=1
)Real-World Examples
Netflix Real-Time
Real-time recommendation updates, A/B testing, and personalization with sub-100ms latency for 230M+ users.
- • 1.5M events/sec peak processing
- • Online learning for recommendations
- • Real-time feature engineering
Uber Real-Time Pricing
Dynamic pricing engine processing supply/demand signals with ML-powered surge pricing across global markets.
- • 100k pricing decisions/sec
- • Geographic feature streaming
- • Multi-model ensemble serving
PayPal Fraud Detection
Real-time fraud scoring system processing payment transactions with adaptive ML models and instant decisions.
- • Sub-10ms fraud scoring
- • Online model adaptation
- • 99.99% availability SLA
Real-Time Analytics Best Practices
✅ Do's
- •Design for horizontal scalability from day one
- •Implement comprehensive monitoring and alerting
- •Use feature stores for consistent feature serving
- •Implement concept drift detection and handling
- •Design for graceful degradation under load
- •Use exactly-once semantics for critical data
❌ Don'ts
- •Don't ignore backpressure and flow control
- •Don't use blocking operations in streaming code
- •Don't neglect late data and out-of-order handling
- •Don't skip model versioning and rollback plans
- •Don't underestimate memory management needs
- •Don't ignore security in streaming pipelines
No quiz questions available
Quiz ID "real-time-analytics-systems" not found