Skip to main contentSkip to user menuSkip to navigation

Computer Vision Systems Design

Design production-scale computer vision systems with real-time processing and GPU optimization

55 min readAdvanced
Not Started
Loading...

📹 Computer Vision Performance Calculator

30 FPS
100 streams
3 models

Performance Analysis

Pipeline:Real-time Processing
Expected Latency:< 100ms
Pixels/Second:6221M pixels/sec
Memory Required:21357.42GB / 24GB
GPU Utilization:95%
Effective Streams:100 / 100
Bottleneck:Memory

Pipeline: Immediate processing with minimal buffering

⚠️ System overloaded. Reduce streams, resolution, or upgrade hardware.

🏗️ Computer Vision System Architecture

Data Ingestion Layer

Video Sources

  • • RTMP/WebRTC streams
  • • IP cameras (RTSP)
  • • File uploads (batch)
  • • Live broadcast feeds

Preprocessing

  • • Frame extraction
  • • Resolution normalization
  • • Format conversion
  • • Quality filtering

Load Balancing

  • • GPU queue management
  • • Stream prioritization
  • • Adaptive throttling
  • • Failover handling

Processing Pipeline

Object Detection

  • • YOLO/R-CNN models
  • • Multi-scale detection
  • • Class-specific filtering
  • • Confidence thresholding

Tracking & Analysis

  • • Multi-object tracking
  • • Behavior analysis
  • • Anomaly detection
  • • Activity recognition

Post-Processing

  • • Non-max suppression
  • • Result aggregation
  • • Temporal smoothing
  • • Output formatting

Storage & Serving Layer

Result Storage

  • • Time-series database
  • • Object metadata store
  • • Event logs
  • • Analytics warehouse

API Layer

  • • Real-time webhooks
  • • Query endpoints
  • • Streaming results
  • • Dashboard APIs

Monitoring

  • • GPU utilization
  • • Processing latency
  • • Accuracy metrics
  • • Error rates

💻 Production CV System Implementation

1. Real-time Video Processing Pipeline

import cv2
import numpy as np
import torch
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from collections import deque
import logging

