Human-in-the-Loop ML

Building ML systems that seamlessly integrate human expertise with automated intelligence

👥

Human Expertise

Leveraging human judgment for complex, nuanced, and high-stakes decisions

🧠

AI Efficiency

Automated processing for routine cases while ensuring quality and scale

🔄

Continuous Learning

Model improvement through real-time human feedback and corrections

HITL Design Patterns

Active Learning Implementation

Key Benefits

Improved Accuracy

Human oversight for edge cases and complex decisions

Combining AI efficiency with human judgment achieves higher overall accuracy

Scalable Quality

Maintain quality while processing large volumes

AI handles routine cases, humans focus on challenging decisions

Continuous Improvement

Models learn from human feedback in real-time

System performance improves continuously through human corrections

Design Principles

Intelligent Routing
Route only uncertain or high-risk cases to humans
Minimize human workload while maximizing impact
Feedback Integration
Seamlessly incorporate human feedback into learning
Real-time model updates from human corrections
User Experience
Intuitive interfaces for efficient human review
Minimize cognitive load and decision fatigue

Application Domains

Content Moderation

Combining AI efficiency with human judgment for content safety

Examples: Social media posts, User-generated content, Community guidelines
Benefits: 95% automation, Human oversight for edge cases, Reduced bias

Medical AI

AI-assisted diagnosis with mandatory expert validation

Examples: Radiology screening, Pathology analysis, Treatment recommendations
Benefits: Faster screening, Reduced errors, Expert validation

Autonomous Systems

Human oversight for critical autonomous decision-making

Examples: Self-driving cars, Trading systems, Industrial automation
Benefits: Safety assurance, Edge case handling, Regulatory compliance

Data Labeling

Efficient large-scale annotation with quality control

Examples: Computer vision, NLP datasets, Speech recognition
Benefits: Cost reduction, Quality improvement, Scalable annotation

System Architecture Overview

# HITL ML System Components

1. AI Processing Layer
   ├── Model Inference Engine
   ├── Confidence Estimation
   ├── Uncertainty Quantification
   └── Risk Assessment

2. Decision Routing Layer
   ├── Confidence Thresholding
   ├── Business Rule Engine
   ├── Queue Management
   └── Load Balancing

3. Human Interface Layer
   ├── Review Dashboard
   ├── Case Presentation
   ├── Decision Capture
   └── Feedback Collection

4. Learning Integration Layer
   ├── Feedback Processing
   ├── Active Learning
   ├── Model Updating
   └── Performance Tracking

5. Quality Assurance Layer
   ├── Inter-Annotator Agreement
   ├── Expert Validation
   ├── Audit Trail
   └── Compliance Monitoring

# Key Performance Targets:
- Automation Rate: 85-95%
- Human Review Latency: < 2 hours
- System Accuracy: > 95%
- Cost per Decision: < $0.50
- Inter-Annotator Agreement: > 85%

Active Learning Implementation

import numpy as np
import torch
import torch.nn as nn
from sklearn.ensemble import RandomForestClassifier
from scipy.stats import entropy
from typing import List, Tuple, Dict, Any

