Skip to main contentSkip to user menuSkip to navigation

Adaptive Security Architecture

Design intelligent security systems that dynamically adjust protection based on behavioral analysis, threat intelligence, and contextual risk factors

45 min readAdvanced
Not Started
Loading...

What is Adaptive Security Architecture?

Adaptive security architecture creates intelligent, self-adjusting security systems that continuously evaluate risk and modify protection measures in real-time. Unlike traditional static security models, these systems use behavioral analysis, machine learning, and contextual awareness to provide dynamic, risk-appropriate security responses.

Core Components:

  • Behavioral Analytics: ML-powered user and system behavior modeling
  • Contextual Risk Engine: Multi-factor risk assessment and scoring
  • Dynamic Policy Engine: Real-time security policy adjustment
  • Threat Intelligence: Live threat feeds and pattern recognition
  • Automated Response: Orchestrated security incident response

Interactive Adaptive Security Calculator

85%
30s
75%
4 factors
60%

Adaptive Security Metrics

Security Score:70/100
False Positive Rate:1.5%
Threat Coverage:100%
Assessment:
Needs Enhancement

Adaptive Security Architecture

Intelligence Layer

Threat intelligence and behavioral analytics

  • • Machine learning models
  • • Threat intelligence feeds
  • • Behavioral baselines
  • • Anomaly detection engines

Risk Assessment Layer

Contextual risk evaluation and scoring

  • • Multi-factor risk scoring
  • • Contextual analysis
  • • Trust scoring algorithms
  • • Risk trend analysis

Policy Engine Layer

Dynamic policy generation and enforcement

  • • Rule generation algorithms
  • • Policy adaptation logic
  • • Enforcement mechanisms
  • • Compliance monitoring

Response Layer

Automated incident response and orchestration

  • • Automated containment
  • • Threat mitigation
  • • Incident orchestration
  • • Recovery procedures

Monitoring Layer

Continuous security posture assessment

  • • Real-time monitoring
  • • Security metrics tracking
  • • Performance analytics
  • • Feedback loops

Integration Layer

Security ecosystem integration and APIs

  • • SIEM integration
  • • Security tool orchestration
  • • API connectivity
  • • Data sharing protocols

Production Implementation

Adaptive Security Engine (Python)

# Adaptive Security Architecture Implementation
import asyncio
import numpy as np
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import hashlib
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

class ThreatLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class SecurityAction(Enum):
    ALLOW = "allow"
    MONITOR = "monitor"
    CHALLENGE = "challenge"
    BLOCK = "block"
    ISOLATE = "isolate"

@dataclass
class UserContext:
    user_id: str
    location: str
    device_id: str
    network: str
    time_of_day: int
    access_patterns: List[str]
    recent_activities: List[Dict]
    trust_score: float = 0.5

@dataclass
class ThreatIndicator:
    indicator_type: str
    value: Any
    severity: ThreatLevel
    confidence: float
    source: str
    timestamp: datetime
    context: Dict = field(default_factory=dict)

@dataclass
class SecurityEvent:
    event_id: str
    event_type: str
    user_context: UserContext
    threat_indicators: List[ThreatIndicator]
    risk_score: float
    recommended_action: SecurityAction
    timestamp: datetime
    metadata: Dict = field(default_factory=dict)

