Skip to main contentSkip to user menuSkip to navigation

AI Safety and Guardrails Systems

Master content filtering, bias detection, model alignment, safety monitoring, and responsible AI deployment

50 min readAdvanced
Not Started
Loading...

What are AI Safety and Guardrails Systems?

AI safety and guardrails systems are protective measures and monitoring frameworks designed to ensure AI models operate safely, ethically, and within acceptable boundaries in production environments.

Content Safety
Harmful content detection & filtering
Bias Detection
Algorithmic fairness monitoring
Model Alignment
Human values & intent alignment

🛡️ AI Safety Guardrails Calculator

Calculate latency, throughput, safety scores, and operational costs for AI safety systems.

Safety Analysis

Total Latency:45ms
Max Throughput:22,222/sec
Actual Throughput:10,000/sec
Safety Score:100/100
Automated Rate:8,000/sec
Human Review:2,000/sec
Daily Cost:$3024000.00
Recommended GPUs:4

AI Safety Framework Architecture

Content Safety Layer

  • • Toxicity & hate speech detection
  • • NSFW content filtering
  • • Violence & harm identification
  • • Personal information sanitization
  • • Real-time content moderation

Bias & Fairness Layer

  • • Demographic bias detection
  • • Equal opportunity monitoring
  • • Calibration fairness checks
  • • Protected attribute analysis
  • • Intersectional bias evaluation

Model Alignment Layer

  • • Human feedback integration
  • • Value alignment verification
  • • Intent preservation checks
  • • Constitutional AI principles
  • • Reward model validation

Monitoring & Control Layer

  • • Real-time safety metrics
  • • Circuit breaker mechanisms
  • • Gradual rollout controls
  • • A/B safety testing
  • • Emergency shutdown systems

Production Content Safety System

Multi-Layer Content Filter

content_safety_system.py
from typing import Dict, List, Any, Optional, Tuple
import re
import asyncio
import json
import time
import logging
from dataclasses import dataclass, asdict
from enum import Enum
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import numpy as np
from sentence_transformers import SentenceTransformer
import httpx
import hashlib

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SafetyViolationType(Enum):
    TOXICITY = "toxicity"
    HATE_SPEECH = "hate_speech"
    HARASSMENT = "harassment"
    VIOLENCE = "violence"
    SEXUAL_CONTENT = "sexual_content"
    SELF_HARM = "self_harm"
    SPAM = "spam"
    MISINFORMATION = "misinformation"
    PRIVACY_VIOLATION = "privacy_violation"

@dataclass
class SafetyResult:
    is_safe: bool
    confidence: float
    violation_types: List[SafetyViolationType]
    severity_score: float  # 0.0 (safe) to 1.0 (extremely unsafe)
    explanation: str
    processing_time_ms: float
    filter_version: str

@dataclass
class ContentItem:
    content: str
    content_type: str  # "text", "image_caption", "code", etc.
    metadata: Dict[str, Any]
    content_id: Optional[str] = None
    timestamp: float = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = time.time()
        if self.content_id is None:
            self.content_id = hashlib.md5(self.content.encode()).hexdigest()[:12]

