Autonomous Systems ML

Building ML systems for autonomous vehicles, robotics, and intelligent agents

⚠️ Safety-Critical System Notice

Autonomous systems are safety-critical applications where ML failures can result in physical harm or loss of life. This content is for educational purposes only. Production autonomous systems require compliance with safety standards (ISO 26262, ISO 21448), extensive testing, validation, and certification by qualified engineers. Never deploy autonomous ML systems without proper safety analysis, redundancy, and human oversight.

Autonomous System Domains

Computer Vision for Autonomous Systems

Real-Time Computer Vision Pipeline

import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from typing import Dict, List, Tuple, Optional
import threading
import queue
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor

@dataclass
class DetectionResult:
    """Detection result with safety metadata"""
    bbox: Tuple[int, int, int, int]  # x1, y1, x2, y2
    class_id: int
    confidence: float
    track_id: Optional[int] = None
    distance: Optional[float] = None
    velocity: Optional[Tuple[float, float]] = None
    safety_score: Optional[float] = None

class AutonomousVisionSystem:
    """
    Real-time computer vision system for autonomous vehicles
    
    Features:
    - Multi-camera sensor fusion
    - Real-time object detection and tracking
    - Depth estimation and 3D perception
    - Safety-critical decision making
    - Hardware acceleration (GPU/TPU)
    """
    
    def __init__(self, config: Dict):
        self.config = config
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        # Initialize models
        self.object_detector = self._load_object_detection_model()
        self.depth_estimator = self._load_depth_estimation_model()
        self.tracker = self._load_tracking_model()
        self.segmentation_model = self._load_segmentation_model()
        
        # Camera calibration and transforms
        self.camera_matrix = self._load_camera_calibration()
        self.transforms = self._setup_transforms()
        
        # Real-time processing pipeline
        self.frame_queue = queue.Queue(maxsize=10)
        self.result_queue = queue.Queue(maxsize=10)
        self.processing_thread = None
        self.is_running = False
        
        # Safety monitoring
        self.safety_monitor = SafetyMonitor(config)
        
        # Performance metrics
        self.metrics = {
            'fps': 0,
            'latency_ms': 0,
            'detection_accuracy': 0,
            'safety_violations': 0
        }
        
    def start_real_time_processing(self, camera_sources: List[str]):
        """Start real-time vision processing pipeline"""
        
        self.is_running = True
        
        # Start camera capture threads
        self.camera_threads = []
        for i, source in enumerate(camera_sources):
            thread = threading.Thread(
                target=self._camera_capture_worker,
                args=(source, i),
                daemon=True
            )
            thread.start()
            self.camera_threads.append(thread)
        
        # Start processing thread
        self.processing_thread = threading.Thread(
            target=self._processing_worker,
            daemon=True
        )
        self.processing_thread.start()
        
        print(f"Started real-time vision processing with {len(camera_sources)} cameras")
    
    def _camera_capture_worker(self, source: str, camera_id: int):
        """Worker thread for camera frame capture"""
        
        cap = cv2.VideoCapture(source)
        cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)  # Minimize latency
        
        while self.is_running:
            ret, frame = cap.read()
            if not ret:
                continue
            
            # Timestamp and camera metadata
            frame_data = {
                'frame': frame,
                'camera_id': camera_id,
                'timestamp': time.time(),
                'frame_id': self._generate_frame_id()
            }
            
            # Non-blocking queue insertion
            try:
                self.frame_queue.put_nowait(frame_data)
            except queue.Full:
                # Drop oldest frame to maintain real-time processing
                try:
                    self.frame_queue.get_nowait()
                    self.frame_queue.put_nowait(frame_data)
                except queue.Empty:
                    pass
        
        cap.release()
    
    def _processing_worker(self):
        """Main processing worker for vision pipeline"""
        
        with ThreadPoolExecutor(max_workers=4) as executor:
            while self.is_running:
                try:
                    # Get latest frame
                    frame_data = self.frame_queue.get(timeout=0.1)
                    
                    # Process frame through vision pipeline
                    start_time = time.time()
                    
                    # Parallel processing of different vision tasks
                    detection_future = executor.submit(
                        self._detect_objects, frame_data['frame']
                    )
                    depth_future = executor.submit(
                        self._estimate_depth, frame_data['frame']
                    )
                    segmentation_future = executor.submit(
                        self._segment_scene, frame_data['frame']
                    )
                    
                    # Collect results
                    detections = detection_future.result()
                    depth_map = depth_future.result()
                    segmentation = segmentation_future.result()
                    
                    # Object tracking
                    tracked_objects = self._update_tracking(detections, frame_data)
                    
                    # 3D world coordinate mapping
                    world_objects = self._map_to_world_coordinates(
                        tracked_objects, depth_map, frame_data['camera_id']
                    )
                    
                    # Safety assessment
                    safety_assessment = self.safety_monitor.assess_scene_safety(
                        world_objects, segmentation, frame_data
                    )
                    
                    # Compile results
                    processing_time = (time.time() - start_time) * 1000  # ms
                    
                    result = {
                        'frame_id': frame_data['frame_id'],
                        'timestamp': frame_data['timestamp'],
                        'camera_id': frame_data['camera_id'],
                        'detections': world_objects,
                        'segmentation': segmentation,
                        'depth_map': depth_map,
                        'safety_assessment': safety_assessment,
                        'processing_time_ms': processing_time,
                        'metrics': self._calculate_frame_metrics(world_objects, processing_time)
                    }
                    
                    # Send to result queue
                    try:
                        self.result_queue.put_nowait(result)
                    except queue.Full:
                        # Drop oldest result
                        try:
                            self.result_queue.get_nowait()
                            self.result_queue.put_nowait(result)
                        except queue.Empty:
                            pass
                    
                    # Update performance metrics
                    self._update_performance_metrics(processing_time)
                    
                except queue.Empty:
                    continue
                except Exception as e:
                    print(f"Processing error: {e}")
                    continue
    
    def _detect_objects(self, frame: np.ndarray) -> List[DetectionResult]:
        """Real-time object detection with confidence filtering"""
        
        # Preprocess frame
        input_tensor = self.transforms(frame).unsqueeze(0).to(self.device)
        
        # Run detection model
        with torch.no_grad():
            predictions = self.object_detector(input_tensor)
        
        # Post-process predictions
        detections = []
        for pred in predictions[0]:  # Assuming batch size 1
            
            # Filter by confidence threshold
            if pred['score'] > self.config['detection_threshold']:
                
                # Safety-critical object prioritization
                class_id = int(pred['class_id'])
                safety_priority = self._get_safety_priority(class_id)
                
                detection = DetectionResult(
                    bbox=tuple(pred['bbox'].cpu().numpy()),
                    class_id=class_id,
                    confidence=float(pred['score']),
                    safety_score=safety_priority
                )
                
                detections.append(detection)
        
        # Sort by safety priority and confidence
        detections.sort(key=lambda x: (x.safety_score, x.confidence), reverse=True)
        
        return detections
    
    def _estimate_depth(self, frame: np.ndarray) -> np.ndarray:
        """Monocular depth estimation for 3D perception"""
        
        # Preprocess for depth model
        depth_input = cv2.resize(frame, (384, 384))
        depth_tensor = torch.from_numpy(depth_input).float().permute(2, 0, 1).unsqueeze(0)
        depth_tensor = depth_tensor.to(self.device) / 255.0
        
        # Generate depth map
        with torch.no_grad():
            depth_prediction = self.depth_estimator(depth_tensor)
        
        # Post-process depth map
        depth_map = depth_prediction.squeeze().cpu().numpy()
        depth_map = cv2.resize(depth_map, (frame.shape[1], frame.shape[0]))
        
        # Convert to metric depth (meters)
        depth_map = self._convert_to_metric_depth(depth_map)
        
        return depth_map
    
    def _update_tracking(self, detections: List[DetectionResult], frame_data: Dict) -> List[DetectionResult]:
        """Multi-object tracking with Kalman filtering"""
        
        # Extract detection data for tracker
        detection_data = []
        for det in detections:
            detection_data.append([
                det.bbox[0], det.bbox[1], det.bbox[2], det.bbox[3],
                det.confidence, det.class_id
            ])
        
        # Update tracker
        tracked_objects = self.tracker.update(
            np.array(detection_data) if detection_data else np.empty((0, 6))
        )
        
        # Update detection objects with tracking information
        updated_detections = []
        for i, det in enumerate(detections):
            if i < len(tracked_objects):
                track = tracked_objects[i]
                det.track_id = int(track[4])  # Track ID
                
                # Estimate velocity from tracking history
                det.velocity = self._estimate_object_velocity(det.track_id)
                
            updated_detections.append(det)
        
        return updated_detections
    
    def _map_to_world_coordinates(self, detections: List[DetectionResult], 
                                depth_map: np.ndarray, camera_id: int) -> List[DetectionResult]:
        """Map 2D detections to 3D world coordinates"""
        
        world_objects = []
        
        for det in detections:
            # Get object center point
            center_x = (det.bbox[0] + det.bbox[2]) // 2
            center_y = (det.bbox[1] + det.bbox[3]) // 2
            
            # Get depth at object center
            object_depth = depth_map[center_y, center_x]
            
            # Convert to 3D world coordinates
            world_coords = self._pixel_to_world(
                center_x, center_y, object_depth, camera_id
            )
            
            # Update detection with 3D information
            det.distance = float(object_depth)
            
            # Calculate relative position and velocity in world frame
            if det.velocity and object_depth > 0:
                # Convert pixel velocity to world velocity
                world_velocity = self._pixel_velocity_to_world(
                    det.velocity, object_depth, camera_id
                )
                det.velocity = world_velocity
            
            world_objects.append(det)
        
        return world_objects