class VideoProcessingPipeline:
    def __init__(self, model_path, gpu_id=0, max_batch_size=8):
        # Initialize GPU device
        self.device = torch.device(f'cuda:{gpu_id}' if torch.cuda.is_available() else 'cpu')
        
        # Load optimized model (TensorRT, ONNX, etc.)
        self.model = torch.jit.load(model_path, map_location=self.device)
        self.model.eval()
        
        # Processing configuration
        self.max_batch_size = max_batch_size
        self.input_size = (640, 640)  # Model input size
        self.confidence_threshold = 0.5
        self.nms_threshold = 0.45
        
        # Frame buffer for batching
        self.frame_buffer = deque()
        self.metadata_buffer = deque()
        
        # Performance tracking
        self.processing_times = deque(maxlen=100)
        self.throughput_counter = 0
        
        # Thread pool for I/O operations
        self.executor = ThreadPoolExecutor(max_workers=4)
        
    async def process_video_stream(self, stream_url: str, output_callback):
        """Process video stream with real-time object detection"""
        cap = cv2.VideoCapture(stream_url)
        
        if not cap.isOpened():
            raise ValueError(f"Could not open video stream: {stream_url}")
            
        frame_id = 0
        
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    logging.warning(f"Failed to read frame from {stream_url}")
                    break
                
                # Add frame to processing queue
                await self.add_frame(frame, frame_id, stream_url)
                frame_id += 1
                
                # Process batch if ready
                if len(self.frame_buffer) >= self.max_batch_size:
                    results = await self.process_batch()
                    
                    # Send results to callback
                    for result in results:
                        await output_callback(result)
                
        finally:
            cap.release()
    
    async def add_frame(self, frame: np.ndarray, frame_id: int, stream_id: str):
        """Add frame to processing buffer with metadata"""
        # Preprocess frame
        processed_frame = self.preprocess_frame(frame)
        
        # Store frame and metadata
        self.frame_buffer.append(processed_frame)
        self.metadata_buffer.append({
            'frame_id': frame_id,
            'stream_id': stream_id,
            'timestamp': time.time(),
            'original_shape': frame.shape
        })
    
    def preprocess_frame(self, frame: np.ndarray) -> torch.Tensor:
        """Preprocess frame for model input"""
        # Resize to model input size
        resized = cv2.resize(frame, self.input_size)
        
        # Convert BGR to RGB
        rgb_frame = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
        
        # Normalize and convert to tensor
        tensor = torch.from_numpy(rgb_frame).permute(2, 0, 1).float() / 255.0
        
        return tensor.unsqueeze(0)  # Add batch dimension
    
    async def process_batch(self) -> List[Dict]:
        """Process batch of frames with GPU acceleration"""
        if not self.frame_buffer:
            return []
        
        start_time = time.time()
        
        # Create batch tensor
        batch_frames = []
        batch_metadata = []
        
        # Process up to max_batch_size frames
        for _ in range(min(self.max_batch_size, len(self.frame_buffer))):
            if self.frame_buffer:
                batch_frames.append(self.frame_buffer.popleft())
                batch_metadata.append(self.metadata_buffer.popleft())
        
        if not batch_frames:
            return []
        
        # Stack frames into batch
        batch_tensor = torch.cat(batch_frames, dim=0).to(self.device)
        
        # Run inference
        with torch.no_grad():
            predictions = self.model(batch_tensor)
        
        # Post-process results
        results = []
        for i, (pred, metadata) in enumerate(zip(predictions, batch_metadata)):
            detections = self.post_process_detections(pred, metadata)
            results.append({
                'detections': detections,
                'metadata': metadata,
                'processing_time': time.time() - start_time
            })
        
        # Update performance metrics
        processing_time = time.time() - start_time
        self.processing_times.append(processing_time)
        self.throughput_counter += len(batch_frames)
        
        return results
    
    def post_process_detections(self, predictions: torch.Tensor, metadata: Dict) -> List[Dict]:
        """Apply NMS and filter detections"""
        # Parse model output (assuming YOLO format)
        boxes, scores, class_ids = self.parse_yolo_output(predictions)
        
        # Apply confidence filtering
        valid_indices = scores > self.confidence_threshold
        boxes = boxes[valid_indices]
        scores = scores[valid_indices]
        class_ids = class_ids[valid_indices]
        
        # Apply Non-Maximum Suppression
        keep_indices = self.non_max_suppression(boxes, scores, self.nms_threshold)
        
        # Format final detections
        detections = []
        for idx in keep_indices:
            detection = {
                'bbox': boxes[idx].tolist(),
                'confidence': float(scores[idx]),
                'class_id': int(class_ids[idx]),
                'class_name': self.get_class_name(int(class_ids[idx])),
                'timestamp': metadata['timestamp']
            }
            detections.append(detection)
        
        return detections
    
    def parse_yolo_output(self, predictions: torch.Tensor):
        """Parse YOLO model output"""
        # Assuming predictions shape: [batch_size, num_detections, 85]
        # 85 = 4 (bbox) + 1 (objectness) + 80 (COCO classes)
        
        boxes = predictions[..., :4]  # x, y, w, h
        objectness = predictions[..., 4:5]
        class_probs = predictions[..., 5:]
        
        # Convert to corner format and scale to input size
        boxes = self.xywh_to_xyxy(boxes)
        
        # Compute final scores
        scores = objectness * class_probs
        class_ids = torch.argmax(scores, dim=-1)
        max_scores = torch.max(scores, dim=-1)[0]
        
        return boxes.squeeze(0), max_scores.squeeze(0), class_ids.squeeze(0)
    
    def non_max_suppression(self, boxes: torch.Tensor, scores: torch.Tensor, iou_threshold: float):
        """Apply Non-Maximum Suppression"""
        return torch.ops.torchvision.nms(boxes, scores, iou_threshold)
    
    def get_performance_stats(self) -> Dict:
        """Get current performance statistics"""
        if not self.processing_times:
            return {}
        
        avg_processing_time = np.mean(self.processing_times)
        fps = len(self.processing_times) / sum(self.processing_times) if sum(self.processing_times) > 0 else 0
        
        return {
            'avg_processing_time_ms': avg_processing_time * 1000,
            'fps': fps,
            'throughput': self.throughput_counter,
            'gpu_utilization': self.get_gpu_utilization()
        }
    
    def get_gpu_utilization(self) -> float:
        """Get current GPU utilization"""
        if torch.cuda.is_available():
            return torch.cuda.utilization() / 100.0
        return 0.0

# Usage example
async def main():
    # Initialize pipeline
    pipeline = VideoProcessingPipeline('models/yolo_optimized.pt', gpu_id=0)
    
    # Define result callback
    async def handle_detections(result):
        detections = result['detections']
        metadata = result['metadata']
        
        print(f"Frame {metadata['frame_id']}: {len(detections)} objects detected")
        for detection in detections:
            print(f"  - {detection['class_name']}: {detection['confidence']:.2f}")
    
    # Process video stream
    await pipeline.process_video_stream('rtmp://stream-url', handle_detections)
    
    # Print performance stats
    stats = pipeline.get_performance_stats()
    print(f"Performance: {stats['fps']:.1f} FPS, {stats['avg_processing_time_ms']:.1f}ms avg")

if __name__ == "__main__":
    asyncio.run(main())