class ActiveLearningSystem:
    def __init__(self, base_model, uncertainty_method='entropy'):
        self.base_model = base_model
        self.uncertainty_method = uncertainty_method
        self.labeled_data = []
        self.unlabeled_data = []
        self.uncertainty_threshold = 0.7
        
    def uncertainty_sampling(self, X_unlabeled: np.ndarray, n_samples: int = 100) -> List[int]:
        """Select most uncertain samples for human annotation"""
        
        # Get model predictions
        predictions = self.base_model.predict_proba(X_unlabeled)
        
        # Calculate uncertainty scores
        if self.uncertainty_method == 'entropy':
            uncertainties = self._entropy_uncertainty(predictions)
        elif self.uncertainty_method == 'least_confident':
            uncertainties = self._least_confident_uncertainty(predictions)
        elif self.uncertainty_method == 'margin':
            uncertainties = self._margin_uncertainty(predictions)
        else:
            raise ValueError(f"Unknown uncertainty method: {self.uncertainty_method}")
        
        # Select top uncertain samples
        uncertain_indices = np.argsort(uncertainties)[-n_samples:]
        
        return uncertain_indices.tolist()
    
    def _entropy_uncertainty(self, predictions: np.ndarray) -> np.ndarray:
        """Calculate entropy-based uncertainty"""
        # Add small epsilon to avoid log(0)
        epsilon = 1e-10
        predictions = np.clip(predictions, epsilon, 1 - epsilon)
        
        # Calculate entropy for each sample
        entropies = -np.sum(predictions * np.log(predictions), axis=1)
        
        return entropies
    
    def _least_confident_uncertainty(self, predictions: np.ndarray) -> np.ndarray:
        """Calculate least confident uncertainty"""
        # Uncertainty = 1 - max(prediction)
        max_probs = np.max(predictions, axis=1)
        uncertainties = 1 - max_probs
        
        return uncertainties
    
    def _margin_uncertainty(self, predictions: np.ndarray) -> np.ndarray:
        """Calculate margin-based uncertainty"""
        # Sort predictions in descending order
        sorted_preds = np.sort(predictions, axis=1)[:, ::-1]
        
        # Margin = difference between top two predictions
        margins = sorted_preds[:, 0] - sorted_preds[:, 1]
        uncertainties = 1 - margins
        
        return uncertainties
    
    def diversity_sampling(self, X_unlabeled: np.ndarray, n_samples: int = 100) -> List[int]:
        """Select diverse samples to ensure good coverage"""
        from sklearn.cluster import KMeans
        
        # Cluster unlabeled data
        n_clusters = min(n_samples, len(X_unlabeled))
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        cluster_labels = kmeans.fit_predict(X_unlabeled)
        
        # Select one sample from each cluster (closest to centroid)
        selected_indices = []
        for cluster_id in range(n_clusters):
            cluster_indices = np.where(cluster_labels == cluster_id)[0]
            if len(cluster_indices) > 0:
                # Find sample closest to cluster centroid
                cluster_center = kmeans.cluster_centers_[cluster_id]
                distances = np.linalg.norm(
                    X_unlabeled[cluster_indices] - cluster_center, axis=1
                )
                closest_idx = cluster_indices[np.argmin(distances)]
                selected_indices.append(closest_idx)
        
        return selected_indices
    
    def query_by_committee(self, X_unlabeled: np.ndarray, 
                          committee_models: List[Any], 
                          n_samples: int = 100) -> List[int]:
        """Query by committee - select samples with highest disagreement"""
        
        # Get predictions from all committee members
        committee_predictions = []
        for model in committee_models:
            preds = model.predict_proba(X_unlabeled)
            committee_predictions.append(preds)
        
        # Calculate disagreement (KL divergence)
        disagreements = []
        for i in range(len(X_unlabeled)):
            sample_preds = [pred[i] for pred in committee_predictions]
            avg_pred = np.mean(sample_preds, axis=0)
            
            # Calculate average KL divergence from committee average
            kl_divs = []
            for pred in sample_preds:
                kl_div = entropy(pred, avg_pred)
                kl_divs.append(kl_div)
            
            disagreements.append(np.mean(kl_divs))
        
        # Select samples with highest disagreement
        disagreement_indices = np.argsort(disagreements)[-n_samples:]
        
        return disagreement_indices.tolist()
    
    def adaptive_sampling(self, X_unlabeled: np.ndarray, 
                         n_samples: int = 100,
                         uncertainty_weight: float = 0.7,
                         diversity_weight: float = 0.3) -> List[int]:
        """Combine uncertainty and diversity sampling"""
        
        # Get uncertainty scores
        predictions = self.base_model.predict_proba(X_unlabeled)
        uncertainty_scores = self._entropy_uncertainty(predictions)
        
        # Normalize uncertainty scores
        uncertainty_scores = (uncertainty_scores - uncertainty_scores.min()) / (
            uncertainty_scores.max() - uncertainty_scores.min()
        )
        
        # Get diversity scores (distance from labeled data)
        diversity_scores = self._calculate_diversity_scores(X_unlabeled)
        
        # Combine scores
        combined_scores = (
            uncertainty_weight * uncertainty_scores + 
            diversity_weight * diversity_scores
        )
        
        # Select top samples
        selected_indices = np.argsort(combined_scores)[-n_samples:]
        
        return selected_indices.tolist()
    
    def _calculate_diversity_scores(self, X_unlabeled: np.ndarray) -> np.ndarray:
        """Calculate diversity scores based on distance from labeled data"""
        if not self.labeled_data:
            # If no labeled data, return uniform scores
            return np.ones(len(X_unlabeled))
        
        # Calculate minimum distance to labeled data for each unlabeled sample
        labeled_features = np.array([sample['features'] for sample in self.labeled_data])
        
        diversity_scores = []
        for unlabeled_sample in X_unlabeled:
            # Calculate distances to all labeled samples
            distances = np.linalg.norm(labeled_features - unlabeled_sample, axis=1)
            min_distance = np.min(distances)
            diversity_scores.append(min_distance)
        
        # Normalize scores
        diversity_scores = np.array(diversity_scores)
        diversity_scores = (diversity_scores - diversity_scores.min()) / (
            diversity_scores.max() - diversity_scores.min() + 1e-8
        )
        
        return diversity_scores
    
    def update_model_with_feedback(self, 
                                 new_labels: List[Tuple[np.ndarray, int]],
                                 retrain: bool = True):
        """Update model with new human-labeled data"""
        
        # Add new labels to labeled dataset
        for features, label in new_labels:
            self.labeled_data.append({
                'features': features,
                'label': label,
                'timestamp': np.datetime64('now')
            })
        
        if retrain and len(self.labeled_data) > 10:
            # Retrain model with updated dataset
            X_train = np.array([sample['features'] for sample in self.labeled_data])
            y_train = np.array([sample['label'] for sample in self.labeled_data])
            
            self.base_model.fit(X_train, y_train)
            
            print(f"Model retrained with {len(self.labeled_data)} labeled samples")
    
    def get_annotation_suggestions(self, 
                                 X_unlabeled: np.ndarray,
                                 n_suggestions: int = 10,
                                 strategy: str = 'adaptive') -> Dict[str, Any]:
        """Get suggestions for human annotation"""
        
        if strategy == 'uncertainty':
            indices = self.uncertainty_sampling(X_unlabeled, n_suggestions)
        elif strategy == 'diversity':
            indices = self.diversity_sampling(X_unlabeled, n_suggestions)
        elif strategy == 'adaptive':
            indices = self.adaptive_sampling(X_unlabeled, n_suggestions)
        else:
            raise ValueError(f"Unknown strategy: {strategy}")
        
        # Get predictions and uncertainty scores for selected samples
        selected_samples = X_unlabeled[indices]
        predictions = self.base_model.predict_proba(selected_samples)
        uncertainty_scores = self._entropy_uncertainty(predictions)
        
        suggestions = []
        for i, idx in enumerate(indices):
            suggestions.append({
                'index': idx,
                'features': X_unlabeled[idx],
                'predicted_class': np.argmax(predictions[i]),
                'prediction_confidence': np.max(predictions[i]),
                'uncertainty_score': uncertainty_scores[i],
                'suggested_priority': len(indices) - i  # Higher priority for more uncertain
            })
        
        return {
            'suggestions': suggestions,
            'strategy_used': strategy,
            'total_unlabeled': len(X_unlabeled),
            'annotation_efficiency': len(indices) / len(X_unlabeled)
        }