class AdaptiveSecurityEngine:
    def __init__(self, config: Dict):
        self.config = config
        self.behavioral_models = {}
        self.threat_intelligence = ThreatIntelligenceEngine()
        self.risk_calculator = RiskAssessmentEngine()
        self.policy_engine = DynamicPolicyEngine()
        self.response_orchestrator = SecurityResponseOrchestrator()
        
        # ML models for behavioral analysis
        self.anomaly_detector = IsolationForest(contamination=0.1, random_state=42)
        self.scaler = StandardScaler()
        
        # Security metrics tracking
        self.security_metrics = SecurityMetricsTracker()
        
    async def process_security_event(self, event_data: Dict) -> SecurityEvent:
        """Process incoming security event with adaptive analysis"""
        
        # Extract user context
        user_context = await self.extract_user_context(event_data)
        
        # Perform behavioral analysis
        behavioral_score = await self.analyze_behavior(user_context, event_data)
        
        # Gather threat indicators
        threat_indicators = await self.gather_threat_indicators(event_data)
        
        # Calculate contextual risk score
        risk_score = await self.risk_calculator.calculate_risk(
            user_context, threat_indicators, behavioral_score
        )
        
        # Determine adaptive response
        recommended_action = await self.determine_adaptive_action(
            risk_score, user_context, threat_indicators
        )
        
        # Create security event
        security_event = SecurityEvent(
            event_id=self.generate_event_id(event_data),
            event_type=event_data.get('event_type', 'unknown'),
            user_context=user_context,
            threat_indicators=threat_indicators,
            risk_score=risk_score,
            recommended_action=recommended_action,
            timestamp=datetime.now(),
            metadata={
                'behavioral_score': behavioral_score,
                'processing_time_ms': 0,  # Will be updated
                'model_version': self.config.get('model_version', '1.0')
            }
        )
        
        # Execute adaptive response
        await self.response_orchestrator.execute_response(security_event)
        
        # Update behavioral models
        await self.update_behavioral_models(user_context, security_event)
        
        # Track metrics
        await self.security_metrics.track_event(security_event)
        
        return security_event
    
    async def extract_user_context(self, event_data: Dict) -> UserContext:
        """Extract comprehensive user context for risk assessment"""
        
        user_id = event_data.get('user_id', 'unknown')
        
        # Get user's historical data
        historical_data = await self.get_user_history(user_id)
        
        # Extract contextual information
        context = UserContext(
            user_id=user_id,
            location=event_data.get('source_ip', '0.0.0.0'),
            device_id=event_data.get('device_id', 'unknown'),
            network=event_data.get('network_info', 'unknown'),
            time_of_day=datetime.now().hour,
            access_patterns=historical_data.get('access_patterns', []),
            recent_activities=historical_data.get('recent_activities', []),
            trust_score=historical_data.get('trust_score', 0.5)
        )
        
        return context
    
    async def analyze_behavior(self, context: UserContext, event_data: Dict) -> float:
        """Perform behavioral analysis using ML models"""
        
        try:
            # Extract behavioral features
            features = self.extract_behavioral_features(context, event_data)
            
            # Get or create user behavioral model
            user_model = await self.get_user_behavioral_model(context.user_id)
            
            if user_model is None:
                # New user - establish baseline
                return await self.establish_behavioral_baseline(context, features)
            
            # Normalize features
            normalized_features = self.scaler.transform([features])
            
            # Calculate anomaly score
            anomaly_score = user_model.decision_function(normalized_features)[0]
            
            # Convert to behavioral score (0-1, where 1 is normal)
            behavioral_score = max(0, min(1, (anomaly_score + 0.5) / 1.0))
            
            return behavioral_score
            
        except Exception as e:
            print(f"Behavioral analysis error: {e}")
            return 0.5  # Default neutral score
    
    def extract_behavioral_features(self, context: UserContext, event_data: Dict) -> List[float]:
        """Extract numerical features for behavioral analysis"""
        
        features = []
        
        # Temporal features
        features.append(context.time_of_day / 24.0)  # Hour of day normalized
        features.append(datetime.now().weekday() / 6.0)  # Day of week normalized
        
        # Location features (simplified)
        location_hash = hash(context.location) % 1000 / 1000.0
        features.append(location_hash)
        
        # Device features
        device_hash = hash(context.device_id) % 1000 / 1000.0
        features.append(device_hash)
        
        # Network features
        network_hash = hash(context.network) % 1000 / 1000.0
        features.append(network_hash)
        
        # Activity features
        features.append(len(context.recent_activities) / 100.0)  # Activity volume
        features.append(context.trust_score)  # Existing trust score
        
        # Event-specific features
        features.append(len(event_data.get('resource_accessed', '')) / 100.0)
        features.append(event_data.get('data_size_kb', 0) / 10000.0)  # Normalized data size
        
        return features
    
    async def get_user_behavioral_model(self, user_id: str) -> Optional[IsolationForest]:
        """Get existing behavioral model for user"""
        return self.behavioral_models.get(user_id)
    
    async def establish_behavioral_baseline(self, context: UserContext, features: List[float]) -> float:
        """Establish behavioral baseline for new user"""
        
        # For new users, create a simple model based on similar users
        # In production, this would use clustering or transfer learning
        
        similar_users = await self.find_similar_users(context)
        
        if similar_users:
            # Use average behavioral score from similar users
            avg_score = sum(u['behavioral_score'] for u in similar_users) / len(similar_users)
            return max(0.3, min(0.7, avg_score))  # Conservative range for new users
        
        return 0.5  # Neutral score for completely new users
    
    async def find_similar_users(self, context: UserContext) -> List[Dict]:
        """Find users with similar behavioral patterns"""
        
        # Simplified similarity matching
        # In production, use more sophisticated clustering
        
        similar_users = []
        for user_id, model in self.behavioral_models.items():
            if user_id != context.user_id:
                # Simple similarity based on context overlap
                similarity_score = self.calculate_context_similarity(context, user_id)
                if similarity_score > 0.7:
                    similar_users.append({
                        'user_id': user_id,
                        'similarity': similarity_score,
                        'behavioral_score': 0.6  # Placeholder
                    })
        
        return sorted(similar_users, key=lambda x: x['similarity'], reverse=True)[:5]
    
    def calculate_context_similarity(self, context: UserContext, other_user_id: str) -> float:
        """Calculate similarity between user contexts"""
        
        # Simplified similarity calculation
        # In production, use more sophisticated methods
        
        similarity_factors = []
        
        # Time-based similarity (same general work hours)
        time_similarity = 1.0 - abs(context.time_of_day - 12) / 12.0
        similarity_factors.append(time_similarity * 0.3)
        
        # Trust score similarity
        other_trust = 0.5  # Would fetch from database
        trust_similarity = 1.0 - abs(context.trust_score - other_trust)
        similarity_factors.append(trust_similarity * 0.4)
        
        # Activity pattern similarity (simplified)
        activity_similarity = 0.7  # Placeholder
        similarity_factors.append(activity_similarity * 0.3)
        
        return sum(similarity_factors)
    
    async def gather_threat_indicators(self, event_data: Dict) -> List[ThreatIndicator]:
        """Gather threat indicators from multiple sources"""
        
        indicators = []
        
        # Check threat intelligence feeds
        ti_indicators = await self.threat_intelligence.check_indicators(event_data)
        indicators.extend(ti_indicators)
        
        # Check for suspicious patterns
        pattern_indicators = await self.detect_suspicious_patterns(event_data)
        indicators.extend(pattern_indicators)
        
        # Check for policy violations
        policy_indicators = await self.check_policy_violations(event_data)
        indicators.extend(policy_indicators)
        
        return indicators
    
    async def detect_suspicious_patterns(self, event_data: Dict) -> List[ThreatIndicator]:
        """Detect suspicious patterns in event data"""
        
        indicators = []
        
        # Check for unusual access times
        if datetime.now().hour < 6 or datetime.now().hour > 22:
            indicators.append(ThreatIndicator(
                indicator_type="unusual_time",
                value=datetime.now().hour,
                severity=ThreatLevel.LOW,
                confidence=0.3,
                source="pattern_detection",
                timestamp=datetime.now(),
                context={"description": "Access outside normal business hours"}
            ))
        
        # Check for unusual data volume
        data_size = event_data.get('data_size_kb', 0)
        if data_size > 10000:  # 10MB+
            indicators.append(ThreatIndicator(
                indicator_type="large_data_access",
                value=data_size,
                severity=ThreatLevel.MEDIUM,
                confidence=0.6,
                source="pattern_detection",
                timestamp=datetime.now(),
                context={"description": "Unusually large data access"}
            ))
        
        # Check for rapid successive access
        access_frequency = event_data.get('recent_access_count', 0)
        if access_frequency > 50:
            indicators.append(ThreatIndicator(
                indicator_type="high_frequency_access",
                value=access_frequency,
                severity=ThreatLevel.HIGH,
                confidence=0.8,
                source="pattern_detection",
                timestamp=datetime.now(),
                context={"description": "Unusually high access frequency"}
            ))
        
        return indicators
    
    async def check_policy_violations(self, event_data: Dict) -> List[ThreatIndicator]:
        """Check for security policy violations"""
        
        indicators = []
        
        # Check access permissions
        if not event_data.get('authorized_access', True):
            indicators.append(ThreatIndicator(
                indicator_type="unauthorized_access",
                value=event_data.get('resource_accessed', ''),
                severity=ThreatLevel.HIGH,
                confidence=0.9,
                source="policy_engine",
                timestamp=datetime.now(),
                context={"description": "Access to unauthorized resource"}
            ))
        
        # Check for privilege escalation attempts
        if event_data.get('privilege_escalation', False):
            indicators.append(ThreatIndicator(
                indicator_type="privilege_escalation",
                value=event_data.get('requested_privilege', ''),
                severity=ThreatLevel.CRITICAL,
                confidence=0.95,
                source="policy_engine",
                timestamp=datetime.now(),
                context={"description": "Attempted privilege escalation"}
            ))
        
        return indicators
    
    async def determine_adaptive_action(
        self,
        risk_score: float,
        context: UserContext,
        indicators: List[ThreatIndicator]
    ) -> SecurityAction:
        """Determine appropriate adaptive security action"""
        
        # Base action on risk score
        if risk_score >= 0.9:
            base_action = SecurityAction.ISOLATE
        elif risk_score >= 0.7:
            base_action = SecurityAction.BLOCK
        elif risk_score >= 0.5:
            base_action = SecurityAction.CHALLENGE
        elif risk_score >= 0.3:
            base_action = SecurityAction.MONITOR
        else:
            base_action = SecurityAction.ALLOW
        
        # Adjust based on user trust score
        if context.trust_score > 0.8 and risk_score < 0.8:
            # High trust users get more lenient treatment
            if base_action == SecurityAction.BLOCK:
                base_action = SecurityAction.CHALLENGE
            elif base_action == SecurityAction.CHALLENGE:
                base_action = SecurityAction.MONITOR
        
        # Adjust based on critical indicators
        critical_indicators = [i for i in indicators if i.severity == ThreatLevel.CRITICAL]
        if critical_indicators:
            base_action = SecurityAction.ISOLATE
        
        return base_action
    
    async def update_behavioral_models(self, context: UserContext, event: SecurityEvent):
        """Update behavioral models based on event outcome"""
        
        user_id = context.user_id
        
        # Extract features from the event
        features = self.extract_behavioral_features(context, event.metadata)
        
        # Update or create user model
        if user_id not in self.behavioral_models:
            # Create new model with initial data
            self.behavioral_models[user_id] = IsolationForest(contamination=0.1, random_state=42)
            
            # Need multiple samples to train, so use synthetic data initially
            initial_features = [features] * 10  # Replicate initial sample
            # Add some noise for variety
            for i in range(1, 10):
                noisy_features = [f + np.random.normal(0, 0.1) for f in features]
                initial_features[i] = noisy_features
            
            # Normalize and train
            normalized_features = self.scaler.fit_transform(initial_features)
            self.behavioral_models[user_id].fit(normalized_features)
        else:
            # Update existing model (simplified online learning simulation)
            # In production, use proper online learning algorithms
            pass
    
    def generate_event_id(self, event_data: Dict) -> str:
        """Generate unique event ID"""
        timestamp = datetime.now().isoformat()
        data_hash = hashlib.md5(json.dumps(event_data, sort_keys=True).encode()).hexdigest()
        return f"evt_{timestamp}_{data_hash[:8]}"
    
    async def get_user_history(self, user_id: str) -> Dict:
        """Get user's historical behavioral data"""
        
        # Simulate database lookup
        # In production, fetch from user behavior database
        
        default_history = {
            'access_patterns': ['login', 'file_access', 'logout'],
            'recent_activities': [
                {'action': 'login', 'timestamp': datetime.now() - timedelta(hours=1)},
                {'action': 'file_access', 'timestamp': datetime.now() - timedelta(minutes=30)}
            ],
            'trust_score': 0.7,
            'behavioral_score': 0.6
        }
        
        return default_history