2. Multi-GPU Video Analytics System

import torch
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel
import redis
import json
import queue
import threading
from typing import List, Dict, Optional
import logging

class MultiGPUVideoAnalytics:
    def __init__(self, model_paths: List[str], gpu_ids: List[int]):
        self.model_paths = model_paths
        self.gpu_ids = gpu_ids
        self.num_gpus = len(gpu_ids)
        
        # Redis for result aggregation
        self.redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
        
        # Inter-process communication
        self.task_queue = mp.Queue(maxsize=1000)
        self.result_queue = mp.Queue()
        
        # Process pool
        self.processes = []
        
        # Performance monitoring
        self.stats = {
            'frames_processed': 0,
            'total_processing_time': 0.0,
            'gpu_utilization': [0.0] * self.num_gpus
        }
    
    def start_workers(self):
        """Start GPU worker processes"""
        for i, (model_path, gpu_id) in enumerate(zip(self.model_paths, self.gpu_ids)):
            worker_process = mp.Process(
                target=self.gpu_worker,
                args=(model_path, gpu_id, i, self.task_queue, self.result_queue)
            )
            worker_process.start()
            self.processes.append(worker_process)
            
        # Start result aggregator
        aggregator_thread = threading.Thread(target=self.result_aggregator)
        aggregator_thread.start()
        
        logging.info(f"Started {self.num_gpus} GPU workers")
    
    @staticmethod
    def gpu_worker(model_path: str, gpu_id: int, worker_id: int, 
                   task_queue: mp.Queue, result_queue: mp.Queue):
        """Worker process for GPU processing"""
        # Set GPU device
        device = torch.device(f'cuda:{gpu_id}')
        torch.cuda.set_device(gpu_id)
        
        # Load model on specific GPU
        model = torch.jit.load(model_path, map_location=device)
        model.eval()
        
        logging.info(f"Worker {worker_id} initialized on GPU {gpu_id}")
        
        while True:
            try:
                # Get task from queue (blocking)
                task = task_queue.get(timeout=1.0)
                
                if task is None:  # Shutdown signal
                    break
                
                start_time = time.time()
                
                # Process frames
                frames = task['frames']
                metadata = task['metadata']
                
                # Create batch tensor
                batch_tensor = torch.stack([
                    torch.from_numpy(frame).permute(2, 0, 1).float() / 255.0
                    for frame in frames
                ]).to(device)
                
                # Run inference
                with torch.no_grad():
                    predictions = model(batch_tensor)
                
                # Prepare results
                processing_time = time.time() - start_time
                
                result = {
                    'worker_id': worker_id,
                    'gpu_id': gpu_id,
                    'predictions': predictions.cpu(),
                    'metadata': metadata,
                    'processing_time': processing_time,
                    'timestamp': time.time()
                }
                
                # Send result back
                result_queue.put(result)
                
            except queue.Empty:
                continue
            except Exception as e:
                logging.error(f"Worker {worker_id} error: {e}")
                continue
    
    def result_aggregator(self):
        """Aggregate results from all GPU workers"""
        while True:
            try:
                result = self.result_queue.get(timeout=1.0)
                
                # Update statistics
                self.stats['frames_processed'] += len(result['metadata'])
                self.stats['total_processing_time'] += result['processing_time']
                
                # Process and store results
                self.process_results(result)
                
            except queue.Empty:
                continue
            except Exception as e:
                logging.error(f"Result aggregator error: {e}")
    
    def process_results(self, result: Dict):
        """Process and store detection results"""
        predictions = result['predictions']
        metadata = result['metadata']
        
        for i, (pred, meta) in enumerate(zip(predictions, metadata)):
            # Parse detections
            detections = self.parse_detections(pred, meta)
            
            # Store in Redis with TTL
            key = f"detections:{meta['stream_id']}:{meta['frame_id']}"
            value = {
                'detections': detections,
                'metadata': meta,
                'processing_stats': {
                    'worker_id': result['worker_id'],
                    'gpu_id': result['gpu_id'],
                    'processing_time': result['processing_time'],
                    'timestamp': result['timestamp']
                }
            }
            
            # Store with 1 hour TTL
            self.redis_client.setex(key, 3600, json.dumps(value))
            
            # Publish real-time event
            self.redis_client.publish('detections', json.dumps({
                'stream_id': meta['stream_id'],
                'frame_id': meta['frame_id'],
                'detection_count': len(detections),
                'timestamp': time.time()
            }))
    
    def submit_frames(self, frames: List[np.ndarray], metadata: List[Dict]):
        """Submit frames for processing"""
        task = {
            'frames': frames,
            'metadata': metadata,
            'submit_time': time.time()
        }
        
        try:
            self.task_queue.put(task, timeout=0.1)  # Non-blocking
            return True
        except queue.Full:
            logging.warning("Task queue full, dropping frames")
            return False
    
    def get_performance_stats(self) -> Dict:
        """Get aggregated performance statistics"""
        total_frames = self.stats['frames_processed']
        total_time = self.stats['total_processing_time']
        
        if total_frames == 0:
            return {'fps': 0, 'avg_processing_time_ms': 0}
        
        avg_fps = total_frames / total_time if total_time > 0 else 0
        avg_processing_time = (total_time / total_frames) * 1000  # ms
        
        # Get GPU utilization
        gpu_utilization = []
        for gpu_id in self.gpu_ids:
            torch.cuda.set_device(gpu_id)
            utilization = torch.cuda.utilization()
            gpu_utilization.append(utilization)
        
        return {
            'total_frames_processed': total_frames,
            'average_fps': avg_fps,
            'avg_processing_time_ms': avg_processing_time,
            'gpu_utilization': gpu_utilization,
            'active_workers': len(self.processes)
        }
    
    def shutdown(self):
        """Gracefully shutdown all workers"""
        # Send shutdown signal to all workers
        for _ in self.processes:
            self.task_queue.put(None)
        
        # Wait for processes to finish
        for process in self.processes:
            process.join(timeout=5.0)
            if process.is_alive():
                process.terminate()
        
        logging.info("All workers shut down")