class MultiLayerContentFilter:
    """Production-grade multi-layer content safety system"""
    
    def __init__(self, 
                 model_name: str = "martin-ha/toxic-comment-model",
                 embedding_model: str = "all-MiniLM-L6-v2",
                 enable_human_review: bool = True):
        
        # Load toxicity detection model
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.toxicity_model = AutoModelForSequenceClassification.from_pretrained(model_name)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.toxicity_model.to(self.device)
        
        # Load embedding model for semantic analysis
        self.embedding_model = SentenceTransformer(embedding_model)
        
        # Configuration
        self.enable_human_review = enable_human_review
        self.toxicity_threshold = 0.5
        self.high_severity_threshold = 0.8
        self.version = "v2.1.0"
        
        # Pre-compiled regex patterns for fast detection
        self._compile_patterns()
        
        # Unsafe keywords database (simplified for demo)
        self.unsafe_keywords = self._load_unsafe_keywords()
        
        # Embedding cache for repeated content
        self.embedding_cache = {}
        self.cache_hits = 0
        self.total_requests = 0
        
        # Performance monitoring
        self.processing_times = []
        
    def _compile_patterns(self):
        """Compile regex patterns for fast rule-based detection"""
        self.patterns = {
            SafetyViolationType.PRIVACY_VIOLATION: [
                re.compile(r'\b(?:\d{3}-\d{2}-\d{4}|\d{9})\b'),  # SSN
                re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),  # Email
                re.compile(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b'),  # Credit card
                re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'),  # IP address
            ],
            SafetyViolationType.SPAM: [
                re.compile(r'\b(?:click here|act now|limited time|urgent|free money)\b', re.IGNORECASE),
                re.compile(r'\$\d+.*(?:per day|per hour|guaranteed)', re.IGNORECASE),
            ],
            SafetyViolationType.VIOLENCE: [
                re.compile(r'\b(?:kill|murder|assault|attack|harm|hurt)\b.*(?:person|people|someone)', re.IGNORECASE),
                re.compile(r'\b(?:bomb|explosive|weapon|gun|knife)\b', re.IGNORECASE),
            ]
        }
    
    def _load_unsafe_keywords(self) -> Dict[SafetyViolationType, List[str]]:
        """Load unsafe keywords database"""
        # In production, this would be loaded from a database or file
        return {
            SafetyViolationType.HATE_SPEECH: [
                "hate", "racist", "bigot", "extremist"
            ],
            SafetyViolationType.HARASSMENT: [
                "bully", "threaten", "intimidate", "stalk"
            ],
            SafetyViolationType.SEXUAL_CONTENT: [
                "explicit", "pornographic", "sexual"
            ]
        }
    
    async def evaluate_content(self, content: ContentItem) -> SafetyResult:
        """Main content evaluation pipeline"""
        start_time = time.time()
        self.total_requests += 1
        
        try:
            # Layer 1: Quick rule-based checks
            rule_result = await self._rule_based_check(content)
            if not rule_result.is_safe and rule_result.severity_score > self.high_severity_threshold:
                processing_time = (time.time() - start_time) * 1000
                self.processing_times.append(processing_time)
                return SafetyResult(
                    is_safe=False,
                    confidence=rule_result.confidence,
                    violation_types=rule_result.violation_types,
                    severity_score=rule_result.severity_score,
                    explanation=f"Rule-based filter: {rule_result.explanation}",
                    processing_time_ms=processing_time,
                    filter_version=self.version
                )
            
            # Layer 2: ML-based toxicity detection
            ml_result = await self._ml_toxicity_check(content)
            
            # Layer 3: Semantic similarity analysis
            semantic_result = await self._semantic_analysis(content)
            
            # Layer 4: Contextual analysis
            context_result = await self._contextual_analysis(content)
            
            # Combine results
            final_result = self._combine_results(
                rule_result, ml_result, semantic_result, context_result
            )
            
            processing_time = (time.time() - start_time) * 1000
            final_result.processing_time_ms = processing_time
            final_result.filter_version = self.version
            
            self.processing_times.append(processing_time)
            
            # Log for monitoring
            await self._log_evaluation(content, final_result)
            
            return final_result
            
        except Exception as e:
            logger.error(f"Content evaluation error: {e}")
            processing_time = (time.time() - start_time) * 1000
            return SafetyResult(
                is_safe=False,  # Fail-safe approach
                confidence=0.0,
                violation_types=[],
                severity_score=1.0,
                explanation=f"Evaluation error: {str(e)}",
                processing_time_ms=processing_time,
                filter_version=self.version
            )
    
    async def _rule_based_check(self, content: ContentItem) -> SafetyResult:
        """Fast rule-based content checking"""
        violations = []
        max_severity = 0.0
        explanations = []
        
        text = content.content.lower()
        
        # Check regex patterns
        for violation_type, patterns in self.patterns.items():
            for pattern in patterns:
                if pattern.search(content.content):
                    violations.append(violation_type)
                    severity = 0.9 if violation_type == SafetyViolationType.PRIVACY_VIOLATION else 0.7
                    max_severity = max(max_severity, severity)
                    explanations.append(f"Pattern match: {violation_type.value}")
        
        # Check keyword lists
        for violation_type, keywords in self.unsafe_keywords.items():
            for keyword in keywords:
                if keyword.lower() in text:
                    violations.append(violation_type)
                    max_severity = max(max_severity, 0.6)
                    explanations.append(f"Keyword match: {keyword}")
        
        return SafetyResult(
            is_safe=len(violations) == 0,
            confidence=0.85 if violations else 0.7,
            violation_types=violations,
            severity_score=max_severity,
            explanation="; ".join(explanations) or "No rule violations detected",
            processing_time_ms=0,  # Will be set later
            filter_version=self.version
        )
    
    async def _ml_toxicity_check(self, content: ContentItem) -> SafetyResult:
        """ML-based toxicity detection"""
        try:
            # Tokenize input
            inputs = self.tokenizer(
                content.content,
                return_tensors="pt",
                truncation=True,
                max_length=512,
                padding=True
            ).to(self.device)
            
            # Get model prediction
            with torch.no_grad():
                outputs = self.toxicity_model(**inputs)
                probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
                toxicity_score = probabilities[0][1].item()  # Assuming label 1 is toxic
            
            violations = []
            if toxicity_score > self.toxicity_threshold:
                violations.append(SafetyViolationType.TOXICITY)
            
            return SafetyResult(
                is_safe=toxicity_score <= self.toxicity_threshold,
                confidence=max(toxicity_score, 1 - toxicity_score),
                violation_types=violations,
                severity_score=toxicity_score,
                explanation=f"ML toxicity score: {toxicity_score:.3f}",
                processing_time_ms=0,
                filter_version=self.version
            )
            
        except Exception as e:
            logger.error(f"ML toxicity check error: {e}")
            return SafetyResult(
                is_safe=False,
                confidence=0.0,
                violation_types=[],
                severity_score=1.0,
                explanation=f"ML check failed: {str(e)}",
                processing_time_ms=0,
                filter_version=self.version
            )
    
    async def _semantic_analysis(self, content: ContentItem) -> SafetyResult:
        """Semantic similarity analysis against known harmful content"""
        try:
            # Check cache first
            content_hash = hashlib.md5(content.content.encode()).hexdigest()
            if content_hash in self.embedding_cache:
                embedding = self.embedding_cache[content_hash]
                self.cache_hits += 1
            else:
                embedding = self.embedding_model.encode([content.content])[0]
                self.embedding_cache[content_hash] = embedding
            
            # Compare against known harmful embeddings (simplified)
            # In production, this would use a vector database like Pinecone or Weaviate
            harmful_examples = [
                "I want to hurt people and cause violence",
                "This is hate speech targeting specific groups",
                "Explicit sexual content description"
            ]
            
            if len(harmful_examples) > 0:
                harmful_embeddings = self.embedding_model.encode(harmful_examples)
                similarities = np.dot(embedding, harmful_embeddings.T)
                max_similarity = np.max(similarities)
                
                violations = []
                if max_similarity > 0.7:  # High similarity threshold
                    violations.append(SafetyViolationType.TOXICITY)
                
                return SafetyResult(
                    is_safe=max_similarity <= 0.7,
                    confidence=0.8,
                    violation_types=violations,
                    severity_score=max_similarity,
                    explanation=f"Semantic similarity: {max_similarity:.3f}",
                    processing_time_ms=0,
                    filter_version=self.version
                )
            
            return SafetyResult(
                is_safe=True,
                confidence=0.6,
                violation_types=[],
                severity_score=0.0,
                explanation="No semantic violations detected",
                processing_time_ms=0,
                filter_version=self.version
            )
            
        except Exception as e:
            logger.error(f"Semantic analysis error: {e}")
            return SafetyResult(
                is_safe=True,  # Default to safe for semantic analysis failures
                confidence=0.3,
                violation_types=[],
                severity_score=0.0,
                explanation=f"Semantic analysis failed: {str(e)}",
                processing_time_ms=0,
                filter_version=self.version
            )
    
    async def _contextual_analysis(self, content: ContentItem) -> SafetyResult:
        """Contextual analysis considering metadata and content type"""
        try:
            violations = []
            severity_score = 0.0
            explanations = []
            
            # Adjust thresholds based on content type
            if content.content_type == "code":
                # Code might contain keywords that look harmful but aren't
                severity_score *= 0.5
            elif content.content_type == "creative_writing":
                # Creative writing might discuss violence in acceptable contexts
                severity_score *= 0.7
            
            # Consider user context from metadata
            user_history = content.metadata.get("user_safety_score", 0.5)
            if user_history > 0.8:  # User has good safety record
                severity_score *= 0.8
            elif user_history < 0.3:  # User has poor safety record
                severity_score *= 1.3
                explanations.append("User has poor safety history")
            
            # Consider conversation context
            conversation_context = content.metadata.get("conversation_safety_context", "neutral")
            if conversation_context == "educational":
                severity_score *= 0.6
            elif conversation_context == "debate":
                severity_score *= 0.8
            
            return SafetyResult(
                is_safe=severity_score <= 0.7,
                confidence=0.7,
                violation_types=violations,
                severity_score=min(1.0, severity_score),
                explanation="; ".join(explanations) or "Contextual analysis complete",
                processing_time_ms=0,
                filter_version=self.version
            )
            
        except Exception as e:
            logger.error(f"Contextual analysis error: {e}")
            return SafetyResult(
                is_safe=True,
                confidence=0.5,
                violation_types=[],
                severity_score=0.0,
                explanation=f"Contextual analysis failed: {str(e)}",
                processing_time_ms=0,
                filter_version=self.version
            )
    
    def _combine_results(self, *results: SafetyResult) -> SafetyResult:
        """Combine multiple safety check results"""
        all_violations = []
        all_explanations = []
        max_severity = 0.0
        confidence_scores = []
        
        for result in results:
            all_violations.extend(result.violation_types)
            all_explanations.append(result.explanation)
            max_severity = max(max_severity, result.severity_score)
            confidence_scores.append(result.confidence)
        
        # Remove duplicates
        unique_violations = list(set(all_violations))
        
        # Calculate combined confidence (weighted average)
        combined_confidence = np.mean(confidence_scores) if confidence_scores else 0.0
        
        # Determine if content is safe (conservative approach)
        is_safe = len(unique_violations) == 0 and max_severity <= 0.5
        
        return SafetyResult(
            is_safe=is_safe,
            confidence=combined_confidence,
            violation_types=unique_violations,
            severity_score=max_severity,
            explanation=" | ".join(all_explanations),
            processing_time_ms=0,  # Will be set by caller
            filter_version=self.version
        )
    
    async def _log_evaluation(self, content: ContentItem, result: SafetyResult):
        """Log evaluation for monitoring and improvement"""
        log_entry = {
            "timestamp": time.time(),
            "content_id": content.content_id,
            "content_type": content.content_type,
            "content_length": len(content.content),
            "is_safe": result.is_safe,
            "severity_score": result.severity_score,
            "confidence": result.confidence,
            "violation_types": [v.value for v in result.violation_types],
            "processing_time_ms": result.processing_time_ms,
            "filter_version": result.filter_version
        }
        
        # In production, this would go to a logging system like ELK stack
        logger.info(f"Safety evaluation: {json.dumps(log_entry)}")
    
    async def batch_evaluate(self, contents: List[ContentItem]) -> List[SafetyResult]:
        """Batch evaluation for efficiency"""
        tasks = [self.evaluate_content(content) for content in contents]
        return await asyncio.gather(*tasks, return_exceptions=False)
    
    def get_performance_stats(self) -> Dict[str, Any]:
        """Get performance statistics"""
        if not self.processing_times:
            return {"message": "No evaluations performed yet"}
        
        return {
            "total_evaluations": len(self.processing_times),
            "average_processing_time_ms": np.mean(self.processing_times),
            "p95_processing_time_ms": np.percentile(self.processing_times, 95),
            "p99_processing_time_ms": np.percentile(self.processing_times, 99),
            "cache_hit_rate": self.cache_hits / max(1, self.total_requests),
            "cache_size": len(self.embedding_cache)
        }
    
    def reset_cache(self):
        """Reset embedding cache"""
        self.embedding_cache.clear()
        self.cache_hits = 0
        self.total_requests = 0