class RiskAssessmentEngine:
    """Calculate contextual risk scores"""
    
    def __init__(self):
        self.risk_weights = {
            'behavioral_score': 0.3,
            'threat_indicators': 0.4,
            'context_factors': 0.2,
            'trust_score': 0.1
        }
    
    async def calculate_risk(
        self,
        context: UserContext,
        indicators: List[ThreatIndicator],
        behavioral_score: float
    ) -> float:
        """Calculate overall risk score (0-1)"""
        
        # Behavioral risk (inverted - lower behavioral score = higher risk)
        behavioral_risk = 1.0 - behavioral_score
        
        # Threat indicator risk
        threat_risk = self.calculate_threat_indicator_risk(indicators)
        
        # Contextual risk factors
        context_risk = self.calculate_context_risk(context)
        
        # Trust score risk (inverted)
        trust_risk = 1.0 - context.trust_score
        
        # Weighted risk calculation
        overall_risk = (
            behavioral_risk * self.risk_weights['behavioral_score'] +
            threat_risk * self.risk_weights['threat_indicators'] +
            context_risk * self.risk_weights['context_factors'] +
            trust_risk * self.risk_weights['trust_score']
        )
        
        return max(0.0, min(1.0, overall_risk))
    
    def calculate_threat_indicator_risk(self, indicators: List[ThreatIndicator]) -> float:
        """Calculate risk from threat indicators"""
        
        if not indicators:
            return 0.0
        
        # Weight indicators by severity and confidence
        risk_scores = []
        
        severity_weights = {
            ThreatLevel.LOW: 0.2,
            ThreatLevel.MEDIUM: 0.5,
            ThreatLevel.HIGH: 0.8,
            ThreatLevel.CRITICAL: 1.0
        }
        
        for indicator in indicators:
            severity_weight = severity_weights.get(indicator.severity, 0.5)
            indicator_risk = severity_weight * indicator.confidence
            risk_scores.append(indicator_risk)
        
        # Use maximum risk score (worst case)
        return max(risk_scores) if risk_scores else 0.0
    
    def calculate_context_risk(self, context: UserContext) -> float:
        """Calculate risk from contextual factors"""
        
        risk_factors = []
        
        # Time-based risk
        hour = context.time_of_day
        if hour < 6 or hour > 22:
            risk_factors.append(0.3)  # Off-hours access
        else:
            risk_factors.append(0.1)
        
        # Location risk (simplified)
        if context.location.startswith('192.168'):
            risk_factors.append(0.1)  # Internal network
        else:
            risk_factors.append(0.4)  # External access
        
        # Device risk
        if context.device_id == 'unknown':
            risk_factors.append(0.5)  # Unknown device
        else:
            risk_factors.append(0.2)  # Known device
        
        # Network risk
        if 'vpn' in context.network.lower():
            risk_factors.append(0.2)  # VPN access
        elif 'public' in context.network.lower():
            risk_factors.append(0.6)  # Public network
        else:
            risk_factors.append(0.3)  # Unknown network
        
        # Average contextual risk
        return sum(risk_factors) / len(risk_factors) if risk_factors else 0.5

