🚀 LLM Serving at Scale
Serving LLMs in production requires careful consideration of architecture patterns, performance optimization, cost management, and reliability. This lesson covers enterprise-grade serving strategies from centralized APIs to distributed edge deployment.
🏗️ Architecture Patterns
- • API Gateway for centralized management
- • Microservices for specialized functions
- • Edge deployment for low latency
- • Serverless for variable workloads
⚡ Key Considerations
- • Latency and throughput requirements
- • Cost optimization strategies
- • Reliability and fault tolerance
- • Security and compliance needs
Production Reality: LLM serving costs can range from $0.002 to $0.12 per 1K tokens. Choosing the right architecture and provider can reduce costs by 90% while improving performance.
🏗️ Serving Architectures
API Gateway Pattern
Centralized API management with routing, authentication, and monitoring
✅ Benefits
- • Centralized authentication and authorization
- • Request routing and load balancing
- • Rate limiting and throttling
- • Monitoring and analytics
- • API versioning support
⚠️ Drawbacks
- • Single point of failure risk
- • Additional latency overhead
- • Complex configuration management
- • Potential bottleneck at scale
Best for: Multi-model serving, enterprise API management, cross-cutting concerns
Production Implementation
# API Gateway with FastAPI and nginx
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import httpx
import asyncio
from typing import Dict, List, Optional
import time
import json
from datetime import datetime
app = FastAPI(title="LLM API Gateway", version="1.0.0")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
security = HTTPBearer()
class LLMGateway:
def __init__(self):
# Model endpoint configurations
self.model_configs = {
"gpt-4": {
"provider": "openai",
"endpoint": "https://api.openai.com/v1/chat/completions",
"cost_per_token": 0.00003,
"rate_limit": 10000, # requests per hour
"timeout": 60
},
"claude-3": {
"provider": "anthropic",
"endpoint": "https://api.anthropic.com/v1/messages",
"cost_per_token": 0.000015,
"rate_limit": 5000,
"timeout": 30
},
"llama-2": {
"provider": "replicate",
"endpoint": "https://api.replicate.com/v1/predictions",
"cost_per_token": 0.000001,
"rate_limit": 1000,
"timeout": 120
}
}
# Rate limiting storage
self.rate_limits = {}
# Load balancing state
self.load_balancer = LoadBalancer()
# Monitoring
self.metrics = MetricsCollector()
async def route_request(self, model: str, request_data: Dict) -> Dict:
"""Route request to appropriate model endpoint"""
# Validate model
if model not in self.model_configs:
raise HTTPException(status_code=400, detail=f"Model {model} not supported")
config = self.model_configs[model]
# Apply rate limiting
if not self.check_rate_limit(model):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Get optimal endpoint (for load balancing)
endpoint = await self.load_balancer.get_endpoint(config["provider"])
# Transform request for provider
provider_request = self.transform_request(config["provider"], request_data)
# Make request with retry logic
response = await self.make_request_with_retry(
endpoint, provider_request, config["timeout"]
)
# Transform response to standard format
standardized_response = self.transform_response(config["provider"], response)
# Record metrics
self.metrics.record_request(model, len(request_data.get("prompt", "")),
len(standardized_response.get("content", "")))
return standardized_response
def check_rate_limit(self, model: str) -> bool:
"""Check if request is within rate limits"""
current_time = time.time()
hour_start = current_time - (current_time % 3600)
if model not in self.rate_limits:
self.rate_limits[model] = {}
if hour_start not in self.rate_limits[model]:
self.rate_limits[model][hour_start] = 0
config = self.model_configs[model]
if self.rate_limits[model][hour_start] >= config["rate_limit"]:
return False
self.rate_limits[model][hour_start] += 1
return True
def transform_request(self, provider: str, request_data: Dict) -> Dict:
"""Transform request to provider-specific format"""
if provider == "openai":
return {
"model": request_data.get("model", "gpt-4"),
"messages": [{"role": "user", "content": request_data["prompt"]}],
"max_tokens": request_data.get("max_tokens", 1000),
"temperature": request_data.get("temperature", 0.7)
}
elif provider == "anthropic":
return {
"model": "claude-3-sonnet-20240229",
"max_tokens": request_data.get("max_tokens", 1000),
"messages": [{"role": "user", "content": request_data["prompt"]}]
}
elif provider == "replicate":
return {
"version": "replicate-model-version",
"input": {
"prompt": request_data["prompt"],
"max_length": request_data.get("max_tokens", 1000),
"temperature": request_data.get("temperature", 0.7)
}
}
return request_data
async def make_request_with_retry(self, endpoint: str, data: Dict,
timeout: int, max_retries: int = 3) -> Dict:
"""Make HTTP request with retry logic"""
async with httpx.AsyncClient(timeout=timeout) as client:
for attempt in range(max_retries):
try:
response = await client.post(endpoint, json=data)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
if attempt == max_retries - 1:
raise HTTPException(status_code=504, detail="Request timeout")
await asyncio.sleep(2 ** attempt)
except httpx.HTTPStatusError as e:
if e.response.status_code == 429: # Rate limit
await asyncio.sleep(2 ** attempt)
else:
raise HTTPException(status_code=e.response.status_code,
detail=f"Provider error: {e.response.text}")
class LoadBalancer:
def __init__(self):
self.endpoints = {
"openai": ["https://api.openai.com/v1/chat/completions"],
"anthropic": ["https://api.anthropic.com/v1/messages"]
}
self.current_index = {}
async def get_endpoint(self, provider: str) -> str:
"""Get next endpoint using round-robin"""
if provider not in self.current_index:
self.current_index[provider] = 0
endpoints = self.endpoints[provider]
endpoint = endpoints[self.current_index[provider]]
self.current_index[provider] = (self.current_index[provider] + 1) % len(endpoints)
return endpoint
class MetricsCollector:
def __init__(self):
self.metrics = {
"requests_total": 0,
"tokens_processed": 0,
"errors_total": 0,
"latency_sum": 0,
"model_usage": {}
}
def record_request(self, model: str, input_tokens: int, output_tokens: int):
"""Record request metrics"""
self.metrics["requests_total"] += 1
self.metrics["tokens_processed"] += input_tokens + output_tokens
if model not in self.metrics["model_usage"]:
self.metrics["model_usage"][model] = {"requests": 0, "tokens": 0}
self.metrics["model_usage"][model]["requests"] += 1
self.metrics["model_usage"][model]["tokens"] += input_tokens + output_tokens
# Initialize gateway
gateway = LLMGateway()
@app.post("/v1/completions")
async def create_completion(
request: Dict,
credentials: HTTPAuthorizationCredentials = Depends(security)
):
"""Standard completion endpoint"""
# Authenticate request
if not await authenticate_token(credentials.credentials):
raise HTTPException(status_code=401, detail="Invalid authentication")
# Extract model from request
model = request.get("model", "gpt-4")
# Route to appropriate provider
response = await gateway.route_request(model, request)
return response
@app.get("/v1/models")
async def list_models():
"""List available models"""
return {
"data": [
{"id": model, "object": "model", "owned_by": config["provider"]}
for model, config in gateway.model_configs.items()
]
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
@app.get("/metrics")
async def get_metrics():
"""Prometheus-style metrics endpoint"""
return gateway.metrics.metrics
async def authenticate_token(token: str) -> bool:
"""Authenticate API token"""
# Implement your authentication logic
return token.startswith("sk-")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
🔍 Provider Comparison
OpenAI API
Models Available
Key Features
- • Function calling
- • JSON mode
- • Streaming
- • Fine-tuning
✅ Pros
- • High quality
- • Reliable API
- • Rich ecosystem
⚠️ Cons
- • Expensive
- • Rate limits
- • No on-premise
📊 Performance Benchmarks
✅ Production Deployment Checklist
Infrastructure
- ✓ Load balancing and auto-scaling configured
- ✓ Health checks and monitoring in place
- ✓ Rate limiting and DDoS protection
- ✓ SSL/TLS certificates and security
- ✓ Backup and disaster recovery plan
- ✓ CDN for global distribution
Operations
- ✓ Comprehensive logging and metrics
- ✓ Alerting for critical issues
- ✓ Cost monitoring and optimization
- ✓ Performance benchmarking
- ✓ Incident response procedures
- ✓ API versioning and backwards compatibility
🎯 Key Takeaways
Architecture Choice Matters: API Gateway for simplicity, microservices for scale, edge for latency, serverless for cost
Provider Selection: Balance cost, performance, and reliability based on your specific requirements
Performance Optimization: Implement caching, batching, and connection pooling for production efficiency
Cost Management: Monitor usage patterns and optimize for your specific workload characteristics
Reliability First: Implement comprehensive monitoring, alerting, and incident response procedures