# Usage example
async def run_safety_evaluation():
    filter_system = MultiLayerContentFilter()
    
    # Test content
    test_contents = [
        ContentItem("This is a normal, safe message", "text", {"user_safety_score": 0.8}),
        ContentItem("I hate all people from that group", "text", {"user_safety_score": 0.2}),
        ContentItem("My email is john@example.com and SSN is 123-45-6789", "text", {}),
        ContentItem("def process_user_data(data): return data.clean()", "code", {}),
    ]
    
    results = await filter_system.batch_evaluate(test_contents)
    
    for content, result in zip(test_contents, results):
        print(f"Content: {content.content[:50]}...")
        print(f"Safe: {result.is_safe}, Confidence: {result.confidence:.2f}")
        print(f"Violations: {[v.value for v in result.violation_types]}")
        print(f"Severity: {result.severity_score:.3f}")
        print("---")
    
    print(f"Performance stats: {filter_system.get_performance_stats()}")

if __name__ == "__main__":
    asyncio.run(run_safety_evaluation())

Algorithmic Bias Detection System

Production Bias Monitoring

bias_detection_system.py
import numpy as np
import pandas as pd
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
import logging
from sklearn.metrics import confusion_matrix, roc_auc_score
from scipy import stats
import json
import time
from enum import Enum