class SafetyMonitor:
    """Safety monitoring system for autonomous vision"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.safety_rules = self._load_safety_rules()
        self.collision_predictor = CollisionPredictor(config)
        
    def assess_scene_safety(self, objects: List[DetectionResult], 
                          segmentation: np.ndarray, frame_data: Dict) -> Dict:
        """Comprehensive safety assessment of the current scene"""
        
        safety_assessment = {
            'overall_risk': 'LOW',
            'immediate_threats': [],
            'collision_predictions': [],
            'recommended_actions': [],
            'confidence': 0.95
        }
        
        # Detect immediate threats
        immediate_threats = self._detect_immediate_threats(objects)
        safety_assessment['immediate_threats'] = immediate_threats
        
        # Predict potential collisions
        collision_predictions = self.collision_predictor.predict_collisions(objects)
        safety_assessment['collision_predictions'] = collision_predictions
        
        # Assess drivable area
        drivable_area_safety = self._assess_drivable_area(segmentation)
        
        # Overall risk calculation
        overall_risk = self._calculate_overall_risk(
            immediate_threats, collision_predictions, drivable_area_safety
        )
        safety_assessment['overall_risk'] = overall_risk
        
        # Generate safety recommendations
        recommendations = self._generate_safety_recommendations(
            immediate_threats, collision_predictions, overall_risk
        )
        safety_assessment['recommended_actions'] = recommendations
        
        return safety_assessment
    
    def _detect_immediate_threats(self, objects: List[DetectionResult]) -> List[Dict]:
        """Detect objects that pose immediate safety threats"""
        
        threats = []
        
        for obj in objects:
            # Distance-based threat assessment
            if obj.distance and obj.distance < self.config['min_safe_distance']:
                
                # Velocity-based collision risk
                collision_time = self._calculate_time_to_collision(obj)
                
                if collision_time is not None and collision_time < self.config['min_reaction_time']:
                    threat = {
                        'object_id': obj.track_id,
                        'class_id': obj.class_id,
                        'distance': obj.distance,
                        'time_to_collision': collision_time,
                        'severity': self._assess_threat_severity(obj, collision_time)
                    }
                    threats.append(threat)
        
        return threats

class CollisionPredictor:
    """ML-based collision prediction system"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.prediction_model = self._load_collision_model()
        
    def predict_collisions(self, objects: List[DetectionResult]) -> List[Dict]:
        """Predict potential collisions using ML models"""
        
        predictions = []
        
        # Pairwise collision prediction
        for i, obj1 in enumerate(objects):
            for j, obj2 in enumerate(objects[i+1:], i+1):
                
                # Extract features for collision prediction
                features = self._extract_collision_features(obj1, obj2)
                
                # Predict collision probability
                collision_prob = self.prediction_model.predict_proba([features])[0][1]
                
                if collision_prob > self.config['collision_threshold']:
                    prediction = {
                        'object_1': obj1.track_id,
                        'object_2': obj2.track_id,
                        'probability': float(collision_prob),
                        'predicted_time': self._estimate_collision_time(obj1, obj2),
                        'predicted_location': self._estimate_collision_location(obj1, obj2)
                    }
                    predictions.append(prediction)
        
        return predictions
    
    def _extract_collision_features(self, obj1: DetectionResult, obj2: DetectionResult) -> List[float]:
        """Extract features for collision prediction model"""
        
        features = []
        
        # Distance features
        if obj1.distance and obj2.distance:
            features.extend([
                obj1.distance,
                obj2.distance,
                abs(obj1.distance - obj2.distance)
            ])
        else:
            features.extend([0, 0, 0])
        
        # Velocity features
        if obj1.velocity and obj2.velocity:
            # Relative velocity
            rel_vel_x = obj1.velocity[0] - obj2.velocity[0]
            rel_vel_y = obj1.velocity[1] - obj2.velocity[1]
            rel_speed = np.sqrt(rel_vel_x**2 + rel_vel_y**2)
            
            features.extend([
                np.sqrt(obj1.velocity[0]**2 + obj1.velocity[1]**2),  # obj1 speed
                np.sqrt(obj2.velocity[0]**2 + obj2.velocity[1]**2),  # obj2 speed
                rel_speed,  # relative speed
                rel_vel_x,  # relative velocity x
                rel_vel_y   # relative velocity y
            ])
        else:
            features.extend([0, 0, 0, 0, 0])
        
        # Object class features (one-hot encoded)
        class_features = [0] * 10  # Assuming 10 object classes
        if obj1.class_id < 10:
            class_features[obj1.class_id] = 1
        features.extend(class_features)
        
        # Confidence features
        features.extend([obj1.confidence, obj2.confidence])
        
        return features

Autonomous Systems Safety Standards

ISO 26262

Functional safety standard for automotive systems

Level: ASIL D (highest)
Requirement: Hardware fault tolerance and software safety measures

ISO 21448 (SOTIF)

Safety of the intended functionality for automated driving

Level: Level 4 autonomy
Requirement: Validation of AI system behavior in edge cases

IEEE 2830

Technical standard for autonomous systems verification

Level: ML component validation
Requirement: Testing and validation of ML models in safety contexts

DO-178C/DO-331

Airborne software certification (aviation)

Level: DAL A (critical)
Requirement: Formal verification of flight-critical ML systems

📝 Autonomous Systems ML Mastery Check

1 of 4Current: 0/4

What is sensor fusion in autonomous systems?