class SecurityResponseOrchestrator:
    """Orchestrate automated security responses"""
    
    def __init__(self):
        self.response_handlers = {
            SecurityAction.ALLOW: self.handle_allow,
            SecurityAction.MONITOR: self.handle_monitor,
            SecurityAction.CHALLENGE: self.handle_challenge,
            SecurityAction.BLOCK: self.handle_block,
            SecurityAction.ISOLATE: self.handle_isolate
        }
    
    async def execute_response(self, event: SecurityEvent):
        """Execute appropriate security response"""
        
        handler = self.response_handlers.get(event.recommended_action)
        if handler:
            await handler(event)
        else:
            print(f"No handler for action: {event.recommended_action}")
    
    async def handle_allow(self, event: SecurityEvent):
        """Handle allow action - normal processing"""
        print(f"ALLOW: Event {event.event_id} - Normal access granted")
        await self.log_security_event(event, "allowed")
    
    async def handle_monitor(self, event: SecurityEvent):
        """Handle monitor action - allow with enhanced logging"""
        print(f"MONITOR: Event {event.event_id} - Enhanced monitoring enabled")
        await self.log_security_event(event, "monitored")
        await self.enable_enhanced_monitoring(event.user_context.user_id)
    
    async def handle_challenge(self, event: SecurityEvent):
        """Handle challenge action - require additional authentication"""
        print(f"CHALLENGE: Event {event.event_id} - Additional authentication required")
        await self.log_security_event(event, "challenged")
        await self.initiate_step_up_auth(event.user_context.user_id)
    
    async def handle_block(self, event: SecurityEvent):
        """Handle block action - deny access"""
        print(f"BLOCK: Event {event.event_id} - Access denied")
        await self.log_security_event(event, "blocked")
        await self.send_security_alert(event, "high")
    
    async def handle_isolate(self, event: SecurityEvent):
        """Handle isolate action - quarantine user/session"""
        print(f"ISOLATE: Event {event.event_id} - User/session isolated")
        await self.log_security_event(event, "isolated")
        await self.isolate_user_session(event.user_context.user_id)
        await self.send_security_alert(event, "critical")
    
    async def log_security_event(self, event: SecurityEvent, action_taken: str):
        """Log security event for audit and analysis"""
        log_entry = {
            'event_id': event.event_id,
            'user_id': event.user_context.user_id,
            'risk_score': event.risk_score,
            'action_taken': action_taken,
            'timestamp': event.timestamp.isoformat(),
            'threat_indicators': len(event.threat_indicators)
        }
        print(f"SECURITY_LOG: {json.dumps(log_entry, indent=2)}")
    
    async def enable_enhanced_monitoring(self, user_id: str):
        """Enable enhanced monitoring for user"""
        print(f"Enhanced monitoring enabled for user: {user_id}")
        # Implementation would integrate with monitoring systems
    
    async def initiate_step_up_auth(self, user_id: str):
        """Initiate step-up authentication"""
        print(f"Step-up authentication initiated for user: {user_id}")
        # Implementation would trigger MFA or additional auth factors
    
    async def isolate_user_session(self, user_id: str):
        """Isolate user session"""
        print(f"User session isolated: {user_id}")
        # Implementation would disable user access and quarantine session
    
    async def send_security_alert(self, event: SecurityEvent, severity: str):
        """Send security alert to SOC team"""
        alert = {
            'severity': severity,
            'event_id': event.event_id,
            'user_id': event.user_context.user_id,
            'risk_score': event.risk_score,
            'recommended_action': event.recommended_action.value,
            'timestamp': event.timestamp.isoformat()
        }
        print(f"SECURITY_ALERT ({severity.upper()}): {json.dumps(alert, indent=2)}")