logger = logging.getLogger(__name__)

class ProtectedAttribute(Enum):
    GENDER = "gender"
    RACE = "race"
    AGE = "age"
    RELIGION = "religion"
    SEXUAL_ORIENTATION = "sexual_orientation"
    DISABILITY = "disability"
    SOCIOECONOMIC_STATUS = "socioeconomic_status"

class FairnessMetric(Enum):
    DEMOGRAPHIC_PARITY = "demographic_parity"
    EQUAL_OPPORTUNITY = "equal_opportunity"
    EQUALIZED_ODDS = "equalized_odds"
    CALIBRATION = "calibration"
    INDIVIDUAL_FAIRNESS = "individual_fairness"

@dataclass
class BiasTestResult:
    metric: FairnessMetric
    protected_attribute: ProtectedAttribute
    groups: Dict[str, Any]
    bias_score: float  # 0.0 (fair) to 1.0 (highly biased)
    statistical_significance: bool
    p_value: float
    threshold_passed: bool
    explanation: str
    recommendations: List[str]

@dataclass
class PredictionRecord:
    prediction: float
    true_label: int
    user_id: str
    protected_attributes: Dict[ProtectedAttribute, str]
    features: Dict[str, Any]
    timestamp: float
    model_version: str

class AlgorithmicBiasDetector:
    """Production system for detecting algorithmic bias in ML models"""
    
    def __init__(self, 
                 fairness_thresholds: Optional[Dict[FairnessMetric, float]] = None,
                 min_samples_per_group: int = 100):
        
        # Default fairness thresholds (lower = more strict)
        self.fairness_thresholds = fairness_thresholds or {
            FairnessMetric.DEMOGRAPHIC_PARITY: 0.1,  # 10% difference max
            FairnessMetric.EQUAL_OPPORTUNITY: 0.1,
            FairnessMetric.EQUALIZED_ODDS: 0.1,
            FairnessMetric.CALIBRATION: 0.05,
            FairnessMetric.INDIVIDUAL_FAIRNESS: 0.15
        }
        
        self.min_samples_per_group = min_samples_per_group
        self.prediction_buffer = []
        self.bias_history = []
        
        # Statistical significance level
        self.alpha = 0.05
        
    def add_predictions(self, predictions: List[PredictionRecord]):
        """Add prediction records for bias analysis"""
        self.prediction_buffer.extend(predictions)
        
        # Automatically run bias analysis when buffer reaches threshold
        if len(self.prediction_buffer) >= 1000:
            self.analyze_bias()
    
    def analyze_bias(self, 
                    protected_attributes: Optional[List[ProtectedAttribute]] = None,
                    metrics: Optional[List[FairnessMetric]] = None) -> Dict[str, List[BiasTestResult]]:
        """Comprehensive bias analysis across multiple dimensions"""
        
        if not self.prediction_buffer:
            logger.warning("No predictions available for bias analysis")
            return {}
        
        # Convert to DataFrame for easier analysis
        df = self._predictions_to_dataframe()
        
        # Default to all available attributes and metrics
        if protected_attributes is None:
            protected_attributes = self._get_available_attributes(df)
        if metrics is None:
            metrics = list(self.fairness_thresholds.keys())
        
        results = {}
        
        for attr in protected_attributes:
            attr_results = []
            
            for metric in metrics:
                try:
                    result = self._compute_fairness_metric(df, attr, metric)
                    if result:
                        attr_results.append(result)
                        
                        # Log significant bias issues
                        if not result.threshold_passed and result.statistical_significance:
                            logger.warning(
                                f"Significant bias detected: {metric.value} for {attr.value} "
                                f"(bias_score: {result.bias_score:.3f}, p_value: {result.p_value:.6f})"
                            )
                            
                except Exception as e:
                    logger.error(f"Error computing {metric.value} for {attr.value}: {e}")
            
            if attr_results:
                results[attr.value] = attr_results
        
        # Store results for historical tracking
        self.bias_history.append({
            'timestamp': time.time(),
            'results': results,
            'sample_size': len(df)
        })
        
        # Clear buffer after analysis
        self.prediction_buffer.clear()
        
        return results
    
    def _predictions_to_dataframe(self) -> pd.DataFrame:
        """Convert prediction records to pandas DataFrame"""
        records = []
        
        for pred in self.prediction_buffer:
            record = {
                'prediction': pred.prediction,
                'true_label': pred.true_label,
                'user_id': pred.user_id,
                'timestamp': pred.timestamp,
                'model_version': pred.model_version
            }
            
            # Add protected attributes
            for attr, value in pred.protected_attributes.items():
                record[f'{attr.value}'] = value
            
            # Add features
            for feature, value in pred.features.items():
                record[f'feature_{feature}'] = value
            
            records.append(record)
        
        return pd.DataFrame(records)
    
    def _get_available_attributes(self, df: pd.DataFrame) -> List[ProtectedAttribute]:
        """Get available protected attributes from DataFrame"""
        available = []
        for attr in ProtectedAttribute:
            if attr.value in df.columns:
                available.append(attr)
        return available
    
    def _compute_fairness_metric(self, 
                                df: pd.DataFrame, 
                                protected_attr: ProtectedAttribute,
                                metric: FairnessMetric) -> Optional[BiasTestResult]:
        """Compute specific fairness metric"""
        
        attr_col = protected_attr.value
        if attr_col not in df.columns:
            return None
        
        # Get unique groups for this attribute
        groups = df[attr_col].unique()
        if len(groups) < 2:
            return None
        
        # Ensure minimum sample size per group
        group_counts = df[attr_col].value_counts()
        if group_counts.min() < self.min_samples_per_group:
            return BiasTestResult(
                metric=metric,
                protected_attribute=protected_attr,
                groups={},
                bias_score=0.0,
                statistical_significance=False,
                p_value=1.0,
                threshold_passed=False,
                explanation="Insufficient samples per group",
                recommendations=["Collect more data for underrepresented groups"]
            )
        
        # Compute metric based on type
        if metric == FairnessMetric.DEMOGRAPHIC_PARITY:
            return self._compute_demographic_parity(df, attr_col, groups)
        elif metric == FairnessMetric.EQUAL_OPPORTUNITY:
            return self._compute_equal_opportunity(df, attr_col, groups)
        elif metric == FairnessMetric.EQUALIZED_ODDS:
            return self._compute_equalized_odds(df, attr_col, groups)
        elif metric == FairnessMetric.CALIBRATION:
            return self._compute_calibration(df, attr_col, groups)
        elif metric == FairnessMetric.INDIVIDUAL_FAIRNESS:
            return self._compute_individual_fairness(df, attr_col, groups)
        
        return None
    
    def _compute_demographic_parity(self, 
                                   df: pd.DataFrame, 
                                   attr_col: str, 
                                   groups: np.ndarray) -> BiasTestResult:
        """Compute demographic parity (equal positive prediction rates)"""
        
        group_stats = {}
        positive_rates = []
        
        for group in groups:
            group_data = df[df[attr_col] == group]
            positive_rate = (group_data['prediction'] > 0.5).mean()
            group_stats[str(group)] = {
                'positive_rate': positive_rate,
                'count': len(group_data)
            }
            positive_rates.append(positive_rate)
        
        # Calculate bias score (max difference in positive rates)
        bias_score = max(positive_rates) - min(positive_rates)
        
        # Statistical test (Chi-square test)
        contingency_table = []
        for group in groups:
            group_data = df[df[attr_col] == group]
            positive = (group_data['prediction'] > 0.5).sum()
            negative = len(group_data) - positive
            contingency_table.append([positive, negative])
        
        chi2, p_value = stats.chi2_contingency(contingency_table)[:2]
        statistical_significance = p_value < self.alpha
        
        threshold_passed = bias_score <= self.fairness_thresholds[FairnessMetric.DEMOGRAPHIC_PARITY]
        
        recommendations = []
        if not threshold_passed:
            recommendations.extend([
                "Consider re-balancing training data",
                "Apply fairness constraints during training", 
                "Use post-processing calibration techniques"
            ])
        
        return BiasTestResult(
            metric=FairnessMetric.DEMOGRAPHIC_PARITY,
            protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
            groups=group_stats,
            bias_score=bias_score,
            statistical_significance=statistical_significance,
            p_value=p_value,
            threshold_passed=threshold_passed,
            explanation=f"Max difference in positive prediction rates: {bias_score:.3f}",
            recommendations=recommendations
        )
    
    def _compute_equal_opportunity(self, 
                                  df: pd.DataFrame, 
                                  attr_col: str, 
                                  groups: np.ndarray) -> BiasTestResult:
        """Compute equal opportunity (equal true positive rates)"""
        
        group_stats = {}
        tpr_rates = []
        
        # Only consider positive examples (true_label == 1)
        positive_df = df[df['true_label'] == 1]
        
        if len(positive_df) == 0:
            return BiasTestResult(
                metric=FairnessMetric.EQUAL_OPPORTUNITY,
                protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
                groups={},
                bias_score=0.0,
                statistical_significance=False,
                p_value=1.0,
                threshold_passed=False,
                explanation="No positive examples available",
                recommendations=["Need positive examples to compute equal opportunity"]
            )
        
        for group in groups:
            group_data = positive_df[positive_df[attr_col] == group]
            if len(group_data) > 0:
                tpr = (group_data['prediction'] > 0.5).mean()  # True Positive Rate
                group_stats[str(group)] = {
                    'tpr': tpr,
                    'count': len(group_data)
                }
                tpr_rates.append(tpr)
        
        if len(tpr_rates) < 2:
            return BiasTestResult(
                metric=FairnessMetric.EQUAL_OPPORTUNITY,
                protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
                groups=group_stats,
                bias_score=0.0,
                statistical_significance=False,
                p_value=1.0,
                threshold_passed=False,
                explanation="Insufficient groups with positive examples",
                recommendations=["Ensure all groups have positive examples"]
            )
        
        bias_score = max(tpr_rates) - min(tpr_rates)
        
        # Statistical test (comparing proportions)
        group1_data = positive_df[positive_df[attr_col] == groups[0]]
        group2_data = positive_df[positive_df[attr_col] == groups[1]]
        
        if len(group1_data) > 0 and len(group2_data) > 0:
            stat, p_value = stats.chi2_contingency([
                [(group1_data['prediction'] > 0.5).sum(), len(group1_data) - (group1_data['prediction'] > 0.5).sum()],
                [(group2_data['prediction'] > 0.5).sum(), len(group2_data) - (group2_data['prediction'] > 0.5).sum()]
            ])[:2]
        else:
            p_value = 1.0
        
        statistical_significance = p_value < self.alpha
        threshold_passed = bias_score <= self.fairness_thresholds[FairnessMetric.EQUAL_OPPORTUNITY]
        
        recommendations = []
        if not threshold_passed:
            recommendations.extend([
                "Apply equal opportunity constraint during training",
                "Use threshold optimization per group",
                "Consider adversarial debiasing techniques"
            ])
        
        return BiasTestResult(
            metric=FairnessMetric.EQUAL_OPPORTUNITY,
            protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
            groups=group_stats,
            bias_score=bias_score,
            statistical_significance=statistical_significance,
            p_value=p_value,
            threshold_passed=threshold_passed,
            explanation=f"Max difference in true positive rates: {bias_score:.3f}",
            recommendations=recommendations
        )
    
    def _compute_equalized_odds(self, 
                               df: pd.DataFrame, 
                               attr_col: str, 
                               groups: np.ndarray) -> BiasTestResult:
        """Compute equalized odds (equal TPR and FPR across groups)"""
        
        group_stats = {}
        tpr_diff_max = 0.0
        fpr_diff_max = 0.0
        
        for group in groups:
            group_data = df[df[attr_col] == group]
            
            # Calculate TPR and FPR
            tp = ((group_data['prediction'] > 0.5) & (group_data['true_label'] == 1)).sum()
            fp = ((group_data['prediction'] > 0.5) & (group_data['true_label'] == 0)).sum()
            tn = ((group_data['prediction'] <= 0.5) & (group_data['true_label'] == 0)).sum()
            fn = ((group_data['prediction'] <= 0.5) & (group_data['true_label'] == 1)).sum()
            
            tpr = tp / (tp + fn) if (tp + fn) > 0 else 0
            fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
            
            group_stats[str(group)] = {
                'tpr': tpr,
                'fpr': fpr,
                'count': len(group_data),
                'confusion_matrix': {'tp': tp, 'fp': fp, 'tn': tn, 'fn': fn}
            }
        
        # Calculate max differences
        tpr_values = [stats['tpr'] for stats in group_stats.values()]
        fpr_values = [stats['fpr'] for stats in group_stats.values()]
        
        if len(tpr_values) >= 2:
            tpr_diff_max = max(tpr_values) - min(tpr_values)
            fpr_diff_max = max(fpr_values) - min(fpr_values)
        
        # Bias score is the maximum of TPR and FPR differences
        bias_score = max(tpr_diff_max, fpr_diff_max)
        
        # Statistical significance (simplified)
        statistical_significance = bias_score > 0.05  # Simplified test
        p_value = 0.01 if statistical_significance else 0.5
        
        threshold_passed = bias_score <= self.fairness_thresholds[FairnessMetric.EQUALIZED_ODDS]
        
        recommendations = []
        if not threshold_passed:
            recommendations.extend([
                "Apply equalized odds constraint during training",
                "Use different decision thresholds per group",
                "Consider fairness-aware ensemble methods"
            ])
        
        return BiasTestResult(
            metric=FairnessMetric.EQUALIZED_ODDS,
            protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
            groups=group_stats,
            bias_score=bias_score,
            statistical_significance=statistical_significance,
            p_value=p_value,
            threshold_passed=threshold_passed,
            explanation=f"Max TPR diff: {tpr_diff_max:.3f}, Max FPR diff: {fpr_diff_max:.3f}",
            recommendations=recommendations
        )
    
    def _compute_calibration(self, 
                           df: pd.DataFrame, 
                           attr_col: str, 
                           groups: np.ndarray) -> BiasTestResult:
        """Compute calibration fairness (predicted probabilities match actual outcomes)"""
        
        group_stats = {}
        calibration_errors = []
        
        for group in groups:
            group_data = df[df[attr_col] == group]
            
            # Bin predictions and calculate calibration error
            n_bins = 10
            bin_boundaries = np.linspace(0, 1, n_bins + 1)
            bin_lowers = bin_boundaries[:-1]
            bin_uppers = bin_boundaries[1:]
            
            calibration_error = 0.0
            total_samples = len(group_data)
            
            for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
                in_bin = (group_data['prediction'] > bin_lower) & (group_data['prediction'] <= bin_upper)
                prop_in_bin = in_bin.mean()
                
                if prop_in_bin > 0:
                    accuracy_in_bin = group_data[in_bin]['true_label'].mean()
                    avg_confidence_in_bin = group_data[in_bin]['prediction'].mean()
                    calibration_error += np.abs(avg_confidence_in_bin - accuracy_in_bin) * prop_in_bin
            
            group_stats[str(group)] = {
                'calibration_error': calibration_error,
                'count': total_samples
            }
            calibration_errors.append(calibration_error)
        
        # Bias score is the maximum difference in calibration errors
        bias_score = max(calibration_errors) - min(calibration_errors) if len(calibration_errors) >= 2 else 0
        
        statistical_significance = bias_score > 0.02  # Simplified test
        p_value = 0.01 if statistical_significance else 0.5
        
        threshold_passed = bias_score <= self.fairness_thresholds[FairnessMetric.CALIBRATION]
        
        recommendations = []
        if not threshold_passed:
            recommendations.extend([
                "Apply calibration methods (Platt scaling, isotonic regression)",
                "Use group-specific calibration",
                "Implement fairness-aware calibration techniques"
            ])
        
        return BiasTestResult(
            metric=FairnessMetric.CALIBRATION,
            protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
            groups=group_stats,
            bias_score=bias_score,
            statistical_significance=statistical_significance,
            p_value=p_value,
            threshold_passed=threshold_passed,
            explanation=f"Max difference in calibration errors: {bias_score:.3f}",
            recommendations=recommendations
        )
    
    def _compute_individual_fairness(self, 
                                   df: pd.DataFrame, 
                                   attr_col: str, 
                                   groups: np.ndarray) -> BiasTestResult:
        """Compute individual fairness (similar individuals get similar predictions)"""
        
        # Simplified individual fairness: variance in predictions within each group
        group_stats = {}
        prediction_variances = []
        
        for group in groups:
            group_data = df[df[attr_col] == group]
            pred_variance = group_data['prediction'].var()
            
            group_stats[str(group)] = {
                'prediction_variance': pred_variance,
                'count': len(group_data),
                'mean_prediction': group_data['prediction'].mean()
            }
            prediction_variances.append(pred_variance)
        
        # Bias score based on difference in variances (higher variance = less fair)
        bias_score = max(prediction_variances) - min(prediction_variances) if len(prediction_variances) >= 2 else 0
        
        statistical_significance = bias_score > 0.01  # Simplified test
        p_value = 0.01 if statistical_significance else 0.5
        
        threshold_passed = bias_score <= self.fairness_thresholds[FairnessMetric.INDIVIDUAL_FAIRNESS]
        
        recommendations = []
        if not threshold_passed:
            recommendations.extend([
                "Investigate feature preprocessing for consistency",
                "Apply individual fairness constraints",
                "Use fairness-aware representation learning"
            ])
        
        return BiasTestResult(
            metric=FairnessMetric.INDIVIDUAL_FAIRNESS,
            protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
            groups=group_stats,
            bias_score=bias_score,
            statistical_significance=statistical_significance,
            p_value=p_value,
            threshold_passed=threshold_passed,
            explanation=f"Difference in prediction variances: {bias_score:.4f}",
            recommendations=recommendations
        )
    
    def generate_bias_report(self, 
                           results: Dict[str, List[BiasTestResult]]) -> Dict[str, Any]:
        """Generate comprehensive bias analysis report"""
        
        total_tests = sum(len(attr_results) for attr_results in results.values())
        failed_tests = 0
        significant_issues = 0
        
        issue_summary = []
        
        for attr, attr_results in results.items():
            for result in attr_results:
                if not result.threshold_passed:
                    failed_tests += 1
                if result.statistical_significance and not result.threshold_passed:
                    significant_issues += 1
                    issue_summary.append({
                        'attribute': attr,
                        'metric': result.metric.value,
                        'bias_score': result.bias_score,
                        'p_value': result.p_value
                    })
        
        # Overall bias risk score
        risk_score = (significant_issues / max(1, total_tests)) * 100
        
        risk_level = "LOW"
        if risk_score > 30:
            risk_level = "HIGH"
        elif risk_score > 15:
            risk_level = "MEDIUM"
        
        return {
            'timestamp': time.time(),
            'summary': {
                'total_tests': total_tests,
                'tests_failed': failed_tests,
                'significant_issues': significant_issues,
                'risk_score': risk_score,
                'risk_level': risk_level
            },
            'detailed_results': results,
            'significant_issues': issue_summary,
            'sample_size': len(self.prediction_buffer) if self.prediction_buffer else 0,
            'recommendations': self._generate_overall_recommendations(results)
        }
    
    def _generate_overall_recommendations(self, 
                                        results: Dict[str, List[BiasTestResult]]) -> List[str]:
        """Generate overall recommendations based on bias analysis"""
        
        all_recommendations = []
        for attr_results in results.values():
            for result in attr_results:
                if not result.threshold_passed:
                    all_recommendations.extend(result.recommendations)
        
        # Deduplicate and prioritize
        unique_recommendations = list(set(all_recommendations))
        
        # Add general recommendations if issues found
        if all_recommendations:
            unique_recommendations.extend([
                "Implement continuous bias monitoring in production",
                "Regular model retraining with fairness constraints",
                "Establish bias review process with domain experts",
                "Consider hiring diverse team for bias evaluation"
            ])
        
        return unique_recommendations

