Adaptive Security Architecture
Design intelligent security systems that dynamically adjust protection based on behavioral analysis, threat intelligence, and contextual risk factors
45 min read•Advanced
Not Started
Loading...
What is Adaptive Security Architecture?
Adaptive security architecture creates intelligent, self-adjusting security systems that continuously evaluate risk and modify protection measures in real-time. Unlike traditional static security models, these systems use behavioral analysis, machine learning, and contextual awareness to provide dynamic, risk-appropriate security responses.
Core Components:
- • Behavioral Analytics: ML-powered user and system behavior modeling
- • Contextual Risk Engine: Multi-factor risk assessment and scoring
- • Dynamic Policy Engine: Real-time security policy adjustment
- • Threat Intelligence: Live threat feeds and pattern recognition
- • Automated Response: Orchestrated security incident response
Interactive Adaptive Security Calculator
85%
30s
75%
4 factors
60%
Adaptive Security Metrics
Security Score:70/100
False Positive Rate:1.5%
Threat Coverage:100%
Assessment:
Needs Enhancement
Adaptive Security Architecture
Intelligence Layer
Threat intelligence and behavioral analytics
- • Machine learning models
- • Threat intelligence feeds
- • Behavioral baselines
- • Anomaly detection engines
Risk Assessment Layer
Contextual risk evaluation and scoring
- • Multi-factor risk scoring
- • Contextual analysis
- • Trust scoring algorithms
- • Risk trend analysis
Policy Engine Layer
Dynamic policy generation and enforcement
- • Rule generation algorithms
- • Policy adaptation logic
- • Enforcement mechanisms
- • Compliance monitoring
Response Layer
Automated incident response and orchestration
- • Automated containment
- • Threat mitigation
- • Incident orchestration
- • Recovery procedures
Monitoring Layer
Continuous security posture assessment
- • Real-time monitoring
- • Security metrics tracking
- • Performance analytics
- • Feedback loops
Integration Layer
Security ecosystem integration and APIs
- • SIEM integration
- • Security tool orchestration
- • API connectivity
- • Data sharing protocols
Production Implementation
Adaptive Security Engine (Python)
# Adaptive Security Architecture Implementation
import asyncio
import numpy as np
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import hashlib
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
class ThreatLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class SecurityAction(Enum):
ALLOW = "allow"
MONITOR = "monitor"
CHALLENGE = "challenge"
BLOCK = "block"
ISOLATE = "isolate"
@dataclass
class UserContext:
user_id: str
location: str
device_id: str
network: str
time_of_day: int
access_patterns: List[str]
recent_activities: List[Dict]
trust_score: float = 0.5
@dataclass
class ThreatIndicator:
indicator_type: str
value: Any
severity: ThreatLevel
confidence: float
source: str
timestamp: datetime
context: Dict = field(default_factory=dict)
@dataclass
class SecurityEvent:
event_id: str
event_type: str
user_context: UserContext
threat_indicators: List[ThreatIndicator]
risk_score: float
recommended_action: SecurityAction
timestamp: datetime
metadata: Dict = field(default_factory=dict)
class AdaptiveSecurityEngine:
def __init__(self, config: Dict):
self.config = config
self.behavioral_models = {}
self.threat_intelligence = ThreatIntelligenceEngine()
self.risk_calculator = RiskAssessmentEngine()
self.policy_engine = DynamicPolicyEngine()
self.response_orchestrator = SecurityResponseOrchestrator()
# ML models for behavioral analysis
self.anomaly_detector = IsolationForest(contamination=0.1, random_state=42)
self.scaler = StandardScaler()
# Security metrics tracking
self.security_metrics = SecurityMetricsTracker()
async def process_security_event(self, event_data: Dict) -> SecurityEvent:
"""Process incoming security event with adaptive analysis"""
# Extract user context
user_context = await self.extract_user_context(event_data)
# Perform behavioral analysis
behavioral_score = await self.analyze_behavior(user_context, event_data)
# Gather threat indicators
threat_indicators = await self.gather_threat_indicators(event_data)
# Calculate contextual risk score
risk_score = await self.risk_calculator.calculate_risk(
user_context, threat_indicators, behavioral_score
)
# Determine adaptive response
recommended_action = await self.determine_adaptive_action(
risk_score, user_context, threat_indicators
)
# Create security event
security_event = SecurityEvent(
event_id=self.generate_event_id(event_data),
event_type=event_data.get('event_type', 'unknown'),
user_context=user_context,
threat_indicators=threat_indicators,
risk_score=risk_score,
recommended_action=recommended_action,
timestamp=datetime.now(),
metadata={
'behavioral_score': behavioral_score,
'processing_time_ms': 0, # Will be updated
'model_version': self.config.get('model_version', '1.0')
}
)
# Execute adaptive response
await self.response_orchestrator.execute_response(security_event)
# Update behavioral models
await self.update_behavioral_models(user_context, security_event)
# Track metrics
await self.security_metrics.track_event(security_event)
return security_event
async def extract_user_context(self, event_data: Dict) -> UserContext:
"""Extract comprehensive user context for risk assessment"""
user_id = event_data.get('user_id', 'unknown')
# Get user's historical data
historical_data = await self.get_user_history(user_id)
# Extract contextual information
context = UserContext(
user_id=user_id,
location=event_data.get('source_ip', '0.0.0.0'),
device_id=event_data.get('device_id', 'unknown'),
network=event_data.get('network_info', 'unknown'),
time_of_day=datetime.now().hour,
access_patterns=historical_data.get('access_patterns', []),
recent_activities=historical_data.get('recent_activities', []),
trust_score=historical_data.get('trust_score', 0.5)
)
return context
async def analyze_behavior(self, context: UserContext, event_data: Dict) -> float:
"""Perform behavioral analysis using ML models"""
try:
# Extract behavioral features
features = self.extract_behavioral_features(context, event_data)
# Get or create user behavioral model
user_model = await self.get_user_behavioral_model(context.user_id)
if user_model is None:
# New user - establish baseline
return await self.establish_behavioral_baseline(context, features)
# Normalize features
normalized_features = self.scaler.transform([features])
# Calculate anomaly score
anomaly_score = user_model.decision_function(normalized_features)[0]
# Convert to behavioral score (0-1, where 1 is normal)
behavioral_score = max(0, min(1, (anomaly_score + 0.5) / 1.0))
return behavioral_score
except Exception as e:
print(f"Behavioral analysis error: {e}")
return 0.5 # Default neutral score
def extract_behavioral_features(self, context: UserContext, event_data: Dict) -> List[float]:
"""Extract numerical features for behavioral analysis"""
features = []
# Temporal features
features.append(context.time_of_day / 24.0) # Hour of day normalized
features.append(datetime.now().weekday() / 6.0) # Day of week normalized
# Location features (simplified)
location_hash = hash(context.location) % 1000 / 1000.0
features.append(location_hash)
# Device features
device_hash = hash(context.device_id) % 1000 / 1000.0
features.append(device_hash)
# Network features
network_hash = hash(context.network) % 1000 / 1000.0
features.append(network_hash)
# Activity features
features.append(len(context.recent_activities) / 100.0) # Activity volume
features.append(context.trust_score) # Existing trust score
# Event-specific features
features.append(len(event_data.get('resource_accessed', '')) / 100.0)
features.append(event_data.get('data_size_kb', 0) / 10000.0) # Normalized data size
return features
async def get_user_behavioral_model(self, user_id: str) -> Optional[IsolationForest]:
"""Get existing behavioral model for user"""
return self.behavioral_models.get(user_id)
async def establish_behavioral_baseline(self, context: UserContext, features: List[float]) -> float:
"""Establish behavioral baseline for new user"""
# For new users, create a simple model based on similar users
# In production, this would use clustering or transfer learning
similar_users = await self.find_similar_users(context)
if similar_users:
# Use average behavioral score from similar users
avg_score = sum(u['behavioral_score'] for u in similar_users) / len(similar_users)
return max(0.3, min(0.7, avg_score)) # Conservative range for new users
return 0.5 # Neutral score for completely new users
async def find_similar_users(self, context: UserContext) -> List[Dict]:
"""Find users with similar behavioral patterns"""
# Simplified similarity matching
# In production, use more sophisticated clustering
similar_users = []
for user_id, model in self.behavioral_models.items():
if user_id != context.user_id:
# Simple similarity based on context overlap
similarity_score = self.calculate_context_similarity(context, user_id)
if similarity_score > 0.7:
similar_users.append({
'user_id': user_id,
'similarity': similarity_score,
'behavioral_score': 0.6 # Placeholder
})
return sorted(similar_users, key=lambda x: x['similarity'], reverse=True)[:5]
def calculate_context_similarity(self, context: UserContext, other_user_id: str) -> float:
"""Calculate similarity between user contexts"""
# Simplified similarity calculation
# In production, use more sophisticated methods
similarity_factors = []
# Time-based similarity (same general work hours)
time_similarity = 1.0 - abs(context.time_of_day - 12) / 12.0
similarity_factors.append(time_similarity * 0.3)
# Trust score similarity
other_trust = 0.5 # Would fetch from database
trust_similarity = 1.0 - abs(context.trust_score - other_trust)
similarity_factors.append(trust_similarity * 0.4)
# Activity pattern similarity (simplified)
activity_similarity = 0.7 # Placeholder
similarity_factors.append(activity_similarity * 0.3)
return sum(similarity_factors)
async def gather_threat_indicators(self, event_data: Dict) -> List[ThreatIndicator]:
"""Gather threat indicators from multiple sources"""
indicators = []
# Check threat intelligence feeds
ti_indicators = await self.threat_intelligence.check_indicators(event_data)
indicators.extend(ti_indicators)
# Check for suspicious patterns
pattern_indicators = await self.detect_suspicious_patterns(event_data)
indicators.extend(pattern_indicators)
# Check for policy violations
policy_indicators = await self.check_policy_violations(event_data)
indicators.extend(policy_indicators)
return indicators
async def detect_suspicious_patterns(self, event_data: Dict) -> List[ThreatIndicator]:
"""Detect suspicious patterns in event data"""
indicators = []
# Check for unusual access times
if datetime.now().hour < 6 or datetime.now().hour > 22:
indicators.append(ThreatIndicator(
indicator_type="unusual_time",
value=datetime.now().hour,
severity=ThreatLevel.LOW,
confidence=0.3,
source="pattern_detection",
timestamp=datetime.now(),
context={"description": "Access outside normal business hours"}
))
# Check for unusual data volume
data_size = event_data.get('data_size_kb', 0)
if data_size > 10000: # 10MB+
indicators.append(ThreatIndicator(
indicator_type="large_data_access",
value=data_size,
severity=ThreatLevel.MEDIUM,
confidence=0.6,
source="pattern_detection",
timestamp=datetime.now(),
context={"description": "Unusually large data access"}
))
# Check for rapid successive access
access_frequency = event_data.get('recent_access_count', 0)
if access_frequency > 50:
indicators.append(ThreatIndicator(
indicator_type="high_frequency_access",
value=access_frequency,
severity=ThreatLevel.HIGH,
confidence=0.8,
source="pattern_detection",
timestamp=datetime.now(),
context={"description": "Unusually high access frequency"}
))
return indicators
async def check_policy_violations(self, event_data: Dict) -> List[ThreatIndicator]:
"""Check for security policy violations"""
indicators = []
# Check access permissions
if not event_data.get('authorized_access', True):
indicators.append(ThreatIndicator(
indicator_type="unauthorized_access",
value=event_data.get('resource_accessed', ''),
severity=ThreatLevel.HIGH,
confidence=0.9,
source="policy_engine",
timestamp=datetime.now(),
context={"description": "Access to unauthorized resource"}
))
# Check for privilege escalation attempts
if event_data.get('privilege_escalation', False):
indicators.append(ThreatIndicator(
indicator_type="privilege_escalation",
value=event_data.get('requested_privilege', ''),
severity=ThreatLevel.CRITICAL,
confidence=0.95,
source="policy_engine",
timestamp=datetime.now(),
context={"description": "Attempted privilege escalation"}
))
return indicators
async def determine_adaptive_action(
self,
risk_score: float,
context: UserContext,
indicators: List[ThreatIndicator]
) -> SecurityAction:
"""Determine appropriate adaptive security action"""
# Base action on risk score
if risk_score >= 0.9:
base_action = SecurityAction.ISOLATE
elif risk_score >= 0.7:
base_action = SecurityAction.BLOCK
elif risk_score >= 0.5:
base_action = SecurityAction.CHALLENGE
elif risk_score >= 0.3:
base_action = SecurityAction.MONITOR
else:
base_action = SecurityAction.ALLOW
# Adjust based on user trust score
if context.trust_score > 0.8 and risk_score < 0.8:
# High trust users get more lenient treatment
if base_action == SecurityAction.BLOCK:
base_action = SecurityAction.CHALLENGE
elif base_action == SecurityAction.CHALLENGE:
base_action = SecurityAction.MONITOR
# Adjust based on critical indicators
critical_indicators = [i for i in indicators if i.severity == ThreatLevel.CRITICAL]
if critical_indicators:
base_action = SecurityAction.ISOLATE
return base_action
async def update_behavioral_models(self, context: UserContext, event: SecurityEvent):
"""Update behavioral models based on event outcome"""
user_id = context.user_id
# Extract features from the event
features = self.extract_behavioral_features(context, event.metadata)
# Update or create user model
if user_id not in self.behavioral_models:
# Create new model with initial data
self.behavioral_models[user_id] = IsolationForest(contamination=0.1, random_state=42)
# Need multiple samples to train, so use synthetic data initially
initial_features = [features] * 10 # Replicate initial sample
# Add some noise for variety
for i in range(1, 10):
noisy_features = [f + np.random.normal(0, 0.1) for f in features]
initial_features[i] = noisy_features
# Normalize and train
normalized_features = self.scaler.fit_transform(initial_features)
self.behavioral_models[user_id].fit(normalized_features)
else:
# Update existing model (simplified online learning simulation)
# In production, use proper online learning algorithms
pass
def generate_event_id(self, event_data: Dict) -> str:
"""Generate unique event ID"""
timestamp = datetime.now().isoformat()
data_hash = hashlib.md5(json.dumps(event_data, sort_keys=True).encode()).hexdigest()
return f"evt_{timestamp}_{data_hash[:8]}"
async def get_user_history(self, user_id: str) -> Dict:
"""Get user's historical behavioral data"""
# Simulate database lookup
# In production, fetch from user behavior database
default_history = {
'access_patterns': ['login', 'file_access', 'logout'],
'recent_activities': [
{'action': 'login', 'timestamp': datetime.now() - timedelta(hours=1)},
{'action': 'file_access', 'timestamp': datetime.now() - timedelta(minutes=30)}
],
'trust_score': 0.7,
'behavioral_score': 0.6
}
return default_history
class RiskAssessmentEngine:
"""Calculate contextual risk scores"""
def __init__(self):
self.risk_weights = {
'behavioral_score': 0.3,
'threat_indicators': 0.4,
'context_factors': 0.2,
'trust_score': 0.1
}
async def calculate_risk(
self,
context: UserContext,
indicators: List[ThreatIndicator],
behavioral_score: float
) -> float:
"""Calculate overall risk score (0-1)"""
# Behavioral risk (inverted - lower behavioral score = higher risk)
behavioral_risk = 1.0 - behavioral_score
# Threat indicator risk
threat_risk = self.calculate_threat_indicator_risk(indicators)
# Contextual risk factors
context_risk = self.calculate_context_risk(context)
# Trust score risk (inverted)
trust_risk = 1.0 - context.trust_score
# Weighted risk calculation
overall_risk = (
behavioral_risk * self.risk_weights['behavioral_score'] +
threat_risk * self.risk_weights['threat_indicators'] +
context_risk * self.risk_weights['context_factors'] +
trust_risk * self.risk_weights['trust_score']
)
return max(0.0, min(1.0, overall_risk))
def calculate_threat_indicator_risk(self, indicators: List[ThreatIndicator]) -> float:
"""Calculate risk from threat indicators"""
if not indicators:
return 0.0
# Weight indicators by severity and confidence
risk_scores = []
severity_weights = {
ThreatLevel.LOW: 0.2,
ThreatLevel.MEDIUM: 0.5,
ThreatLevel.HIGH: 0.8,
ThreatLevel.CRITICAL: 1.0
}
for indicator in indicators:
severity_weight = severity_weights.get(indicator.severity, 0.5)
indicator_risk = severity_weight * indicator.confidence
risk_scores.append(indicator_risk)
# Use maximum risk score (worst case)
return max(risk_scores) if risk_scores else 0.0
def calculate_context_risk(self, context: UserContext) -> float:
"""Calculate risk from contextual factors"""
risk_factors = []
# Time-based risk
hour = context.time_of_day
if hour < 6 or hour > 22:
risk_factors.append(0.3) # Off-hours access
else:
risk_factors.append(0.1)
# Location risk (simplified)
if context.location.startswith('192.168'):
risk_factors.append(0.1) # Internal network
else:
risk_factors.append(0.4) # External access
# Device risk
if context.device_id == 'unknown':
risk_factors.append(0.5) # Unknown device
else:
risk_factors.append(0.2) # Known device
# Network risk
if 'vpn' in context.network.lower():
risk_factors.append(0.2) # VPN access
elif 'public' in context.network.lower():
risk_factors.append(0.6) # Public network
else:
risk_factors.append(0.3) # Unknown network
# Average contextual risk
return sum(risk_factors) / len(risk_factors) if risk_factors else 0.5
class SecurityResponseOrchestrator:
"""Orchestrate automated security responses"""
def __init__(self):
self.response_handlers = {
SecurityAction.ALLOW: self.handle_allow,
SecurityAction.MONITOR: self.handle_monitor,
SecurityAction.CHALLENGE: self.handle_challenge,
SecurityAction.BLOCK: self.handle_block,
SecurityAction.ISOLATE: self.handle_isolate
}
async def execute_response(self, event: SecurityEvent):
"""Execute appropriate security response"""
handler = self.response_handlers.get(event.recommended_action)
if handler:
await handler(event)
else:
print(f"No handler for action: {event.recommended_action}")
async def handle_allow(self, event: SecurityEvent):
"""Handle allow action - normal processing"""
print(f"ALLOW: Event {event.event_id} - Normal access granted")
await self.log_security_event(event, "allowed")
async def handle_monitor(self, event: SecurityEvent):
"""Handle monitor action - allow with enhanced logging"""
print(f"MONITOR: Event {event.event_id} - Enhanced monitoring enabled")
await self.log_security_event(event, "monitored")
await self.enable_enhanced_monitoring(event.user_context.user_id)
async def handle_challenge(self, event: SecurityEvent):
"""Handle challenge action - require additional authentication"""
print(f"CHALLENGE: Event {event.event_id} - Additional authentication required")
await self.log_security_event(event, "challenged")
await self.initiate_step_up_auth(event.user_context.user_id)
async def handle_block(self, event: SecurityEvent):
"""Handle block action - deny access"""
print(f"BLOCK: Event {event.event_id} - Access denied")
await self.log_security_event(event, "blocked")
await self.send_security_alert(event, "high")
async def handle_isolate(self, event: SecurityEvent):
"""Handle isolate action - quarantine user/session"""
print(f"ISOLATE: Event {event.event_id} - User/session isolated")
await self.log_security_event(event, "isolated")
await self.isolate_user_session(event.user_context.user_id)
await self.send_security_alert(event, "critical")
async def log_security_event(self, event: SecurityEvent, action_taken: str):
"""Log security event for audit and analysis"""
log_entry = {
'event_id': event.event_id,
'user_id': event.user_context.user_id,
'risk_score': event.risk_score,
'action_taken': action_taken,
'timestamp': event.timestamp.isoformat(),
'threat_indicators': len(event.threat_indicators)
}
print(f"SECURITY_LOG: {json.dumps(log_entry, indent=2)}")
async def enable_enhanced_monitoring(self, user_id: str):
"""Enable enhanced monitoring for user"""
print(f"Enhanced monitoring enabled for user: {user_id}")
# Implementation would integrate with monitoring systems
async def initiate_step_up_auth(self, user_id: str):
"""Initiate step-up authentication"""
print(f"Step-up authentication initiated for user: {user_id}")
# Implementation would trigger MFA or additional auth factors
async def isolate_user_session(self, user_id: str):
"""Isolate user session"""
print(f"User session isolated: {user_id}")
# Implementation would disable user access and quarantine session
async def send_security_alert(self, event: SecurityEvent, severity: str):
"""Send security alert to SOC team"""
alert = {
'severity': severity,
'event_id': event.event_id,
'user_id': event.user_context.user_id,
'risk_score': event.risk_score,
'recommended_action': event.recommended_action.value,
'timestamp': event.timestamp.isoformat()
}
print(f"SECURITY_ALERT ({severity.upper()}): {json.dumps(alert, indent=2)}")
class ThreatIntelligenceEngine:
"""Integrate threat intelligence feeds and analysis"""
def __init__(self):
self.ioc_database = {
# Indicators of Compromise
'malicious_ips': ['192.0.2.1', '198.51.100.1'],
'suspicious_domains': ['malicious.example.com'],
'known_malware_hashes': ['d41d8cd98f00b204e9800998ecf8427e']
}
async def check_indicators(self, event_data: Dict) -> List[ThreatIndicator]:
"""Check event data against threat intelligence"""
indicators = []
# Check IP addresses
source_ip = event_data.get('source_ip', '')
if source_ip in self.ioc_database['malicious_ips']:
indicators.append(ThreatIndicator(
indicator_type="malicious_ip",
value=source_ip,
severity=ThreatLevel.HIGH,
confidence=0.9,
source="threat_intelligence",
timestamp=datetime.now(),
context={"description": "Known malicious IP address"}
))
# Check domains
domain = event_data.get('domain', '')
if domain in self.ioc_database['suspicious_domains']:
indicators.append(ThreatIndicator(
indicator_type="suspicious_domain",
value=domain,
severity=ThreatLevel.MEDIUM,
confidence=0.8,
source="threat_intelligence",
timestamp=datetime.now(),
context={"description": "Known suspicious domain"}
))
return indicators
class SecurityMetricsTracker:
"""Track and analyze security metrics"""
def __init__(self):
self.metrics = {
'events_processed': 0,
'threats_detected': 0,
'false_positives': 0,
'response_times': [],
'action_counts': {}
}
async def track_event(self, event: SecurityEvent):
"""Track security event metrics"""
self.metrics['events_processed'] += 1
if event.threat_indicators:
self.metrics['threats_detected'] += 1
# Track action counts
action = event.recommended_action.value
self.metrics['action_counts'][action] = self.metrics['action_counts'].get(action, 0) + 1
# Response time tracking would be implemented here
async def get_security_dashboard(self) -> Dict:
"""Get security dashboard metrics"""
return {
'total_events': self.metrics['events_processed'],
'threats_detected': self.metrics['threats_detected'],
'detection_rate': self.metrics['threats_detected'] / max(self.metrics['events_processed'], 1),
'action_distribution': self.metrics['action_counts'],
'average_response_time': sum(self.metrics['response_times']) / max(len(self.metrics['response_times']), 1)
}
class DynamicPolicyEngine:
"""Generate and adapt security policies"""
def __init__(self):
self.base_policies = {
'default_trust_score': 0.5,
'max_failed_attempts': 3,
'session_timeout_hours': 8,
'high_risk_threshold': 0.7
}
async def generate_adaptive_policy(self, context: UserContext, risk_score: float) -> Dict:
"""Generate adaptive security policy based on context and risk"""
policy = self.base_policies.copy()
# Adjust based on risk score
if risk_score > 0.8:
policy['session_timeout_hours'] = 2 # Shorter session for high risk
policy['max_failed_attempts'] = 1 # Stricter failure tolerance
elif risk_score < 0.3:
policy['session_timeout_hours'] = 12 # Longer session for low risk
policy['max_failed_attempts'] = 5 # More lenient for trusted users
# Adjust based on user trust score
if context.trust_score > 0.8:
policy['session_timeout_hours'] *= 1.5 # Extend for trusted users
return policy
Behavioral Analytics Engine (TypeScript)
// Behavioral Analytics for Adaptive Security
interface BehavioralProfile {
userId: string;
baselineEstablished: boolean;
accessPatterns: AccessPattern[];
riskFactors: RiskFactor[];
trustScore: number;
lastUpdated: Date;
}
interface AccessPattern {
pattern: string;
frequency: number;
timeWindows: TimeWindow[];
locations: string[];
devices: string[];
confidence: number;
}
interface RiskFactor {
factor: string;
severity: 'low' | 'medium' | 'high' | 'critical';
confidence: number;
firstObserved: Date;
lastObserved: Date;
occurrences: number;
}
interface SecurityAlert {
id: string;
userId: string;
alertType: string;
severity: 'info' | 'warning' | 'error' | 'critical';
description: string;
riskScore: number;
recommendation: string;
timestamp: Date;
context: any;
}
class BehavioralAnalyticsEngine {
private userProfiles: Map<string, BehavioralProfile> = new Map();
private mlModels: Map<string, any> = new Map();
private alertThresholds: AlertThresholds;
private contextAnalyzer: ContextAnalyzer;
constructor(config: AnalyticsConfig) {
this.alertThresholds = config.alertThresholds;
this.contextAnalyzer = new ContextAnalyzer();
}
async analyzeUserBehavior(
userId: string,
currentActivity: UserActivity
): Promise<BehavioralAnalysis> {
// Get or create user profile
let profile = this.userProfiles.get(userId);
if (!profile) {
profile = await this.createUserProfile(userId);
this.userProfiles.set(userId, profile);
}
// Extract behavioral features from current activity
const features = this.extractBehavioralFeatures(currentActivity);
// Compare against established baseline
const anomalyScore = await this.calculateAnomalyScore(profile, features);
// Analyze contextual factors
const contextAnalysis = await this.contextAnalyzer.analyze(currentActivity);
// Calculate overall risk score
const riskScore = this.calculateRiskScore(anomalyScore, contextAnalysis, profile);
// Generate recommendations
const recommendations = this.generateRecommendations(riskScore, anomalyScore, contextAnalysis);
// Update user profile
await this.updateUserProfile(userId, currentActivity, riskScore);
return {
userId,
riskScore,
anomalyScore,
contextualRisk: contextAnalysis.riskLevel,
recommendations,
behavioralInsights: {
deviationFromNormal: anomalyScore > 0.7,
suspiciousPatterns: this.identifySuspiciousPatterns(features, profile),
trustScoreChange: this.calculateTrustScoreChange(profile, riskScore)
}
};
}
private async createUserProfile(userId: string): Promise<BehavioralProfile> {
// Initialize behavioral profile for new user
return {
userId,
baselineEstablished: false,
accessPatterns: [],
riskFactors: [],
trustScore: 0.5, // Neutral trust score for new users
lastUpdated: new Date()
};
}
private extractBehavioralFeatures(activity: UserActivity): BehavioralFeatures {
return {
// Temporal features
accessTime: {
hour: new Date(activity.timestamp).getHours(),
dayOfWeek: new Date(activity.timestamp).getDay(),
isBusinessHours: this.isBusinessHours(activity.timestamp)
},
// Location features
location: {
ipAddress: activity.sourceIP,
geolocation: activity.geolocation,
networkType: this.classifyNetworkType(activity.sourceIP),
isKnownLocation: this.isKnownLocation(activity.userId, activity.geolocation)
},
// Device features
device: {
deviceId: activity.deviceId,
userAgent: activity.userAgent,
operatingSystem: this.extractOS(activity.userAgent),
browser: this.extractBrowser(activity.userAgent),
isKnownDevice: this.isKnownDevice(activity.userId, activity.deviceId)
},
// Activity features
activity: {
actionType: activity.action,
resourceAccessed: activity.resource,
dataVolume: activity.dataSize || 0,
duration: activity.duration || 0,
sequencePattern: this.analyzeSequencePattern(activity)
},
// Authentication features
authentication: {
authMethod: activity.authMethod,
failedAttempts: activity.failedAttempts || 0,
mfaUsed: activity.mfaUsed || false,
passwordAge: activity.passwordAge || 0
}
};
}
private async calculateAnomalyScore(
profile: BehavioralProfile,
features: BehavioralFeatures
): Promise<number> {
if (!profile.baselineEstablished) {
// Not enough data for anomaly detection
return 0.3; // Low but non-zero anomaly score
}
let anomalyScore = 0;
let weightSum = 0;
// Time-based anomaly detection
const timeAnomaly = this.calculateTimeAnomaly(profile, features.accessTime);
anomalyScore += timeAnomaly * 0.2;
weightSum += 0.2;
// Location-based anomaly detection
const locationAnomaly = this.calculateLocationAnomaly(profile, features.location);
anomalyScore += locationAnomaly * 0.25;
weightSum += 0.25;
// Device-based anomaly detection
const deviceAnomaly = this.calculateDeviceAnomaly(profile, features.device);
anomalyScore += deviceAnomaly * 0.2;
weightSum += 0.2;
// Activity pattern anomaly detection
const activityAnomaly = this.calculateActivityAnomaly(profile, features.activity);
anomalyScore += activityAnomaly * 0.25;
weightSum += 0.25;
// Authentication anomaly detection
const authAnomaly = this.calculateAuthAnomaly(profile, features.authentication);
anomalyScore += authAnomaly * 0.1;
weightSum += 0.1;
return anomalyScore / weightSum;
}
private calculateTimeAnomaly(
profile: BehavioralProfile,
timeFeatures: any
): number {
// Find typical access hours for user
const typicalHours = this.extractTypicalAccessHours(profile);
if (typicalHours.length === 0) {
return 0.2; // Slight anomaly if no pattern established
}
const currentHour = timeFeatures.hour;
const isTypicalTime = typicalHours.some(hour =>
Math.abs(hour - currentHour) <= 1 // Within 1 hour of typical
);
if (!isTypicalTime) {
// Check if it's drastically different (e.g., middle of night vs day user)
const hourDiff = Math.min(
...typicalHours.map(hour => Math.abs(hour - currentHour))
);
return Math.min(1.0, hourDiff / 12); // Max anomaly if 12+ hours different
}
return 0.1; // Minimal anomaly for typical time
}
private calculateLocationAnomaly(
profile: BehavioralProfile,
locationFeatures: any
): number {
if (!locationFeatures.isKnownLocation) {
// Unknown location is suspicious
return 0.7;
}
// Check network type consistency
const typicalNetworkTypes = this.extractTypicalNetworkTypes(profile);
if (typicalNetworkTypes.length > 0 &&
!typicalNetworkTypes.includes(locationFeatures.networkType)) {
return 0.4; // Moderate anomaly for different network type
}
return 0.1; // Low anomaly for known location
}
private calculateRiskScore(
anomalyScore: number,
contextAnalysis: ContextAnalysis,
profile: BehavioralProfile
): number {
// Base risk from anomaly score
let riskScore = anomalyScore * 0.4;
// Add contextual risk
const contextRiskMap = {
'low': 0.1,
'medium': 0.3,
'high': 0.6,
'critical': 0.9
};
riskScore += contextRiskMap[contextAnalysis.riskLevel] * 0.3;
// Factor in user trust score (inverse relationship)
const trustFactor = (1 - profile.trustScore) * 0.2;
riskScore += trustFactor;
// Add risk from active risk factors
const activeRiskFactors = profile.riskFactors.filter(rf =>
this.isRecentRiskFactor(rf)
);
const riskFactorScore = activeRiskFactors.reduce((sum, rf) => {
const severityWeight = { low: 0.1, medium: 0.2, high: 0.4, critical: 0.8 };
return sum + (severityWeight[rf.severity] || 0.2) * rf.confidence;
}, 0);
riskScore += Math.min(0.1, riskFactorScore); // Cap risk factor contribution
return Math.min(1.0, Math.max(0.0, riskScore));
}
private generateRecommendations(
riskScore: number,
anomalyScore: number,
contextAnalysis: ContextAnalysis
): SecurityRecommendation[] {
const recommendations: SecurityRecommendation[] = [];
if (riskScore > 0.8) {
recommendations.push({
action: 'BLOCK_ACCESS',
priority: 'critical',
description: 'High risk score detected - block access and investigate',
rationale: `Risk score ${riskScore.toFixed(2)} exceeds critical threshold`
});
} else if (riskScore > 0.6) {
recommendations.push({
action: 'REQUIRE_ADDITIONAL_AUTH',
priority: 'high',
description: 'Require step-up authentication (MFA)',
rationale: 'Elevated risk requires additional verification'
});
} else if (riskScore > 0.4) {
recommendations.push({
action: 'ENHANCED_MONITORING',
priority: 'medium',
description: 'Enable enhanced session monitoring',
rationale: 'Moderate risk detected - increase monitoring'
});
}
if (anomalyScore > 0.7) {
recommendations.push({
action: 'BEHAVIORAL_ALERT',
priority: 'high',
description: 'Unusual behavioral pattern detected',
rationale: `Anomaly score ${anomalyScore.toFixed(2)} indicates significant deviation`
});
}
if (contextAnalysis.riskLevel === 'high' || contextAnalysis.riskLevel === 'critical') {
recommendations.push({
action: 'CONTEXT_RESTRICTION',
priority: 'high',
description: 'Apply contextual access restrictions',
rationale: `High contextual risk: ${contextAnalysis.primaryRiskFactors.join(', ')}`
});
}
return recommendations;
}
private async updateUserProfile(
userId: string,
activity: UserActivity,
riskScore: number
): Promise<void> {
const profile = this.userProfiles.get(userId)!;
// Update access patterns
await this.updateAccessPatterns(profile, activity);
// Update trust score based on risk score
profile.trustScore = this.updateTrustScore(profile.trustScore, riskScore);
// Add/update risk factors if significant risk detected
if (riskScore > 0.5) {
await this.updateRiskFactors(profile, activity, riskScore);
}
// Check if baseline can be established (need minimum activities)
if (!profile.baselineEstablished && profile.accessPatterns.length >= 10) {
profile.baselineEstablished = true;
await this.trainUserModel(userId, profile);
}
profile.lastUpdated = new Date();
}
private updateTrustScore(currentTrust: number, riskScore: number): number {
// Adjust trust score based on current activity risk
const riskImpact = riskScore > 0.5 ? (riskScore - 0.5) * 0.2 : 0;
const trustGain = riskScore < 0.3 ? (0.3 - riskScore) * 0.1 : 0;
let newTrust = currentTrust - riskImpact + trustGain;
// Apply decay factor (trust degrades slowly over time)
newTrust *= 0.99;
return Math.min(1.0, Math.max(0.1, newTrust));
}
private identifySuspiciousPatterns(
features: BehavioralFeatures,
profile: BehavioralProfile
): string[] {
const suspiciousPatterns: string[] = [];
// Check for unusual access times
if (!features.accessTime.isBusinessHours &&
this.isTypicalBusinessHoursUser(profile)) {
suspiciousPatterns.push('off_hours_access');
}
// Check for impossible travel
if (this.detectImpossibleTravel(features, profile)) {
suspiciousPatterns.push('impossible_travel');
}
// Check for unusual data access volume
if (features.activity.dataVolume > this.getTypicalDataVolume(profile) * 5) {
suspiciousPatterns.push('excessive_data_access');
}
// Check for rapid successive logins
if (this.detectRapidSuccessiveLogins(features, profile)) {
suspiciousPatterns.push('rapid_successive_logins');
}
return suspiciousPatterns;
}
async generateSecurityAlert(
analysis: BehavioralAnalysis,
activity: UserActivity
): Promise<SecurityAlert | null> {
if (analysis.riskScore < this.alertThresholds.minimumRiskScore) {
return null; // No alert needed
}
const severity = this.determineSeverity(analysis.riskScore);
return {
id: this.generateAlertId(),
userId: analysis.userId,
alertType: 'behavioral_anomaly',
severity,
description: this.generateAlertDescription(analysis),
riskScore: analysis.riskScore,
recommendation: this.getTopRecommendation(analysis.recommendations),
timestamp: new Date(),
context: {
activity,
behavioralInsights: analysis.behavioralInsights,
anomalyScore: analysis.anomalyScore
}
};
}
private generateAlertDescription(analysis: BehavioralAnalysis): string {
const insights = analysis.behavioralInsights;
let description = `User ${analysis.userId} showing `;
if (insights.deviationFromNormal) {
description += 'significant behavioral deviation';
} else {
description += 'elevated risk indicators';
}
if (insights.suspiciousPatterns.length > 0) {
description += ` with patterns: ${insights.suspiciousPatterns.join(', ')}`;
}
return description;
}
private determineSeverity(riskScore: number): 'info' | 'warning' | 'error' | 'critical' {
if (riskScore >= 0.8) return 'critical';
if (riskScore >= 0.6) return 'error';
if (riskScore >= 0.4) return 'warning';
return 'info';
}
}
// Supporting Classes
class ContextAnalyzer {
async analyze(activity: UserActivity): Promise<ContextAnalysis> {
const riskFactors: string[] = [];
let riskLevel: 'low' | 'medium' | 'high' | 'critical' = 'low';
// Analyze time context
if (this.isOffHours(activity.timestamp)) {
riskFactors.push('off_hours_access');
riskLevel = 'medium';
}
// Analyze location context
if (this.isSuspiciousLocation(activity.sourceIP)) {
riskFactors.push('suspicious_location');
riskLevel = 'high';
}
// Analyze network context
if (this.isPublicNetwork(activity.sourceIP)) {
riskFactors.push('public_network');
if (riskLevel === 'low') riskLevel = 'medium';
}
return {
riskLevel,
primaryRiskFactors: riskFactors,
contextScore: this.calculateContextScore(riskFactors)
};
}
private isOffHours(timestamp: Date): boolean {
const hour = timestamp.getHours();
const day = timestamp.getDay();
return hour < 8 || hour > 18 || day === 0 || day === 6;
}
private isSuspiciousLocation(ip: string): boolean {
// Check against threat intelligence feeds
// This is simplified - in production, integrate with IP reputation services
const suspiciousNetworks = ['192.0.2.', '198.51.100.'];
return suspiciousNetworks.some(network => ip.startsWith(network));
}
private calculateContextScore(riskFactors: string[]): number {
const riskWeights = {
'off_hours_access': 0.3,
'suspicious_location': 0.6,
'public_network': 0.2,
'unknown_device': 0.4
};
return riskFactors.reduce((score, factor) =>
score + (riskWeights[factor] || 0.1), 0
);
}
}
Real-World Examples
Microsoft Azure AD Risk-Based Access
- • Behavioral Analytics: ML-powered user risk assessment
- • Contextual Policies: Location, device, and app-based policies
- • Real-time Response: Automatic MFA requirements and session blocking
- • Scale: Protects millions of users with adaptive authentication
Okta Adaptive Multi-Factor Authentication
- • Risk Engine: Behavioral and contextual risk scoring
- • Dynamic Policies: Adaptive authentication based on risk
- • User Experience: Seamless access for trusted activities
- • Integration: Works with 7,000+ applications
Google BeyondCorp Enterprise
- • Zero Trust: Continuous risk assessment for every request
- • Context Aware: Device state, location, and user behavior
- • ML-Driven: Advanced threat detection with machine learning
- • Granular Control: Application and resource-level policies
Darktrace Antigena
- • Self-Learning AI: Establishes normal behavior baselines
- • Autonomous Response: Real-time threat neutralization
- • Adaptive Defense: Evolves with changing threat landscape
- • Enterprise Scale: Protects entire network ecosystems
Adaptive Security Best Practices
✅ Do
- •Establish behavioral baselines with sufficient data before making security decisions
- •Implement risk-based authentication that adjusts security measures based on calculated risk
- •Use multiple contextual factors including time, location, device, and network for risk assessment
- •Provide automated response capabilities with human oversight for complex scenarios
- •Continuously update ML models with new threat intelligence and behavioral patterns
❌ Don't
- •Rely solely on static rules - adaptive security requires dynamic policy adjustment
- •Ignore false positive rates - high false positives reduce user trust and system effectiveness
- •Make decisions on single indicators - use multiple factors for robust risk assessment
- •Implement fully automated responses without human oversight capabilities
- •Neglect user experience - security measures should be transparent for legitimate users
No quiz questions available
Quiz ID "adaptive-security-architecture" not found