Autonomous Systems ML
Building ML systems for autonomous vehicles, robotics, and intelligent agents
⚠️ Safety-Critical System Notice
Autonomous systems are safety-critical applications where ML failures can result in physical harm or loss of life. This content is for educational purposes only. Production autonomous systems require compliance with safety standards (ISO 26262, ISO 21448), extensive testing, validation, and certification by qualified engineers. Never deploy autonomous ML systems without proper safety analysis, redundancy, and human oversight.
Autonomous System Domains
Computer Vision for Autonomous Systems
Real-Time Computer Vision Pipeline
import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from typing import Dict, List, Tuple, Optional
import threading
import queue
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
@dataclass
class DetectionResult:
"""Detection result with safety metadata"""
bbox: Tuple[int, int, int, int] # x1, y1, x2, y2
class_id: int
confidence: float
track_id: Optional[int] = None
distance: Optional[float] = None
velocity: Optional[Tuple[float, float]] = None
safety_score: Optional[float] = None
class AutonomousVisionSystem:
"""
Real-time computer vision system for autonomous vehicles
Features:
- Multi-camera sensor fusion
- Real-time object detection and tracking
- Depth estimation and 3D perception
- Safety-critical decision making
- Hardware acceleration (GPU/TPU)
"""
def __init__(self, config: Dict):
self.config = config
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Initialize models
self.object_detector = self._load_object_detection_model()
self.depth_estimator = self._load_depth_estimation_model()
self.tracker = self._load_tracking_model()
self.segmentation_model = self._load_segmentation_model()
# Camera calibration and transforms
self.camera_matrix = self._load_camera_calibration()
self.transforms = self._setup_transforms()
# Real-time processing pipeline
self.frame_queue = queue.Queue(maxsize=10)
self.result_queue = queue.Queue(maxsize=10)
self.processing_thread = None
self.is_running = False
# Safety monitoring
self.safety_monitor = SafetyMonitor(config)
# Performance metrics
self.metrics = {
'fps': 0,
'latency_ms': 0,
'detection_accuracy': 0,
'safety_violations': 0
}
def start_real_time_processing(self, camera_sources: List[str]):
"""Start real-time vision processing pipeline"""
self.is_running = True
# Start camera capture threads
self.camera_threads = []
for i, source in enumerate(camera_sources):
thread = threading.Thread(
target=self._camera_capture_worker,
args=(source, i),
daemon=True
)
thread.start()
self.camera_threads.append(thread)
# Start processing thread
self.processing_thread = threading.Thread(
target=self._processing_worker,
daemon=True
)
self.processing_thread.start()
print(f"Started real-time vision processing with {len(camera_sources)} cameras")
def _camera_capture_worker(self, source: str, camera_id: int):
"""Worker thread for camera frame capture"""
cap = cv2.VideoCapture(source)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Minimize latency
while self.is_running:
ret, frame = cap.read()
if not ret:
continue
# Timestamp and camera metadata
frame_data = {
'frame': frame,
'camera_id': camera_id,
'timestamp': time.time(),
'frame_id': self._generate_frame_id()
}
# Non-blocking queue insertion
try:
self.frame_queue.put_nowait(frame_data)
except queue.Full:
# Drop oldest frame to maintain real-time processing
try:
self.frame_queue.get_nowait()
self.frame_queue.put_nowait(frame_data)
except queue.Empty:
pass
cap.release()
def _processing_worker(self):
"""Main processing worker for vision pipeline"""
with ThreadPoolExecutor(max_workers=4) as executor:
while self.is_running:
try:
# Get latest frame
frame_data = self.frame_queue.get(timeout=0.1)
# Process frame through vision pipeline
start_time = time.time()
# Parallel processing of different vision tasks
detection_future = executor.submit(
self._detect_objects, frame_data['frame']
)
depth_future = executor.submit(
self._estimate_depth, frame_data['frame']
)
segmentation_future = executor.submit(
self._segment_scene, frame_data['frame']
)
# Collect results
detections = detection_future.result()
depth_map = depth_future.result()
segmentation = segmentation_future.result()
# Object tracking
tracked_objects = self._update_tracking(detections, frame_data)
# 3D world coordinate mapping
world_objects = self._map_to_world_coordinates(
tracked_objects, depth_map, frame_data['camera_id']
)
# Safety assessment
safety_assessment = self.safety_monitor.assess_scene_safety(
world_objects, segmentation, frame_data
)
# Compile results
processing_time = (time.time() - start_time) * 1000 # ms
result = {
'frame_id': frame_data['frame_id'],
'timestamp': frame_data['timestamp'],
'camera_id': frame_data['camera_id'],
'detections': world_objects,
'segmentation': segmentation,
'depth_map': depth_map,
'safety_assessment': safety_assessment,
'processing_time_ms': processing_time,
'metrics': self._calculate_frame_metrics(world_objects, processing_time)
}
# Send to result queue
try:
self.result_queue.put_nowait(result)
except queue.Full:
# Drop oldest result
try:
self.result_queue.get_nowait()
self.result_queue.put_nowait(result)
except queue.Empty:
pass
# Update performance metrics
self._update_performance_metrics(processing_time)
except queue.Empty:
continue
except Exception as e:
print(f"Processing error: {e}")
continue
def _detect_objects(self, frame: np.ndarray) -> List[DetectionResult]:
"""Real-time object detection with confidence filtering"""
# Preprocess frame
input_tensor = self.transforms(frame).unsqueeze(0).to(self.device)
# Run detection model
with torch.no_grad():
predictions = self.object_detector(input_tensor)
# Post-process predictions
detections = []
for pred in predictions[0]: # Assuming batch size 1
# Filter by confidence threshold
if pred['score'] > self.config['detection_threshold']:
# Safety-critical object prioritization
class_id = int(pred['class_id'])
safety_priority = self._get_safety_priority(class_id)
detection = DetectionResult(
bbox=tuple(pred['bbox'].cpu().numpy()),
class_id=class_id,
confidence=float(pred['score']),
safety_score=safety_priority
)
detections.append(detection)
# Sort by safety priority and confidence
detections.sort(key=lambda x: (x.safety_score, x.confidence), reverse=True)
return detections
def _estimate_depth(self, frame: np.ndarray) -> np.ndarray:
"""Monocular depth estimation for 3D perception"""
# Preprocess for depth model
depth_input = cv2.resize(frame, (384, 384))
depth_tensor = torch.from_numpy(depth_input).float().permute(2, 0, 1).unsqueeze(0)
depth_tensor = depth_tensor.to(self.device) / 255.0
# Generate depth map
with torch.no_grad():
depth_prediction = self.depth_estimator(depth_tensor)
# Post-process depth map
depth_map = depth_prediction.squeeze().cpu().numpy()
depth_map = cv2.resize(depth_map, (frame.shape[1], frame.shape[0]))
# Convert to metric depth (meters)
depth_map = self._convert_to_metric_depth(depth_map)
return depth_map
def _update_tracking(self, detections: List[DetectionResult], frame_data: Dict) -> List[DetectionResult]:
"""Multi-object tracking with Kalman filtering"""
# Extract detection data for tracker
detection_data = []
for det in detections:
detection_data.append([
det.bbox[0], det.bbox[1], det.bbox[2], det.bbox[3],
det.confidence, det.class_id
])
# Update tracker
tracked_objects = self.tracker.update(
np.array(detection_data) if detection_data else np.empty((0, 6))
)
# Update detection objects with tracking information
updated_detections = []
for i, det in enumerate(detections):
if i < len(tracked_objects):
track = tracked_objects[i]
det.track_id = int(track[4]) # Track ID
# Estimate velocity from tracking history
det.velocity = self._estimate_object_velocity(det.track_id)
updated_detections.append(det)
return updated_detections
def _map_to_world_coordinates(self, detections: List[DetectionResult],
depth_map: np.ndarray, camera_id: int) -> List[DetectionResult]:
"""Map 2D detections to 3D world coordinates"""
world_objects = []
for det in detections:
# Get object center point
center_x = (det.bbox[0] + det.bbox[2]) // 2
center_y = (det.bbox[1] + det.bbox[3]) // 2
# Get depth at object center
object_depth = depth_map[center_y, center_x]
# Convert to 3D world coordinates
world_coords = self._pixel_to_world(
center_x, center_y, object_depth, camera_id
)
# Update detection with 3D information
det.distance = float(object_depth)
# Calculate relative position and velocity in world frame
if det.velocity and object_depth > 0:
# Convert pixel velocity to world velocity
world_velocity = self._pixel_velocity_to_world(
det.velocity, object_depth, camera_id
)
det.velocity = world_velocity
world_objects.append(det)
return world_objects
class SafetyMonitor:
"""Safety monitoring system for autonomous vision"""
def __init__(self, config: Dict):
self.config = config
self.safety_rules = self._load_safety_rules()
self.collision_predictor = CollisionPredictor(config)
def assess_scene_safety(self, objects: List[DetectionResult],
segmentation: np.ndarray, frame_data: Dict) -> Dict:
"""Comprehensive safety assessment of the current scene"""
safety_assessment = {
'overall_risk': 'LOW',
'immediate_threats': [],
'collision_predictions': [],
'recommended_actions': [],
'confidence': 0.95
}
# Detect immediate threats
immediate_threats = self._detect_immediate_threats(objects)
safety_assessment['immediate_threats'] = immediate_threats
# Predict potential collisions
collision_predictions = self.collision_predictor.predict_collisions(objects)
safety_assessment['collision_predictions'] = collision_predictions
# Assess drivable area
drivable_area_safety = self._assess_drivable_area(segmentation)
# Overall risk calculation
overall_risk = self._calculate_overall_risk(
immediate_threats, collision_predictions, drivable_area_safety
)
safety_assessment['overall_risk'] = overall_risk
# Generate safety recommendations
recommendations = self._generate_safety_recommendations(
immediate_threats, collision_predictions, overall_risk
)
safety_assessment['recommended_actions'] = recommendations
return safety_assessment
def _detect_immediate_threats(self, objects: List[DetectionResult]) -> List[Dict]:
"""Detect objects that pose immediate safety threats"""
threats = []
for obj in objects:
# Distance-based threat assessment
if obj.distance and obj.distance < self.config['min_safe_distance']:
# Velocity-based collision risk
collision_time = self._calculate_time_to_collision(obj)
if collision_time is not None and collision_time < self.config['min_reaction_time']:
threat = {
'object_id': obj.track_id,
'class_id': obj.class_id,
'distance': obj.distance,
'time_to_collision': collision_time,
'severity': self._assess_threat_severity(obj, collision_time)
}
threats.append(threat)
return threats
class CollisionPredictor:
"""ML-based collision prediction system"""
def __init__(self, config: Dict):
self.config = config
self.prediction_model = self._load_collision_model()
def predict_collisions(self, objects: List[DetectionResult]) -> List[Dict]:
"""Predict potential collisions using ML models"""
predictions = []
# Pairwise collision prediction
for i, obj1 in enumerate(objects):
for j, obj2 in enumerate(objects[i+1:], i+1):
# Extract features for collision prediction
features = self._extract_collision_features(obj1, obj2)
# Predict collision probability
collision_prob = self.prediction_model.predict_proba([features])[0][1]
if collision_prob > self.config['collision_threshold']:
prediction = {
'object_1': obj1.track_id,
'object_2': obj2.track_id,
'probability': float(collision_prob),
'predicted_time': self._estimate_collision_time(obj1, obj2),
'predicted_location': self._estimate_collision_location(obj1, obj2)
}
predictions.append(prediction)
return predictions
def _extract_collision_features(self, obj1: DetectionResult, obj2: DetectionResult) -> List[float]:
"""Extract features for collision prediction model"""
features = []
# Distance features
if obj1.distance and obj2.distance:
features.extend([
obj1.distance,
obj2.distance,
abs(obj1.distance - obj2.distance)
])
else:
features.extend([0, 0, 0])
# Velocity features
if obj1.velocity and obj2.velocity:
# Relative velocity
rel_vel_x = obj1.velocity[0] - obj2.velocity[0]
rel_vel_y = obj1.velocity[1] - obj2.velocity[1]
rel_speed = np.sqrt(rel_vel_x**2 + rel_vel_y**2)
features.extend([
np.sqrt(obj1.velocity[0]**2 + obj1.velocity[1]**2), # obj1 speed
np.sqrt(obj2.velocity[0]**2 + obj2.velocity[1]**2), # obj2 speed
rel_speed, # relative speed
rel_vel_x, # relative velocity x
rel_vel_y # relative velocity y
])
else:
features.extend([0, 0, 0, 0, 0])
# Object class features (one-hot encoded)
class_features = [0] * 10 # Assuming 10 object classes
if obj1.class_id < 10:
class_features[obj1.class_id] = 1
features.extend(class_features)
# Confidence features
features.extend([obj1.confidence, obj2.confidence])
return features
Autonomous Systems Safety Standards
ISO 26262
Functional safety standard for automotive systems
ISO 21448 (SOTIF)
Safety of the intended functionality for automated driving
IEEE 2830
Technical standard for autonomous systems verification
DO-178C/DO-331
Airborne software certification (aviation)
Related Technologies
ROS 2→
Robot Operating System for autonomous systems
OpenCV→
Computer vision library for image processing
PyTorch→
Deep learning framework for vision models
CARLA→
Autonomous vehicle simulation platform
Apache Kafka→
Real-time sensor data streaming
NVIDIA CUDA→
GPU acceleration for real-time processing
Docker→
Containerization for autonomous system deployment
Kubernetes→
Orchestration for distributed autonomous systems