class HumanFeedbackLoop:
    """System for collecting and integrating human feedback"""
    
    def __init__(self, model, feedback_buffer_size=1000):
        self.model = model
        self.feedback_buffer = []
        self.feedback_buffer_size = feedback_buffer_size
        self.feedback_weights = {'positive': 1.0, 'negative': 2.0, 'correction': 3.0}
        
    def collect_feedback(self, 
                        prediction_id: str,
                        true_label: int,
                        feedback_type: str,
                        confidence: float = 1.0):
        """Collect human feedback on model predictions"""
        
        feedback_entry = {
            'prediction_id': prediction_id,
            'true_label': true_label,
            'feedback_type': feedback_type,  # 'positive', 'negative', 'correction'
            'confidence': confidence,
            'timestamp': np.datetime64('now'),
            'weight': self.feedback_weights.get(feedback_type, 1.0)
        }
        
        self.feedback_buffer.append(feedback_entry)
        
        # Maintain buffer size
        if len(self.feedback_buffer) > self.feedback_buffer_size:
            self.feedback_buffer.pop(0)
        
        # Apply immediate updates for critical feedback
        if feedback_type == 'correction' and confidence > 0.8:
            self._apply_immediate_correction(feedback_entry)
    
    def _apply_immediate_correction(self, feedback: Dict[str, Any]):
        """Apply immediate model correction for high-confidence feedback"""
        # This would involve immediate model updates
        # Implementation depends on the specific model type
        pass
    
    def process_feedback_batch(self, batch_size: int = 100):
        """Process a batch of feedback for model improvement"""
        if len(self.feedback_buffer) < batch_size:
            return
        
        # Select recent feedback
        recent_feedback = self.feedback_buffer[-batch_size:]
        
        # Group feedback by type
        corrections = [f for f in recent_feedback if f['feedback_type'] == 'correction']
        negative_feedback = [f for f in recent_feedback if f['feedback_type'] == 'negative']
        
        # Process corrections (highest priority)
        if corrections:
            self._process_corrections(corrections)
        
        # Process negative feedback for model adjustment
        if negative_feedback:
            self._process_negative_feedback(negative_feedback)
    
    def _process_corrections(self, corrections: List[Dict[str, Any]]):
        """Process correction feedback to improve model"""
        # Extract features and corrected labels
        corrected_samples = []
        for correction in corrections:
            # Retrieve original prediction data
            prediction_data = self._get_prediction_data(correction['prediction_id'])
            if prediction_data:
                corrected_samples.append({
                    'features': prediction_data['features'],
                    'true_label': correction['true_label'],
                    'weight': correction['weight'] * correction['confidence']
                })
        
        if corrected_samples:
            # Update model with corrected samples
            self._update_model_with_corrections(corrected_samples)
    
    def get_feedback_analytics(self) -> Dict[str, Any]:
        """Analyze feedback patterns and model performance"""
        if not self.feedback_buffer:
            return {}
        
        feedback_types = {}
        confidence_scores = []
        recent_feedback = self.feedback_buffer[-100:]  # Last 100 feedback items
        
        for feedback in recent_feedback:
            feedback_type = feedback['feedback_type']
            feedback_types[feedback_type] = feedback_types.get(feedback_type, 0) + 1
            confidence_scores.append(feedback['confidence'])
        
        return {
            'total_feedback': len(self.feedback_buffer),
            'feedback_distribution': feedback_types,
            'average_confidence': np.mean(confidence_scores),
            'correction_rate': feedback_types.get('correction', 0) / len(recent_feedback),
            'model_improvement_opportunity': feedback_types.get('negative', 0) / len(recent_feedback)
        }

HITL ML Service Implementation

import asyncio
import torch
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from enum import Enum
import redis
import json
import uuid
import logging

class DecisionStatus(Enum):
    PENDING_AI = "pending_ai"
    PENDING_HUMAN = "pending_human"
    PENDING_EXPERT = "pending_expert"
    APPROVED = "approved"
    REJECTED = "rejected"
    ESCALATED = "escalated"

class ConfidenceLevel(Enum):
    HIGH = "high"      # > 0.9
    MEDIUM = "medium"  # 0.7 - 0.9
    LOW = "low"        # < 0.7