class ThreatIntelligenceEngine:
    """Integrate threat intelligence feeds and analysis"""
    
    def __init__(self):
        self.ioc_database = {
            # Indicators of Compromise
            'malicious_ips': ['192.0.2.1', '198.51.100.1'],
            'suspicious_domains': ['malicious.example.com'],
            'known_malware_hashes': ['d41d8cd98f00b204e9800998ecf8427e']
        }
    
    async def check_indicators(self, event_data: Dict) -> List[ThreatIndicator]:
        """Check event data against threat intelligence"""
        
        indicators = []
        
        # Check IP addresses
        source_ip = event_data.get('source_ip', '')
        if source_ip in self.ioc_database['malicious_ips']:
            indicators.append(ThreatIndicator(
                indicator_type="malicious_ip",
                value=source_ip,
                severity=ThreatLevel.HIGH,
                confidence=0.9,
                source="threat_intelligence",
                timestamp=datetime.now(),
                context={"description": "Known malicious IP address"}
            ))
        
        # Check domains
        domain = event_data.get('domain', '')
        if domain in self.ioc_database['suspicious_domains']:
            indicators.append(ThreatIndicator(
                indicator_type="suspicious_domain",
                value=domain,
                severity=ThreatLevel.MEDIUM,
                confidence=0.8,
                source="threat_intelligence",
                timestamp=datetime.now(),
                context={"description": "Known suspicious domain"}
            ))
        
        return indicators

class SecurityMetricsTracker:
    """Track and analyze security metrics"""
    
    def __init__(self):
        self.metrics = {
            'events_processed': 0,
            'threats_detected': 0,
            'false_positives': 0,
            'response_times': [],
            'action_counts': {}
        }
    
    async def track_event(self, event: SecurityEvent):
        """Track security event metrics"""
        
        self.metrics['events_processed'] += 1
        
        if event.threat_indicators:
            self.metrics['threats_detected'] += 1
        
        # Track action counts
        action = event.recommended_action.value
        self.metrics['action_counts'][action] = self.metrics['action_counts'].get(action, 0) + 1
        
        # Response time tracking would be implemented here
        
    async def get_security_dashboard(self) -> Dict:
        """Get security dashboard metrics"""
        
        return {
            'total_events': self.metrics['events_processed'],
            'threats_detected': self.metrics['threats_detected'],
            'detection_rate': self.metrics['threats_detected'] / max(self.metrics['events_processed'], 1),
            'action_distribution': self.metrics['action_counts'],
            'average_response_time': sum(self.metrics['response_times']) / max(len(self.metrics['response_times']), 1)
        }

class DynamicPolicyEngine:
    """Generate and adapt security policies"""
    
    def __init__(self):
        self.base_policies = {
            'default_trust_score': 0.5,
            'max_failed_attempts': 3,
            'session_timeout_hours': 8,
            'high_risk_threshold': 0.7
        }
    
    async def generate_adaptive_policy(self, context: UserContext, risk_score: float) -> Dict:
        """Generate adaptive security policy based on context and risk"""
        
        policy = self.base_policies.copy()
        
        # Adjust based on risk score
        if risk_score > 0.8:
            policy['session_timeout_hours'] = 2  # Shorter session for high risk
            policy['max_failed_attempts'] = 1    # Stricter failure tolerance
        elif risk_score < 0.3:
            policy['session_timeout_hours'] = 12  # Longer session for low risk
            policy['max_failed_attempts'] = 5     # More lenient for trusted users
        
        # Adjust based on user trust score
        if context.trust_score > 0.8:
            policy['session_timeout_hours'] *= 1.5  # Extend for trusted users
        
        return policy

Behavioral Analytics Engine (TypeScript)

// Behavioral Analytics for Adaptive Security
interface BehavioralProfile {
  userId: string;
  baselineEstablished: boolean;
  accessPatterns: AccessPattern[];
  riskFactors: RiskFactor[];
  trustScore: number;
  lastUpdated: Date;
}

interface AccessPattern {
  pattern: string;
  frequency: number;
  timeWindows: TimeWindow[];
  locations: string[];
  devices: string[];
  confidence: number;
}

interface RiskFactor {
  factor: string;
  severity: 'low' | 'medium' | 'high' | 'critical';
  confidence: number;
  firstObserved: Date;
  lastObserved: Date;
  occurrences: number;
}

interface SecurityAlert {
  id: string;
  userId: string;
  alertType: string;
  severity: 'info' | 'warning' | 'error' | 'critical';
  description: string;
  riskScore: number;
  recommendation: string;
  timestamp: Date;
  context: any;
}

class BehavioralAnalyticsEngine {
  private userProfiles: Map<string, BehavioralProfile> = new Map();
  private mlModels: Map<string, any> = new Map();
  private alertThresholds: AlertThresholds;
  private contextAnalyzer: ContextAnalyzer;

