AI Safety and Guardrails Systems
Master content filtering, bias detection, model alignment, safety monitoring, and responsible AI deployment
50 min read•Advanced
Not Started
Loading...
What are AI Safety and Guardrails Systems?
AI safety and guardrails systems are protective measures and monitoring frameworks designed to ensure AI models operate safely, ethically, and within acceptable boundaries in production environments.
Content Safety
Harmful content detection & filtering
Bias Detection
Algorithmic fairness monitoring
Model Alignment
Human values & intent alignment
🛡️ AI Safety Guardrails Calculator
Calculate latency, throughput, safety scores, and operational costs for AI safety systems.
Safety Analysis
Total Latency:45ms
Max Throughput:22,222/sec
Actual Throughput:10,000/sec
Safety Score:100/100
Automated Rate:8,000/sec
Human Review:2,000/sec
Daily Cost:$3024000.00
Recommended GPUs:4
AI Safety Framework Architecture
Content Safety Layer
- • Toxicity & hate speech detection
- • NSFW content filtering
- • Violence & harm identification
- • Personal information sanitization
- • Real-time content moderation
Bias & Fairness Layer
- • Demographic bias detection
- • Equal opportunity monitoring
- • Calibration fairness checks
- • Protected attribute analysis
- • Intersectional bias evaluation
Model Alignment Layer
- • Human feedback integration
- • Value alignment verification
- • Intent preservation checks
- • Constitutional AI principles
- • Reward model validation
Monitoring & Control Layer
- • Real-time safety metrics
- • Circuit breaker mechanisms
- • Gradual rollout controls
- • A/B safety testing
- • Emergency shutdown systems
Production Content Safety System
Multi-Layer Content Filter
content_safety_system.py
from typing import Dict, List, Any, Optional, Tuple
import re
import asyncio
import json
import time
import logging
from dataclasses import dataclass, asdict
from enum import Enum
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import numpy as np
from sentence_transformers import SentenceTransformer
import httpx
import hashlib
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SafetyViolationType(Enum):
TOXICITY = "toxicity"
HATE_SPEECH = "hate_speech"
HARASSMENT = "harassment"
VIOLENCE = "violence"
SEXUAL_CONTENT = "sexual_content"
SELF_HARM = "self_harm"
SPAM = "spam"
MISINFORMATION = "misinformation"
PRIVACY_VIOLATION = "privacy_violation"
@dataclass
class SafetyResult:
is_safe: bool
confidence: float
violation_types: List[SafetyViolationType]
severity_score: float # 0.0 (safe) to 1.0 (extremely unsafe)
explanation: str
processing_time_ms: float
filter_version: str
@dataclass
class ContentItem:
content: str
content_type: str # "text", "image_caption", "code", etc.
metadata: Dict[str, Any]
content_id: Optional[str] = None
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
if self.content_id is None:
self.content_id = hashlib.md5(self.content.encode()).hexdigest()[:12]
class MultiLayerContentFilter:
"""Production-grade multi-layer content safety system"""
def __init__(self,
model_name: str = "martin-ha/toxic-comment-model",
embedding_model: str = "all-MiniLM-L6-v2",
enable_human_review: bool = True):
# Load toxicity detection model
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.toxicity_model = AutoModelForSequenceClassification.from_pretrained(model_name)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.toxicity_model.to(self.device)
# Load embedding model for semantic analysis
self.embedding_model = SentenceTransformer(embedding_model)
# Configuration
self.enable_human_review = enable_human_review
self.toxicity_threshold = 0.5
self.high_severity_threshold = 0.8
self.version = "v2.1.0"
# Pre-compiled regex patterns for fast detection
self._compile_patterns()
# Unsafe keywords database (simplified for demo)
self.unsafe_keywords = self._load_unsafe_keywords()
# Embedding cache for repeated content
self.embedding_cache = {}
self.cache_hits = 0
self.total_requests = 0
# Performance monitoring
self.processing_times = []
def _compile_patterns(self):
"""Compile regex patterns for fast rule-based detection"""
self.patterns = {
SafetyViolationType.PRIVACY_VIOLATION: [
re.compile(r'\b(?:\d{3}-\d{2}-\d{4}|\d{9})\b'), # SSN
re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), # Email
re.compile(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b'), # Credit card
re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'), # IP address
],
SafetyViolationType.SPAM: [
re.compile(r'\b(?:click here|act now|limited time|urgent|free money)\b', re.IGNORECASE),
re.compile(r'\$\d+.*(?:per day|per hour|guaranteed)', re.IGNORECASE),
],
SafetyViolationType.VIOLENCE: [
re.compile(r'\b(?:kill|murder|assault|attack|harm|hurt)\b.*(?:person|people|someone)', re.IGNORECASE),
re.compile(r'\b(?:bomb|explosive|weapon|gun|knife)\b', re.IGNORECASE),
]
}
def _load_unsafe_keywords(self) -> Dict[SafetyViolationType, List[str]]:
"""Load unsafe keywords database"""
# In production, this would be loaded from a database or file
return {
SafetyViolationType.HATE_SPEECH: [
"hate", "racist", "bigot", "extremist"
],
SafetyViolationType.HARASSMENT: [
"bully", "threaten", "intimidate", "stalk"
],
SafetyViolationType.SEXUAL_CONTENT: [
"explicit", "pornographic", "sexual"
]
}
async def evaluate_content(self, content: ContentItem) -> SafetyResult:
"""Main content evaluation pipeline"""
start_time = time.time()
self.total_requests += 1
try:
# Layer 1: Quick rule-based checks
rule_result = await self._rule_based_check(content)
if not rule_result.is_safe and rule_result.severity_score > self.high_severity_threshold:
processing_time = (time.time() - start_time) * 1000
self.processing_times.append(processing_time)
return SafetyResult(
is_safe=False,
confidence=rule_result.confidence,
violation_types=rule_result.violation_types,
severity_score=rule_result.severity_score,
explanation=f"Rule-based filter: {rule_result.explanation}",
processing_time_ms=processing_time,
filter_version=self.version
)
# Layer 2: ML-based toxicity detection
ml_result = await self._ml_toxicity_check(content)
# Layer 3: Semantic similarity analysis
semantic_result = await self._semantic_analysis(content)
# Layer 4: Contextual analysis
context_result = await self._contextual_analysis(content)
# Combine results
final_result = self._combine_results(
rule_result, ml_result, semantic_result, context_result
)
processing_time = (time.time() - start_time) * 1000
final_result.processing_time_ms = processing_time
final_result.filter_version = self.version
self.processing_times.append(processing_time)
# Log for monitoring
await self._log_evaluation(content, final_result)
return final_result
except Exception as e:
logger.error(f"Content evaluation error: {e}")
processing_time = (time.time() - start_time) * 1000
return SafetyResult(
is_safe=False, # Fail-safe approach
confidence=0.0,
violation_types=[],
severity_score=1.0,
explanation=f"Evaluation error: {str(e)}",
processing_time_ms=processing_time,
filter_version=self.version
)
async def _rule_based_check(self, content: ContentItem) -> SafetyResult:
"""Fast rule-based content checking"""
violations = []
max_severity = 0.0
explanations = []
text = content.content.lower()
# Check regex patterns
for violation_type, patterns in self.patterns.items():
for pattern in patterns:
if pattern.search(content.content):
violations.append(violation_type)
severity = 0.9 if violation_type == SafetyViolationType.PRIVACY_VIOLATION else 0.7
max_severity = max(max_severity, severity)
explanations.append(f"Pattern match: {violation_type.value}")
# Check keyword lists
for violation_type, keywords in self.unsafe_keywords.items():
for keyword in keywords:
if keyword.lower() in text:
violations.append(violation_type)
max_severity = max(max_severity, 0.6)
explanations.append(f"Keyword match: {keyword}")
return SafetyResult(
is_safe=len(violations) == 0,
confidence=0.85 if violations else 0.7,
violation_types=violations,
severity_score=max_severity,
explanation="; ".join(explanations) or "No rule violations detected",
processing_time_ms=0, # Will be set later
filter_version=self.version
)
async def _ml_toxicity_check(self, content: ContentItem) -> SafetyResult:
"""ML-based toxicity detection"""
try:
# Tokenize input
inputs = self.tokenizer(
content.content,
return_tensors="pt",
truncation=True,
max_length=512,
padding=True
).to(self.device)
# Get model prediction
with torch.no_grad():
outputs = self.toxicity_model(**inputs)
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
toxicity_score = probabilities[0][1].item() # Assuming label 1 is toxic
violations = []
if toxicity_score > self.toxicity_threshold:
violations.append(SafetyViolationType.TOXICITY)
return SafetyResult(
is_safe=toxicity_score <= self.toxicity_threshold,
confidence=max(toxicity_score, 1 - toxicity_score),
violation_types=violations,
severity_score=toxicity_score,
explanation=f"ML toxicity score: {toxicity_score:.3f}",
processing_time_ms=0,
filter_version=self.version
)
except Exception as e:
logger.error(f"ML toxicity check error: {e}")
return SafetyResult(
is_safe=False,
confidence=0.0,
violation_types=[],
severity_score=1.0,
explanation=f"ML check failed: {str(e)}",
processing_time_ms=0,
filter_version=self.version
)
async def _semantic_analysis(self, content: ContentItem) -> SafetyResult:
"""Semantic similarity analysis against known harmful content"""
try:
# Check cache first
content_hash = hashlib.md5(content.content.encode()).hexdigest()
if content_hash in self.embedding_cache:
embedding = self.embedding_cache[content_hash]
self.cache_hits += 1
else:
embedding = self.embedding_model.encode([content.content])[0]
self.embedding_cache[content_hash] = embedding
# Compare against known harmful embeddings (simplified)
# In production, this would use a vector database like Pinecone or Weaviate
harmful_examples = [
"I want to hurt people and cause violence",
"This is hate speech targeting specific groups",
"Explicit sexual content description"
]
if len(harmful_examples) > 0:
harmful_embeddings = self.embedding_model.encode(harmful_examples)
similarities = np.dot(embedding, harmful_embeddings.T)
max_similarity = np.max(similarities)
violations = []
if max_similarity > 0.7: # High similarity threshold
violations.append(SafetyViolationType.TOXICITY)
return SafetyResult(
is_safe=max_similarity <= 0.7,
confidence=0.8,
violation_types=violations,
severity_score=max_similarity,
explanation=f"Semantic similarity: {max_similarity:.3f}",
processing_time_ms=0,
filter_version=self.version
)
return SafetyResult(
is_safe=True,
confidence=0.6,
violation_types=[],
severity_score=0.0,
explanation="No semantic violations detected",
processing_time_ms=0,
filter_version=self.version
)
except Exception as e:
logger.error(f"Semantic analysis error: {e}")
return SafetyResult(
is_safe=True, # Default to safe for semantic analysis failures
confidence=0.3,
violation_types=[],
severity_score=0.0,
explanation=f"Semantic analysis failed: {str(e)}",
processing_time_ms=0,
filter_version=self.version
)
async def _contextual_analysis(self, content: ContentItem) -> SafetyResult:
"""Contextual analysis considering metadata and content type"""
try:
violations = []
severity_score = 0.0
explanations = []
# Adjust thresholds based on content type
if content.content_type == "code":
# Code might contain keywords that look harmful but aren't
severity_score *= 0.5
elif content.content_type == "creative_writing":
# Creative writing might discuss violence in acceptable contexts
severity_score *= 0.7
# Consider user context from metadata
user_history = content.metadata.get("user_safety_score", 0.5)
if user_history > 0.8: # User has good safety record
severity_score *= 0.8
elif user_history < 0.3: # User has poor safety record
severity_score *= 1.3
explanations.append("User has poor safety history")
# Consider conversation context
conversation_context = content.metadata.get("conversation_safety_context", "neutral")
if conversation_context == "educational":
severity_score *= 0.6
elif conversation_context == "debate":
severity_score *= 0.8
return SafetyResult(
is_safe=severity_score <= 0.7,
confidence=0.7,
violation_types=violations,
severity_score=min(1.0, severity_score),
explanation="; ".join(explanations) or "Contextual analysis complete",
processing_time_ms=0,
filter_version=self.version
)
except Exception as e:
logger.error(f"Contextual analysis error: {e}")
return SafetyResult(
is_safe=True,
confidence=0.5,
violation_types=[],
severity_score=0.0,
explanation=f"Contextual analysis failed: {str(e)}",
processing_time_ms=0,
filter_version=self.version
)
def _combine_results(self, *results: SafetyResult) -> SafetyResult:
"""Combine multiple safety check results"""
all_violations = []
all_explanations = []
max_severity = 0.0
confidence_scores = []
for result in results:
all_violations.extend(result.violation_types)
all_explanations.append(result.explanation)
max_severity = max(max_severity, result.severity_score)
confidence_scores.append(result.confidence)
# Remove duplicates
unique_violations = list(set(all_violations))
# Calculate combined confidence (weighted average)
combined_confidence = np.mean(confidence_scores) if confidence_scores else 0.0
# Determine if content is safe (conservative approach)
is_safe = len(unique_violations) == 0 and max_severity <= 0.5
return SafetyResult(
is_safe=is_safe,
confidence=combined_confidence,
violation_types=unique_violations,
severity_score=max_severity,
explanation=" | ".join(all_explanations),
processing_time_ms=0, # Will be set by caller
filter_version=self.version
)
async def _log_evaluation(self, content: ContentItem, result: SafetyResult):
"""Log evaluation for monitoring and improvement"""
log_entry = {
"timestamp": time.time(),
"content_id": content.content_id,
"content_type": content.content_type,
"content_length": len(content.content),
"is_safe": result.is_safe,
"severity_score": result.severity_score,
"confidence": result.confidence,
"violation_types": [v.value for v in result.violation_types],
"processing_time_ms": result.processing_time_ms,
"filter_version": result.filter_version
}
# In production, this would go to a logging system like ELK stack
logger.info(f"Safety evaluation: {json.dumps(log_entry)}")
async def batch_evaluate(self, contents: List[ContentItem]) -> List[SafetyResult]:
"""Batch evaluation for efficiency"""
tasks = [self.evaluate_content(content) for content in contents]
return await asyncio.gather(*tasks, return_exceptions=False)
def get_performance_stats(self) -> Dict[str, Any]:
"""Get performance statistics"""
if not self.processing_times:
return {"message": "No evaluations performed yet"}
return {
"total_evaluations": len(self.processing_times),
"average_processing_time_ms": np.mean(self.processing_times),
"p95_processing_time_ms": np.percentile(self.processing_times, 95),
"p99_processing_time_ms": np.percentile(self.processing_times, 99),
"cache_hit_rate": self.cache_hits / max(1, self.total_requests),
"cache_size": len(self.embedding_cache)
}
def reset_cache(self):
"""Reset embedding cache"""
self.embedding_cache.clear()
self.cache_hits = 0
self.total_requests = 0
# Usage example
async def run_safety_evaluation():
filter_system = MultiLayerContentFilter()
# Test content
test_contents = [
ContentItem("This is a normal, safe message", "text", {"user_safety_score": 0.8}),
ContentItem("I hate all people from that group", "text", {"user_safety_score": 0.2}),
ContentItem("My email is john@example.com and SSN is 123-45-6789", "text", {}),
ContentItem("def process_user_data(data): return data.clean()", "code", {}),
]
results = await filter_system.batch_evaluate(test_contents)
for content, result in zip(test_contents, results):
print(f"Content: {content.content[:50]}...")
print(f"Safe: {result.is_safe}, Confidence: {result.confidence:.2f}")
print(f"Violations: {[v.value for v in result.violation_types]}")
print(f"Severity: {result.severity_score:.3f}")
print("---")
print(f"Performance stats: {filter_system.get_performance_stats()}")
if __name__ == "__main__":
asyncio.run(run_safety_evaluation())Algorithmic Bias Detection System
Production Bias Monitoring
bias_detection_system.py
import numpy as np
import pandas as pd
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
import logging
from sklearn.metrics import confusion_matrix, roc_auc_score
from scipy import stats
import json
import time
from enum import Enum
logger = logging.getLogger(__name__)
class ProtectedAttribute(Enum):
GENDER = "gender"
RACE = "race"
AGE = "age"
RELIGION = "religion"
SEXUAL_ORIENTATION = "sexual_orientation"
DISABILITY = "disability"
SOCIOECONOMIC_STATUS = "socioeconomic_status"
class FairnessMetric(Enum):
DEMOGRAPHIC_PARITY = "demographic_parity"
EQUAL_OPPORTUNITY = "equal_opportunity"
EQUALIZED_ODDS = "equalized_odds"
CALIBRATION = "calibration"
INDIVIDUAL_FAIRNESS = "individual_fairness"
@dataclass
class BiasTestResult:
metric: FairnessMetric
protected_attribute: ProtectedAttribute
groups: Dict[str, Any]
bias_score: float # 0.0 (fair) to 1.0 (highly biased)
statistical_significance: bool
p_value: float
threshold_passed: bool
explanation: str
recommendations: List[str]
@dataclass
class PredictionRecord:
prediction: float
true_label: int
user_id: str
protected_attributes: Dict[ProtectedAttribute, str]
features: Dict[str, Any]
timestamp: float
model_version: str
class AlgorithmicBiasDetector:
"""Production system for detecting algorithmic bias in ML models"""
def __init__(self,
fairness_thresholds: Optional[Dict[FairnessMetric, float]] = None,
min_samples_per_group: int = 100):
# Default fairness thresholds (lower = more strict)
self.fairness_thresholds = fairness_thresholds or {
FairnessMetric.DEMOGRAPHIC_PARITY: 0.1, # 10% difference max
FairnessMetric.EQUAL_OPPORTUNITY: 0.1,
FairnessMetric.EQUALIZED_ODDS: 0.1,
FairnessMetric.CALIBRATION: 0.05,
FairnessMetric.INDIVIDUAL_FAIRNESS: 0.15
}
self.min_samples_per_group = min_samples_per_group
self.prediction_buffer = []
self.bias_history = []
# Statistical significance level
self.alpha = 0.05
def add_predictions(self, predictions: List[PredictionRecord]):
"""Add prediction records for bias analysis"""
self.prediction_buffer.extend(predictions)
# Automatically run bias analysis when buffer reaches threshold
if len(self.prediction_buffer) >= 1000:
self.analyze_bias()
def analyze_bias(self,
protected_attributes: Optional[List[ProtectedAttribute]] = None,
metrics: Optional[List[FairnessMetric]] = None) -> Dict[str, List[BiasTestResult]]:
"""Comprehensive bias analysis across multiple dimensions"""
if not self.prediction_buffer:
logger.warning("No predictions available for bias analysis")
return {}
# Convert to DataFrame for easier analysis
df = self._predictions_to_dataframe()
# Default to all available attributes and metrics
if protected_attributes is None:
protected_attributes = self._get_available_attributes(df)
if metrics is None:
metrics = list(self.fairness_thresholds.keys())
results = {}
for attr in protected_attributes:
attr_results = []
for metric in metrics:
try:
result = self._compute_fairness_metric(df, attr, metric)
if result:
attr_results.append(result)
# Log significant bias issues
if not result.threshold_passed and result.statistical_significance:
logger.warning(
f"Significant bias detected: {metric.value} for {attr.value} "
f"(bias_score: {result.bias_score:.3f}, p_value: {result.p_value:.6f})"
)
except Exception as e:
logger.error(f"Error computing {metric.value} for {attr.value}: {e}")
if attr_results:
results[attr.value] = attr_results
# Store results for historical tracking
self.bias_history.append({
'timestamp': time.time(),
'results': results,
'sample_size': len(df)
})
# Clear buffer after analysis
self.prediction_buffer.clear()
return results
def _predictions_to_dataframe(self) -> pd.DataFrame:
"""Convert prediction records to pandas DataFrame"""
records = []
for pred in self.prediction_buffer:
record = {
'prediction': pred.prediction,
'true_label': pred.true_label,
'user_id': pred.user_id,
'timestamp': pred.timestamp,
'model_version': pred.model_version
}
# Add protected attributes
for attr, value in pred.protected_attributes.items():
record[f'{attr.value}'] = value
# Add features
for feature, value in pred.features.items():
record[f'feature_{feature}'] = value
records.append(record)
return pd.DataFrame(records)
def _get_available_attributes(self, df: pd.DataFrame) -> List[ProtectedAttribute]:
"""Get available protected attributes from DataFrame"""
available = []
for attr in ProtectedAttribute:
if attr.value in df.columns:
available.append(attr)
return available
def _compute_fairness_metric(self,
df: pd.DataFrame,
protected_attr: ProtectedAttribute,
metric: FairnessMetric) -> Optional[BiasTestResult]:
"""Compute specific fairness metric"""
attr_col = protected_attr.value
if attr_col not in df.columns:
return None
# Get unique groups for this attribute
groups = df[attr_col].unique()
if len(groups) < 2:
return None
# Ensure minimum sample size per group
group_counts = df[attr_col].value_counts()
if group_counts.min() < self.min_samples_per_group:
return BiasTestResult(
metric=metric,
protected_attribute=protected_attr,
groups={},
bias_score=0.0,
statistical_significance=False,
p_value=1.0,
threshold_passed=False,
explanation="Insufficient samples per group",
recommendations=["Collect more data for underrepresented groups"]
)
# Compute metric based on type
if metric == FairnessMetric.DEMOGRAPHIC_PARITY:
return self._compute_demographic_parity(df, attr_col, groups)
elif metric == FairnessMetric.EQUAL_OPPORTUNITY:
return self._compute_equal_opportunity(df, attr_col, groups)
elif metric == FairnessMetric.EQUALIZED_ODDS:
return self._compute_equalized_odds(df, attr_col, groups)
elif metric == FairnessMetric.CALIBRATION:
return self._compute_calibration(df, attr_col, groups)
elif metric == FairnessMetric.INDIVIDUAL_FAIRNESS:
return self._compute_individual_fairness(df, attr_col, groups)
return None
def _compute_demographic_parity(self,
df: pd.DataFrame,
attr_col: str,
groups: np.ndarray) -> BiasTestResult:
"""Compute demographic parity (equal positive prediction rates)"""
group_stats = {}
positive_rates = []
for group in groups:
group_data = df[df[attr_col] == group]
positive_rate = (group_data['prediction'] > 0.5).mean()
group_stats[str(group)] = {
'positive_rate': positive_rate,
'count': len(group_data)
}
positive_rates.append(positive_rate)
# Calculate bias score (max difference in positive rates)
bias_score = max(positive_rates) - min(positive_rates)
# Statistical test (Chi-square test)
contingency_table = []
for group in groups:
group_data = df[df[attr_col] == group]
positive = (group_data['prediction'] > 0.5).sum()
negative = len(group_data) - positive
contingency_table.append([positive, negative])
chi2, p_value = stats.chi2_contingency(contingency_table)[:2]
statistical_significance = p_value < self.alpha
threshold_passed = bias_score <= self.fairness_thresholds[FairnessMetric.DEMOGRAPHIC_PARITY]
recommendations = []
if not threshold_passed:
recommendations.extend([
"Consider re-balancing training data",
"Apply fairness constraints during training",
"Use post-processing calibration techniques"
])
return BiasTestResult(
metric=FairnessMetric.DEMOGRAPHIC_PARITY,
protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
groups=group_stats,
bias_score=bias_score,
statistical_significance=statistical_significance,
p_value=p_value,
threshold_passed=threshold_passed,
explanation=f"Max difference in positive prediction rates: {bias_score:.3f}",
recommendations=recommendations
)
def _compute_equal_opportunity(self,
df: pd.DataFrame,
attr_col: str,
groups: np.ndarray) -> BiasTestResult:
"""Compute equal opportunity (equal true positive rates)"""
group_stats = {}
tpr_rates = []
# Only consider positive examples (true_label == 1)
positive_df = df[df['true_label'] == 1]
if len(positive_df) == 0:
return BiasTestResult(
metric=FairnessMetric.EQUAL_OPPORTUNITY,
protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
groups={},
bias_score=0.0,
statistical_significance=False,
p_value=1.0,
threshold_passed=False,
explanation="No positive examples available",
recommendations=["Need positive examples to compute equal opportunity"]
)
for group in groups:
group_data = positive_df[positive_df[attr_col] == group]
if len(group_data) > 0:
tpr = (group_data['prediction'] > 0.5).mean() # True Positive Rate
group_stats[str(group)] = {
'tpr': tpr,
'count': len(group_data)
}
tpr_rates.append(tpr)
if len(tpr_rates) < 2:
return BiasTestResult(
metric=FairnessMetric.EQUAL_OPPORTUNITY,
protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
groups=group_stats,
bias_score=0.0,
statistical_significance=False,
p_value=1.0,
threshold_passed=False,
explanation="Insufficient groups with positive examples",
recommendations=["Ensure all groups have positive examples"]
)
bias_score = max(tpr_rates) - min(tpr_rates)
# Statistical test (comparing proportions)
group1_data = positive_df[positive_df[attr_col] == groups[0]]
group2_data = positive_df[positive_df[attr_col] == groups[1]]
if len(group1_data) > 0 and len(group2_data) > 0:
stat, p_value = stats.chi2_contingency([
[(group1_data['prediction'] > 0.5).sum(), len(group1_data) - (group1_data['prediction'] > 0.5).sum()],
[(group2_data['prediction'] > 0.5).sum(), len(group2_data) - (group2_data['prediction'] > 0.5).sum()]
])[:2]
else:
p_value = 1.0
statistical_significance = p_value < self.alpha
threshold_passed = bias_score <= self.fairness_thresholds[FairnessMetric.EQUAL_OPPORTUNITY]
recommendations = []
if not threshold_passed:
recommendations.extend([
"Apply equal opportunity constraint during training",
"Use threshold optimization per group",
"Consider adversarial debiasing techniques"
])
return BiasTestResult(
metric=FairnessMetric.EQUAL_OPPORTUNITY,
protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
groups=group_stats,
bias_score=bias_score,
statistical_significance=statistical_significance,
p_value=p_value,
threshold_passed=threshold_passed,
explanation=f"Max difference in true positive rates: {bias_score:.3f}",
recommendations=recommendations
)
def _compute_equalized_odds(self,
df: pd.DataFrame,
attr_col: str,
groups: np.ndarray) -> BiasTestResult:
"""Compute equalized odds (equal TPR and FPR across groups)"""
group_stats = {}
tpr_diff_max = 0.0
fpr_diff_max = 0.0
for group in groups:
group_data = df[df[attr_col] == group]
# Calculate TPR and FPR
tp = ((group_data['prediction'] > 0.5) & (group_data['true_label'] == 1)).sum()
fp = ((group_data['prediction'] > 0.5) & (group_data['true_label'] == 0)).sum()
tn = ((group_data['prediction'] <= 0.5) & (group_data['true_label'] == 0)).sum()
fn = ((group_data['prediction'] <= 0.5) & (group_data['true_label'] == 1)).sum()
tpr = tp / (tp + fn) if (tp + fn) > 0 else 0
fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
group_stats[str(group)] = {
'tpr': tpr,
'fpr': fpr,
'count': len(group_data),
'confusion_matrix': {'tp': tp, 'fp': fp, 'tn': tn, 'fn': fn}
}
# Calculate max differences
tpr_values = [stats['tpr'] for stats in group_stats.values()]
fpr_values = [stats['fpr'] for stats in group_stats.values()]
if len(tpr_values) >= 2:
tpr_diff_max = max(tpr_values) - min(tpr_values)
fpr_diff_max = max(fpr_values) - min(fpr_values)
# Bias score is the maximum of TPR and FPR differences
bias_score = max(tpr_diff_max, fpr_diff_max)
# Statistical significance (simplified)
statistical_significance = bias_score > 0.05 # Simplified test
p_value = 0.01 if statistical_significance else 0.5
threshold_passed = bias_score <= self.fairness_thresholds[FairnessMetric.EQUALIZED_ODDS]
recommendations = []
if not threshold_passed:
recommendations.extend([
"Apply equalized odds constraint during training",
"Use different decision thresholds per group",
"Consider fairness-aware ensemble methods"
])
return BiasTestResult(
metric=FairnessMetric.EQUALIZED_ODDS,
protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
groups=group_stats,
bias_score=bias_score,
statistical_significance=statistical_significance,
p_value=p_value,
threshold_passed=threshold_passed,
explanation=f"Max TPR diff: {tpr_diff_max:.3f}, Max FPR diff: {fpr_diff_max:.3f}",
recommendations=recommendations
)
def _compute_calibration(self,
df: pd.DataFrame,
attr_col: str,
groups: np.ndarray) -> BiasTestResult:
"""Compute calibration fairness (predicted probabilities match actual outcomes)"""
group_stats = {}
calibration_errors = []
for group in groups:
group_data = df[df[attr_col] == group]
# Bin predictions and calculate calibration error
n_bins = 10
bin_boundaries = np.linspace(0, 1, n_bins + 1)
bin_lowers = bin_boundaries[:-1]
bin_uppers = bin_boundaries[1:]
calibration_error = 0.0
total_samples = len(group_data)
for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
in_bin = (group_data['prediction'] > bin_lower) & (group_data['prediction'] <= bin_upper)
prop_in_bin = in_bin.mean()
if prop_in_bin > 0:
accuracy_in_bin = group_data[in_bin]['true_label'].mean()
avg_confidence_in_bin = group_data[in_bin]['prediction'].mean()
calibration_error += np.abs(avg_confidence_in_bin - accuracy_in_bin) * prop_in_bin
group_stats[str(group)] = {
'calibration_error': calibration_error,
'count': total_samples
}
calibration_errors.append(calibration_error)
# Bias score is the maximum difference in calibration errors
bias_score = max(calibration_errors) - min(calibration_errors) if len(calibration_errors) >= 2 else 0
statistical_significance = bias_score > 0.02 # Simplified test
p_value = 0.01 if statistical_significance else 0.5
threshold_passed = bias_score <= self.fairness_thresholds[FairnessMetric.CALIBRATION]
recommendations = []
if not threshold_passed:
recommendations.extend([
"Apply calibration methods (Platt scaling, isotonic regression)",
"Use group-specific calibration",
"Implement fairness-aware calibration techniques"
])
return BiasTestResult(
metric=FairnessMetric.CALIBRATION,
protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
groups=group_stats,
bias_score=bias_score,
statistical_significance=statistical_significance,
p_value=p_value,
threshold_passed=threshold_passed,
explanation=f"Max difference in calibration errors: {bias_score:.3f}",
recommendations=recommendations
)
def _compute_individual_fairness(self,
df: pd.DataFrame,
attr_col: str,
groups: np.ndarray) -> BiasTestResult:
"""Compute individual fairness (similar individuals get similar predictions)"""
# Simplified individual fairness: variance in predictions within each group
group_stats = {}
prediction_variances = []
for group in groups:
group_data = df[df[attr_col] == group]
pred_variance = group_data['prediction'].var()
group_stats[str(group)] = {
'prediction_variance': pred_variance,
'count': len(group_data),
'mean_prediction': group_data['prediction'].mean()
}
prediction_variances.append(pred_variance)
# Bias score based on difference in variances (higher variance = less fair)
bias_score = max(prediction_variances) - min(prediction_variances) if len(prediction_variances) >= 2 else 0
statistical_significance = bias_score > 0.01 # Simplified test
p_value = 0.01 if statistical_significance else 0.5
threshold_passed = bias_score <= self.fairness_thresholds[FairnessMetric.INDIVIDUAL_FAIRNESS]
recommendations = []
if not threshold_passed:
recommendations.extend([
"Investigate feature preprocessing for consistency",
"Apply individual fairness constraints",
"Use fairness-aware representation learning"
])
return BiasTestResult(
metric=FairnessMetric.INDIVIDUAL_FAIRNESS,
protected_attribute=ProtectedAttribute(attr_col.replace('_', '_').upper()),
groups=group_stats,
bias_score=bias_score,
statistical_significance=statistical_significance,
p_value=p_value,
threshold_passed=threshold_passed,
explanation=f"Difference in prediction variances: {bias_score:.4f}",
recommendations=recommendations
)
def generate_bias_report(self,
results: Dict[str, List[BiasTestResult]]) -> Dict[str, Any]:
"""Generate comprehensive bias analysis report"""
total_tests = sum(len(attr_results) for attr_results in results.values())
failed_tests = 0
significant_issues = 0
issue_summary = []
for attr, attr_results in results.items():
for result in attr_results:
if not result.threshold_passed:
failed_tests += 1
if result.statistical_significance and not result.threshold_passed:
significant_issues += 1
issue_summary.append({
'attribute': attr,
'metric': result.metric.value,
'bias_score': result.bias_score,
'p_value': result.p_value
})
# Overall bias risk score
risk_score = (significant_issues / max(1, total_tests)) * 100
risk_level = "LOW"
if risk_score > 30:
risk_level = "HIGH"
elif risk_score > 15:
risk_level = "MEDIUM"
return {
'timestamp': time.time(),
'summary': {
'total_tests': total_tests,
'tests_failed': failed_tests,
'significant_issues': significant_issues,
'risk_score': risk_score,
'risk_level': risk_level
},
'detailed_results': results,
'significant_issues': issue_summary,
'sample_size': len(self.prediction_buffer) if self.prediction_buffer else 0,
'recommendations': self._generate_overall_recommendations(results)
}
def _generate_overall_recommendations(self,
results: Dict[str, List[BiasTestResult]]) -> List[str]:
"""Generate overall recommendations based on bias analysis"""
all_recommendations = []
for attr_results in results.values():
for result in attr_results:
if not result.threshold_passed:
all_recommendations.extend(result.recommendations)
# Deduplicate and prioritize
unique_recommendations = list(set(all_recommendations))
# Add general recommendations if issues found
if all_recommendations:
unique_recommendations.extend([
"Implement continuous bias monitoring in production",
"Regular model retraining with fairness constraints",
"Establish bias review process with domain experts",
"Consider hiring diverse team for bias evaluation"
])
return unique_recommendations
# Usage example
def create_sample_predictions() -> List[PredictionRecord]:
"""Create sample prediction data for testing"""
import random
predictions = []
for i in range(1000):
# Simulate biased model that discriminates based on gender
gender = random.choice(['male', 'female'])
race = random.choice(['white', 'black', 'hispanic', 'asian'])
# Introduce bias: higher predictions for males
base_prediction = random.uniform(0, 1)
if gender == 'male':
prediction = min(1.0, base_prediction + 0.2)
else:
prediction = base_prediction
true_label = random.randint(0, 1)
record = PredictionRecord(
prediction=prediction,
true_label=true_label,
user_id=f"user_{i}",
protected_attributes={
ProtectedAttribute.GENDER: gender,
ProtectedAttribute.RACE: race,
ProtectedAttribute.AGE: random.choice(['young', 'middle', 'old'])
},
features={'feature_1': random.uniform(0, 1), 'feature_2': random.uniform(0, 1)},
timestamp=time.time(),
model_version='v1.0'
)
predictions.append(record)
return predictions
if __name__ == "__main__":
# Test the bias detection system
detector = AlgorithmicBiasDetector()
# Add sample predictions
sample_predictions = create_sample_predictions()
detector.add_predictions(sample_predictions)
# Analyze bias
results = detector.analyze_bias()
# Generate report
report = detector.generate_bias_report(results)
print("Bias Analysis Report:")
print(json.dumps(report['summary'], indent=2))
if report['significant_issues']:
print("\nSignificant Issues Found:")
for issue in report['significant_issues']:
print(f"- {issue['attribute']}: {issue['metric']} (bias_score: {issue['bias_score']:.3f})")
print(f"\nOverall Risk Level: {report['summary']['risk_level']}")Real-World Examples
OpenAI Content Policy
Multi-layered content filtering system for ChatGPT and GPT API with real-time harmful content detection and user safety.
- • Real-time toxicity detection
- • Constitutional AI alignment
- • Human feedback integration
Facebook AI Fairness
Comprehensive bias detection across content ranking, ad targeting, and moderation with continuous fairness monitoring.
- • Demographic parity enforcement
- • Intersectional bias analysis
- • Fairness-aware ML training
Google AI Principles
Responsible AI framework with safety by design, fairness testing, and AI ethics integration across all AI products.
- • Model cards for transparency
- • What-If Tool for bias testing
- • Responsible AI practices
AI Safety Best Practices
✅ Do's
- •Implement multiple layers of safety checks
- •Continuously monitor for bias and fairness issues
- •Include diverse perspectives in safety evaluation
- •Design fail-safe mechanisms and circuit breakers
- •Maintain transparency and explainability
- •Regular safety audits and red team exercises
❌ Don'ts
- •Don't rely on single-layer safety measures
- •Don't ignore low-frequency but high-impact risks
- •Don't optimize only for performance metrics
- •Don't deploy without comprehensive safety testing
- •Don't ignore user feedback on safety issues
- •Don't assume safety is a one-time consideration
No quiz questions available
Quiz ID "ai-safety-guardrails-systems" not found