# Usage example
def create_sample_predictions() -> List[PredictionRecord]:
    """Create sample prediction data for testing"""
    import random
    
    predictions = []
    for i in range(1000):
        # Simulate biased model that discriminates based on gender
        gender = random.choice(['male', 'female'])
        race = random.choice(['white', 'black', 'hispanic', 'asian'])
        
        # Introduce bias: higher predictions for males
        base_prediction = random.uniform(0, 1)
        if gender == 'male':
            prediction = min(1.0, base_prediction + 0.2)
        else:
            prediction = base_prediction
        
        true_label = random.randint(0, 1)
        
        record = PredictionRecord(
            prediction=prediction,
            true_label=true_label,
            user_id=f"user_{i}",
            protected_attributes={
                ProtectedAttribute.GENDER: gender,
                ProtectedAttribute.RACE: race,
                ProtectedAttribute.AGE: random.choice(['young', 'middle', 'old'])
            },
            features={'feature_1': random.uniform(0, 1), 'feature_2': random.uniform(0, 1)},
            timestamp=time.time(),
            model_version='v1.0'
        )
        predictions.append(record)
    
    return predictions

if __name__ == "__main__":
    # Test the bias detection system
    detector = AlgorithmicBiasDetector()
    
    # Add sample predictions
    sample_predictions = create_sample_predictions()
    detector.add_predictions(sample_predictions)
    
    # Analyze bias
    results = detector.analyze_bias()
    
    # Generate report
    report = detector.generate_bias_report(results)
    
    print("Bias Analysis Report:")
    print(json.dumps(report['summary'], indent=2))
    
    if report['significant_issues']:
        print("\nSignificant Issues Found:")
        for issue in report['significant_issues']:
            print(f"- {issue['attribute']}: {issue['metric']} (bias_score: {issue['bias_score']:.3f})")
    
    print(f"\nOverall Risk Level: {report['summary']['risk_level']}")

