Gmail Smart Compose Architecture
Design Gmail Smart Compose system: real-time inference, context processing, personalization, and production deployment at Google scale
Gmail Smart Compose System Overview
Gmail Smart Compose serves over 1.5 billion users with real-time email completion suggestions. The system must provide contextually relevant, personalized suggestions within 100ms while maintaining privacy and supporting multiple languages.
Scale Requirements
- • 1.5B+ users globally
- • 100+ languages supported
- • <100ms latency target
- • 99.9% uptime requirement
- • Multi-region deployment
Technical Challenges
- • Real-time inference at scale
- • Context understanding across emails
- • Personalization without compromising privacy
- • Multi-modal input (text + metadata)
- • Quality consistency across languages
Business Impact
- • 20% faster email composition
- • Improved UX and engagement
- • Reduced typos and errors
- • Accessibility benefits
- • Competitive advantage in productivity
System Features Deep Dive
Real-time Inference Architecture
Achieving sub-100ms latency for 1.5B+ users requires sophisticated optimization across the entire stack, from model architecture to infrastructure deployment.
Latency Breakdown:
- • Network latency: 10-20ms (regional deployment)
- • Context processing: 5-10ms (optimized parsing)
- • Model inference: 30-50ms (batched execution)
- • Post-processing: 5-15ms (ranking, filtering)
- • Total target: <100ms end-to-end
class RealTimeInferenceEngine:
def __init__(self, config):
self.model = self.load_optimized_model()
self.request_batcher = RequestBatcher(
max_batch_size=32,
max_latency_ms=50 # Strict latency requirement
)
self.context_cache = ContextCache(ttl_ms=5000)
async def generate_suggestion(self, context: EmailContext) -> List[Suggestion]:
"""Generate suggestion with strict latency requirements"""
start_time = time.time()
# Fast path: check cache first
cache_key = self.create_cache_key(context)
cached_result = await self.context_cache.get(cache_key)
if cached_result:
return cached_result
# Batch inference for efficiency
result = await self.request_batcher.process(
context,
timeout_ms=80 # Leave 20ms buffer for processing
)
# Cache result for similar contexts
await self.context_cache.set(cache_key, result)
total_latency = (time.time() - start_time) * 1000
if total_latency > 100:
logger.warning(f"Latency SLA violated: {total_latency:.1f}ms")
return result
def load_optimized_model(self):
"""Load model with aggressive optimizations"""
# Use distilled model for speed
model = AutoModelForCausalLM.from_pretrained(
"gmail-smart-compose-distilled-v3",
torch_dtype=torch.float16, # FP16 for speed
trust_remote_code=True
)
# Apply optimizations
model = torch.compile(model, mode="reduce-overhead")
model.eval()
# Warm up with typical inputs
self.warmup_model(model)
return model
def warmup_model(self, model):
"""Warm up model to avoid cold start latency"""
typical_contexts = [
"Thank you for",
"I hope this email finds you well",
"Please let me know if",
"Looking forward to",
"Best regards"
]
for context in typical_contexts:
dummy_input = self.tokenizer.encode(context, return_tensors='pt')
with torch.inference_mode():
_ = model.generate(dummy_input, max_new_tokens=5, do_sample=False)
class RequestBatcher:
"""Intelligent request batching with latency constraints"""
def __init__(self, max_batch_size: int = 32, max_latency_ms: int = 50):
self.max_batch_size = max_batch_size
self.max_latency_ms = max_latency_ms
self.pending_requests = []
self.processing = False
# Start batching loop
asyncio.create_task(self.batching_loop())
async def process(self, context: EmailContext, timeout_ms: int = 80) -> List[Suggestion]:
"""Add request to batch and wait for result"""
future = asyncio.Future()
request = BatchRequest(
context=context,
future=future,
timestamp=time.time()
)
self.pending_requests.append(request)
try:
return await asyncio.wait_for(future, timeout=timeout_ms/1000)
except asyncio.TimeoutError:
# Remove from pending requests
if request in self.pending_requests:
self.pending_requests.remove(request)
raise TimeoutError(f"Request timed out after {timeout_ms}ms")
async def batching_loop(self):
"""Main batching loop with strict latency requirements"""
while True:
if not self.pending_requests or self.processing:
await asyncio.sleep(0.001) # 1ms check interval
continue
# Check if we should process batch now
oldest_age_ms = (time.time() - self.pending_requests[0].timestamp) * 1000
should_process = (
len(self.pending_requests) >= self.max_batch_size or
oldest_age_ms >= self.max_latency_ms
)
if should_process:
await self.process_batch()
async def process_batch(self):
"""Process current batch with optimized inference"""
self.processing = True
batch = self.pending_requests[:self.max_batch_size]
self.pending_requests = self.pending_requests[self.max_batch_size:]
try:
# Prepare batch inputs
batch_contexts = [req.context for req in batch]
batch_inputs = self.prepare_batch_inputs(batch_contexts)
# Run inference
with torch.inference_mode():
batch_outputs = self.model.generate(
**batch_inputs,
max_new_tokens=10,
num_return_sequences=3,
do_sample=True,
temperature=0.7,
pad_token_id=self.model.config.eos_token_id
)
# Parse outputs and return results
for i, request in enumerate(batch):
output_tokens = batch_outputs[i*3:(i+1)*3] # 3 suggestions per request
suggestions = self.decode_suggestions(output_tokens, request.context)
request.future.set_result(suggestions)
except Exception as e:
# Return error to all requests in batch
for request in batch:
request.future.set_exception(e)
finally:
self.processing = FalseProduction Architecture
Input Processing
Context extraction, email parsing
Model Inference
Real-time transformer inference
Personalization
User adaptation, style matching
Response Delivery
Suggestion ranking, UI integration
class SmartComposeService:
def __init__(self, config):
# Core models
self.completion_model = self.load_completion_model()
self.context_encoder = ContextEncoder()
self.personalization_engine = PersonalizationEngine()
self.quality_filter = QualityFilter()
# Infrastructure
self.cache_manager = DistributedCache()
self.feature_store = FeatureStore()
self.metrics_collector = MetricsCollector()
# Performance optimization
self.batch_processor = BatchProcessor(max_latency_ms=50)
self.model_cache = ModelCache()
async def generate_suggestions(self,
email_context: EmailContext,
user_context: UserContext,
cursor_position: int) -> List[Suggestion]:
"""Generate real-time email completion suggestions"""
request_start = time.time()
try:
# Step 1: Context processing (5-10ms)
processed_context = await self.process_context(
email_context, user_context, cursor_position
)
# Step 2: Feature extraction (5-10ms)
features = await self.extract_features(processed_context)
# Step 3: Model inference (30-50ms)
raw_suggestions = await self.generate_completions(
processed_context, features
)
# Step 4: Personalization (5-10ms)
personalized_suggestions = await self.personalization_engine.adapt(
raw_suggestions, user_context
)
# Step 5: Quality filtering (5-10ms)
filtered_suggestions = await self.quality_filter.filter(
personalized_suggestions, processed_context
)
# Step 6: Ranking and selection (5ms)
final_suggestions = self.rank_suggestions(
filtered_suggestions, features
)
# Record metrics
total_latency = time.time() - request_start
await self.record_request_metrics(total_latency, len(final_suggestions))
return final_suggestions[:3] # Return top 3 suggestions
except Exception as e:
# Graceful degradation
await self.record_error(e)
return [] # Return empty suggestions on error
async def process_context(self,
email_context: EmailContext,
user_context: UserContext,
cursor_position: int) -> ProcessedContext:
"""Process email and user context for completion"""
# Extract email thread context
thread_context = await self.extract_thread_context(email_context)
# Parse current email state
current_email = self.parse_current_email(
email_context.current_draft,
cursor_position
)
# Extract relevant user patterns
user_patterns = await self.feature_store.get_user_features(
user_context.user_id,
features=['writing_style', 'common_phrases', 'language_preference']
)
return ProcessedContext(
thread_context=thread_context,
current_email=current_email,
user_patterns=user_patterns,
recipient_context=email_context.recipients,
timestamp=email_context.timestamp
)
async def generate_completions(self,
context: ProcessedContext,
features: Dict) -> List[RawSuggestion]:
"""Generate completion suggestions using transformer model"""
# Prepare model input
model_input = self.prepare_model_input(context, features)
# Check cache first
cache_key = self.create_cache_key(model_input)
cached_result = await self.cache_manager.get(cache_key)
if cached_result:
return cached_result
# Generate suggestions with batching for efficiency
suggestions = await self.batch_processor.process_request(
model_input,
generation_params={
'max_tokens': 20,
'temperature': 0.3,
'top_p': 0.9,
'num_return_sequences': 5
}
)
# Cache results
await self.cache_manager.set(
cache_key,
suggestions,
ttl_seconds=300 # 5 minute cache
)
return suggestions
def prepare_model_input(self,
context: ProcessedContext,
features: Dict) -> ModelInput:
"""Prepare input for completion model"""
# Create context template
context_template = self.build_context_template(context)
# Add special tokens
input_text = f"""
<thread_context>{context.thread_context}</thread_context>
<current_email>{context.current_email.text_before_cursor}</current_email>
<recipients>{', '.join(context.recipient_context)}</recipients>
<user_style>{context.user_patterns.get('writing_style', 'formal')}</user_style>
<continue>
"""
# Tokenize and prepare for model
tokens = self.completion_model.tokenizer.encode(
input_text,
max_length=512, # Keep context manageable
truncation=True,
return_tensors='pt'
)
return ModelInput(
input_ids=tokens,
attention_mask=torch.ones_like(tokens),
context_features=features
)
class BatchProcessor:
"""Efficient batch processing for real-time inference"""
def __init__(self, max_latency_ms: int = 50, max_batch_size: int = 32):
self.max_latency_ms = max_latency_ms
self.max_batch_size = max_batch_size
self.pending_requests = []
self.processing = False
# Start processing loop
asyncio.create_task(self.processing_loop())
async def process_request(self, model_input: ModelInput,
generation_params: Dict) -> List[RawSuggestion]:
"""Add request to batch processing queue"""
result_future = asyncio.Future()
self.pending_requests.append(BatchRequest(
model_input=model_input,
generation_params=generation_params,
result_future=result_future,
timestamp=time.time()
))
return await result_future
async def processing_loop(self):
"""Main batch processing loop"""
while True:
if not self.pending_requests:
await asyncio.sleep(0.001)
continue
# Check if we should process now
oldest_request_age = time.time() - self.pending_requests[0].timestamp
should_process = (
len(self.pending_requests) >= self.max_batch_size or
oldest_request_age * 1000 > self.max_latency_ms
)
if should_process and not self.processing:
await self.process_batch()
async def process_batch(self):
"""Process current batch of requests"""
if not self.pending_requests:
return
self.processing = True
try:
# Extract batch
batch = self.pending_requests[:self.max_batch_size]
self.pending_requests = self.pending_requests[self.max_batch_size:]
# Process batch
batch_results = await self.run_inference_batch(batch)
# Return results
for request, result in zip(batch, batch_results):
request.result_future.set_result(result)
except Exception as e:
# Return errors to all requests
for request in batch:
request.result_future.set_exception(e)
finally:
self.processing = FalsePerformance Optimization Strategies
Latency Optimization
- Model Optimization:
Quantization, pruning, knowledge distillation
- Caching Strategy:
Multi-level caching: context, features, completions
- Batch Processing:
Dynamic batching with latency constraints
- Edge Deployment:
Regional model deployment for reduced latency
Quality Assurance
class QualityMetricsSystem:
def __init__(self):
self.metrics = {
'acceptance_rate': RollingAverage(window=1000),
'suggestion_relevance': RollingAverage(window=1000),
'user_satisfaction': RollingAverage(window=100),
'completion_accuracy': RollingAverage(window=1000)
}
async def track_suggestion_usage(self,
suggestion_id: str,
user_action: str,
context: Dict):
"""Track how users interact with suggestions"""
if user_action == 'accepted':
self.metrics['acceptance_rate'].add(1.0)
# Track completion accuracy
await self.measure_completion_accuracy(
suggestion_id, context
)
elif user_action == 'rejected':
self.metrics['acceptance_rate'].add(0.0)
elif user_action == 'modified':
# Partial acceptance
self.metrics['acceptance_rate'].add(0.5)
# Analyze modification patterns
await self.analyze_modification_patterns(
suggestion_id, context
)
async def measure_completion_accuracy(self,
suggestion_id: str,
context: Dict):
"""Measure how accurately suggestions match user intent"""
# Get the actual completion user typed
actual_completion = context.get('actual_text', '')
predicted_completion = context.get('suggestion_text', '')
# Calculate various accuracy metrics
edit_distance = self.calculate_edit_distance(
predicted_completion, actual_completion
)
semantic_similarity = await self.calculate_semantic_similarity(
predicted_completion, actual_completion
)
# Record metrics
accuracy_score = 1.0 - (edit_distance / max(len(actual_completion), 1))
self.metrics['completion_accuracy'].add(accuracy_score)
# Store detailed metrics
await self.store_detailed_metrics({
'suggestion_id': suggestion_id,
'edit_distance': edit_distance,
'semantic_similarity': semantic_similarity,
'accuracy_score': accuracy_score,
'context_length': len(context.get('email_context', '')),
'suggestion_length': len(predicted_completion)
})Privacy & Security Architecture
Data Protection
- • Email content never stored
- • Ephemeral context processing
- • Encrypted data transmission
- • Regular security audits
On-Device Processing
- • Lightweight models for mobile
- • Federated learning updates
- • Local personalization
- • Offline capability
Compliance
- • GDPR compliance
- • CCPA requirements
- • Industry regulations
- • User consent management