  constructor(config: AnalyticsConfig) {
    this.alertThresholds = config.alertThresholds;
    this.contextAnalyzer = new ContextAnalyzer();
  }

  async analyzeUserBehavior(
    userId: string,
    currentActivity: UserActivity
  ): Promise<BehavioralAnalysis> {
    
    // Get or create user profile
    let profile = this.userProfiles.get(userId);
    if (!profile) {
      profile = await this.createUserProfile(userId);
      this.userProfiles.set(userId, profile);
    }

    // Extract behavioral features from current activity
    const features = this.extractBehavioralFeatures(currentActivity);
    
    // Compare against established baseline
    const anomalyScore = await this.calculateAnomalyScore(profile, features);
    
    // Analyze contextual factors
    const contextAnalysis = await this.contextAnalyzer.analyze(currentActivity);
    
    // Calculate overall risk score
    const riskScore = this.calculateRiskScore(anomalyScore, contextAnalysis, profile);
    
    // Generate recommendations
    const recommendations = this.generateRecommendations(riskScore, anomalyScore, contextAnalysis);
    
    // Update user profile
    await this.updateUserProfile(userId, currentActivity, riskScore);
    
    return {
      userId,
      riskScore,
      anomalyScore,
      contextualRisk: contextAnalysis.riskLevel,
      recommendations,
      behavioralInsights: {
        deviationFromNormal: anomalyScore > 0.7,
        suspiciousPatterns: this.identifySuspiciousPatterns(features, profile),
        trustScoreChange: this.calculateTrustScoreChange(profile, riskScore)
      }
    };
  }

  private async createUserProfile(userId: string): Promise<BehavioralProfile> {
    // Initialize behavioral profile for new user
    return {
      userId,
      baselineEstablished: false,
      accessPatterns: [],
      riskFactors: [],
      trustScore: 0.5, // Neutral trust score for new users
      lastUpdated: new Date()
    };
  }

  private extractBehavioralFeatures(activity: UserActivity): BehavioralFeatures {
    return {
      // Temporal features
      accessTime: {
        hour: new Date(activity.timestamp).getHours(),
        dayOfWeek: new Date(activity.timestamp).getDay(),
        isBusinessHours: this.isBusinessHours(activity.timestamp)
      },
      
      // Location features
      location: {
        ipAddress: activity.sourceIP,
        geolocation: activity.geolocation,
        networkType: this.classifyNetworkType(activity.sourceIP),
        isKnownLocation: this.isKnownLocation(activity.userId, activity.geolocation)
      },
      
      // Device features
      device: {
        deviceId: activity.deviceId,
        userAgent: activity.userAgent,
        operatingSystem: this.extractOS(activity.userAgent),
        browser: this.extractBrowser(activity.userAgent),
        isKnownDevice: this.isKnownDevice(activity.userId, activity.deviceId)
      },
      
      // Activity features
      activity: {
        actionType: activity.action,
        resourceAccessed: activity.resource,
        dataVolume: activity.dataSize || 0,
        duration: activity.duration || 0,
        sequencePattern: this.analyzeSequencePattern(activity)
      },
      
      // Authentication features
      authentication: {
        authMethod: activity.authMethod,
        failedAttempts: activity.failedAttempts || 0,
        mfaUsed: activity.mfaUsed || false,
        passwordAge: activity.passwordAge || 0
      }
    };
  }

  private async calculateAnomalyScore(
    profile: BehavioralProfile,
    features: BehavioralFeatures
  ): Promise<number> {
    
    if (!profile.baselineEstablished) {
      // Not enough data for anomaly detection
      return 0.3; // Low but non-zero anomaly score
    }

    let anomalyScore = 0;
    let weightSum = 0;

    // Time-based anomaly detection
    const timeAnomaly = this.calculateTimeAnomaly(profile, features.accessTime);
    anomalyScore += timeAnomaly * 0.2;
    weightSum += 0.2;

    // Location-based anomaly detection
    const locationAnomaly = this.calculateLocationAnomaly(profile, features.location);
    anomalyScore += locationAnomaly * 0.25;
    weightSum += 0.25;

    // Device-based anomaly detection
    const deviceAnomaly = this.calculateDeviceAnomaly(profile, features.device);
    anomalyScore += deviceAnomaly * 0.2;
    weightSum += 0.2;

    // Activity pattern anomaly detection
    const activityAnomaly = this.calculateActivityAnomaly(profile, features.activity);
    anomalyScore += activityAnomaly * 0.25;
    weightSum += 0.25;

    // Authentication anomaly detection
    const authAnomaly = this.calculateAuthAnomaly(profile, features.authentication);
    anomalyScore += authAnomaly * 0.1;
    weightSum += 0.1;

    return anomalyScore / weightSum;
  }

  private calculateTimeAnomaly(
    profile: BehavioralProfile,
    timeFeatures: any
  ): number {
    // Find typical access hours for user
    const typicalHours = this.extractTypicalAccessHours(profile);
    
    if (typicalHours.length === 0) {
      return 0.2; // Slight anomaly if no pattern established
    }

    const currentHour = timeFeatures.hour;
    const isTypicalTime = typicalHours.some(hour => 
      Math.abs(hour - currentHour) <= 1 // Within 1 hour of typical
    );

    if (!isTypicalTime) {
      // Check if it's drastically different (e.g., middle of night vs day user)
      const hourDiff = Math.min(
        ...typicalHours.map(hour => Math.abs(hour - currentHour))
      );
      
      return Math.min(1.0, hourDiff / 12); // Max anomaly if 12+ hours different
    }

    return 0.1; // Minimal anomaly for typical time
  }

