Advanced Search Systems Architecture
Build production search systems with vector search, semantic retrieval, ranking algorithms, and distributed infrastructure
60 min read•Advanced
Not Started
Loading...
Modern Search Systems Architecture
Advanced search systems combine traditional keyword-based search with modern semantic search capabilities, serving billions of queries daily with sub-second response times. Production systems integrate multiple retrieval methods, sophisticated ranking algorithms, and distributed infrastructure to deliver highly relevant results at scale.
Key Search System Challenges
- Semantic Understanding: Moving beyond keyword matching to intent understanding
- Scale & Performance: Searching billions of documents in milliseconds
- Relevance Quality: Balancing precision, recall, and user satisfaction
- Multi-modal Search: Combining text, images, and structured data
Search System Scale Calculator
100M
10,000/s
768D
50ms
Infrastructure Requirements
Vector Index Size:286.1 GB
Memory Required:429.2 GB
QPS Capacity:200/s
Vector DB Shards:10
Elasticsearch Nodes:10
Search Quality: Good relevance with optimization
Hybrid Search Architecture
Multi-Modal Retrieval Pipeline
1. Query Processing
- • Query understanding
- • Intent classification
- • Query expansion
- • Spell correction
2. Retrieval
- • Keyword search (BM25)
- • Vector similarity
- • Faceted search
- • Geo-spatial search
3. Ranking
- • Learning to Rank
- • Feature engineering
- • Business rules
- • Personalization
4. Result Assembly
- • Result fusion
- • Diversification
- • Snippet generation
- • A/B testing
Production Hybrid Search Engine
Unified Search Service with Vector + Keyword
import asyncio
import numpy as np
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, asdict
import time
import json
import logging
from elasticsearch import AsyncElasticsearch
from sentence_transformers import SentenceTransformer
import faiss
import redis.asyncio as redis
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import torch
@dataclass
class SearchResult:
"""Individual search result with metadata"""
doc_id: str
title: str
content: str
url: str
score: float
vector_score: float
keyword_score: float
metadata: Dict[str, Any]
snippet: str
rank: int
@dataclass
class SearchResponse:
"""Complete search response"""
query: str
results: List[SearchResult]
total_found: int
search_time_ms: float
facets: Dict[str, List[Dict[str, Any]]]
suggestions: List[str]
debug_info: Optional[Dict[str, Any]] = None
class HybridSearchEngine:
"""
Production hybrid search engine combining:
- Elasticsearch for keyword search and facets
- Faiss for vector similarity search
- Neural reranking for result fusion
"""
def __init__(
self,
es_hosts: List[str] = ['localhost:9200'],
redis_url: str = 'redis://localhost:6379',
embedding_model: str = 'all-MiniLM-L6-v2',
vector_dimensions: int = 384,
faiss_index_type: str = 'IVFFlat'
):
# Elasticsearch for keyword search
self.es_client = AsyncElasticsearch(es_hosts)
# Redis for caching
self.redis_client = None
self.redis_url = redis_url
# Sentence transformer for embeddings
self.embedding_model = SentenceTransformer(embedding_model)
self.vector_dimensions = vector_dimensions
# Faiss index for vector similarity
self.faiss_index = None
self.doc_id_mapping = {} # Maps faiss indices to document IDs
self.faiss_index_type = faiss_index_type
# Learning to rank model (placeholder - would use LightGBM/XGBoost)
self.ranking_model = None
# Query processing
self.query_expander = None
self.spell_checker = None
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
async def initialize(self):
"""Initialize async components and load indices"""
self.redis_client = await redis.from_url(self.redis_url)
# Setup Elasticsearch mappings
await self._setup_elasticsearch_index()
# Load or build Faiss index
await self._load_vector_index()
self.logger.info("Search engine initialized successfully")
async def _setup_elasticsearch_index(self):
"""Create Elasticsearch index with optimized mapping"""
mapping = {
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "english",
"fields": {
"keyword": {"type": "keyword"},
"suggest": {"type": "search_as_you_type"}
}
},
"content": {
"type": "text",
"analyzer": "english",
"term_vector": "with_positions_offsets"
},
"url": {"type": "keyword"},
"category": {"type": "keyword"},
"tags": {"type": "keyword"},
"published_date": {"type": "date"},
"author": {"type": "keyword"},
"view_count": {"type": "integer"},
"embedding": {
"type": "dense_vector",
"dims": self.vector_dimensions,
"index": True,
"similarity": "cosine"
}
}
},
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"refresh_interval": "30s",
"analysis": {
"analyzer": {
"english": {
"type": "standard",
"stopwords": "_english_"
}
}
}
}
}
try:
await self.es_client.indices.create(
index="search_documents",
body=mapping,
ignore=400
)
except Exception as e:
self.logger.warning(f"ES index creation failed: {e}")
async def _load_vector_index(self):
"""Load or create Faiss vector index"""
try:
# In production, load from persistent storage
self.faiss_index = faiss.IndexIVFFlat(
faiss.IndexFlatIP(self.vector_dimensions),
self.vector_dimensions,
100 # Number of clusters
)
# Train index with sample data (in production, use real embeddings)
sample_vectors = np.random.random((1000, self.vector_dimensions)).astype('float32')
self.faiss_index.train(sample_vectors)
self.logger.info(f"Faiss index loaded with {self.faiss_index.ntotal} vectors")
except Exception as e:
self.logger.error(f"Failed to load vector index: {e}")
raise
async def search(
self,
query: str,
filters: Optional[Dict[str, Any]] = None,
limit: int = 20,
offset: int = 0,
include_debug: bool = False,
hybrid_weights: Tuple[float, float] = (0.7, 0.3) # (keyword, vector)
) -> SearchResponse:
"""
Perform hybrid search combining keyword and vector search
"""
search_start = time.time()
# Check cache first
cache_key = self._get_cache_key(query, filters, limit, offset)
cached_result = await self._get_cached_result(cache_key)
if cached_result:
return cached_result
# Process query
processed_query = await self._process_query(query)
# Parallel search execution
keyword_task = self._keyword_search(
processed_query, filters, limit * 3, offset # Get more candidates
)
vector_task = self._vector_search(
processed_query, limit * 3
)
keyword_results, vector_results = await asyncio.gather(
keyword_task, vector_task
)
# Hybrid ranking and fusion
fused_results = await self._fuse_results(
keyword_results, vector_results,
hybrid_weights, limit, offset
)
# Generate snippets and suggestions
snippets_task = self._generate_snippets(fused_results, query)
suggestions_task = self._get_query_suggestions(query)
facets_task = self._get_facets(filters)
snippets, suggestions, facets = await asyncio.gather(
snippets_task, suggestions_task, facets_task
)
# Build response
search_time_ms = (time.time() - search_start) * 1000
response = SearchResponse(
query=query,
results=fused_results[:limit],
total_found=len(fused_results),
search_time_ms=search_time_ms,
facets=facets,
suggestions=suggestions,
debug_info=self._build_debug_info(
keyword_results, vector_results, hybrid_weights
) if include_debug else None
)
# Cache result
await self._cache_result(cache_key, response)
return response
async def _process_query(self, query: str) -> str:
"""Process and enhance the input query"""
# Basic query processing
processed = query.strip().lower()
# Query expansion (simplified - would use word2vec, WordNet, etc.)
# expanded_terms = await self._expand_query(processed)
# Spell correction (simplified - would use proper spell checker)
# corrected = await self._correct_spelling(processed)
return processed
async def _keyword_search(
self,
query: str,
filters: Optional[Dict[str, Any]],
limit: int,
offset: int
) -> List[Dict[str, Any]]:
"""Elasticsearch keyword search with BM25 scoring"""
# Build query DSL
es_query = {
"query": {
"bool": {
"must": [
{
"multi_match": {
"query": query,
"fields": ["title^3", "content^1"],
"type": "best_fields",
"fuzziness": "AUTO"
}
}
],
"filter": []
}
},
"highlight": {
"fields": {
"content": {"fragment_size": 150, "number_of_fragments": 2}
}
},
"sort": ["_score"],
"size": limit,
"from": offset
}
# Add filters
if filters:
for field, value in filters.items():
if isinstance(value, list):
es_query["query"]["bool"]["filter"].append({
"terms": {field: value}
})
else:
es_query["query"]["bool"]["filter"].append({
"term": {field: value}
})
try:
response = await self.es_client.search(
index="search_documents",
body=es_query
)
results = []
for hit in response['hits']['hits']:
results.append({
'doc_id': hit['_id'],
'score': hit['_score'],
'source': hit['_source'],
'highlight': hit.get('highlight', {})
})
return results
except Exception as e:
self.logger.error(f"Keyword search failed: {e}")
return []
async def _vector_search(
self,
query: str,
limit: int
) -> List[Dict[str, Any]]:
"""Vector similarity search using Faiss"""
try:
# Generate query embedding
query_embedding = self.embedding_model.encode([query])
query_vector = query_embedding.astype('float32')
# Search in Faiss index
if self.faiss_index and self.faiss_index.ntotal > 0:
# Set search parameters
self.faiss_index.nprobe = 10 # Number of clusters to search
scores, indices = self.faiss_index.search(query_vector, limit)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx != -1 and idx in self.doc_id_mapping: # Valid result
results.append({
'doc_id': self.doc_id_mapping[idx],
'vector_score': float(score),
'faiss_index': int(idx)
})
return results
else:
self.logger.warning("Faiss index not available")
return []
except Exception as e:
self.logger.error(f"Vector search failed: {e}")
return []
async def _fuse_results(
self,
keyword_results: List[Dict[str, Any]],
vector_results: List[Dict[str, Any]],
hybrid_weights: Tuple[float, float],
limit: int,
offset: int
) -> List[SearchResult]:
"""Fuse keyword and vector search results with hybrid ranking"""
# Create document score mapping
doc_scores = {}
# Process keyword results
keyword_weight, vector_weight = hybrid_weights
max_keyword_score = max([r['score'] for r in keyword_results]) if keyword_results else 1.0
for result in keyword_results:
doc_id = result['doc_id']
normalized_score = result['score'] / max_keyword_score
doc_scores[doc_id] = {
'keyword_score': normalized_score,
'vector_score': 0.0,
'source': result['source'],
'highlight': result.get('highlight', {})
}
# Process vector results
max_vector_score = max([r['vector_score'] for r in vector_results]) if vector_results else 1.0
for result in vector_results:
doc_id = result['doc_id']
normalized_score = result['vector_score'] / max_vector_score
if doc_id in doc_scores:
doc_scores[doc_id]['vector_score'] = normalized_score
else:
# Get document from Elasticsearch
try:
doc = await self.es_client.get(
index="search_documents",
id=doc_id
)
doc_scores[doc_id] = {
'keyword_score': 0.0,
'vector_score': normalized_score,
'source': doc['_source'],
'highlight': {}
}
except:
continue
# Calculate hybrid scores and create SearchResult objects
search_results = []
for doc_id, scores in doc_scores.items():
hybrid_score = (
keyword_weight * scores['keyword_score'] +
vector_weight * scores['vector_score']
)
source = scores['source']
search_result = SearchResult(
doc_id=doc_id,
title=source.get('title', ''),
content=source.get('content', ''),
url=source.get('url', ''),
score=hybrid_score,
vector_score=scores['vector_score'],
keyword_score=scores['keyword_score'],
metadata={
'category': source.get('category'),
'published_date': source.get('published_date'),
'view_count': source.get('view_count', 0)
},
snippet='', # Will be filled by _generate_snippets
rank=0 # Will be set after sorting
)
search_results.append(search_result)
# Sort by hybrid score
search_results.sort(key=lambda x: x.score, reverse=True)
# Set ranks
for i, result in enumerate(search_results):
result.rank = i + 1 + offset
return search_results
async def _generate_snippets(
self,
results: List[SearchResult],
query: str
) -> List[SearchResult]:
"""Generate relevant snippets for search results"""
# Simple snippet generation (in production, use more sophisticated methods)
query_terms = query.lower().split()
for result in results:
content = result.content.lower()
# Find best sentence containing query terms
sentences = content.split('.')
best_sentence = ""
max_matches = 0
for sentence in sentences:
matches = sum(1 for term in query_terms if term in sentence)
if matches > max_matches:
max_matches = matches
best_sentence = sentence.strip()
# Create snippet (max 200 chars)
if best_sentence:
snippet = best_sentence[:200] + "..." if len(best_sentence) > 200 else best_sentence
result.snippet = snippet
else:
result.snippet = result.content[:200] + "..." if len(result.content) > 200 else result.content
return results
async def _get_query_suggestions(self, query: str) -> List[str]:
"""Get query suggestions and corrections"""
# Simple suggestions (in production, use completion suggester)
try:
suggest_query = {
"suggest": {
"query_suggestion": {
"text": query,
"term": {
"field": "title"
}
}
}
}
response = await self.es_client.search(
index="search_documents",
body=suggest_query
)
suggestions = []
if 'suggest' in response:
for suggestion in response['suggest']['query_suggestion']:
for option in suggestion['options']:
suggestions.append(option['text'])
return suggestions[:5]
except Exception as e:
self.logger.error(f"Query suggestions failed: {e}")
return []
async def _get_facets(self, filters: Optional[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
"""Get search facets for filtering"""
try:
agg_query = {
"size": 0,
"aggs": {
"categories": {
"terms": {"field": "category", "size": 10}
},
"authors": {
"terms": {"field": "author", "size": 10}
},
"tags": {
"terms": {"field": "tags", "size": 15}
}
}
}
response = await self.es_client.search(
index="search_documents",
body=agg_query
)
facets = {}
for facet_name, aggregation in response['aggregations'].items():
facets[facet_name] = [
{"value": bucket["key"], "count": bucket["doc_count"]}
for bucket in aggregation["buckets"]
]
return facets
except Exception as e:
self.logger.error(f"Facets retrieval failed: {e}")
return {}
def _get_cache_key(
self,
query: str,
filters: Optional[Dict[str, Any]],
limit: int,
offset: int
) -> str:
"""Generate cache key for search results"""
import hashlib
cache_data = {
'query': query,
'filters': filters or {},
'limit': limit,
'offset': offset
}
cache_string = json.dumps(cache_data, sort_keys=True)
return f"search:{hashlib.md5(cache_string.encode()).hexdigest()}"
async def _get_cached_result(self, cache_key: str) -> Optional[SearchResponse]:
"""Get cached search result"""
try:
if self.redis_client:
cached_data = await self.redis_client.get(cache_key)
if cached_data:
data = json.loads(cached_data)
# Convert back to SearchResponse (simplified)
return SearchResponse(**data)
except Exception as e:
self.logger.warning(f"Cache retrieval failed: {e}")
return None
async def _cache_result(self, cache_key: str, response: SearchResponse):
"""Cache search result"""
try:
if self.redis_client:
# Convert to dict for JSON serialization
data = asdict(response)
await self.redis_client.setex(
cache_key,
300, # 5 minute cache
json.dumps(data, default=str)
)
except Exception as e:
self.logger.warning(f"Cache storage failed: {e}")
def _build_debug_info(
self,
keyword_results: List[Dict[str, Any]],
vector_results: List[Dict[str, Any]],
hybrid_weights: Tuple[float, float]
) -> Dict[str, Any]:
"""Build debug information for search results"""
return {
'keyword_results_count': len(keyword_results),
'vector_results_count': len(vector_results),
'hybrid_weights': {
'keyword': hybrid_weights[0],
'vector': hybrid_weights[1]
},
'search_strategy': 'hybrid_bm25_vector'
}Search Analytics & Optimization
Search Quality Monitoring System
import asyncio
import json
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from collections import defaultdict, deque
import numpy as np
from kafka import KafkaConsumer, KafkaProducer
import redis.asyncio as redis
from elasticsearch import AsyncElasticsearch
import logging
@dataclass
class SearchMetric:
"""Search quality and performance metrics"""
query: str
results_returned: int
search_time_ms: float
click_through_rate: float
zero_results_rate: float
user_satisfaction: Optional[float]
timestamp: float
session_id: str
user_id: Optional[str]
@dataclass
class QueryPerformance:
"""Aggregated query performance stats"""
query: str
frequency: int
avg_latency: float
avg_ctr: float
zero_results_rate: float
user_satisfaction: float
improvement_suggestions: List[str]
class SearchAnalyticsSystem:
"""
Real-time search analytics and optimization system
Monitors search quality, performance, and user satisfaction
"""
def __init__(
self,
kafka_servers: List[str] = ['localhost:9092'],
redis_url: str = 'redis://localhost:6379',
es_hosts: List[str] = ['localhost:9200']
):
# Kafka for streaming analytics
self.consumer = KafkaConsumer(
'search_events',
'click_events',
'user_feedback',
bootstrap_servers=kafka_servers,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=kafka_servers,
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# Redis for real-time metrics
self.redis_client = None
self.redis_url = redis_url
# Elasticsearch for analytics data
self.es_client = AsyncElasticsearch(es_hosts)
# Real-time metrics storage
self.query_metrics = defaultdict(lambda: {
'count': 0,
'total_latency': 0,
'clicks': 0,
'zero_results': 0,
'satisfaction_scores': []
})
# Sliding windows for performance tracking
self.latency_window = deque(maxlen=1000)
self.ctr_window = deque(maxlen=1000)
# A/B test tracking
self.ab_test_results = defaultdict(lambda: {
'variant_a': {'queries': 0, 'satisfaction': []},
'variant_b': {'queries': 0, 'satisfaction': []}
})
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
async def initialize(self):
"""Initialize analytics system"""
self.redis_client = await redis.from_url(self.redis_url)
# Setup ES index for analytics
await self._setup_analytics_index()
# Start background processors
asyncio.create_task(self._process_search_events())
asyncio.create_task(self._generate_insights())
self.logger.info("Search analytics system initialized")
async def _setup_analytics_index(self):
"""Create Elasticsearch index for search analytics"""
mapping = {
"mappings": {
"properties": {
"query": {"type": "keyword"},
"normalized_query": {"type": "keyword"},
"results_returned": {"type": "integer"},
"search_time_ms": {"type": "float"},
"user_clicked": {"type": "boolean"},
"clicked_position": {"type": "integer"},
"session_id": {"type": "keyword"},
"user_id": {"type": "keyword"},
"timestamp": {"type": "date"},
"search_type": {"type": "keyword"},
"ab_test_variant": {"type": "keyword"},
"user_satisfaction": {"type": "float"},
"query_category": {"type": "keyword"}
}
},
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
}
}
try:
await self.es_client.indices.create(
index="search_analytics",
body=mapping,
ignore=400
)
except Exception as e:
self.logger.warning(f"Analytics index creation failed: {e}")
async def _process_search_events(self):
"""Process streaming search events"""
self.logger.info("Starting search events processor...")
for message in self.consumer:
try:
event = message.value
event_type = message.topic
if event_type == 'search_events':
await self._process_search_event(event)
elif event_type == 'click_events':
await self._process_click_event(event)
elif event_type == 'user_feedback':
await self._process_feedback_event(event)
except Exception as e:
self.logger.error(f"Event processing error: {e}")
continue
async def _process_search_event(self, event: Dict[str, Any]):
"""Process individual search event"""
query = event['query'].lower().strip()
results_count = event['results_returned']
latency = event['search_time_ms']
session_id = event['session_id']
# Update real-time metrics
self.query_metrics[query]['count'] += 1
self.query_metrics[query]['total_latency'] += latency
if results_count == 0:
self.query_metrics[query]['zero_results'] += 1
# Update sliding windows
self.latency_window.append(latency)
# Store in Redis for real-time dashboards
await self._update_redis_metrics(query, event)
# Store in Elasticsearch for historical analysis
await self._store_search_analytics(event)
async def _process_click_event(self, event: Dict[str, Any]):
"""Process click events to calculate CTR"""
query = event['query'].lower().strip()
clicked_position = event.get('clicked_position', -1)
# Update click metrics
self.query_metrics[query]['clicks'] += 1
# Calculate position-based CTR
ctr = 1.0 / (clicked_position + 1) if clicked_position >= 0 else 0
self.ctr_window.append(ctr)
# Update Redis
await self._update_click_metrics(query, event)
async def _process_feedback_event(self, event: Dict[str, Any]):
"""Process user satisfaction feedback"""
query = event['query'].lower().strip()
satisfaction = event.get('satisfaction_score', 0) # 0-5 scale
self.query_metrics[query]['satisfaction_scores'].append(satisfaction)
# Update A/B test results if applicable
ab_variant = event.get('ab_test_variant')
if ab_variant:
self.ab_test_results[query][f'variant_{ab_variant}']['satisfaction'].append(satisfaction)
async def _update_redis_metrics(self, query: str, event: Dict[str, Any]):
"""Update real-time metrics in Redis"""
try:
pipe = self.redis_client.pipeline()
# Query frequency
pipe.incr(f"query_count:{query}")
# Average latency (using exponential moving average)
current_avg = await self.redis_client.get(f"query_latency:{query}")
if current_avg:
new_avg = 0.9 * float(current_avg) + 0.1 * event['search_time_ms']
else:
new_avg = event['search_time_ms']
pipe.set(f"query_latency:{query}", new_avg)
# Zero results tracking
if event['results_returned'] == 0:
pipe.incr(f"zero_results:{query}")
# Hourly volume
hour_key = f"hourly_searches:{int(time.time() // 3600)}"
pipe.incr(hour_key)
pipe.expire(hour_key, 86400)
await pipe.execute()
except Exception as e:
self.logger.error(f"Redis metrics update failed: {e}")
async def _update_click_metrics(self, query: str, event: Dict[str, Any]):
"""Update click-through rate metrics"""
try:
# Calculate and store CTR
searches = self.query_metrics[query]['count']
clicks = self.query_metrics[query]['clicks']
ctr = clicks / searches if searches > 0 else 0
await self.redis_client.set(f"query_ctr:{query}", ctr)
except Exception as e:
self.logger.error(f"CTR metrics update failed: {e}")
async def _store_search_analytics(self, event: Dict[str, Any]):
"""Store search event in Elasticsearch for analysis"""
try:
await self.es_client.index(
index="search_analytics",
body={
**event,
'normalized_query': event['query'].lower().strip(),
'timestamp': time.time()
}
)
except Exception as e:
self.logger.error(f"ES analytics storage failed: {e}")
async def _generate_insights(self):
"""Generate periodic search insights and recommendations"""
while True:
try:
await asyncio.sleep(300) # Every 5 minutes
insights = await self._analyze_search_patterns()
poor_performers = await self._identify_poor_performing_queries()
recommendations = await self._generate_recommendations(poor_performers)
# Publish insights
await self._publish_insights({
'timestamp': time.time(),
'insights': insights,
'poor_performers': poor_performers,
'recommendations': recommendations
})
self.logger.info("Search insights generated and published")
except Exception as e:
self.logger.error(f"Insights generation failed: {e}")
async def _analyze_search_patterns(self) -> Dict[str, Any]:
"""Analyze search patterns and trends"""
# Current system performance
avg_latency = np.mean(self.latency_window) if self.latency_window else 0
avg_ctr = np.mean(self.ctr_window) if self.ctr_window else 0
# Top queries by volume
top_queries = sorted(
[(q, metrics['count']) for q, metrics in self.query_metrics.items()],
key=lambda x: x[1],
reverse=True
)[:10]
# Zero results analysis
zero_results_queries = [
q for q, metrics in self.query_metrics.items()
if metrics['zero_results'] / max(1, metrics['count']) > 0.5
]
return {
'avg_latency_ms': avg_latency,
'avg_ctr': avg_ctr,
'total_unique_queries': len(self.query_metrics),
'top_queries': top_queries,
'high_zero_results': zero_results_queries[:5]
}
async def _identify_poor_performing_queries(self) -> List[QueryPerformance]:
"""Identify queries with poor performance metrics"""
poor_performers = []
for query, metrics in self.query_metrics.items():
if metrics['count'] < 5: # Skip low-volume queries
continue
avg_latency = metrics['total_latency'] / metrics['count']
ctr = metrics['clicks'] / metrics['count'] if metrics['count'] > 0 else 0
zero_rate = metrics['zero_results'] / metrics['count']
avg_satisfaction = np.mean(metrics['satisfaction_scores']) if metrics['satisfaction_scores'] else 2.5
# Identify poor performance
is_poor = (
avg_latency > 200 or # High latency
ctr < 0.1 or # Low CTR
zero_rate > 0.3 or # High zero results
avg_satisfaction < 2.0 # Low satisfaction
)
if is_poor:
suggestions = []
if avg_latency > 200:
suggestions.append("Optimize query processing")
if ctr < 0.1:
suggestions.append("Improve result relevance")
if zero_rate > 0.3:
suggestions.append("Expand index coverage or add query suggestions")
if avg_satisfaction < 2.0:
suggestions.append("Review ranking algorithm")
poor_performers.append(QueryPerformance(
query=query,
frequency=metrics['count'],
avg_latency=avg_latency,
avg_ctr=ctr,
zero_results_rate=zero_rate,
user_satisfaction=avg_satisfaction,
improvement_suggestions=suggestions
))
return sorted(poor_performers, key=lambda x: x.frequency, reverse=True)
async def _generate_recommendations(
self,
poor_performers: List[QueryPerformance]
) -> List[Dict[str, Any]]:
"""Generate actionable recommendations for search improvement"""
recommendations = []
# High latency queries
high_latency = [p for p in poor_performers if p.avg_latency > 200]
if high_latency:
recommendations.append({
'type': 'performance',
'priority': 'high',
'title': 'Optimize High Latency Queries',
'description': f'{len(high_latency)} queries have >200ms latency',
'queries': [p.query for p in high_latency[:5]],
'actions': [
'Review index sharding strategy',
'Optimize query structure',
'Consider result caching',
'Upgrade hardware resources'
]
})
# Zero results queries
zero_results = [p for p in poor_performers if p.zero_results_rate > 0.5]
if zero_results:
recommendations.append({
'type': 'coverage',
'priority': 'high',
'title': 'Address Zero Results Queries',
'description': f'{len(zero_results)} queries return no results >50% of time',
'queries': [p.query for p in zero_results[:5]],
'actions': [
'Expand document index',
'Implement query relaxation',
'Add spell correction',
'Provide query suggestions'
]
})
# Low engagement queries
low_ctr = [p for p in poor_performers if p.avg_ctr < 0.05]
if low_ctr:
recommendations.append({
'type': 'relevance',
'priority': 'medium',
'title': 'Improve Result Relevance',
'description': f'{len(low_ctr)} queries have very low click-through rates',
'queries': [p.query for p in low_ctr[:5]],
'actions': [
'Retrain ranking model',
'Adjust result weights',
'Improve snippet generation',
'A/B test different algorithms'
]
})
return recommendations
async def _publish_insights(self, insights: Dict[str, Any]):
"""Publish insights to downstream systems"""
try:
self.producer.send('search_insights', value=insights)
# Also store in Redis for dashboards
await self.redis_client.setex(
'latest_search_insights',
3600,
json.dumps(insights, default=str)
)
except Exception as e:
self.logger.error(f"Insights publishing failed: {e}")
async def get_real_time_dashboard_data(self) -> Dict[str, Any]:
"""Get current search system health for dashboards"""
try:
current_hour = int(time.time() // 3600)
# Get metrics from Redis
pipe = self.redis_client.pipeline()
pipe.get(f"hourly_searches:{current_hour}")
pipe.get(f"hourly_searches:{current_hour - 1}")
pipe.get("latest_search_insights")
results = await pipe.execute()
dashboard_data = {
'current_hour_searches': int(results[0] or 0),
'previous_hour_searches': int(results[1] or 0),
'system_health': 'healthy', # Would calculate based on error rates
'avg_latency': np.mean(self.latency_window) if self.latency_window else 0,
'avg_ctr': np.mean(self.ctr_window) if self.ctr_window else 0,
'active_queries': len(self.query_metrics),
'insights': json.loads(results[2]) if results[2] else {}
}
return dashboard_data
except Exception as e:
self.logger.error(f"Dashboard data retrieval failed: {e}")
return {'error': str(e)}Production Search Systems
Google
Web Search Infrastructure
- Scale: 8.5B searches/day, 130 trillion web pages indexed
- Architecture: Distributed crawling + inverted index + PageRank
- Latency: <200ms for billions of pages search
- Infrastructure: Thousands of data centers globally
- Innovation: RankBrain ML for query understanding
- Features: Knowledge graphs, featured snippets, autocomplete
Elasticsearch
Distributed Search Engine
- Scale: Petabyte-scale deployments, millions of queries/sec
- Architecture: Lucene-based with distributed sharding
- Latency: Sub-second search across massive datasets
- Use Cases: Log analytics, e-commerce, enterprise search
- Features: Full-text search, aggregations, real-time analytics
- Clients: Netflix, GitHub, Uber, Stack Overflow
Pinterest
Visual Search System
- Scale: 450M+ users, billions of pins searched monthly
- Architecture: CNN embeddings + approximate nearest neighbor
- Latency: <100ms for visual similarity search
- Innovation: Multi-modal search (image + text)
- Infrastructure: Custom GPU clusters for embedding generation
- Features: Visual search, shop the look, try-on AR
Shopify
E-commerce Search Platform
- Scale: 2M+ merchants, billions of product searches
- Architecture: Hybrid search with personalization
- Latency: <50ms for product discovery
- Features: Auto-complete, filters, recommendations
- ML Models: Query understanding, ranking, personalization
- Business Impact: 30% of sales from search traffic
Search System Best Practices
✅ Do
- •Implement hybrid search - Combine keyword and vector search for best relevance
- •Monitor search analytics - Track CTR, zero results, and user satisfaction
- •Cache aggressively - Popular queries and expensive computations
- •A/B test ranking algorithms - Measure business metrics, not just precision/recall
- •Provide query suggestions - Help users refine searches and reduce zero results
❌ Don't
- •Rely only on keyword search - Modern users expect semantic understanding
- •Ignore query performance - Slow searches hurt user experience significantly
- •Over-optimize for precision - Sometimes showing more results is better than perfect ones
- •Forget about zero results - High zero-result rates indicate index or algorithm problems
- •Skip spell correction - User typos are extremely common in search queries
No quiz questions available
Quiz ID "advanced-search-systems" not found