class HITLMLService:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # Initialize ML models
        self.ml_model = self._load_ml_model(config['model_path'])
        self.confidence_estimator = self._load_confidence_model()
        self.active_learner = ActiveLearningSystem(self.ml_model)
        
        # Human workflow components
        self.decision_router = DecisionRouter(config['routing_rules'])
        self.review_interface = ReviewInterface(config['ui_config'])
        self.feedback_collector = HumanFeedbackLoop(self.ml_model)
        
        # Storage and queuing
        self.redis_client = redis.Redis(
            host=config.get('redis_host', 'localhost'),
            port=config.get('redis_port', 6379),
            decode_responses=True
        )
        
        # Performance tracking
        self.metrics = {
            'total_decisions': 0,
            'automated_decisions': 0,
            'human_reviews': 0,
            'expert_escalations': 0,
            'avg_processing_time': 0,
            'accuracy_score': 0
        }
        
    def _load_ml_model(self, model_path: str):
        """Load the main ML model"""
        # Implementation depends on model type
        model = torch.load(model_path, map_location=self.device)
        model.eval()
        return model
    
    def _load_confidence_model(self):
        """Load model for confidence estimation"""
        # This could be a separate model trained to estimate prediction confidence
        return None  # Placeholder
    
    async def process_request(self, 
                            request_data: Dict[str, Any],
                            request_id: str = None) -> Dict[str, Any]:
        """Main entry point for processing requests"""
        
        if request_id is None:
            request_id = str(uuid.uuid4())
        
        start_time = datetime.now()
        self.metrics['total_decisions'] += 1
        
        try:
            # Step 1: AI Processing
            ai_result = await self._process_with_ai(request_data, request_id)
            
            # Step 2: Decision Routing
            routing_decision = await self._route_decision(ai_result, request_data)
            
            # Step 3: Handle based on routing decision
            if routing_decision['action'] == 'auto_approve':
                final_result = await self._auto_approve(ai_result, request_id)
                self.metrics['automated_decisions'] += 1
                
            elif routing_decision['action'] == 'human_review':
                final_result = await self._queue_for_human_review(
                    ai_result, request_data, request_id, routing_decision
                )
                self.metrics['human_reviews'] += 1
                
            elif routing_decision['action'] == 'expert_escalation':
                final_result = await self._escalate_to_expert(
                    ai_result, request_data, request_id, routing_decision
                )
                self.metrics['expert_escalations'] += 1
                
            else:
                raise ValueError(f"Unknown routing action: {routing_decision['action']}")
            
            # Update metrics
            processing_time = (datetime.now() - start_time).total_seconds()
            self._update_performance_metrics(processing_time)
            
            return final_result
            
        except Exception as e:
            logging.error(f"Error processing request {request_id}: {e}")
            return {
                'request_id': request_id,
                'status': 'error',
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            }
    
    async def _process_with_ai(self, 
                             request_data: Dict[str, Any], 
                             request_id: str) -> Dict[str, Any]:
        """Process request with AI model"""
        
        # Extract features from request
        features = self._extract_features(request_data)
        
        # Get AI prediction
        with torch.no_grad():
            prediction = self.ml_model(features)
            prediction_probs = torch.softmax(prediction, dim=-1)
        
        # Calculate confidence and uncertainty
        confidence = torch.max(prediction_probs).item()
        uncertainty = self._calculate_uncertainty(prediction_probs)
        
        # Determine confidence level
        if confidence > 0.9:
            confidence_level = ConfidenceLevel.HIGH
        elif confidence > 0.7:
            confidence_level = ConfidenceLevel.MEDIUM
        else:
            confidence_level = ConfidenceLevel.LOW
        
        ai_result = {
            'request_id': request_id,
            'prediction': torch.argmax(prediction).item(),
            'prediction_probs': prediction_probs.tolist(),
            'confidence': confidence,
            'uncertainty': uncertainty,
            'confidence_level': confidence_level.value,
            'features': features.tolist(),
            'processing_time': datetime.now().isoformat()
        }
        
        # Store AI result for potential human review
        await self._store_ai_result(request_id, ai_result)
        
        return ai_result
    
    async def _route_decision(self, 
                            ai_result: Dict[str, Any], 
                            request_data: Dict[str, Any]) -> Dict[str, Any]:
        """Route decision based on confidence and business rules"""
        
        confidence = ai_result['confidence']
        uncertainty = ai_result['uncertainty']
        
        # Business rule checks
        is_high_risk = self._check_high_risk_conditions(request_data)
        requires_compliance = self._check_compliance_requirements(request_data)
        
        # Routing logic
        if confidence > 0.95 and not is_high_risk and not requires_compliance:
            action = 'auto_approve'
            reason = 'High confidence, low risk'
            
        elif confidence > 0.8 and not is_high_risk:
            action = 'human_review'
            reason = 'Medium confidence, requires human validation'
            queue_priority = 'normal'
            
        elif confidence > 0.5:
            action = 'human_review'
            reason = 'Low confidence, requires careful review'
            queue_priority = 'high'
            
        else:
            action = 'expert_escalation'
            reason = 'Very low confidence or high-risk case'
            queue_priority = 'urgent'
        
        routing_decision = {
            'action': action,
            'reason': reason,
            'confidence': confidence,
            'uncertainty': uncertainty,
            'is_high_risk': is_high_risk,
            'requires_compliance': requires_compliance
        }
        
        if action in ['human_review', 'expert_escalation']:
            routing_decision['queue_priority'] = queue_priority
        
        return routing_decision
    
    async def _queue_for_human_review(self, 
                                    ai_result: Dict[str, Any],
                                    request_data: Dict[str, Any],
                                    request_id: str,
                                    routing_decision: Dict[str, Any]) -> Dict[str, Any]:
        """Queue request for human review"""
        
        review_item = {
            'request_id': request_id,
            'ai_result': ai_result,
            'original_request': request_data,
            'routing_decision': routing_decision,
            'status': DecisionStatus.PENDING_HUMAN.value,
            'created_at': datetime.now().isoformat(),
            'priority': routing_decision.get('queue_priority', 'normal')
        }
        
        # Add to review queue
        queue_name = f"human_review:{routing_decision.get('queue_priority', 'normal')}"
        await asyncio.get_event_loop().run_in_executor(
            None, self.redis_client.lpush, queue_name, json.dumps(review_item)
        )
        
        # Set timeout for review
        timeout_hours = self.config.get('review_timeout_hours', 24)
        await asyncio.get_event_loop().run_in_executor(
            None, self.redis_client.expire, f"pending:{request_id}", timeout_hours * 3600
        )
        
        return {
            'request_id': request_id,
            'status': DecisionStatus.PENDING_HUMAN.value,
            'queue_position': await self._get_queue_position(queue_name),
            'estimated_review_time': self._estimate_review_time(routing_decision['queue_priority']),
            'ai_recommendation': {
                'prediction': ai_result['prediction'],
                'confidence': ai_result['confidence']
            }
        }
    
    async def submit_human_decision(self, 
                                  request_id: str,
                                  human_decision: Dict[str, Any],
                                  reviewer_id: str) -> Dict[str, Any]:
        """Process human review decision"""
        
        try:
            # Retrieve original AI result
            ai_result = await self._get_ai_result(request_id)
            if not ai_result:
                raise ValueError(f"No AI result found for request {request_id}")
            
            # Validate human decision
            decision = human_decision['decision']  # 'approve', 'reject', 'escalate'
            confidence = human_decision.get('confidence', 1.0)
            feedback = human_decision.get('feedback', '')
            
            # Process decision
            if decision == 'approve':
                final_status = DecisionStatus.APPROVED
            elif decision == 'reject':
                final_status = DecisionStatus.REJECTED
            elif decision == 'escalate':
                return await self._escalate_to_expert(
                    ai_result, {}, request_id, {'reason': 'Human escalation'}
                )
            else:
                raise ValueError(f"Invalid decision: {decision}")
            
            # Store decision
            decision_record = {
                'request_id': request_id,
                'final_decision': decision,
                'status': final_status.value,
                'ai_prediction': ai_result['prediction'],
                'ai_confidence': ai_result['confidence'],
                'human_decision': decision,
                'human_confidence': confidence,
                'reviewer_id': reviewer_id,
                'feedback': feedback,
                'completed_at': datetime.now().isoformat()
            }
            
            await self._store_decision_record(request_id, decision_record)
            
            # Collect feedback for model improvement
            feedback_type = 'positive' if ai_result['prediction'] == (1 if decision == 'approve' else 0) else 'correction'
            self.feedback_collector.collect_feedback(
                request_id,
                1 if decision == 'approve' else 0,
                feedback_type,
                confidence
            )
            
            # Remove from pending queue
            await self._remove_from_pending(request_id)
            
            return {
                'request_id': request_id,
                'status': final_status.value,
                'final_decision': decision,
                'agreement_with_ai': ai_result['prediction'] == (1 if decision == 'approve' else 0),
                'processing_complete': True
            }
            
        except Exception as e:
            logging.error(f"Error processing human decision for {request_id}: {e}")
            return {
                'request_id': request_id,
                'status': 'error',
                'error': str(e)
            }
    
    async def get_review_queue(self, 
                             reviewer_id: str,
                             queue_type: str = 'normal',
                             limit: int = 10) -> List[Dict[str, Any]]:
        """Get pending items for human review"""
        
        queue_name = f"human_review:{queue_type}"
        
        # Get items from queue
        queue_items = await asyncio.get_event_loop().run_in_executor(
            None, self.redis_client.lrange, queue_name, 0, limit - 1
        )
        
        review_items = []
        for item_json in queue_items:
            item = json.loads(item_json)
            
            # Enrich with additional context for reviewer
            enriched_item = {
                'request_id': item['request_id'],
                'ai_recommendation': {
                    'prediction': item['ai_result']['prediction'],
                    'confidence': item['ai_result']['confidence'],
                    'confidence_level': item['ai_result']['confidence_level']
                },
                'request_summary': self._create_request_summary(item['original_request']),
                'priority': item['priority'],
                'created_at': item['created_at'],
                'time_in_queue': self._calculate_time_in_queue(item['created_at']),
                'routing_reason': item['routing_decision']['reason']
            }
            
            review_items.append(enriched_item)
        
        return review_items
    
    def _extract_features(self, request_data: Dict[str, Any]) -> torch.Tensor:
        """Extract features from request data"""
        # Implementation depends on the specific use case
        # This is a placeholder
        features = torch.tensor([1.0, 2.0, 3.0], device=self.device)
        return features
    
    def _calculate_uncertainty(self, prediction_probs: torch.Tensor) -> float:
        """Calculate prediction uncertainty"""
        # Use entropy as uncertainty measure
        entropy = -torch.sum(prediction_probs * torch.log(prediction_probs + 1e-8))
        return entropy.item()
    
    def _check_high_risk_conditions(self, request_data: Dict[str, Any]) -> bool:
        """Check if request meets high-risk conditions"""
        # Business-specific high-risk checks
        return request_data.get('amount', 0) > 10000  # Example
    
    def _check_compliance_requirements(self, request_data: Dict[str, Any]) -> bool:
        """Check if request requires compliance review"""
        # Compliance-specific checks
        return request_data.get('requires_audit', False)  # Example
    
    async def _store_ai_result(self, request_id: str, ai_result: Dict[str, Any]):
        """Store AI result in Redis"""
        key = f"ai_result:{request_id}"
        await asyncio.get_event_loop().run_in_executor(
            None, self.redis_client.setex, key, 86400, json.dumps(ai_result)  # 24 hour expiry
        )
    
    async def _get_ai_result(self, request_id: str) -> Optional[Dict[str, Any]]:
        """Retrieve AI result from Redis"""
        key = f"ai_result:{request_id}"
        result_json = await asyncio.get_event_loop().run_in_executor(
            None, self.redis_client.get, key
        )
        return json.loads(result_json) if result_json else None
    
    async def get_system_metrics(self) -> Dict[str, Any]:
        """Get comprehensive system metrics"""
        
        # Get queue sizes
        queue_sizes = {}
        for priority in ['urgent', 'high', 'normal', 'low']:
            queue_name = f"human_review:{priority}"
            size = await asyncio.get_event_loop().run_in_executor(
                None, self.redis_client.llen, queue_name
            )
            queue_sizes[priority] = size
        
        # Calculate automation rate
        total_decisions = self.metrics['total_decisions']
        automation_rate = (
            self.metrics['automated_decisions'] / total_decisions 
            if total_decisions > 0 else 0
        )
        
        # Get feedback analytics
        feedback_analytics = self.feedback_collector.get_feedback_analytics()
        
        return {
            'processing_metrics': {
                'total_decisions': total_decisions,
                'automated_decisions': self.metrics['automated_decisions'],
                'human_reviews': self.metrics['human_reviews'],
                'expert_escalations': self.metrics['expert_escalations'],
                'automation_rate': automation_rate,
                'avg_processing_time_ms': self.metrics['avg_processing_time'] * 1000
            },
            'queue_status': {
                'total_pending': sum(queue_sizes.values()),
                'by_priority': queue_sizes
            },
            'model_performance': {
                'accuracy_score': self.metrics['accuracy_score'],
                'feedback_analytics': feedback_analytics
            },
            'system_health': {
                'active_reviewers': await self._count_active_reviewers(),
                'avg_review_time': await self._calculate_avg_review_time(),
                'sla_compliance': await self._calculate_sla_compliance()
            }
        }
    
    def _update_performance_metrics(self, processing_time: float):
        """Update system performance metrics"""
        # Update average processing time using exponential moving average
        alpha = 0.1
        self.metrics['avg_processing_time'] = (
            alpha * processing_time + 
            (1 - alpha) * self.metrics['avg_processing_time']
        )