  private calculateLocationAnomaly(
    profile: BehavioralProfile,
    locationFeatures: any
  ): number {
    if (!locationFeatures.isKnownLocation) {
      // Unknown location is suspicious
      return 0.7;
    }

    // Check network type consistency
    const typicalNetworkTypes = this.extractTypicalNetworkTypes(profile);
    if (typicalNetworkTypes.length > 0 && 
        !typicalNetworkTypes.includes(locationFeatures.networkType)) {
      return 0.4; // Moderate anomaly for different network type
    }

    return 0.1; // Low anomaly for known location
  }

  private calculateRiskScore(
    anomalyScore: number,
    contextAnalysis: ContextAnalysis,
    profile: BehavioralProfile
  ): number {
    
    // Base risk from anomaly score
    let riskScore = anomalyScore * 0.4;

    // Add contextual risk
    const contextRiskMap = {
      'low': 0.1,
      'medium': 0.3,
      'high': 0.6,
      'critical': 0.9
    };
    riskScore += contextRiskMap[contextAnalysis.riskLevel] * 0.3;

    // Factor in user trust score (inverse relationship)
    const trustFactor = (1 - profile.trustScore) * 0.2;
    riskScore += trustFactor;

    // Add risk from active risk factors
    const activeRiskFactors = profile.riskFactors.filter(rf => 
      this.isRecentRiskFactor(rf)
    );
    
    const riskFactorScore = activeRiskFactors.reduce((sum, rf) => {
      const severityWeight = { low: 0.1, medium: 0.2, high: 0.4, critical: 0.8 };
      return sum + (severityWeight[rf.severity] || 0.2) * rf.confidence;
    }, 0);

    riskScore += Math.min(0.1, riskFactorScore); // Cap risk factor contribution

    return Math.min(1.0, Math.max(0.0, riskScore));
  }

  private generateRecommendations(
    riskScore: number,
    anomalyScore: number,
    contextAnalysis: ContextAnalysis
  ): SecurityRecommendation[] {
    
    const recommendations: SecurityRecommendation[] = [];

    if (riskScore > 0.8) {
      recommendations.push({
        action: 'BLOCK_ACCESS',
        priority: 'critical',
        description: 'High risk score detected - block access and investigate',
        rationale: `Risk score ${riskScore.toFixed(2)} exceeds critical threshold`
      });
    } else if (riskScore > 0.6) {
      recommendations.push({
        action: 'REQUIRE_ADDITIONAL_AUTH',
        priority: 'high',
        description: 'Require step-up authentication (MFA)',
        rationale: 'Elevated risk requires additional verification'
      });
    } else if (riskScore > 0.4) {
      recommendations.push({
        action: 'ENHANCED_MONITORING',
        priority: 'medium',
        description: 'Enable enhanced session monitoring',
        rationale: 'Moderate risk detected - increase monitoring'
      });
    }

    if (anomalyScore > 0.7) {
      recommendations.push({
        action: 'BEHAVIORAL_ALERT',
        priority: 'high',
        description: 'Unusual behavioral pattern detected',
        rationale: `Anomaly score ${anomalyScore.toFixed(2)} indicates significant deviation`
      });
    }

    if (contextAnalysis.riskLevel === 'high' || contextAnalysis.riskLevel === 'critical') {
      recommendations.push({
        action: 'CONTEXT_RESTRICTION',
        priority: 'high',
        description: 'Apply contextual access restrictions',
        rationale: `High contextual risk: ${contextAnalysis.primaryRiskFactors.join(', ')}`
      });
    }

    return recommendations;
  }

  private async updateUserProfile(
    userId: string,
    activity: UserActivity,
    riskScore: number
  ): Promise<void> {
    
    const profile = this.userProfiles.get(userId)!;
    
    // Update access patterns
    await this.updateAccessPatterns(profile, activity);
    
    // Update trust score based on risk score
    profile.trustScore = this.updateTrustScore(profile.trustScore, riskScore);
    
    // Add/update risk factors if significant risk detected
    if (riskScore > 0.5) {
      await this.updateRiskFactors(profile, activity, riskScore);
    }
    
    // Check if baseline can be established (need minimum activities)
    if (!profile.baselineEstablished && profile.accessPatterns.length >= 10) {
      profile.baselineEstablished = true;
      await this.trainUserModel(userId, profile);
    }
    
    profile.lastUpdated = new Date();
  }

  private updateTrustScore(currentTrust: number, riskScore: number): number {
    // Adjust trust score based on current activity risk
    const riskImpact = riskScore > 0.5 ? (riskScore - 0.5) * 0.2 : 0;
    const trustGain = riskScore < 0.3 ? (0.3 - riskScore) * 0.1 : 0;
    
    let newTrust = currentTrust - riskImpact + trustGain;
    
    // Apply decay factor (trust degrades slowly over time)
    newTrust *= 0.99;
    
    return Math.min(1.0, Math.max(0.1, newTrust));
  }

  private identifySuspiciousPatterns(
    features: BehavioralFeatures,
    profile: BehavioralProfile
  ): string[] {
    
    const suspiciousPatterns: string[] = [];
    
    // Check for unusual access times
    if (!features.accessTime.isBusinessHours && 
        this.isTypicalBusinessHoursUser(profile)) {
      suspiciousPatterns.push('off_hours_access');
    }
    
    // Check for impossible travel
    if (this.detectImpossibleTravel(features, profile)) {
      suspiciousPatterns.push('impossible_travel');
    }
    
    // Check for unusual data access volume
    if (features.activity.dataVolume > this.getTypicalDataVolume(profile) * 5) {
      suspiciousPatterns.push('excessive_data_access');
    }
    
    // Check for rapid successive logins
    if (this.detectRapidSuccessiveLogins(features, profile)) {
      suspiciousPatterns.push('rapid_successive_logins');
    }
    
    return suspiciousPatterns;
  }

