Skip to main contentSkip to user menuSkip to navigation

Real-Time Analytics Systems

Master streaming ML, low-latency inference, feature streaming, online learning, and production deployment

55 min readAdvanced
Not Started
Loading...

What are Real-Time Analytics Systems?

Real-time analytics systems process and analyze streaming data to provide immediate insights, predictions, and actions with sub-second to sub-minute latencies for business-critical applications.

Sub-100ms
Ultra-low latency processing
Streaming ML
Online learning and inference
Feature Streaming
Real-time feature computation

🧮 Real-Time Analytics Calculator

Calculate throughput, latency, memory usage, and scaling requirements for real-time analytics systems.

Performance Analysis

Events/sec:100,000
Events/partition:8,333
Total Latency:72ms
Window Memory:5859 MB
Total Memory:6066 MB
Latency Class:Sub-100ms
Recommended Partitions:13
Recommended Instances:4

Real-Time Analytics Architecture

Streaming Ingestion

  • • Apache Kafka / Amazon Kinesis
  • • Event schema validation
  • • Partitioning strategies
  • • Exactly-once semantics
  • • Dead letter queues

Stream Processing

  • • Apache Flink / Kafka Streams
  • • Windowing operations
  • • State management
  • • Watermarks & late data
  • • Checkpointing & recovery

Feature Streaming

  • • Real-time feature computation
  • • Feature stores (Feast, Tecton)
  • • Feature freshness monitoring
  • • Historical feature serving
  • • Feature drift detection

Online Inference

  • • Model serving platforms
  • • A/B testing frameworks
  • • Model versioning
  • • Circuit breakers
  • • Prediction caching

Streaming Analytics Engine

Real-Time Feature Pipeline

streaming_features.py
from kafka import KafkaConsumer, KafkaProducer
from typing import Dict, List, Any, Optional
import json
import time
import logging
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
from redis import Redis
import asyncio
from collections import defaultdict, deque

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class StreamingEvent:
    user_id: str
    event_type: str
    timestamp: float
    properties: Dict[str, Any]
    session_id: Optional[str] = None
    
@dataclass 
class ComputedFeature:
    feature_name: str
    feature_value: float
    user_id: str
    timestamp: float
    window_type: str
    ttl_seconds: int = 3600