HITL Workflow Stages

AI Prediction

Initial automated processing and confidence scoring

Actions:
Model inference
Confidence calculation
Uncertainty estimation
Outputs:
Predictions
Confidence scores
Uncertainty measures

Routing Decision

Determine if human review is needed

Actions:
Confidence thresholding
Risk assessment
Queue assignment
Outputs:
Auto-approve
Human review
Expert escalation

Human Review

Human evaluation and decision making

Actions:
Case review
Decision making
Feedback provision
Outputs:
Final decision
Feedback labels
Quality scores

Learning Loop

Incorporating human feedback into model improvement

Actions:
Feedback integration
Model retraining
Performance evaluation
Outputs:
Updated models
Performance metrics
Process improvements

Workflow Orchestration System

import asyncio
from enum import Enum
from typing import Dict, List, Any, Callable
from datetime import datetime, timedelta

class WorkflowState(Enum):
    STARTED = "started"
    AI_PROCESSING = "ai_processing"
    ROUTING_DECISION = "routing_decision"
    HUMAN_REVIEW = "human_review"
    EXPERT_REVIEW = "expert_review"
    COMPLETED = "completed"
    FAILED = "failed"

class HITLWorkflowOrchestrator:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.workflows = {}
        self.state_handlers = self._initialize_state_handlers()
        self.workflow_templates = self._load_workflow_templates()
        
    def _initialize_state_handlers(self) -> Dict[WorkflowState, Callable]:
        """Initialize handlers for each workflow state"""
        return {
            WorkflowState.STARTED: self._handle_workflow_start,
            WorkflowState.AI_PROCESSING: self._handle_ai_processing,
            WorkflowState.ROUTING_DECISION: self._handle_routing_decision,
            WorkflowState.HUMAN_REVIEW: self._handle_human_review,
            WorkflowState.EXPERT_REVIEW: self._handle_expert_review,
            WorkflowState.COMPLETED: self._handle_workflow_completion,
            WorkflowState.FAILED: self._handle_workflow_failure
        }
    
    async def start_workflow(self, 
                           workflow_type: str,
                           request_data: Dict[str, Any],
                           workflow_id: str = None) -> str:
        """Start a new HITL workflow"""
        
        if workflow_id is None:
            workflow_id = self._generate_workflow_id()
        
        # Get workflow template
        template = self.workflow_templates.get(workflow_type)
        if not template:
            raise ValueError(f"Unknown workflow type: {workflow_type}")
        
        # Initialize workflow state
        workflow = {
            'id': workflow_id,
            'type': workflow_type,
            'state': WorkflowState.STARTED,
            'request_data': request_data,
            'created_at': datetime.now(),
            'updated_at': datetime.now(),
            'template': template,
            'context': {},
            'results': {},
            'transitions': []
        }
        
        self.workflows[workflow_id] = workflow
        
        # Start workflow execution
        await self._execute_workflow_step(workflow_id)
        
        return workflow_id
    
    async def _execute_workflow_step(self, workflow_id: str):
        """Execute the next step in the workflow"""
        
        workflow = self.workflows.get(workflow_id)
        if not workflow:
            raise ValueError(f"Workflow {workflow_id} not found")
        
        current_state = workflow['state']
        
        # Get handler for current state
        handler = self.state_handlers.get(current_state)
        if not handler:
            raise ValueError(f"No handler for state {current_state}")
        
        try:
            # Execute state handler
            result = await handler(workflow)
            
            # Update workflow with results
            workflow['results'][current_state.value] = result
            workflow['updated_at'] = datetime.now()
            
            # Determine next state
            next_state = self._determine_next_state(workflow, result)
            
            if next_state and next_state != current_state:
                # Record state transition
                transition = {
                    'from_state': current_state.value,
                    'to_state': next_state.value,
                    'timestamp': datetime.now(),
                    'trigger': result.get('transition_trigger', 'automatic')
                }
                workflow['transitions'].append(transition)
                
                # Update workflow state
                workflow['state'] = next_state
                
                # Continue workflow if not terminal state
                if next_state not in [WorkflowState.COMPLETED, WorkflowState.FAILED]:
                    await self._execute_workflow_step(workflow_id)
            
        except Exception as e:
            # Handle workflow failure
            workflow['state'] = WorkflowState.FAILED
            workflow['error'] = str(e)
            await self._handle_workflow_failure(workflow)
    
    async def _handle_workflow_start(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
        """Handle workflow start state"""
        return {
            'status': 'initialized',
            'transition_trigger': 'start_ai_processing'
        }
    
    async def _handle_ai_processing(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
        """Handle AI processing state"""
        
        # Extract features and run AI model
        request_data = workflow['request_data']
        
        # Simulate AI processing (replace with actual ML inference)
        ai_prediction = await self._run_ai_inference(request_data)
        
        # Store AI results in workflow context
        workflow['context']['ai_result'] = ai_prediction
        
        return {
            'status': 'ai_completed',
            'prediction': ai_prediction['prediction'],
            'confidence': ai_prediction['confidence'],
            'transition_trigger': 'routing_required'
        }
    
    async def _handle_routing_decision(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
        """Handle routing decision state"""
        
        ai_result = workflow['context']['ai_result']
        request_data = workflow['request_data']
        
        # Apply routing rules
        routing_decision = await self._make_routing_decision(ai_result, request_data)
        
        workflow['context']['routing_decision'] = routing_decision
        
        return {
            'status': 'routing_completed',
            'routing_action': routing_decision['action'],
            'transition_trigger': routing_decision['action']
        }
    
    async def _handle_human_review(self, workflow: Dict[str, Any]) -> Dict[str, Any]:
        """Handle human review state"""
        
        # Queue for human review
        review_queue_item = {
            'workflow_id': workflow['id'],
            'ai_result': workflow['context']['ai_result'],
            'request_data': workflow['request_data'],
            'priority': workflow['context']['routing_decision'].get('priority', 'normal')
        }
        
        await self._queue_for_human_review(review_queue_item)
        
        # Wait for human decision (this would be handled by a separate process)
        # For now, return pending status
        return {
            'status': 'queued_for_human_review',
            'queue_position': await self._get_queue_position(),
            'estimated_time': await self._estimate_review_time(),
            'transition_trigger': 'await_human_decision'
        }
    
    async def process_human_decision(self, 
                                   workflow_id: str,
                                   human_decision: Dict[str, Any]) -> Dict[str, Any]:
        """Process human decision and continue workflow"""
        
        workflow = self.workflows.get(workflow_id)
        if not workflow:
            raise ValueError(f"Workflow {workflow_id} not found")
        
        # Store human decision in context
        workflow['context']['human_decision'] = human_decision
        
        # Continue workflow based on human decision
        if human_decision['decision'] == 'escalate':
            workflow['state'] = WorkflowState.EXPERT_REVIEW
        else:
            workflow['state'] = WorkflowState.COMPLETED
        
        await self._execute_workflow_step(workflow_id)
        
        return {
            'workflow_id': workflow_id,
            'status': workflow['state'].value,
            'decision_processed': True
        }
    
    def _determine_next_state(self, 
                            workflow: Dict[str, Any], 
                            current_result: Dict[str, Any]) -> WorkflowState:
        """Determine next workflow state based on current result"""
        
        current_state = workflow['state']
        trigger = current_result.get('transition_trigger')
        
        # State transition logic
        transitions = {
            WorkflowState.STARTED: {
                'start_ai_processing': WorkflowState.AI_PROCESSING
            },
            WorkflowState.AI_PROCESSING: {
                'routing_required': WorkflowState.ROUTING_DECISION
            },
            WorkflowState.ROUTING_DECISION: {
                'auto_approve': WorkflowState.COMPLETED,
                'human_review': WorkflowState.HUMAN_REVIEW,
                'expert_escalation': WorkflowState.EXPERT_REVIEW
            },
            WorkflowState.HUMAN_REVIEW: {
                'human_approved': WorkflowState.COMPLETED,
                'human_rejected': WorkflowState.COMPLETED,
                'escalate_to_expert': WorkflowState.EXPERT_REVIEW
            },
            WorkflowState.EXPERT_REVIEW: {
                'expert_decision': WorkflowState.COMPLETED
            }
        }
        
        return transitions.get(current_state, {}).get(trigger)
    
    async def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
        """Get current workflow status"""
        
        workflow = self.workflows.get(workflow_id)
        if not workflow:
            return {'error': 'Workflow not found'}
        
        return {
            'workflow_id': workflow_id,
            'type': workflow['type'],
            'current_state': workflow['state'].value,
            'created_at': workflow['created_at'].isoformat(),
            'updated_at': workflow['updated_at'].isoformat(),
            'progress': self._calculate_workflow_progress(workflow),
            'estimated_completion': self._estimate_completion_time(workflow),
            'context': workflow['context'],
            'transitions': workflow['transitions']
        }
    
    def _calculate_workflow_progress(self, workflow: Dict[str, Any]) -> float:
        """Calculate workflow completion progress (0-1)"""
        
        total_states = len(WorkflowState) - 2  # Exclude COMPLETED and FAILED
        completed_transitions = len(workflow['transitions'])
        
        return min(completed_transitions / total_states, 1.0)
    
    async def get_workflow_analytics(self) -> Dict[str, Any]:
        """Get analytics across all workflows"""
        
        if not self.workflows:
            return {}
        
        # Aggregate statistics
        workflows_by_type = {}
        workflows_by_state = {}
        avg_processing_times = {}
        
        for workflow in self.workflows.values():
            # Count by type
            workflow_type = workflow['type']
            workflows_by_type[workflow_type] = workflows_by_type.get(workflow_type, 0) + 1
            
            # Count by state
            state = workflow['state'].value
            workflows_by_state[state] = workflows_by_state.get(state, 0) + 1
            
            # Calculate processing time for completed workflows
            if workflow['state'] == WorkflowState.COMPLETED:
                processing_time = (workflow['updated_at'] - workflow['created_at']).total_seconds()
                if workflow_type not in avg_processing_times:
                    avg_processing_times[workflow_type] = []
                avg_processing_times[workflow_type].append(processing_time)
        
        # Calculate averages
        for workflow_type in avg_processing_times:
            times = avg_processing_times[workflow_type]
            avg_processing_times[workflow_type] = sum(times) / len(times)
        
        return {
            'total_workflows': len(self.workflows),
            'workflows_by_type': workflows_by_type,
            'workflows_by_state': workflows_by_state,
            'avg_processing_times': avg_processing_times,
            'automation_rate': self._calculate_automation_rate(),
            'human_review_rate': self._calculate_human_review_rate()
        }
    
    def _calculate_automation_rate(self) -> float:
        """Calculate percentage of fully automated decisions"""
        completed_workflows = [w for w in self.workflows.values() 
                             if w['state'] == WorkflowState.COMPLETED]
        
        if not completed_workflows:
            return 0.0
        
        automated_count = 0
        for workflow in completed_workflows:
            # Check if workflow went through human review
            states_visited = [t['to_state'] for t in workflow['transitions']]
            if 'human_review' not in states_visited and 'expert_review' not in states_visited:
                automated_count += 1
        
        return automated_count / len(completed_workflows)

Quality Metrics

Performance Metrics

Human-AI Agreement
Consensus between human and AI decisions
> 90%
Review Efficiency
Cases processed per human-hour
> 100/hour
False Positive Rate
Incorrect AI predictions requiring human fix
< 5%
Learning Acceleration
Model improvement rate with human feedback
3x faster

Quality Metrics

Inter-Annotator Agreement
Consistency between human annotators
> 85%
Expert Validation Rate
Expert approval of AI + human decisions
> 95%
Time to Resolution
Average time from AI to human decision
< 2 hours
Cost per Decision
Total cost including human oversight
< $0.50

Production Deployment

# Human-in-the-Loop ML Service Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: hitl-ml-service
  labels:
    app: hitl-ml
spec:
  replicas: 3
  selector:
    matchLabels:
      app: hitl-ml
  template:
    metadata:
      labels:
        app: hitl-ml
    spec:
      containers:
      - name: hitl-service
        image: ml-platform/hitl-ml:v1.5.0
        ports:
        - containerPort: 8000
        resources:
          requests:
            memory: "4Gi"
            cpu: "2"
            nvidia.com/gpu: "1"
          limits:
            memory: "8Gi"
            cpu: "4"
            nvidia.com/gpu: "1"
        env:
        - name: ML_MODEL_PATH
          value: "/models/production_model.pt"
        - name: REDIS_HOST
          value: "redis-cluster"
        - name: REVIEW_TIMEOUT_HOURS
          value: "24"
        - name: AUTO_APPROVE_THRESHOLD
          value: "0.95"
        - name: EXPERT_ESCALATION_THRESHOLD
          value: "0.5"
        volumeMounts:
        - name: model-storage
          mountPath: /models
        - name: workflow-configs
          mountPath: /configs
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 60
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 15
      volumes:
      - name: model-storage
        persistentVolumeClaim:
          claimName: ml-models-pvc
      - name: workflow-configs
        configMap:
          name: hitl-workflow-configs

---
# Human Review Interface
apiVersion: apps/v1
kind: Deployment
metadata:
  name: review-interface
spec:
  replicas: 2
  selector:
    matchLabels:
      app: review-interface
  template:
    metadata:
      labels:
        app: review-interface
    spec:
      containers:
      - name: review-ui
        image: ml-platform/review-interface:v1.2.0
        ports:
        - containerPort: 3000
        env:
        - name: API_ENDPOINT
          value: "http://hitl-ml-service:8000"
        - name: AUTH_PROVIDER
          value: "oauth2"
        resources:
          requests:
            memory: "512Mi"
            cpu: "0.5"
          limits:
            memory: "1Gi"
            cpu: "1"

---
# Redis Cluster for Queue Management
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: redis-hitl-cluster
spec:
  serviceName: "redis-hitl"
  replicas: 3
  selector:
    matchLabels:
      app: redis-hitl
  template:
    metadata:
      labels:
        app: redis-hitl
    spec:
      containers:
      - name: redis
        image: redis:7-alpine
        ports:
        - containerPort: 6379
        command:
        - redis-server
        - --cluster-enabled
        - "yes"
        - --cluster-config-file
        - nodes.conf
        - --cluster-node-timeout
        - "5000"
        resources:
          requests:
            memory: "1Gi"
            cpu: "0.5"
          limits:
            memory: "2Gi"
            cpu: "1"
        volumeMounts:
        - name: redis-storage
          mountPath: /data
  volumeClaimTemplates:
  - metadata:
      name: redis-storage
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 10Gi

---
# Workflow Orchestrator
apiVersion: apps/v1
kind: Deployment
metadata:
  name: workflow-orchestrator
spec:
  replicas: 2
  selector:
    matchLabels:
      app: workflow-orchestrator
  template:
    metadata:
      labels:
        app: workflow-orchestrator
    spec:
      containers:
      - name: orchestrator
        image: ml-platform/workflow-orchestrator:v1.0.0
        env:
        - name: REDIS_HOST
          value: "redis-hitl-cluster"
        - name: HITL_SERVICE_ENDPOINT
          value: "http://hitl-ml-service:8000"
        resources:
          requests:
            memory: "1Gi"
            cpu: "1"
          limits:
            memory: "2Gi"
            cpu: "2"

---
# Monitoring and Analytics
apiVersion: apps/v1
kind: Deployment
metadata:
  name: hitl-analytics
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hitl-analytics
  template:
    metadata:
      labels:
        app: hitl-analytics
    spec:
      containers:
      - name: analytics
        image: ml-platform/hitl-analytics:v1.0.0
        ports:
        - containerPort: 8080
        env:
        - name: REDIS_HOST
          value: "redis-hitl-cluster"
        - name: PROMETHEUS_ENDPOINT
          value: "http://prometheus:9090"
        resources:
          requests:
            memory: "2Gi"
            cpu: "1"
          limits:
            memory: "4Gi"
            cpu: "2"

Performance Benchmarks

Efficiency Metrics

Automation Rate87%
Human Review Efficiency125 cases/hour
Average Queue Time45 minutes
Cost per Decision$0.32

Quality Metrics

System Accuracy96.8%
Human-AI Agreement92.3%
Expert Validation Rate97.1%
Learning Acceleration3.2x faster

📝 Test Your Understanding

1 of 4Current: 0/4

What is the primary purpose of active learning in HITL ML systems?