Computer Vision Systems Design
Design production-scale computer vision systems with real-time processing and GPU optimization
55 min read•Advanced
Not Started
Loading...
📹 Computer Vision Performance Calculator
30 FPS
100 streams
3 models
Performance Analysis
Pipeline:Real-time Processing
Expected Latency:< 100ms
Pixels/Second:6221M pixels/sec
Memory Required:21357.42GB / 24GB
GPU Utilization:95%
Effective Streams:100 / 100
Bottleneck:Memory
Pipeline: Immediate processing with minimal buffering
⚠️ System overloaded. Reduce streams, resolution, or upgrade hardware.
🏗️ Computer Vision System Architecture
Data Ingestion Layer
Video Sources
- • RTMP/WebRTC streams
- • IP cameras (RTSP)
- • File uploads (batch)
- • Live broadcast feeds
Preprocessing
- • Frame extraction
- • Resolution normalization
- • Format conversion
- • Quality filtering
Load Balancing
- • GPU queue management
- • Stream prioritization
- • Adaptive throttling
- • Failover handling
Processing Pipeline
Object Detection
- • YOLO/R-CNN models
- • Multi-scale detection
- • Class-specific filtering
- • Confidence thresholding
Tracking & Analysis
- • Multi-object tracking
- • Behavior analysis
- • Anomaly detection
- • Activity recognition
Post-Processing
- • Non-max suppression
- • Result aggregation
- • Temporal smoothing
- • Output formatting
Storage & Serving Layer
Result Storage
- • Time-series database
- • Object metadata store
- • Event logs
- • Analytics warehouse
API Layer
- • Real-time webhooks
- • Query endpoints
- • Streaming results
- • Dashboard APIs
Monitoring
- • GPU utilization
- • Processing latency
- • Accuracy metrics
- • Error rates
💻 Production CV System Implementation
1. Real-time Video Processing Pipeline
import cv2
import numpy as np
import torch
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from collections import deque
import logging
class VideoProcessingPipeline:
def __init__(self, model_path, gpu_id=0, max_batch_size=8):
# Initialize GPU device
self.device = torch.device(f'cuda:{gpu_id}' if torch.cuda.is_available() else 'cpu')
# Load optimized model (TensorRT, ONNX, etc.)
self.model = torch.jit.load(model_path, map_location=self.device)
self.model.eval()
# Processing configuration
self.max_batch_size = max_batch_size
self.input_size = (640, 640) # Model input size
self.confidence_threshold = 0.5
self.nms_threshold = 0.45
# Frame buffer for batching
self.frame_buffer = deque()
self.metadata_buffer = deque()
# Performance tracking
self.processing_times = deque(maxlen=100)
self.throughput_counter = 0
# Thread pool for I/O operations
self.executor = ThreadPoolExecutor(max_workers=4)
async def process_video_stream(self, stream_url: str, output_callback):
"""Process video stream with real-time object detection"""
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
raise ValueError(f"Could not open video stream: {stream_url}")
frame_id = 0
try:
while True:
ret, frame = cap.read()
if not ret:
logging.warning(f"Failed to read frame from {stream_url}")
break
# Add frame to processing queue
await self.add_frame(frame, frame_id, stream_url)
frame_id += 1
# Process batch if ready
if len(self.frame_buffer) >= self.max_batch_size:
results = await self.process_batch()
# Send results to callback
for result in results:
await output_callback(result)
finally:
cap.release()
async def add_frame(self, frame: np.ndarray, frame_id: int, stream_id: str):
"""Add frame to processing buffer with metadata"""
# Preprocess frame
processed_frame = self.preprocess_frame(frame)
# Store frame and metadata
self.frame_buffer.append(processed_frame)
self.metadata_buffer.append({
'frame_id': frame_id,
'stream_id': stream_id,
'timestamp': time.time(),
'original_shape': frame.shape
})
def preprocess_frame(self, frame: np.ndarray) -> torch.Tensor:
"""Preprocess frame for model input"""
# Resize to model input size
resized = cv2.resize(frame, self.input_size)
# Convert BGR to RGB
rgb_frame = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
# Normalize and convert to tensor
tensor = torch.from_numpy(rgb_frame).permute(2, 0, 1).float() / 255.0
return tensor.unsqueeze(0) # Add batch dimension
async def process_batch(self) -> List[Dict]:
"""Process batch of frames with GPU acceleration"""
if not self.frame_buffer:
return []
start_time = time.time()
# Create batch tensor
batch_frames = []
batch_metadata = []
# Process up to max_batch_size frames
for _ in range(min(self.max_batch_size, len(self.frame_buffer))):
if self.frame_buffer:
batch_frames.append(self.frame_buffer.popleft())
batch_metadata.append(self.metadata_buffer.popleft())
if not batch_frames:
return []
# Stack frames into batch
batch_tensor = torch.cat(batch_frames, dim=0).to(self.device)
# Run inference
with torch.no_grad():
predictions = self.model(batch_tensor)
# Post-process results
results = []
for i, (pred, metadata) in enumerate(zip(predictions, batch_metadata)):
detections = self.post_process_detections(pred, metadata)
results.append({
'detections': detections,
'metadata': metadata,
'processing_time': time.time() - start_time
})
# Update performance metrics
processing_time = time.time() - start_time
self.processing_times.append(processing_time)
self.throughput_counter += len(batch_frames)
return results
def post_process_detections(self, predictions: torch.Tensor, metadata: Dict) -> List[Dict]:
"""Apply NMS and filter detections"""
# Parse model output (assuming YOLO format)
boxes, scores, class_ids = self.parse_yolo_output(predictions)
# Apply confidence filtering
valid_indices = scores > self.confidence_threshold
boxes = boxes[valid_indices]
scores = scores[valid_indices]
class_ids = class_ids[valid_indices]
# Apply Non-Maximum Suppression
keep_indices = self.non_max_suppression(boxes, scores, self.nms_threshold)
# Format final detections
detections = []
for idx in keep_indices:
detection = {
'bbox': boxes[idx].tolist(),
'confidence': float(scores[idx]),
'class_id': int(class_ids[idx]),
'class_name': self.get_class_name(int(class_ids[idx])),
'timestamp': metadata['timestamp']
}
detections.append(detection)
return detections
def parse_yolo_output(self, predictions: torch.Tensor):
"""Parse YOLO model output"""
# Assuming predictions shape: [batch_size, num_detections, 85]
# 85 = 4 (bbox) + 1 (objectness) + 80 (COCO classes)
boxes = predictions[..., :4] # x, y, w, h
objectness = predictions[..., 4:5]
class_probs = predictions[..., 5:]
# Convert to corner format and scale to input size
boxes = self.xywh_to_xyxy(boxes)
# Compute final scores
scores = objectness * class_probs
class_ids = torch.argmax(scores, dim=-1)
max_scores = torch.max(scores, dim=-1)[0]
return boxes.squeeze(0), max_scores.squeeze(0), class_ids.squeeze(0)
def non_max_suppression(self, boxes: torch.Tensor, scores: torch.Tensor, iou_threshold: float):
"""Apply Non-Maximum Suppression"""
return torch.ops.torchvision.nms(boxes, scores, iou_threshold)
def get_performance_stats(self) -> Dict:
"""Get current performance statistics"""
if not self.processing_times:
return {}
avg_processing_time = np.mean(self.processing_times)
fps = len(self.processing_times) / sum(self.processing_times) if sum(self.processing_times) > 0 else 0
return {
'avg_processing_time_ms': avg_processing_time * 1000,
'fps': fps,
'throughput': self.throughput_counter,
'gpu_utilization': self.get_gpu_utilization()
}
def get_gpu_utilization(self) -> float:
"""Get current GPU utilization"""
if torch.cuda.is_available():
return torch.cuda.utilization() / 100.0
return 0.0
# Usage example
async def main():
# Initialize pipeline
pipeline = VideoProcessingPipeline('models/yolo_optimized.pt', gpu_id=0)
# Define result callback
async def handle_detections(result):
detections = result['detections']
metadata = result['metadata']
print(f"Frame {metadata['frame_id']}: {len(detections)} objects detected")
for detection in detections:
print(f" - {detection['class_name']}: {detection['confidence']:.2f}")
# Process video stream
await pipeline.process_video_stream('rtmp://stream-url', handle_detections)
# Print performance stats
stats = pipeline.get_performance_stats()
print(f"Performance: {stats['fps']:.1f} FPS, {stats['avg_processing_time_ms']:.1f}ms avg")
if __name__ == "__main__":
asyncio.run(main())2. Multi-GPU Video Analytics System
import torch
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel
import redis
import json
import queue
import threading
from typing import List, Dict, Optional
import logging
class MultiGPUVideoAnalytics:
def __init__(self, model_paths: List[str], gpu_ids: List[int]):
self.model_paths = model_paths
self.gpu_ids = gpu_ids
self.num_gpus = len(gpu_ids)
# Redis for result aggregation
self.redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Inter-process communication
self.task_queue = mp.Queue(maxsize=1000)
self.result_queue = mp.Queue()
# Process pool
self.processes = []
# Performance monitoring
self.stats = {
'frames_processed': 0,
'total_processing_time': 0.0,
'gpu_utilization': [0.0] * self.num_gpus
}
def start_workers(self):
"""Start GPU worker processes"""
for i, (model_path, gpu_id) in enumerate(zip(self.model_paths, self.gpu_ids)):
worker_process = mp.Process(
target=self.gpu_worker,
args=(model_path, gpu_id, i, self.task_queue, self.result_queue)
)
worker_process.start()
self.processes.append(worker_process)
# Start result aggregator
aggregator_thread = threading.Thread(target=self.result_aggregator)
aggregator_thread.start()
logging.info(f"Started {self.num_gpus} GPU workers")
@staticmethod
def gpu_worker(model_path: str, gpu_id: int, worker_id: int,
task_queue: mp.Queue, result_queue: mp.Queue):
"""Worker process for GPU processing"""
# Set GPU device
device = torch.device(f'cuda:{gpu_id}')
torch.cuda.set_device(gpu_id)
# Load model on specific GPU
model = torch.jit.load(model_path, map_location=device)
model.eval()
logging.info(f"Worker {worker_id} initialized on GPU {gpu_id}")
while True:
try:
# Get task from queue (blocking)
task = task_queue.get(timeout=1.0)
if task is None: # Shutdown signal
break
start_time = time.time()
# Process frames
frames = task['frames']
metadata = task['metadata']
# Create batch tensor
batch_tensor = torch.stack([
torch.from_numpy(frame).permute(2, 0, 1).float() / 255.0
for frame in frames
]).to(device)
# Run inference
with torch.no_grad():
predictions = model(batch_tensor)
# Prepare results
processing_time = time.time() - start_time
result = {
'worker_id': worker_id,
'gpu_id': gpu_id,
'predictions': predictions.cpu(),
'metadata': metadata,
'processing_time': processing_time,
'timestamp': time.time()
}
# Send result back
result_queue.put(result)
except queue.Empty:
continue
except Exception as e:
logging.error(f"Worker {worker_id} error: {e}")
continue
def result_aggregator(self):
"""Aggregate results from all GPU workers"""
while True:
try:
result = self.result_queue.get(timeout=1.0)
# Update statistics
self.stats['frames_processed'] += len(result['metadata'])
self.stats['total_processing_time'] += result['processing_time']
# Process and store results
self.process_results(result)
except queue.Empty:
continue
except Exception as e:
logging.error(f"Result aggregator error: {e}")
def process_results(self, result: Dict):
"""Process and store detection results"""
predictions = result['predictions']
metadata = result['metadata']
for i, (pred, meta) in enumerate(zip(predictions, metadata)):
# Parse detections
detections = self.parse_detections(pred, meta)
# Store in Redis with TTL
key = f"detections:{meta['stream_id']}:{meta['frame_id']}"
value = {
'detections': detections,
'metadata': meta,
'processing_stats': {
'worker_id': result['worker_id'],
'gpu_id': result['gpu_id'],
'processing_time': result['processing_time'],
'timestamp': result['timestamp']
}
}
# Store with 1 hour TTL
self.redis_client.setex(key, 3600, json.dumps(value))
# Publish real-time event
self.redis_client.publish('detections', json.dumps({
'stream_id': meta['stream_id'],
'frame_id': meta['frame_id'],
'detection_count': len(detections),
'timestamp': time.time()
}))
def submit_frames(self, frames: List[np.ndarray], metadata: List[Dict]):
"""Submit frames for processing"""
task = {
'frames': frames,
'metadata': metadata,
'submit_time': time.time()
}
try:
self.task_queue.put(task, timeout=0.1) # Non-blocking
return True
except queue.Full:
logging.warning("Task queue full, dropping frames")
return False
def get_performance_stats(self) -> Dict:
"""Get aggregated performance statistics"""
total_frames = self.stats['frames_processed']
total_time = self.stats['total_processing_time']
if total_frames == 0:
return {'fps': 0, 'avg_processing_time_ms': 0}
avg_fps = total_frames / total_time if total_time > 0 else 0
avg_processing_time = (total_time / total_frames) * 1000 # ms
# Get GPU utilization
gpu_utilization = []
for gpu_id in self.gpu_ids:
torch.cuda.set_device(gpu_id)
utilization = torch.cuda.utilization()
gpu_utilization.append(utilization)
return {
'total_frames_processed': total_frames,
'average_fps': avg_fps,
'avg_processing_time_ms': avg_processing_time,
'gpu_utilization': gpu_utilization,
'active_workers': len(self.processes)
}
def shutdown(self):
"""Gracefully shutdown all workers"""
# Send shutdown signal to all workers
for _ in self.processes:
self.task_queue.put(None)
# Wait for processes to finish
for process in self.processes:
process.join(timeout=5.0)
if process.is_alive():
process.terminate()
logging.info("All workers shut down")
# Usage example
def main():
# Initialize multi-GPU system
model_paths = ['model1.pt', 'model2.pt', 'model3.pt', 'model4.pt']
gpu_ids = [0, 1, 2, 3] # Use 4 GPUs
analytics_system = MultiGPUVideoAnalytics(model_paths, gpu_ids)
analytics_system.start_workers()
try:
# Simulate frame processing
for batch_id in range(100):
# Create dummy frames (in practice, these come from video streams)
frames = [np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8) for _ in range(8)]
metadata = [
{
'stream_id': f'stream_{i%4}',
'frame_id': batch_id * 8 + i,
'timestamp': time.time()
}
for i in range(8)
]
# Submit for processing
success = analytics_system.submit_frames(frames, metadata)
if not success:
print("Queue full, skipping batch")
# Print stats every 10 batches
if batch_id % 10 == 0:
stats = analytics_system.get_performance_stats()
print(f"Batch {batch_id}: {stats['average_fps']:.1f} FPS, "
f"GPU util: {[f'{u:.1f}%' for u in stats['gpu_utilization']]}")
time.sleep(0.1) # Simulate real-time processing
finally:
analytics_system.shutdown()
if __name__ == "__main__":
# Enable multiprocessing
mp.set_start_method('spawn')
main()🏭 Production Computer Vision Systems
T
Tesla Autopilot Vision
Multi-camera perception system
- Cameras:8 cameras, 360° view
- Processing:30 FPS real-time
- Tasks:Object detection, depth estimation
- Hardware:FSD Chip (144 TOPS)
A
Amazon Go Stores
Cashier-less shopping
- Coverage:100+ cameras per store
- Tracking:Multi-person, multi-object
- Accuracy:> 99% transaction accuracy
- Latency:< 1 second checkout
Y
YouTube Content Moderation
Video content analysis
- Scale:500+ hours uploaded/minute
- Processing:Real-time + batch analysis
- Detection:Violence, adult content, spam
- Accuracy:95%+ automated decisions
F
Facebook Photo Tagging
Face recognition & tagging
- Photos:350M+ photos/day
- Faces:Billions of face embeddings
- Accuracy:97.35% face verification
- Speed:< 1 second per photo
✅ Computer Vision Systems Best Practices
✅ Do's
- Optimize for your hardware - Use TensorRT, ONNX, or model-specific optimizations for your GPU architecture
- Implement adaptive batching - Balance latency vs throughput based on current load
- Use multi-scale detection - Process multiple resolutions for better small object detection
- Monitor GPU memory carefully - Track memory usage to prevent OOM crashes
- Implement graceful degradation - Reduce quality/accuracy under high load rather than dropping frames
❌ Don'ts
- Don't process every frame - Skip frames during high load or use temporal consistency
- Avoid CPU-GPU memory copying - Keep data on GPU as much as possible
- Don't ignore model warm-up - First inference is always slower; warm up models on startup
- Avoid blocking I/O operations - Use async processing for network and disk operations
- Don't neglect post-processing - NMS and filtering can be bottlenecks at scale
No quiz questions available
Quiz ID "computer-vision-systems" not found