# Usage example
def main():
    # Initialize multi-GPU system
    model_paths = ['model1.pt', 'model2.pt', 'model3.pt', 'model4.pt']
    gpu_ids = [0, 1, 2, 3]  # Use 4 GPUs
    
    analytics_system = MultiGPUVideoAnalytics(model_paths, gpu_ids)
    analytics_system.start_workers()
    
    try:
        # Simulate frame processing
        for batch_id in range(100):
            # Create dummy frames (in practice, these come from video streams)
            frames = [np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8) for _ in range(8)]
            metadata = [
                {
                    'stream_id': f'stream_{i%4}',
                    'frame_id': batch_id * 8 + i,
                    'timestamp': time.time()
                }
                for i in range(8)
            ]
            
            # Submit for processing
            success = analytics_system.submit_frames(frames, metadata)
            if not success:
                print("Queue full, skipping batch")
            
            # Print stats every 10 batches
            if batch_id % 10 == 0:
                stats = analytics_system.get_performance_stats()
                print(f"Batch {batch_id}: {stats['average_fps']:.1f} FPS, "
                      f"GPU util: {[f'{u:.1f}%' for u in stats['gpu_utilization']]}")
            
            time.sleep(0.1)  # Simulate real-time processing
            
    finally:
        analytics_system.shutdown()

if __name__ == "__main__":
    # Enable multiprocessing
    mp.set_start_method('spawn')
    main()

🏭 Production Computer Vision Systems

T

Tesla Autopilot Vision

Multi-camera perception system

  • Cameras:8 cameras, 360° view
  • Processing:30 FPS real-time
  • Tasks:Object detection, depth estimation
  • Hardware:FSD Chip (144 TOPS)
A

Amazon Go Stores

Cashier-less shopping

  • Coverage:100+ cameras per store
  • Tracking:Multi-person, multi-object
  • Accuracy:> 99% transaction accuracy
  • Latency:< 1 second checkout
Y

YouTube Content Moderation

Video content analysis

  • Scale:500+ hours uploaded/minute
  • Processing:Real-time + batch analysis
  • Detection:Violence, adult content, spam
  • Accuracy:95%+ automated decisions
F

Facebook Photo Tagging

Face recognition & tagging

  • Photos:350M+ photos/day
  • Faces:Billions of face embeddings
  • Accuracy:97.35% face verification
  • Speed:< 1 second per photo

✅ Computer Vision Systems Best Practices

✅ Do's

  • Optimize for your hardware - Use TensorRT, ONNX, or model-specific optimizations for your GPU architecture
  • Implement adaptive batching - Balance latency vs throughput based on current load
  • Use multi-scale detection - Process multiple resolutions for better small object detection
  • Monitor GPU memory carefully - Track memory usage to prevent OOM crashes
  • Implement graceful degradation - Reduce quality/accuracy under high load rather than dropping frames

❌ Don'ts

  • Don't process every frame - Skip frames during high load or use temporal consistency
  • Avoid CPU-GPU memory copying - Keep data on GPU as much as possible
  • Don't ignore model warm-up - First inference is always slower; warm up models on startup
  • Avoid blocking I/O operations - Use async processing for network and disk operations
  • Don't neglect post-processing - NMS and filtering can be bottlenecks at scale
No quiz questions available
Quiz ID "computer-vision-systems" not found