Real-World Examples

OpenAI Content Policy

Multi-layered content filtering system for ChatGPT and GPT API with real-time harmful content detection and user safety.

  • • Real-time toxicity detection
  • • Constitutional AI alignment
  • • Human feedback integration

Facebook AI Fairness

Comprehensive bias detection across content ranking, ad targeting, and moderation with continuous fairness monitoring.

  • • Demographic parity enforcement
  • • Intersectional bias analysis
  • • Fairness-aware ML training

Google AI Principles

Responsible AI framework with safety by design, fairness testing, and AI ethics integration across all AI products.

  • • Model cards for transparency
  • • What-If Tool for bias testing
  • • Responsible AI practices

AI Safety Best Practices

✅ Do's

  • Implement multiple layers of safety checks
  • Continuously monitor for bias and fairness issues
  • Include diverse perspectives in safety evaluation
  • Design fail-safe mechanisms and circuit breakers
  • Maintain transparency and explainability
  • Regular safety audits and red team exercises

❌ Don'ts

  • Don't rely on single-layer safety measures
  • Don't ignore low-frequency but high-impact risks
  • Don't optimize only for performance metrics
  • Don't deploy without comprehensive safety testing
  • Don't ignore user feedback on safety issues
  • Don't assume safety is a one-time consideration
No quiz questions available
Quiz ID "ai-safety-guardrails-systems" not found