class RealTimeFeatureEngine:
    """Production-ready real-time feature computation engine"""
    
    def __init__(self, 
                 kafka_bootstrap_servers: str = "localhost:9092",
                 redis_host: str = "localhost",
                 redis_port: int = 6379):
        
        # Kafka setup
        self.consumer = KafkaConsumer(
            'user_events',
            bootstrap_servers=kafka_bootstrap_servers,
            auto_offset_reset='latest',
            enable_auto_commit=True,
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        
        self.producer = KafkaProducer(
            bootstrap_servers=kafka_bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        # Redis for real-time state
        self.redis_client = Redis(host=redis_host, port=redis_port, decode_responses=True)
        
        # In-memory windows for fast computation
        self.sliding_windows = defaultdict(lambda: defaultdict(deque))
        self.window_configs = {
            '1m': 60,
            '5m': 300, 
            '15m': 900,
            '1h': 3600
        }
        
        # Feature computation functions
        self.feature_computers = {
            'click_rate_1m': self._compute_click_rate,
            'session_duration': self._compute_session_duration,
            'event_velocity': self._compute_event_velocity,
            'conversion_score': self._compute_conversion_score,
            'engagement_trend': self._compute_engagement_trend,
            'anomaly_score': self._compute_anomaly_score
        }
        
        # Performance monitoring
        self.processed_events = 0
        self.processing_times = deque(maxlen=1000)
        self.last_stats_time = time.time()
    
    async def process_stream(self):
        """Main streaming processing loop"""
        logger.info("Starting real-time feature processing...")
        
        try:
            for message in self.consumer:
                start_time = time.time()
                
                # Parse event
                event_data = message.value
                event = StreamingEvent(**event_data)
                
                # Compute features
                features = await self._compute_all_features(event)
                
                # Store features
                await self._store_features(features)
                
                # Publish computed features
                await self._publish_features(features)
                
                # Update performance metrics
                processing_time = time.time() - start_time
                self.processing_times.append(processing_time)
                self.processed_events += 1
                
                # Log stats every 1000 events
                if self.processed_events % 1000 == 0:
                    await self._log_performance_stats()
                    
        except Exception as e:
            logger.error(f"Stream processing error: {e}")
            raise
    
    async def _compute_all_features(self, event: StreamingEvent) -> List[ComputedFeature]:
        """Compute all features for an event"""
        features = []
        
        # Update sliding windows first
        await self._update_windows(event)
        
        # Compute each feature
        for feature_name, compute_func in self.feature_computers.items():
            try:
                feature_value = await compute_func(event)
                if feature_value is not None:
                    feature = ComputedFeature(
                        feature_name=feature_name,
                        feature_value=feature_value,
                        user_id=event.user_id,
                        timestamp=event.timestamp,
                        window_type='streaming'
                    )
                    features.append(feature)
            except Exception as e:
                logger.error(f"Error computing {feature_name}: {e}")
        
        return features
    
    async def _update_windows(self, event: StreamingEvent):
        """Update sliding windows with new event"""
        current_time = event.timestamp
        user_id = event.user_id
        
        # Add to all window types
        for window_name, window_size in self.window_configs.items():
            window = self.sliding_windows[user_id][window_name]
            
            # Add new event
            window.append((current_time, event))
            
            # Remove old events outside window
            cutoff_time = current_time - window_size
            while window and window[0][0] < cutoff_time:
                window.popleft()
    
    async def _compute_click_rate(self, event: StreamingEvent) -> Optional[float]:
        """Compute click rate in last 1 minute"""
        window = self.sliding_windows[event.user_id]['1m']
        if not window:
            return None
            
        total_events = len(window)
        click_events = sum(1 for _, e in window if e.event_type == 'click')
        
        return click_events / max(total_events, 1)
    
    async def _compute_session_duration(self, event: StreamingEvent) -> Optional[float]:
        """Compute current session duration"""
        if not event.session_id:
            return None
            
        # Get session start from Redis
        session_key = f"session:{event.session_id}"
        session_start = self.redis_client.get(session_key)
        
        if not session_start:
            # First event in session
            self.redis_client.setex(session_key, 3600, str(event.timestamp))
            return 0.0
        else:
            return event.timestamp - float(session_start)
    
    async def _compute_event_velocity(self, event: StreamingEvent) -> Optional[float]:
        """Compute events per minute velocity"""
        window = self.sliding_windows[event.user_id]['1m']
        if len(window) < 2:
            return 0.0
            
        # Events per minute
        return len(window)
    
    async def _compute_conversion_score(self, event: StreamingEvent) -> Optional[float]:
        """Compute conversion propensity score"""
        window_5m = self.sliding_windows[event.user_id]['5m']
        if not window_5m:
            return 0.0
        
        # Simple scoring based on event types and frequency
        event_weights = {
            'page_view': 0.1,
            'click': 0.3,
            'add_to_cart': 0.7,
            'purchase': 1.0,
            'search': 0.2
        }
        
        score = 0.0
        for _, e in window_5m:
            weight = event_weights.get(e.event_type, 0.0)
            score += weight
        
        # Normalize by window size
        return min(score / 10, 1.0)
    
    async def _compute_engagement_trend(self, event: StreamingEvent) -> Optional[float]:
        """Compute engagement trend (increasing/decreasing)"""
        window_15m = self.sliding_windows[event.user_id]['15m']
        if len(window_15m) < 10:
            return 0.0
        
        # Split into two halves and compare event rates
        events = list(window_15m)
        mid_point = len(events) // 2
        
        first_half_rate = mid_point / 450  # events per second in first 7.5 min
        second_half_rate = (len(events) - mid_point) / 450  # events per second in last 7.5 min
        
        if first_half_rate == 0:
            return 1.0 if second_half_rate > 0 else 0.0
            
        trend = (second_half_rate - first_half_rate) / first_half_rate
        return max(-1.0, min(1.0, trend))  # Clamp between -1 and 1
    
    async def _compute_anomaly_score(self, event: StreamingEvent) -> Optional[float]:
        """Compute anomaly score based on user behavior"""
        window_1h = self.sliding_windows[event.user_id]['1h']
        if len(window_1h) < 5:
            return 0.0
        
        # Get historical average from Redis
        avg_key = f"user_avg:{event.user_id}"
        historical_avg = self.redis_client.get(avg_key)
        
        current_rate = len(window_1h) / 3600  # events per second
        
        if historical_avg:
            hist_rate = float(historical_avg)
            if hist_rate > 0:
                deviation = abs(current_rate - hist_rate) / hist_rate
                anomaly_score = min(deviation, 1.0)
            else:
                anomaly_score = 1.0 if current_rate > 0 else 0.0
        else:
            anomaly_score = 0.0
        
        # Update historical average
        new_avg = current_rate if not historical_avg else (float(historical_avg) * 0.9 + current_rate * 0.1)
        self.redis_client.setex(avg_key, 86400, str(new_avg))
        
        return anomaly_score
    
    async def _store_features(self, features: List[ComputedFeature]):
        """Store computed features in Redis"""
        pipe = self.redis_client.pipeline()
        
        for feature in features:
            key = f"feature:{feature.user_id}:{feature.feature_name}"
            value = {
                'value': feature.feature_value,
                'timestamp': feature.timestamp,
                'window_type': feature.window_type
            }
            
            # Store with TTL
            pipe.setex(key, feature.ttl_seconds, json.dumps(value))
        
        pipe.execute()
    
    async def _publish_features(self, features: List[ComputedFeature]):
        """Publish computed features to Kafka"""
        for feature in features:
            message = {
                'user_id': feature.user_id,
                'feature_name': feature.feature_name,
                'feature_value': feature.feature_value,
                'timestamp': feature.timestamp,
                'window_type': feature.window_type
            }
            
            self.producer.send('computed_features', message)
        
        # Ensure delivery
        self.producer.flush()
    
    async def _log_performance_stats(self):
        """Log performance statistics"""
        current_time = time.time()
        elapsed = current_time - self.last_stats_time
        
        if self.processing_times:
            avg_processing_time = np.mean(list(self.processing_times))
            p95_processing_time = np.percentile(list(self.processing_times), 95)
            p99_processing_time = np.percentile(list(self.processing_times), 99)
        else:
            avg_processing_time = p95_processing_time = p99_processing_time = 0
        
        events_per_second = 1000 / elapsed if elapsed > 0 else 0
        
        logger.info(f"""
        Performance Stats:
        - Events processed: {self.processed_events}
        - Events/sec: {events_per_second:.1f}
        - Avg processing time: {avg_processing_time*1000:.2f}ms
        - P95 processing time: {p95_processing_time*1000:.2f}ms  
        - P99 processing time: {p99_processing_time*1000:.2f}ms
        - Active windows: {sum(len(windows) for windows in self.sliding_windows.values())}
        """)
        
        self.last_stats_time = current_time
    
    def get_feature(self, user_id: str, feature_name: str) -> Optional[Dict[str, Any]]:
        """Get latest feature value for a user"""
        key = f"feature:{user_id}:{feature_name}"
        value = self.redis_client.get(key)
        
        if value:
            return json.loads(value)
        return None
    
    def get_all_features(self, user_id: str) -> Dict[str, Any]:
        """Get all features for a user"""
        pattern = f"feature:{user_id}:*"
        keys = self.redis_client.keys(pattern)
        
        features = {}
        for key in keys:
            feature_name = key.split(':')[-1]
            value = self.redis_client.get(key)
            if value:
                features[feature_name] = json.loads(value)
        
        return features

# Usage example
async def run_feature_engine():
    engine = RealTimeFeatureEngine()
    
    try:
        await engine.process_stream()
    except KeyboardInterrupt:
        logger.info("Shutting down feature engine...")
    except Exception as e:
        logger.error(f"Feature engine error: {e}")
        raise

if __name__ == "__main__":
    asyncio.run(run_feature_engine())

Online Learning System

Streaming Model Update Pipeline

online_learning.py
import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score
from typing import Dict, List, Any, Optional, Tuple
import joblib
import json
import time
import logging
from dataclasses import dataclass
from threading import Lock
import asyncio
from collections import deque
import redis
from kafka import KafkaConsumer, KafkaProducer

logger = logging.getLogger(__name__)

@dataclass
class TrainingExample:
    features: np.ndarray
    label: float
    timestamp: float
    weight: float = 1.0
    example_id: Optional[str] = None

@dataclass
class ModelMetrics:
    accuracy: float
    precision: float
    recall: float
    training_examples: int
    last_updated: float
    model_version: str

class OnlineLearningSystem:
    """Production online learning system with concept drift detection"""
    
    def __init__(self, 
                 model_name: str = "fraud_detector",
                 kafka_bootstrap_servers: str = "localhost:9092",
                 redis_host: str = "localhost",
                 learning_rate: float = 0.01,
                 batch_size: int = 100):
        
        self.model_name = model_name
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        
        # Online learning models
        self.model = SGDClassifier(
            loss='log_loss',  # Logistic regression
            learning_rate='adaptive',
            eta0=learning_rate,
            random_state=42
        )
        
        self.scaler = StandardScaler()
        self.model_lock = Lock()
        self.is_initialized = False
        
        # Kafka setup
        self.consumer = KafkaConsumer(
            'training_examples',
            bootstrap_servers=kafka_bootstrap_servers,
            auto_offset_reset='latest',
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        
        self.producer = KafkaProducer(
            bootstrap_servers=kafka_bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        # Redis for model storage and metrics
        self.redis_client = redis.Redis(host=redis_host, decode_responses=True)
        
        # Performance tracking
        self.training_buffer = deque(maxlen=batch_size)
        self.recent_performance = deque(maxlen=1000)
        self.drift_detection_window = deque(maxlen=500)
        self.model_version = 1
        self.examples_processed = 0
        
        # Concept drift detection
        self.baseline_accuracy = None
        self.drift_threshold = 0.05  # 5% accuracy drop triggers retraining
        self.min_examples_for_drift = 100
    
    async def start_online_learning(self):
        """Start the online learning process"""
        logger.info(f"Starting online learning for {self.model_name}")
        
        # Load existing model if available
        await self._load_model()
        
        try:
            for message in self.consumer:
                example_data = message.value
                example = TrainingExample(
                    features=np.array(example_data['features']),
                    label=example_data['label'],
                    timestamp=example_data.get('timestamp', time.time()),
                    weight=example_data.get('weight', 1.0),
                    example_id=example_data.get('example_id')
                )
                
                # Add to training buffer
                await self._add_training_example(example)
                
                # Check if we should update the model
                if len(self.training_buffer) >= self.batch_size:
                    await self._update_model()
                
                # Check for concept drift
                if len(self.drift_detection_window) >= self.min_examples_for_drift:
                    await self._check_concept_drift()
                
                self.examples_processed += 1
                
        except Exception as e:
            logger.error(f"Online learning error: {e}")
            raise
    
    async def _add_training_example(self, example: TrainingExample):
        """Add training example to buffer"""
        self.training_buffer.append(example)
        
        # Also track for drift detection
        if self.is_initialized:
            prediction = await self._predict_single(example.features)
            is_correct = (prediction > 0.5) == (example.label > 0.5)
            self.drift_detection_window.append(is_correct)
    
    async def _update_model(self):
        """Update model with batched examples"""
        if not self.training_buffer:
            return
        
        # Prepare batch data
        features_batch = np.array([ex.features for ex in self.training_buffer])
        labels_batch = np.array([ex.label for ex in self.training_buffer])
        weights_batch = np.array([ex.weight for ex in self.training_buffer])
        
        with self.model_lock:
            try:
                if not self.is_initialized:
                    # First batch - fit scaler and initialize model
                    self.scaler.fit(features_batch)
                    features_scaled = self.scaler.transform(features_batch)
                    self.model.fit(features_scaled, labels_batch, sample_weight=weights_batch)
                    self.is_initialized = True
                    logger.info("Model initialized with first batch")
                else:
                    # Subsequent batches - partial fit
                    features_scaled = self.scaler.transform(features_batch)
                    self.model.partial_fit(features_scaled, labels_batch, sample_weight=weights_batch)
                
                # Update performance metrics
                await self._update_performance_metrics(features_scaled, labels_batch)
                
                # Save model periodically
                if self.examples_processed % 1000 == 0:
                    await self._save_model()
                
                # Clear buffer
                self.training_buffer.clear()
                
                logger.info(f"Model updated with batch of {len(features_batch)} examples")
                
            except Exception as e:
                logger.error(f"Model update error: {e}")
                # Clear buffer on error to prevent infinite loop
                self.training_buffer.clear()
    
    async def _update_performance_metrics(self, features: np.ndarray, labels: np.ndarray):
        """Update and track model performance metrics"""
        if not self.is_initialized:
            return
        
        # Make predictions
        predictions = self.model.predict(features)
        probabilities = self.model.predict_proba(features)[:, 1]
        
        # Calculate metrics
        accuracy = accuracy_score(labels, predictions)
        precision = precision_score(labels, predictions, zero_division=0)
        recall = recall_score(labels, predictions, zero_division=0)
        
        # Store metrics
        metrics = ModelMetrics(
            accuracy=accuracy,
            precision=precision,
            recall=recall,
            training_examples=self.examples_processed,
            last_updated=time.time(),
            model_version=str(self.model_version)
        )
        
        # Add to recent performance tracking
        self.recent_performance.append(accuracy)
        
        # Store in Redis
        await self._store_metrics(metrics)
        
        # Set baseline if not set
        if self.baseline_accuracy is None:
            self.baseline_accuracy = accuracy
            logger.info(f"Baseline accuracy set to {accuracy:.3f}")
    
    async def _check_concept_drift(self):
        """Check for concept drift and trigger retraining if needed"""
        if not self.baseline_accuracy or len(self.drift_detection_window) < self.min_examples_for_drift:
            return
        
        # Calculate recent accuracy
        recent_accuracy = sum(self.drift_detection_window) / len(self.drift_detection_window)
        
        # Check for significant drop
        accuracy_drop = self.baseline_accuracy - recent_accuracy
        
        if accuracy_drop > self.drift_threshold:
            logger.warning(f"Concept drift detected! Accuracy dropped by {accuracy_drop:.3f}")
            await self._handle_concept_drift(recent_accuracy)
        
        # Update baseline with weighted average
        self.baseline_accuracy = 0.95 * self.baseline_accuracy + 0.05 * recent_accuracy
    
    async def _handle_concept_drift(self, current_accuracy: float):
        """Handle detected concept drift"""
        logger.info("Handling concept drift - creating new model version")
        
        # Create new model version
        self.model_version += 1
        
        # Reset model with more aggressive learning
        with self.model_lock:
            self.model = SGDClassifier(
                loss='log_loss',
                learning_rate='adaptive',
                eta0=self.learning_rate * 2,  # Increase learning rate
                random_state=42
            )
            self.scaler = StandardScaler()
            self.is_initialized = False
        
        # Reset drift detection
        self.drift_detection_window.clear()
        self.baseline_accuracy = None
        
        # Notify about drift
        await self._notify_concept_drift(current_accuracy)
    
    async def _notify_concept_drift(self, accuracy: float):
        """Notify about concept drift event"""
        notification = {
            'event': 'concept_drift_detected',
            'model_name': self.model_name,
            'accuracy': accuracy,
            'new_model_version': self.model_version,
            'timestamp': time.time()
        }
        
        self.producer.send('model_events', notification)
        self.producer.flush()
    
    async def predict(self, features: np.ndarray) -> Tuple[float, float]:
        """Make prediction with confidence"""
        if not self.is_initialized:
            return 0.5, 0.0  # Default prediction
        
        with self.model_lock:
            features_scaled = self.scaler.transform(features.reshape(1, -1))
            prediction = self.model.predict_proba(features_scaled)[0]
            confidence = max(prediction) - min(prediction)
            return prediction[1], confidence  # Return probability of positive class
    
    async def _predict_single(self, features: np.ndarray) -> float:
        """Single prediction for internal use"""
        if not self.is_initialized:
            return 0.5
        
        with self.model_lock:
            features_scaled = self.scaler.transform(features.reshape(1, -1))
            return self.model.predict_proba(features_scaled)[0][1]
    
    async def _save_model(self):
        """Save model to Redis"""
        if not self.is_initialized:
            return
        
        try:
            with self.model_lock:
                # Serialize model and scaler
                model_data = {
                    'model': joblib.dump(self.model, None),
                    'scaler': joblib.dump(self.scaler, None),
                    'model_version': self.model_version,
                    'examples_processed': self.examples_processed,
                    'last_saved': time.time()
                }
            
            # Store in Redis
            key = f"model:{self.model_name}:latest"
            self.redis_client.set(key, json.dumps(model_data, default=str))
            
            logger.info(f"Model saved to Redis (version {self.model_version})")
            
        except Exception as e:
            logger.error(f"Model save error: {e}")
    
    async def _load_model(self):
        """Load model from Redis"""
        try:
            key = f"model:{self.model_name}:latest"
            model_data = self.redis_client.get(key)
            
            if model_data:
                data = json.loads(model_data)
                
                with self.model_lock:
                    self.model = joblib.load(data['model'])
                    self.scaler = joblib.load(data['scaler'])
                    self.model_version = data['model_version']
                    self.examples_processed = data['examples_processed']
                    self.is_initialized = True
                
                logger.info(f"Model loaded from Redis (version {self.model_version})")
            else:
                logger.info("No existing model found, starting fresh")
                
        except Exception as e:
            logger.error(f"Model load error: {e}")
    
    async def _store_metrics(self, metrics: ModelMetrics):
        """Store performance metrics"""
        key = f"metrics:{self.model_name}:latest"
        self.redis_client.set(key, json.dumps(metrics.__dict__))
        
        # Also store time series
        ts_key = f"metrics:{self.model_name}:timeseries"
        self.redis_client.lpush(ts_key, json.dumps({
            **metrics.__dict__,
            'timestamp': time.time()
        }))
        
        # Keep only last 1000 entries
        self.redis_client.ltrim(ts_key, 0, 999)
    
    def get_metrics(self) -> Optional[ModelMetrics]:
        """Get current model metrics"""
        key = f"metrics:{self.model_name}:latest"
        data = self.redis_client.get(key)
        
        if data:
            metrics_dict = json.loads(data)
            return ModelMetrics(**metrics_dict)
        return None
    
    def get_model_info(self) -> Dict[str, Any]:
        """Get model information and stats"""
        metrics = self.get_metrics()
        
        return {
            'model_name': self.model_name,
            'model_version': self.model_version,
            'examples_processed': self.examples_processed,
            'is_initialized': self.is_initialized,
            'baseline_accuracy': self.baseline_accuracy,
            'recent_performance_avg': np.mean(list(self.recent_performance)) if self.recent_performance else None,
            'current_metrics': metrics.__dict__ if metrics else None,
            'drift_detection_size': len(self.drift_detection_window)
        }

# Usage example
async def run_online_learning():
    system = OnlineLearningSystem(model_name="fraud_detector")
    
    try:
        await system.start_online_learning()
    except KeyboardInterrupt:
        logger.info("Shutting down online learning system...")
    except Exception as e:
        logger.error(f"Online learning error: {e}")

if __name__ == "__main__":
    asyncio.run(run_online_learning())

Production Analytics API

Real-Time Analytics API Service

realtime_analytics_api.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from typing import Dict, List, Any, Optional
import asyncio
import json
import time
import logging
from dataclasses import asdict
import numpy as np
import redis
from kafka import KafkaProducer
import uvicorn
from contextlib import asynccontextmanager

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class FeatureRequest(BaseModel):
    user_id: str
    features: Optional[List[str]] = None  # If None, return all features

class PredictionRequest(BaseModel):
    user_id: str
    model_name: str = "fraud_detector"
    features: Optional[Dict[str, float]] = None  # Override features

class AnalyticsEvent(BaseModel):
    user_id: str
    event_type: str
    properties: Dict[str, Any]
    session_id: Optional[str] = None
    timestamp: Optional[float] = Field(default_factory=time.time)

class BatchPredictionRequest(BaseModel):
    requests: List[PredictionRequest]
    return_features: bool = False

class RealTimeAnalyticsService:
    """Production real-time analytics service"""
    
    def __init__(self):
        # Redis connection for features and models
        self.redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
        
        # Kafka producer for events
        self.kafka_producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        # Cache for frequently accessed features
        self.feature_cache = {}
        self.cache_ttl = 60  # seconds
        
        # Performance monitoring
        self.request_count = 0
        self.total_response_time = 0.0
        
        # Models registry
        self.models = {}
        
    async def get_features(self, user_id: str, feature_names: Optional[List[str]] = None) -> Dict[str, Any]:
        """Get real-time features for a user"""
        try:
            start_time = time.time()
            
            # Check cache first
            cache_key = f"features_cache:{user_id}"
            cached_features = self.feature_cache.get(cache_key)
            
            if cached_features and time.time() - cached_features['timestamp'] < self.cache_ttl:
                features = cached_features['features']
            else:
                # Get from Redis
                if feature_names:
                    features = {}
                    for feature_name in feature_names:
                        key = f"feature:{user_id}:{feature_name}"
                        value = self.redis_client.get(key)
                        if value:
                            features[feature_name] = json.loads(value)
                else:
                    # Get all features
                    pattern = f"feature:{user_id}:*"
                    keys = self.redis_client.keys(pattern)
                    features = {}
                    
                    for key in keys:
                        feature_name = key.split(':')[-1]
                        value = self.redis_client.get(key)
                        if value:
                            features[feature_name] = json.loads(value)
                
                # Cache the result
                self.feature_cache[cache_key] = {
                    'features': features,
                    'timestamp': time.time()
                }
            
            processing_time = time.time() - start_time
            self._update_metrics(processing_time)
            
            return {
                'user_id': user_id,
                'features': features,
                'retrieved_at': time.time(),
                'processing_time_ms': processing_time * 1000,
                'cache_hit': cached_features is not None
            }
            
        except Exception as e:
            logger.error(f"Feature retrieval error for {user_id}: {e}")
            raise HTTPException(status_code=500, detail=str(e))
    
    async def make_prediction(self, request: PredictionRequest) -> Dict[str, Any]:
        """Make real-time prediction for a user"""
        try:
            start_time = time.time()
            
            # Get features (either provided or from feature store)
            if request.features:
                features = request.features
            else:
                feature_result = await self.get_features(request.user_id)
                raw_features = feature_result['features']
                # Extract just the values
                features = {name: data['value'] for name, data in raw_features.items() if 'value' in data}
            
            if not features:
                raise HTTPException(status_code=400, detail="No features available for prediction")
            
            # Get model metrics to check if model is healthy
            metrics_key = f"metrics:{request.model_name}:latest"
            metrics_data = self.redis_client.get(metrics_key)
            
            if not metrics_data:
                raise HTTPException(status_code=404, detail=f"Model {request.model_name} not found")
            
            metrics = json.loads(metrics_data)
            
            # Simple prediction logic (in production, this would call the actual model)
            # For demonstration, we'll use the features to compute a score
            prediction_score = self._compute_prediction_score(features)
            confidence = min(0.95, max(0.05, abs(prediction_score - 0.5) * 2))
            
            processing_time = time.time() - start_time
            self._update_metrics(processing_time)
            
            result = {
                'user_id': request.user_id,
                'model_name': request.model_name,
                'prediction': prediction_score,
                'confidence': confidence,
                'features_used': list(features.keys()),
                'model_version': metrics['model_version'],
                'predicted_at': time.time(),
                'processing_time_ms': processing_time * 1000
            }
            
            # Log prediction for model monitoring
            await self._log_prediction(result)
            
            return result
            
        except HTTPException:
            raise
        except Exception as e:
            logger.error(f"Prediction error for {request.user_id}: {e}")
            raise HTTPException(status_code=500, detail=str(e))
    
    async def batch_predictions(self, request: BatchPredictionRequest) -> Dict[str, Any]:
        """Make batch predictions"""
        try:
            start_time = time.time()
            
            # Process all predictions concurrently
            tasks = [self.make_prediction(pred_req) for pred_req in request.requests]
            predictions = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Separate successful predictions from errors
            successful = []
            errors = []
            
            for i, pred in enumerate(predictions):
                if isinstance(pred, Exception):
                    errors.append({
                        'request_index': i,
                        'user_id': request.requests[i].user_id,
                        'error': str(pred)
                    })
                else:
                    successful.append(pred)
            
            processing_time = time.time() - start_time
            
            return {
                'predictions': successful,
                'errors': errors,
                'total_requests': len(request.requests),
                'successful_count': len(successful),
                'error_count': len(errors),
                'batch_processing_time_ms': processing_time * 1000
            }
            
        except Exception as e:
            logger.error(f"Batch prediction error: {e}")
            raise HTTPException(status_code=500, detail=str(e))
    
    async def ingest_event(self, event: AnalyticsEvent, background_tasks: BackgroundTasks):
        """Ingest real-time analytics event"""
        try:
            # Add to Kafka for processing
            event_data = event.dict()
            self.kafka_producer.send('user_events', event_data)
            
            # Also trigger immediate feature computation for critical features
            background_tasks.add_task(self._compute_immediate_features, event)
            
            return {
                'status': 'accepted',
                'event_id': f"{event.user_id}_{event.timestamp}",
                'timestamp': event.timestamp
            }
            
        except Exception as e:
            logger.error(f"Event ingestion error: {e}")
            raise HTTPException(status_code=500, detail=str(e))
    
    async def _compute_immediate_features(self, event: AnalyticsEvent):
        """Compute immediate features for low-latency use cases"""
        try:
            # Simple immediate feature computation
            user_id = event.user_id
            
            # Update event count
            count_key = f"feature:{user_id}:event_count_realtime"
            current_count = self.redis_client.get(count_key)
            new_count = (int(current_count) if current_count else 0) + 1
            
            feature_data = {
                'value': new_count,
                'timestamp': event.timestamp,
                'window_type': 'immediate'
            }
            
            self.redis_client.setex(count_key, 3600, json.dumps(feature_data))
            
            # Update last event type
            event_type_key = f"feature:{user_id}:last_event_type"
            event_type_data = {
                'value': event.event_type,
                'timestamp': event.timestamp,
                'window_type': 'immediate'
            }
            
            self.redis_client.setex(event_type_key, 3600, json.dumps(event_type_data))
            
        except Exception as e:
            logger.error(f"Immediate feature computation error: {e}")
    
    def _compute_prediction_score(self, features: Dict[str, Any]) -> float:
        """Simple prediction score computation"""
        # In production, this would use the actual trained model
        # For demo, we'll use a simple weighted combination
        
        weights = {
            'click_rate_1m': 0.3,
            'session_duration': 0.2,
            'event_velocity': 0.15,
            'conversion_score': 0.25,
            'engagement_trend': 0.1
        }
        
        score = 0.5  # baseline
        total_weight = 0
        
        for feature_name, weight in weights.items():
            if feature_name in features:
                value = features[feature_name]
                if isinstance(value, (int, float)):
                    score += weight * min(1.0, max(0.0, value))
                    total_weight += weight
        
        # Normalize
        if total_weight > 0:
            score = score / (0.5 + total_weight)  # Adjust for baseline
        
        return max(0.0, min(1.0, score))
    
    async def _log_prediction(self, prediction_result: Dict[str, Any]):
        """Log prediction for monitoring and feedback"""
        try:
            log_entry = {
                'type': 'prediction',
                'timestamp': time.time(),
                **prediction_result
            }
            
            self.kafka_producer.send('prediction_logs', log_entry)
            self.kafka_producer.flush()
            
        except Exception as e:
            logger.error(f"Prediction logging error: {e}")
    
    def _update_metrics(self, processing_time: float):
        """Update service performance metrics"""
        self.request_count += 1
        self.total_response_time += processing_time
    
    def get_service_stats(self) -> Dict[str, Any]:
        """Get service performance statistics"""
        avg_response_time = (
            self.total_response_time / max(1, self.request_count)
        )
        
        return {
            'total_requests': self.request_count,
            'average_response_time_ms': avg_response_time * 1000,
            'cache_size': len(self.feature_cache),
            'models_loaded': len(self.models),
            'uptime_seconds': time.time() - self.start_time if hasattr(self, 'start_time') else 0
        }

# Initialize service
analytics_service = RealTimeAnalyticsService()

@asynccontextmanager
async def lifespan(app: FastAPI):
    analytics_service.start_time = time.time()
    logger.info("Real-Time Analytics Service started")
    yield
    logger.info("Real-Time Analytics Service shutting down")
    analytics_service.kafka_producer.close()

# FastAPI app
app = FastAPI(
    title="Real-Time Analytics Service",
    description="Production real-time analytics and ML inference API",
    version="1.0.0",
    lifespan=lifespan
)

@app.post("/features", response_model=Dict[str, Any])
async def get_user_features(request: FeatureRequest):
    """Get real-time features for a user"""
    return await analytics_service.get_features(request.user_id, request.features)

@app.post("/predict", response_model=Dict[str, Any])
async def make_prediction(request: PredictionRequest):
    """Make real-time prediction"""
    return await analytics_service.make_prediction(request)

@app.post("/predict/batch", response_model=Dict[str, Any])
async def batch_predict(request: BatchPredictionRequest):
    """Make batch predictions"""
    return await analytics_service.batch_predictions(request)

@app.post("/events", response_model=Dict[str, Any])
async def ingest_event(event: AnalyticsEvent, background_tasks: BackgroundTasks):
    """Ingest analytics event for real-time processing"""
    return await analytics_service.ingest_event(event, background_tasks)

@app.get("/stats")
async def get_stats():
    """Get service performance statistics"""
    return analytics_service.get_service_stats()

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    try:
        # Test Redis connection
        analytics_service.redis_client.ping()
        return {
            "status": "healthy",
            "timestamp": time.time(),
            "services": {
                "redis": "connected",
                "kafka": "connected"
            }
        }
    except Exception as e:
        raise HTTPException(status_code=503, detail=f"Service unhealthy: {e}")

if __name__ == "__main__":
    uvicorn.run(
        "realtime_analytics_api:app",
        host="0.0.0.0",
        port=8000,
        workers=1
    )

Real-World Examples

Netflix Real-Time

Real-time recommendation updates, A/B testing, and personalization with sub-100ms latency for 230M+ users.

  • • 1.5M events/sec peak processing
  • • Online learning for recommendations
  • • Real-time feature engineering

Uber Real-Time Pricing

Dynamic pricing engine processing supply/demand signals with ML-powered surge pricing across global markets.

  • • 100k pricing decisions/sec
  • • Geographic feature streaming
  • • Multi-model ensemble serving

PayPal Fraud Detection

Real-time fraud scoring system processing payment transactions with adaptive ML models and instant decisions.

  • • Sub-10ms fraud scoring
  • • Online model adaptation
  • • 99.99% availability SLA

Real-Time Analytics Best Practices

✅ Do's

  • Design for horizontal scalability from day one
  • Implement comprehensive monitoring and alerting
  • Use feature stores for consistent feature serving
  • Implement concept drift detection and handling
  • Design for graceful degradation under load
  • Use exactly-once semantics for critical data

❌ Don'ts

  • Don't ignore backpressure and flow control
  • Don't use blocking operations in streaming code
  • Don't neglect late data and out-of-order handling
  • Don't skip model versioning and rollback plans
  • Don't underestimate memory management needs
  • Don't ignore security in streaming pipelines
No quiz questions available
Quiz ID "real-time-analytics-systems" not found