  async generateSecurityAlert(
    analysis: BehavioralAnalysis,
    activity: UserActivity
  ): Promise<SecurityAlert | null> {
    
    if (analysis.riskScore < this.alertThresholds.minimumRiskScore) {
      return null; // No alert needed
    }

    const severity = this.determineSeverity(analysis.riskScore);
    
    return {
      id: this.generateAlertId(),
      userId: analysis.userId,
      alertType: 'behavioral_anomaly',
      severity,
      description: this.generateAlertDescription(analysis),
      riskScore: analysis.riskScore,
      recommendation: this.getTopRecommendation(analysis.recommendations),
      timestamp: new Date(),
      context: {
        activity,
        behavioralInsights: analysis.behavioralInsights,
        anomalyScore: analysis.anomalyScore
      }
    };
  }

  private generateAlertDescription(analysis: BehavioralAnalysis): string {
    const insights = analysis.behavioralInsights;
    let description = `User ${analysis.userId} showing `;
    
    if (insights.deviationFromNormal) {
      description += 'significant behavioral deviation';
    } else {
      description += 'elevated risk indicators';
    }
    
    if (insights.suspiciousPatterns.length > 0) {
      description += ` with patterns: ${insights.suspiciousPatterns.join(', ')}`;
    }
    
    return description;
  }

  private determineSeverity(riskScore: number): 'info' | 'warning' | 'error' | 'critical' {
    if (riskScore >= 0.8) return 'critical';
    if (riskScore >= 0.6) return 'error';
    if (riskScore >= 0.4) return 'warning';
    return 'info';
  }
}

// Supporting Classes
class ContextAnalyzer {
  async analyze(activity: UserActivity): Promise<ContextAnalysis> {
    const riskFactors: string[] = [];
    let riskLevel: 'low' | 'medium' | 'high' | 'critical' = 'low';

    // Analyze time context
    if (this.isOffHours(activity.timestamp)) {
      riskFactors.push('off_hours_access');
      riskLevel = 'medium';
    }

    // Analyze location context
    if (this.isSuspiciousLocation(activity.sourceIP)) {
      riskFactors.push('suspicious_location');
      riskLevel = 'high';
    }

    // Analyze network context
    if (this.isPublicNetwork(activity.sourceIP)) {
      riskFactors.push('public_network');
      if (riskLevel === 'low') riskLevel = 'medium';
    }

    return {
      riskLevel,
      primaryRiskFactors: riskFactors,
      contextScore: this.calculateContextScore(riskFactors)
    };
  }

  private isOffHours(timestamp: Date): boolean {
    const hour = timestamp.getHours();
    const day = timestamp.getDay();
    return hour < 8 || hour > 18 || day === 0 || day === 6;
  }

  private isSuspiciousLocation(ip: string): boolean {
    // Check against threat intelligence feeds
    // This is simplified - in production, integrate with IP reputation services
    const suspiciousNetworks = ['192.0.2.', '198.51.100.'];
    return suspiciousNetworks.some(network => ip.startsWith(network));
  }

  private calculateContextScore(riskFactors: string[]): number {
    const riskWeights = {
      'off_hours_access': 0.3,
      'suspicious_location': 0.6,
      'public_network': 0.2,
      'unknown_device': 0.4
    };

    return riskFactors.reduce((score, factor) => 
      score + (riskWeights[factor] || 0.1), 0
    );
  }
}

Real-World Examples

Microsoft Azure AD Risk-Based Access

  • Behavioral Analytics: ML-powered user risk assessment
  • Contextual Policies: Location, device, and app-based policies
  • Real-time Response: Automatic MFA requirements and session blocking
  • Scale: Protects millions of users with adaptive authentication

Okta Adaptive Multi-Factor Authentication

  • Risk Engine: Behavioral and contextual risk scoring
  • Dynamic Policies: Adaptive authentication based on risk
  • User Experience: Seamless access for trusted activities
  • Integration: Works with 7,000+ applications

Google BeyondCorp Enterprise

  • Zero Trust: Continuous risk assessment for every request
  • Context Aware: Device state, location, and user behavior
  • ML-Driven: Advanced threat detection with machine learning
  • Granular Control: Application and resource-level policies

Darktrace Antigena

  • Self-Learning AI: Establishes normal behavior baselines
  • Autonomous Response: Real-time threat neutralization
  • Adaptive Defense: Evolves with changing threat landscape
  • Enterprise Scale: Protects entire network ecosystems

Adaptive Security Best Practices

✅ Do

  • Establish behavioral baselines with sufficient data before making security decisions
  • Implement risk-based authentication that adjusts security measures based on calculated risk
  • Use multiple contextual factors including time, location, device, and network for risk assessment
  • Provide automated response capabilities with human oversight for complex scenarios
  • Continuously update ML models with new threat intelligence and behavioral patterns

❌ Don't

  • Rely solely on static rules - adaptive security requires dynamic policy adjustment
  • Ignore false positive rates - high false positives reduce user trust and system effectiveness
  • Make decisions on single indicators - use multiple factors for robust risk assessment
  • Implement fully automated responses without human oversight capabilities
  • Neglect user experience - security measures should be transparent for legitimate users
No quiz questions available
Quiz ID "adaptive-security-architecture" not found