Human-in-the-Loop ML
Building ML systems that seamlessly integrate human expertise with automated intelligence
Human Expertise
Leveraging human judgment for complex, nuanced, and high-stakes decisions
AI Efficiency
Automated processing for routine cases while ensuring quality and scale
Continuous Learning
Model improvement through real-time human feedback and corrections
HITL Design Patterns
Active Learning Implementation
Key Benefits
Improved Accuracy
Human oversight for edge cases and complex decisions
Combining AI efficiency with human judgment achieves higher overall accuracy
Scalable Quality
Maintain quality while processing large volumes
AI handles routine cases, humans focus on challenging decisions
Continuous Improvement
Models learn from human feedback in real-time
System performance improves continuously through human corrections
Design Principles
Application Domains
Content Moderation
Combining AI efficiency with human judgment for content safety
Medical AI
AI-assisted diagnosis with mandatory expert validation
Autonomous Systems
Human oversight for critical autonomous decision-making
Data Labeling
Efficient large-scale annotation with quality control
System Architecture Overview
# HITL ML System Components
1. AI Processing Layer
├── Model Inference Engine
├── Confidence Estimation
├── Uncertainty Quantification
└── Risk Assessment
2. Decision Routing Layer
├── Confidence Thresholding
├── Business Rule Engine
├── Queue Management
└── Load Balancing
3. Human Interface Layer
├── Review Dashboard
├── Case Presentation
├── Decision Capture
└── Feedback Collection
4. Learning Integration Layer
├── Feedback Processing
├── Active Learning
├── Model Updating
└── Performance Tracking
5. Quality Assurance Layer
├── Inter-Annotator Agreement
├── Expert Validation
├── Audit Trail
└── Compliance Monitoring
# Key Performance Targets:
- Automation Rate: 85-95%
- Human Review Latency: < 2 hours
- System Accuracy: > 95%
- Cost per Decision: < $0.50
- Inter-Annotator Agreement: > 85%
Active Learning Implementation
import numpy as np
import torch
import torch.nn as nn
from sklearn.ensemble import RandomForestClassifier
from scipy.stats import entropy
from typing import List, Tuple, Dict, Any
class ActiveLearningSystem:
def __init__(self, base_model, uncertainty_method='entropy'):
self.base_model = base_model
self.uncertainty_method = uncertainty_method
self.labeled_data = []
self.unlabeled_data = []
self.uncertainty_threshold = 0.7
def uncertainty_sampling(self, X_unlabeled: np.ndarray, n_samples: int = 100) -> List[int]:
"""Select most uncertain samples for human annotation"""
# Get model predictions
predictions = self.base_model.predict_proba(X_unlabeled)
# Calculate uncertainty scores
if self.uncertainty_method == 'entropy':
uncertainties = self._entropy_uncertainty(predictions)
elif self.uncertainty_method == 'least_confident':
uncertainties = self._least_confident_uncertainty(predictions)
elif self.uncertainty_method == 'margin':
uncertainties = self._margin_uncertainty(predictions)
else:
raise ValueError(f"Unknown uncertainty method: {self.uncertainty_method}")
# Select top uncertain samples
uncertain_indices = np.argsort(uncertainties)[-n_samples:]
return uncertain_indices.tolist()
def _entropy_uncertainty(self, predictions: np.ndarray) -> np.ndarray:
"""Calculate entropy-based uncertainty"""
# Add small epsilon to avoid log(0)
epsilon = 1e-10
predictions = np.clip(predictions, epsilon, 1 - epsilon)
# Calculate entropy for each sample
entropies = -np.sum(predictions * np.log(predictions), axis=1)
return entropies
def _least_confident_uncertainty(self, predictions: np.ndarray) -> np.ndarray:
"""Calculate least confident uncertainty"""
# Uncertainty = 1 - max(prediction)
max_probs = np.max(predictions, axis=1)
uncertainties = 1 - max_probs
return uncertainties
def _margin_uncertainty(self, predictions: np.ndarray) -> np.ndarray:
"""Calculate margin-based uncertainty"""
# Sort predictions in descending order
sorted_preds = np.sort(predictions, axis=1)[:, ::-1]
# Margin = difference between top two predictions
margins = sorted_preds[:, 0] - sorted_preds[:, 1]
uncertainties = 1 - margins
return uncertainties
def diversity_sampling(self, X_unlabeled: np.ndarray, n_samples: int = 100) -> List[int]:
"""Select diverse samples to ensure good coverage"""
from sklearn.cluster import KMeans
# Cluster unlabeled data
n_clusters = min(n_samples, len(X_unlabeled))
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
cluster_labels = kmeans.fit_predict(X_unlabeled)
# Select one sample from each cluster (closest to centroid)
selected_indices = []
for cluster_id in range(n_clusters):
cluster_indices = np.where(cluster_labels == cluster_id)[0]
if len(cluster_indices) > 0:
# Find sample closest to cluster centroid
cluster_center = kmeans.cluster_centers_[cluster_id]
distances = np.linalg.norm(
X_unlabeled[cluster_indices] - cluster_center, axis=1
)
closest_idx = cluster_indices[np.argmin(distances)]
selected_indices.append(closest_idx)
return selected_indices
def query_by_committee(self, X_unlabeled: np.ndarray,
committee_models: List[Any],
n_samples: int = 100) -> List[int]:
"""Query by committee - select samples with highest disagreement"""
# Get predictions from all committee members
committee_predictions = []
for model in committee_models:
preds = model.predict_proba(X_unlabeled)
committee_predictions.append(preds)
# Calculate disagreement (KL divergence)
disagreements = []
for i in range(len(X_unlabeled)):
sample_preds = [pred[i] for pred in committee_predictions]
avg_pred = np.mean(sample_preds, axis=0)
# Calculate average KL divergence from committee average
kl_divs = []
for pred in sample_preds:
kl_div = entropy(pred, avg_pred)
kl_divs.append(kl_div)
disagreements.append(np.mean(kl_divs))
# Select samples with highest disagreement
disagreement_indices = np.argsort(disagreements)[-n_samples:]
return disagreement_indices.tolist()
def adaptive_sampling(self, X_unlabeled: np.ndarray,
n_samples: int = 100,
uncertainty_weight: float = 0.7,
diversity_weight: float = 0.3) -> List[int]:
"""Combine uncertainty and diversity sampling"""
# Get uncertainty scores
predictions = self.base_model.predict_proba(X_unlabeled)
uncertainty_scores = self._entropy_uncertainty(predictions)
# Normalize uncertainty scores
uncertainty_scores = (uncertainty_scores - uncertainty_scores.min()) / (
uncertainty_scores.max() - uncertainty_scores.min()
)
# Get diversity scores (distance from labeled data)
diversity_scores = self._calculate_diversity_scores(X_unlabeled)
# Combine scores
combined_scores = (
uncertainty_weight * uncertainty_scores +
diversity_weight * diversity_scores
)
# Select top samples
selected_indices = np.argsort(combined_scores)[-n_samples:]
return selected_indices.tolist()
def _calculate_diversity_scores(self, X_unlabeled: np.ndarray) -> np.ndarray:
"""Calculate diversity scores based on distance from labeled data"""
if not self.labeled_data:
# If no labeled data, return uniform scores
return np.ones(len(X_unlabeled))
# Calculate minimum distance to labeled data for each unlabeled sample
labeled_features = np.array([sample['features'] for sample in self.labeled_data])
diversity_scores = []
for unlabeled_sample in X_unlabeled:
# Calculate distances to all labeled samples
distances = np.linalg.norm(labeled_features - unlabeled_sample, axis=1)
min_distance = np.min(distances)
diversity_scores.append(min_distance)
# Normalize scores
diversity_scores = np.array(diversity_scores)
diversity_scores = (diversity_scores - diversity_scores.min()) / (
diversity_scores.max() - diversity_scores.min() + 1e-8
)
return diversity_scores
def update_model_with_feedback(self,
new_labels: List[Tuple[np.ndarray, int]],
retrain: bool = True):
"""Update model with new human-labeled data"""
# Add new labels to labeled dataset
for features, label in new_labels:
self.labeled_data.append({
'features': features,
'label': label,
'timestamp': np.datetime64('now')
})
if retrain and len(self.labeled_data) > 10:
# Retrain model with updated dataset
X_train = np.array([sample['features'] for sample in self.labeled_data])
y_train = np.array([sample['label'] for sample in self.labeled_data])
self.base_model.fit(X_train, y_train)
print(f"Model retrained with {len(self.labeled_data)} labeled samples")
def get_annotation_suggestions(self,
X_unlabeled: np.ndarray,
n_suggestions: int = 10,
strategy: str = 'adaptive') -> Dict[str, Any]:
"""Get suggestions for human annotation"""
if strategy == 'uncertainty':
indices = self.uncertainty_sampling(X_unlabeled, n_suggestions)
elif strategy == 'diversity':
indices = self.diversity_sampling(X_unlabeled, n_suggestions)
elif strategy == 'adaptive':
indices = self.adaptive_sampling(X_unlabeled, n_suggestions)
else:
raise ValueError(f"Unknown strategy: {strategy}")
# Get predictions and uncertainty scores for selected samples
selected_samples = X_unlabeled[indices]
predictions = self.base_model.predict_proba(selected_samples)
uncertainty_scores = self._entropy_uncertainty(predictions)
suggestions = []
for i, idx in enumerate(indices):
suggestions.append({
'index': idx,
'features': X_unlabeled[idx],
'predicted_class': np.argmax(predictions[i]),
'prediction_confidence': np.max(predictions[i]),
'uncertainty_score': uncertainty_scores[i],
'suggested_priority': len(indices) - i # Higher priority for more uncertain
})
return {
'suggestions': suggestions,
'strategy_used': strategy,
'total_unlabeled': len(X_unlabeled),
'annotation_efficiency': len(indices) / len(X_unlabeled)
}
class HumanFeedbackLoop:
"""System for collecting and integrating human feedback"""
def __init__(self, model, feedback_buffer_size=1000):
self.model = model
self.feedback_buffer = []
self.feedback_buffer_size = feedback_buffer_size
self.feedback_weights = {'positive': 1.0, 'negative': 2.0, 'correction': 3.0}
def collect_feedback(self,
prediction_id: str,
true_label: int,
feedback_type: str,
confidence: float = 1.0):
"""Collect human feedback on model predictions"""
feedback_entry = {
'prediction_id': prediction_id,
'true_label': true_label,
'feedback_type': feedback_type, # 'positive', 'negative', 'correction'
'confidence': confidence,
'timestamp': np.datetime64('now'),
'weight': self.feedback_weights.get(feedback_type, 1.0)
}
self.feedback_buffer.append(feedback_entry)
# Maintain buffer size
if len(self.feedback_buffer) > self.feedback_buffer_size:
self.feedback_buffer.pop(0)
# Apply immediate updates for critical feedback
if feedback_type == 'correction' and confidence > 0.8:
self._apply_immediate_correction(feedback_entry)
def _apply_immediate_correction(self, feedback: Dict[str, Any]):
"""Apply immediate model correction for high-confidence feedback"""
# This would involve immediate model updates
# Implementation depends on the specific model type
pass
def process_feedback_batch(self, batch_size: int = 100):
"""Process a batch of feedback for model improvement"""
if len(self.feedback_buffer) < batch_size:
return
# Select recent feedback
recent_feedback = self.feedback_buffer[-batch_size:]
# Group feedback by type
corrections = [f for f in recent_feedback if f['feedback_type'] == 'correction']
negative_feedback = [f for f in recent_feedback if f['feedback_type'] == 'negative']
# Process corrections (highest priority)
if corrections:
self._process_corrections(corrections)
# Process negative feedback for model adjustment
if negative_feedback:
self._process_negative_feedback(negative_feedback)
def _process_corrections(self, corrections: List[Dict[str, Any]]):
"""Process correction feedback to improve model"""
# Extract features and corrected labels
corrected_samples = []
for correction in corrections:
# Retrieve original prediction data
prediction_data = self._get_prediction_data(correction['prediction_id'])
if prediction_data:
corrected_samples.append({
'features': prediction_data['features'],
'true_label': correction['true_label'],
'weight': correction['weight'] * correction['confidence']
})
if corrected_samples:
# Update model with corrected samples
self._update_model_with_corrections(corrected_samples)
def get_feedback_analytics(self) -> Dict[str, Any]:
"""Analyze feedback patterns and model performance"""
if not self.feedback_buffer:
return {}
feedback_types = {}
confidence_scores = []
recent_feedback = self.feedback_buffer[-100:] # Last 100 feedback items
for feedback in recent_feedback:
feedback_type = feedback['feedback_type']
feedback_types[feedback_type] = feedback_types.get(feedback_type, 0) + 1
confidence_scores.append(feedback['confidence'])
return {
'total_feedback': len(self.feedback_buffer),
'feedback_distribution': feedback_types,
'average_confidence': np.mean(confidence_scores),
'correction_rate': feedback_types.get('correction', 0) / len(recent_feedback),
'model_improvement_opportunity': feedback_types.get('negative', 0) / len(recent_feedback)
}
HITL ML Service Implementation
import asyncio
import torch
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from enum import Enum
import redis
import json
import uuid
import logging
class DecisionStatus(Enum):
PENDING_AI = "pending_ai"
PENDING_HUMAN = "pending_human"
PENDING_EXPERT = "pending_expert"
APPROVED = "approved"
REJECTED = "rejected"
ESCALATED = "escalated"
class ConfidenceLevel(Enum):
HIGH = "high" # > 0.9
MEDIUM = "medium" # 0.7 - 0.9
LOW = "low" # < 0.7
class HITLMLService:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Initialize ML models
self.ml_model = self._load_ml_model(config['model_path'])
self.confidence_estimator = self._load_confidence_model()
self.active_learner = ActiveLearningSystem(self.ml_model)
# Human workflow components
self.decision_router = DecisionRouter(config['routing_rules'])
self.review_interface = ReviewInterface(config['ui_config'])
self.feedback_collector = HumanFeedbackLoop(self.ml_model)
# Storage and queuing
self.redis_client = redis.Redis(
host=config.get('redis_host', 'localhost'),
port=config.get('redis_port', 6379),
decode_responses=True
)
# Performance tracking
self.metrics = {
'total_decisions': 0,
'automated_decisions': 0,
'human_reviews': 0,
'expert_escalations': 0,
'avg_processing_time': 0,
'accuracy_score': 0
}
def _load_ml_model(self, model_path: str):
"""Load the main ML model"""
# Implementation depends on model type
model = torch.load(model_path, map_location=self.device)
model.eval()
return model
def _load_confidence_model(self):
"""Load model for confidence estimation"""
# This could be a separate model trained to estimate prediction confidence
return None # Placeholder
async def process_request(self,
request_data: Dict[str, Any],
request_id: str = None) -> Dict[str, Any]:
"""Main entry point for processing requests"""
if request_id is None:
request_id = str(uuid.uuid4())
start_time = datetime.now()
self.metrics['total_decisions'] += 1
try:
# Step 1: AI Processing
ai_result = await self._process_with_ai(request_data, request_id)
# Step 2: Decision Routing
routing_decision = await self._route_decision(ai_result, request_data)
# Step 3: Handle based on routing decision
if routing_decision['action'] == 'auto_approve':
final_result = await self._auto_approve(ai_result, request_id)
self.metrics['automated_decisions'] += 1
elif routing_decision['action'] == 'human_review':
final_result = await self._queue_for_human_review(
ai_result, request_data, request_id, routing_decision
)
self.metrics['human_reviews'] += 1
elif routing_decision['action'] == 'expert_escalation':
final_result = await self._escalate_to_expert(
ai_result, request_data, request_id, routing_decision
)
self.metrics['expert_escalations'] += 1
else:
raise ValueError(f"Unknown routing action: {routing_decision['action']}")
# Update metrics
processing_time = (datetime.now() - start_time).total_seconds()
self._update_performance_metrics(processing_time)
return final_result
except Exception as e:
logging.error(f"Error processing request {request_id}: {e}")
return {
'request_id': request_id,
'status': 'error',
'error': str(e),
'timestamp': datetime.now().isoformat()
}
async def _process_with_ai(self,
request_data: Dict[str, Any],
request_id: str) -> Dict[str, Any]:
"""Process request with AI model"""
# Extract features from request
features = self._extract_features(request_data)
# Get AI prediction
with torch.no_grad():
prediction = self.ml_model(features)
prediction_probs = torch.softmax(prediction, dim=-1)
# Calculate confidence and uncertainty
confidence = torch.max(prediction_probs).item()
uncertainty = self._calculate_uncertainty(prediction_probs)
# Determine confidence level
if confidence > 0.9:
confidence_level = ConfidenceLevel.HIGH
elif confidence > 0.7:
confidence_level = ConfidenceLevel.MEDIUM
else:
confidence_level = ConfidenceLevel.LOW
ai_result = {
'request_id': request_id,
'prediction': torch.argmax(prediction).item(),
'prediction_probs': prediction_probs.tolist(),
'confidence': confidence,
'uncertainty': uncertainty,
'confidence_level': confidence_level.value,
'features': features.tolist(),
'processing_time': datetime.now().isoformat()
}
# Store AI result for potential human review
await self._store_ai_result(request_id, ai_result)
return ai_result
async def _route_decision(self,
ai_result: Dict[str, Any],
request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Route decision based on confidence and business rules"""
confidence = ai_result['confidence']
uncertainty = ai_result['uncertainty']
# Business rule checks
is_high_risk = self._check_high_risk_conditions(request_data)
requires_compliance = self._check_compliance_requirements(request_data)
# Routing logic
if confidence > 0.95 and not is_high_risk and not requires_compliance:
action = 'auto_approve'
reason = 'High confidence, low risk'
elif confidence > 0.8 and not is_high_risk:
action = 'human_review'
reason = 'Medium confidence, requires human validation'
queue_priority = 'normal'
elif confidence > 0.5:
action = 'human_review'
reason = 'Low confidence, requires careful review'
queue_priority = 'high'
else:
action = 'expert_escalation'
reason = 'Very low confidence or high-risk case'
queue_priority = 'urgent'
routing_decision = {
'action': action,
'reason': reason,
'confidence': confidence,
'uncertainty': uncertainty,
'is_high_risk': is_high_risk,
'requires_compliance': requires_compliance
}
if action in ['human_review', 'expert_escalation']:
routing_decision['queue_priority'] = queue_priority
return routing_decision
async def _queue_for_human_review(self,
ai_result: Dict[str, Any],
request_data: Dict[str, Any],
request_id: str,
routing_decision: Dict[str, Any]) -> Dict[str, Any]:
"""Queue request for human review"""
review_item = {
'request_id': request_id,
'ai_result': ai_result,
'original_request': request_data,
'routing_decision': routing_decision,
'status': DecisionStatus.PENDING_HUMAN.value,
'created_at': datetime.now().isoformat(),
'priority': routing_decision.get('queue_priority', 'normal')
}
# Add to review queue
queue_name = f"human_review:{routing_decision.get('queue_priority', 'normal')}"
await asyncio.get_event_loop().run_in_executor(
None, self.redis_client.lpush, queue_name, json.dumps(review_item)
)
# Set timeout for review
timeout_hours = self.config.get('review_timeout_hours', 24)
await asyncio.get_event_loop().run_in_executor(
None, self.redis_client.expire, f"pending:{request_id}", timeout_hours * 3600
)
return {
'request_id': request_id,
'status': DecisionStatus.PENDING_HUMAN.value,
'queue_position': await self._get_queue_position(queue_name),
'estimated_review_time': self._estimate_review_time(routing_decision['queue_priority']),
'ai_recommendation': {
'prediction': ai_result['prediction'],
'confidence': ai_result['confidence']
}
}
async def submit_human_decision(self,
request_id: str,
human_decision: Dict[str, Any],
reviewer_id: str) -> Dict[str, Any]:
"""Process human review decision"""
try:
# Retrieve original AI result
ai_result = await self._get_ai_result(request_id)
if not ai_result:
raise ValueError(f"No AI result found for request {request_id}")
# Validate human decision
decision = human_decision['decision'] # 'approve', 'reject', 'escalate'
confidence = human_decision.get('confidence', 1.0)
feedback = human_decision.get('feedback', '')
# Process decision
if decision == 'approve':
final_status = DecisionStatus.APPROVED
elif decision == 'reject':
final_status = DecisionStatus.REJECTED
elif decision == 'escalate':
return await self._escalate_to_expert(
ai_result, {}, request_id, {'reason': 'Human escalation'}
)
else:
raise ValueError(f"Invalid decision: {decision}")
# Store decision
decision_record = {
'request_id': request_id,
'final_decision': decision,
'status': final_status.value,
'ai_prediction': ai_result['prediction'],
'ai_confidence': ai_result['confidence'],
'human_decision': decision,
'human_confidence': confidence,
'reviewer_id': reviewer_id,
'feedback': feedback,
'completed_at': datetime.now().isoformat()
}
await self._store_decision_record(request_id, decision_record)
# Collect feedback for model improvement
feedback_type = 'positive' if ai_result['prediction'] == (1 if decision == 'approve' else 0) else 'correction'
self.feedback_collector.collect_feedback(
request_id,
1 if decision == 'approve' else 0,
feedback_type,
confidence
)
# Remove from pending queue
await self._remove_from_pending(request_id)
return {
'request_id': request_id,
'status': final_status.value,
'final_decision': decision,
'agreement_with_ai': ai_result['prediction'] == (1 if decision == 'approve' else 0),
'processing_complete': True
}
except Exception as e:
logging.error(f"Error processing human decision for {request_id}: {e}")
return {
'request_id': request_id,
'status': 'error',
'error': str(e)
}
async def get_review_queue(self,
reviewer_id: str,
queue_type: str = 'normal',
limit: int = 10) -> List[Dict[str, Any]]:
"""Get pending items for human review"""
queue_name = f"human_review:{queue_type}"
# Get items from queue
queue_items = await asyncio.get_event_loop().run_in_executor(
None, self.redis_client.lrange, queue_name, 0, limit - 1
)
review_items = []
for item_json in queue_items:
item = json.loads(item_json)
# Enrich with additional context for reviewer
enriched_item = {
'request_id': item['request_id'],
'ai_recommendation': {
'prediction': item['ai_result']['prediction'],
'confidence': item['ai_result']['confidence'],
'confidence_level': item['ai_result']['confidence_level']
},
'request_summary': self._create_request_summary(item['original_request']),
'priority': item['priority'],
'created_at': item['created_at'],
'time_in_queue': self._calculate_time_in_queue(item['created_at']),
'routing_reason': item['routing_decision']['reason']
}
review_items.append(enriched_item)
return review_items
def _extract_features(self, request_data: Dict[str, Any]) -> torch.Tensor:
"""Extract features from request data"""
# Implementation depends on the specific use case
# This is a placeholder
features = torch.tensor([1.0, 2.0, 3.0], device=self.device)
return features
def _calculate_uncertainty(self, prediction_probs: torch.Tensor) -> float:
"""Calculate prediction uncertainty"""
# Use entropy as uncertainty measure
entropy = -torch.sum(prediction_probs * torch.log(prediction_probs + 1e-8))
return entropy.item()
def _check_high_risk_conditions(self, request_data: Dict[str, Any]) -> bool:
"""Check if request meets high-risk conditions"""
# Business-specific high-risk checks
return request_data.get('amount', 0) > 10000 # Example
def _check_compliance_requirements(self, request_data: Dict[str, Any]) -> bool:
"""Check if request requires compliance review"""
# Compliance-specific checks
return request_data.get('requires_audit', False) # Example
async def _store_ai_result(self, request_id: str, ai_result: Dict[str, Any]):
"""Store AI result in Redis"""
key = f"ai_result:{request_id}"
await asyncio.get_event_loop().run_in_executor(
None, self.redis_client.setex, key, 86400, json.dumps(ai_result) # 24 hour expiry
)
async def _get_ai_result(self, request_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve AI result from Redis"""
key = f"ai_result:{request_id}"
result_json = await asyncio.get_event_loop().run_in_executor(
None, self.redis_client.get, key
)
return json.loads(result_json) if result_json else None
async def get_system_metrics(self) -> Dict[str, Any]:
"""Get comprehensive system metrics"""
# Get queue sizes
queue_sizes = {}
for priority in ['urgent', 'high', 'normal', 'low']:
queue_name = f"human_review:{priority}"
size = await asyncio.get_event_loop().run_in_executor(
None, self.redis_client.llen, queue_name
)
queue_sizes[priority] = size
# Calculate automation rate
total_decisions = self.metrics['total_decisions']
automation_rate = (
self.metrics['automated_decisions'] / total_decisions
if total_decisions > 0 else 0
)
# Get feedback analytics
feedback_analytics = self.feedback_collector.get_feedback_analytics()
return {
'processing_metrics': {
'total_decisions': total_decisions,
'automated_decisions': self.metrics['automated_decisions'],
'human_reviews': self.metrics['human_reviews'],
'expert_escalations': self.metrics['expert_escalations'],
'automation_rate': automation_rate,
'avg_processing_time_ms': self.metrics['avg_processing_time'] * 1000
},
'queue_status': {
'total_pending': sum(queue_sizes.values()),
'by_priority': queue_sizes
},
'model_performance': {
'accuracy_score': self.metrics['accuracy_score'],
'feedback_analytics': feedback_analytics
},
'system_health': {
'active_reviewers': await self._count_active_reviewers(),
'avg_review_time': await self._calculate_avg_review_time(),
'sla_compliance': await self._calculate_sla_compliance()
}
}
def _update_performance_metrics(self, processing_time: float):
"""Update system performance metrics"""
# Update average processing time using exponential moving average
alpha = 0.1
self.metrics['avg_processing_time'] = (
alpha * processing_time +
(1 - alpha) * self.metrics['avg_processing_time']
)
HITL Workflow Stages
AI Prediction
Initial automated processing and confidence scoring
Routing Decision
Determine if human review is needed
Human Review
Human evaluation and decision making
Learning Loop
Incorporating human feedback into model improvement
Workflow Orchestration System
import asyncio
from enum import Enum
from typing import Dict, List, Any, Callable
from datetime import datetime, timedelta
class WorkflowState(Enum):
STARTED = "started"
AI_PROCESSING = "ai_processing"
ROUTING_DECISION = "routing_decision"
HUMAN_REVIEW = "human_review"
EXPERT_REVIEW = "expert_review"
COMPLETED = "completed"
FAILED = "failed"
class HITLWorkflowOrchestrator:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.workflows = {}
self.state_handlers = self._initialize_state_handlers()
self.workflow_templates = self._load_workflow_templates()
def _initialize_state_handlers(self) -> Dict[WorkflowState, Callable]:
"""Initialize handlers for each workflow state"""
return {
WorkflowState.STARTED: self._handle_workflow_start,
WorkflowState.AI_PROCESSING: self._handle_ai_processing,
WorkflowState.ROUTING_DECISION: self._handle_routing_decision,
WorkflowState.HUMAN_REVIEW: self._handle_human_review,
WorkflowState.EXPERT_REVIEW: self._handle_expert_review,
WorkflowState.COMPLETED: self._handle_workflow_completion,
WorkflowState.FAILED: self._handle_workflow_failure
}
async def start_workflow(self,
workflow_type: str,
request_data: Dict[str, Any],
workflow_id: str = None) -> str:
"""Start a new HITL workflow"""
if workflow_id is None:
workflow_id = self._generate_workflow_id()
# Get workflow template
template = self.workflow_templates.get(workflow_type)
if not template:
raise ValueError(f"Unknown workflow type: {workflow_type}")
# Initialize workflow state
workflow = {
'id': workflow_id,
'type': workflow_type,
'state': WorkflowState.STARTED,
'request_data': request_data,
'created_at': datetime.now(),
'updated_at': datetime.now(),
'template': template,
'context': {},
'results': {},
'transitions': []
}
self.workflows[workflow_id] = workflow
# Start workflow execution
await self._execute_workflow_step(workflow_id)
return workflow_id
async def _execute_workflow_step(self, workflow_id: str):
"""Execute the next step in the workflow"""
workflow = self.workflows.get(workflow_id)
if not workflow:
raise ValueError(f"Workflow {workflow_id} not found")
current_state = workflow['state']
# Get handler for current state
handler = self.state_handlers.get(current_state)
if not handler:
raise ValueError(f"No handler for state {current_state}")
try:
# Execute state handler
result = await handler(workflow)
# Update workflow with results
workflow['results'][current_state.value] = result
workflow['updated_at'] = datetime.now()
# Determine next state
next_state = self._determine_next_state(workflow, result)
if next_state and next_state != current_state:
# Record state transition
transition = {
'from_state': current_state.value,
'to_state': next_state.value,
'timestamp': datetime.now(),
'trigger': result.get('transition_trigger', 'automatic')
}
workflow['transitions'].append(transition)
# Update workflow state
workflow['state'] = next_state
# Continue workflow if not terminal state
if next_state not in [WorkflowState.COMPLETED, WorkflowState.FAILED]:
await self._execute_workflow_step(workflow_id)
except Exception as e:
# Handle workflow failure
workflow['state'] = WorkflowState.FAILED
workflow['error'] = str(e)
await self._handle_workflow_failure(workflow)
async def _handle_workflow_start(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
"""Handle workflow start state"""
return {
'status': 'initialized',
'transition_trigger': 'start_ai_processing'
}
async def _handle_ai_processing(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
"""Handle AI processing state"""
# Extract features and run AI model
request_data = workflow['request_data']
# Simulate AI processing (replace with actual ML inference)
ai_prediction = await self._run_ai_inference(request_data)
# Store AI results in workflow context
workflow['context']['ai_result'] = ai_prediction
return {
'status': 'ai_completed',
'prediction': ai_prediction['prediction'],
'confidence': ai_prediction['confidence'],
'transition_trigger': 'routing_required'
}
async def _handle_routing_decision(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
"""Handle routing decision state"""
ai_result = workflow['context']['ai_result']
request_data = workflow['request_data']
# Apply routing rules
routing_decision = await self._make_routing_decision(ai_result, request_data)
workflow['context']['routing_decision'] = routing_decision
return {
'status': 'routing_completed',
'routing_action': routing_decision['action'],
'transition_trigger': routing_decision['action']
}
async def _handle_human_review(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
"""Handle human review state"""
# Queue for human review
review_queue_item = {
'workflow_id': workflow['id'],
'ai_result': workflow['context']['ai_result'],
'request_data': workflow['request_data'],
'priority': workflow['context']['routing_decision'].get('priority', 'normal')
}
await self._queue_for_human_review(review_queue_item)
# Wait for human decision (this would be handled by a separate process)
# For now, return pending status
return {
'status': 'queued_for_human_review',
'queue_position': await self._get_queue_position(),
'estimated_time': await self._estimate_review_time(),
'transition_trigger': 'await_human_decision'
}
async def process_human_decision(self,
workflow_id: str,
human_decision: Dict[str, Any]) -> Dict[str, Any]:
"""Process human decision and continue workflow"""
workflow = self.workflows.get(workflow_id)
if not workflow:
raise ValueError(f"Workflow {workflow_id} not found")
# Store human decision in context
workflow['context']['human_decision'] = human_decision
# Continue workflow based on human decision
if human_decision['decision'] == 'escalate':
workflow['state'] = WorkflowState.EXPERT_REVIEW
else:
workflow['state'] = WorkflowState.COMPLETED
await self._execute_workflow_step(workflow_id)
return {
'workflow_id': workflow_id,
'status': workflow['state'].value,
'decision_processed': True
}
def _determine_next_state(self,
workflow: Dict[str, Any],
current_result: Dict[str, Any]) -> WorkflowState:
"""Determine next workflow state based on current result"""
current_state = workflow['state']
trigger = current_result.get('transition_trigger')
# State transition logic
transitions = {
WorkflowState.STARTED: {
'start_ai_processing': WorkflowState.AI_PROCESSING
},
WorkflowState.AI_PROCESSING: {
'routing_required': WorkflowState.ROUTING_DECISION
},
WorkflowState.ROUTING_DECISION: {
'auto_approve': WorkflowState.COMPLETED,
'human_review': WorkflowState.HUMAN_REVIEW,
'expert_escalation': WorkflowState.EXPERT_REVIEW
},
WorkflowState.HUMAN_REVIEW: {
'human_approved': WorkflowState.COMPLETED,
'human_rejected': WorkflowState.COMPLETED,
'escalate_to_expert': WorkflowState.EXPERT_REVIEW
},
WorkflowState.EXPERT_REVIEW: {
'expert_decision': WorkflowState.COMPLETED
}
}
return transitions.get(current_state, {}).get(trigger)
async def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
"""Get current workflow status"""
workflow = self.workflows.get(workflow_id)
if not workflow:
return {'error': 'Workflow not found'}
return {
'workflow_id': workflow_id,
'type': workflow['type'],
'current_state': workflow['state'].value,
'created_at': workflow['created_at'].isoformat(),
'updated_at': workflow['updated_at'].isoformat(),
'progress': self._calculate_workflow_progress(workflow),
'estimated_completion': self._estimate_completion_time(workflow),
'context': workflow['context'],
'transitions': workflow['transitions']
}
def _calculate_workflow_progress(self, workflow: Dict[str, Any]) -> float:
"""Calculate workflow completion progress (0-1)"""
total_states = len(WorkflowState) - 2 # Exclude COMPLETED and FAILED
completed_transitions = len(workflow['transitions'])
return min(completed_transitions / total_states, 1.0)
async def get_workflow_analytics(self) -> Dict[str, Any]:
"""Get analytics across all workflows"""
if not self.workflows:
return {}
# Aggregate statistics
workflows_by_type = {}
workflows_by_state = {}
avg_processing_times = {}
for workflow in self.workflows.values():
# Count by type
workflow_type = workflow['type']
workflows_by_type[workflow_type] = workflows_by_type.get(workflow_type, 0) + 1
# Count by state
state = workflow['state'].value
workflows_by_state[state] = workflows_by_state.get(state, 0) + 1
# Calculate processing time for completed workflows
if workflow['state'] == WorkflowState.COMPLETED:
processing_time = (workflow['updated_at'] - workflow['created_at']).total_seconds()
if workflow_type not in avg_processing_times:
avg_processing_times[workflow_type] = []
avg_processing_times[workflow_type].append(processing_time)
# Calculate averages
for workflow_type in avg_processing_times:
times = avg_processing_times[workflow_type]
avg_processing_times[workflow_type] = sum(times) / len(times)
return {
'total_workflows': len(self.workflows),
'workflows_by_type': workflows_by_type,
'workflows_by_state': workflows_by_state,
'avg_processing_times': avg_processing_times,
'automation_rate': self._calculate_automation_rate(),
'human_review_rate': self._calculate_human_review_rate()
}
def _calculate_automation_rate(self) -> float:
"""Calculate percentage of fully automated decisions"""
completed_workflows = [w for w in self.workflows.values()
if w['state'] == WorkflowState.COMPLETED]
if not completed_workflows:
return 0.0
automated_count = 0
for workflow in completed_workflows:
# Check if workflow went through human review
states_visited = [t['to_state'] for t in workflow['transitions']]
if 'human_review' not in states_visited and 'expert_review' not in states_visited:
automated_count += 1
return automated_count / len(completed_workflows)
Quality Metrics
Performance Metrics
Quality Metrics
Production Deployment
# Human-in-the-Loop ML Service Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: hitl-ml-service
labels:
app: hitl-ml
spec:
replicas: 3
selector:
matchLabels:
app: hitl-ml
template:
metadata:
labels:
app: hitl-ml
spec:
containers:
- name: hitl-service
image: ml-platform/hitl-ml:v1.5.0
ports:
- containerPort: 8000
resources:
requests:
memory: "4Gi"
cpu: "2"
nvidia.com/gpu: "1"
limits:
memory: "8Gi"
cpu: "4"
nvidia.com/gpu: "1"
env:
- name: ML_MODEL_PATH
value: "/models/production_model.pt"
- name: REDIS_HOST
value: "redis-cluster"
- name: REVIEW_TIMEOUT_HOURS
value: "24"
- name: AUTO_APPROVE_THRESHOLD
value: "0.95"
- name: EXPERT_ESCALATION_THRESHOLD
value: "0.5"
volumeMounts:
- name: model-storage
mountPath: /models
- name: workflow-configs
mountPath: /configs
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 30
periodSeconds: 15
volumes:
- name: model-storage
persistentVolumeClaim:
claimName: ml-models-pvc
- name: workflow-configs
configMap:
name: hitl-workflow-configs
---
# Human Review Interface
apiVersion: apps/v1
kind: Deployment
metadata:
name: review-interface
spec:
replicas: 2
selector:
matchLabels:
app: review-interface
template:
metadata:
labels:
app: review-interface
spec:
containers:
- name: review-ui
image: ml-platform/review-interface:v1.2.0
ports:
- containerPort: 3000
env:
- name: API_ENDPOINT
value: "http://hitl-ml-service:8000"
- name: AUTH_PROVIDER
value: "oauth2"
resources:
requests:
memory: "512Mi"
cpu: "0.5"
limits:
memory: "1Gi"
cpu: "1"
---
# Redis Cluster for Queue Management
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: redis-hitl-cluster
spec:
serviceName: "redis-hitl"
replicas: 3
selector:
matchLabels:
app: redis-hitl
template:
metadata:
labels:
app: redis-hitl
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
command:
- redis-server
- --cluster-enabled
- "yes"
- --cluster-config-file
- nodes.conf
- --cluster-node-timeout
- "5000"
resources:
requests:
memory: "1Gi"
cpu: "0.5"
limits:
memory: "2Gi"
cpu: "1"
volumeMounts:
- name: redis-storage
mountPath: /data
volumeClaimTemplates:
- metadata:
name: redis-storage
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 10Gi
---
# Workflow Orchestrator
apiVersion: apps/v1
kind: Deployment
metadata:
name: workflow-orchestrator
spec:
replicas: 2
selector:
matchLabels:
app: workflow-orchestrator
template:
metadata:
labels:
app: workflow-orchestrator
spec:
containers:
- name: orchestrator
image: ml-platform/workflow-orchestrator:v1.0.0
env:
- name: REDIS_HOST
value: "redis-hitl-cluster"
- name: HITL_SERVICE_ENDPOINT
value: "http://hitl-ml-service:8000"
resources:
requests:
memory: "1Gi"
cpu: "1"
limits:
memory: "2Gi"
cpu: "2"
---
# Monitoring and Analytics
apiVersion: apps/v1
kind: Deployment
metadata:
name: hitl-analytics
spec:
replicas: 1
selector:
matchLabels:
app: hitl-analytics
template:
metadata:
labels:
app: hitl-analytics
spec:
containers:
- name: analytics
image: ml-platform/hitl-analytics:v1.0.0
ports:
- containerPort: 8080
env:
- name: REDIS_HOST
value: "redis-hitl-cluster"
- name: PROMETHEUS_ENDPOINT
value: "http://prometheus:9090"
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
Performance Benchmarks
Efficiency Metrics
Quality Metrics
📝 Test Your Understanding
What is the primary purpose of active learning in HITL ML systems?
Related Technologies
Label Studio→
Data labeling and annotation tool
Snorkel→
Weak supervision framework
Prodigy→
Active learning annotation tool
Weights & Biases→
ML experiment tracking platform
MLflow→
ML lifecycle management platform
Kubeflow→
ML workflows on Kubernetes
Apache Airflow→
Workflow orchestration platform
Redis→
In-memory data structure store