Skip to main contentSkip to user menuSkip to navigation

Advanced Search Systems Architecture

Build production search systems with vector search, semantic retrieval, ranking algorithms, and distributed infrastructure

60 min readAdvanced
Not Started
Loading...

Modern Search Systems Architecture

Advanced search systems combine traditional keyword-based search with modern semantic search capabilities, serving billions of queries daily with sub-second response times. Production systems integrate multiple retrieval methods, sophisticated ranking algorithms, and distributed infrastructure to deliver highly relevant results at scale.

Key Search System Challenges

  • Semantic Understanding: Moving beyond keyword matching to intent understanding
  • Scale & Performance: Searching billions of documents in milliseconds
  • Relevance Quality: Balancing precision, recall, and user satisfaction
  • Multi-modal Search: Combining text, images, and structured data

Search System Scale Calculator

100M
10,000/s
768D
50ms

Infrastructure Requirements

Vector Index Size:286.1 GB
Memory Required:429.2 GB
QPS Capacity:200/s
Vector DB Shards:10
Elasticsearch Nodes:10

Search Quality: Good relevance with optimization

Hybrid Search Architecture

Multi-Modal Retrieval Pipeline

1. Query Processing

  • • Query understanding
  • • Intent classification
  • • Query expansion
  • • Spell correction

2. Retrieval

  • • Keyword search (BM25)
  • • Vector similarity
  • • Faceted search
  • • Geo-spatial search

3. Ranking

  • • Learning to Rank
  • • Feature engineering
  • • Business rules
  • • Personalization

4. Result Assembly

  • • Result fusion
  • • Diversification
  • • Snippet generation
  • • A/B testing

Production Hybrid Search Engine

Unified Search Service with Vector + Keyword

import asyncio
import numpy as np
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, asdict
import time
import json
import logging
from elasticsearch import AsyncElasticsearch
from sentence_transformers import SentenceTransformer
import faiss
import redis.asyncio as redis
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import torch

@dataclass
class SearchResult:
    """Individual search result with metadata"""
    doc_id: str
    title: str
    content: str
    url: str
    score: float
    vector_score: float
    keyword_score: float
    metadata: Dict[str, Any]
    snippet: str
    rank: int

@dataclass
class SearchResponse:
    """Complete search response"""
    query: str
    results: List[SearchResult]
    total_found: int
    search_time_ms: float
    facets: Dict[str, List[Dict[str, Any]]]
    suggestions: List[str]
    debug_info: Optional[Dict[str, Any]] = None

class HybridSearchEngine:
    """
    Production hybrid search engine combining:
    - Elasticsearch for keyword search and facets
    - Faiss for vector similarity search  
    - Neural reranking for result fusion
    """
    
    def __init__(
        self,
        es_hosts: List[str] = ['localhost:9200'],
        redis_url: str = 'redis://localhost:6379',
        embedding_model: str = 'all-MiniLM-L6-v2',
        vector_dimensions: int = 384,
        faiss_index_type: str = 'IVFFlat'
    ):
        # Elasticsearch for keyword search
        self.es_client = AsyncElasticsearch(es_hosts)
        
        # Redis for caching
        self.redis_client = None
        self.redis_url = redis_url
        
        # Sentence transformer for embeddings
        self.embedding_model = SentenceTransformer(embedding_model)
        self.vector_dimensions = vector_dimensions
        
        # Faiss index for vector similarity
        self.faiss_index = None
        self.doc_id_mapping = {}  # Maps faiss indices to document IDs
        self.faiss_index_type = faiss_index_type
        
        # Learning to rank model (placeholder - would use LightGBM/XGBoost)
        self.ranking_model = None
        
        # Query processing
        self.query_expander = None
        self.spell_checker = None
        
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    async def initialize(self):
        """Initialize async components and load indices"""
        self.redis_client = await redis.from_url(self.redis_url)
        
        # Setup Elasticsearch mappings
        await self._setup_elasticsearch_index()
        
        # Load or build Faiss index
        await self._load_vector_index()
        
        self.logger.info("Search engine initialized successfully")
    
    async def _setup_elasticsearch_index(self):
        """Create Elasticsearch index with optimized mapping"""
        mapping = {
            "mappings": {
                "properties": {
                    "title": {
                        "type": "text",
                        "analyzer": "english",
                        "fields": {
                            "keyword": {"type": "keyword"},
                            "suggest": {"type": "search_as_you_type"}
                        }
                    },
                    "content": {
                        "type": "text", 
                        "analyzer": "english",
                        "term_vector": "with_positions_offsets"
                    },
                    "url": {"type": "keyword"},
                    "category": {"type": "keyword"},
                    "tags": {"type": "keyword"},
                    "published_date": {"type": "date"},
                    "author": {"type": "keyword"},
                    "view_count": {"type": "integer"},
                    "embedding": {
                        "type": "dense_vector",
                        "dims": self.vector_dimensions,
                        "index": True,
                        "similarity": "cosine"
                    }
                }
            },
            "settings": {
                "number_of_shards": 3,
                "number_of_replicas": 1,
                "refresh_interval": "30s",
                "analysis": {
                    "analyzer": {
                        "english": {
                            "type": "standard",
                            "stopwords": "_english_"
                        }
                    }
                }
            }
        }
        
        try:
            await self.es_client.indices.create(
                index="search_documents",
                body=mapping,
                ignore=400
            )
        except Exception as e:
            self.logger.warning(f"ES index creation failed: {e}")
    
    async def _load_vector_index(self):
        """Load or create Faiss vector index"""
        try:
            # In production, load from persistent storage
            self.faiss_index = faiss.IndexIVFFlat(
                faiss.IndexFlatIP(self.vector_dimensions),
                self.vector_dimensions,
                100  # Number of clusters
            )
            
            # Train index with sample data (in production, use real embeddings)
            sample_vectors = np.random.random((1000, self.vector_dimensions)).astype('float32')
            self.faiss_index.train(sample_vectors)
            
            self.logger.info(f"Faiss index loaded with {self.faiss_index.ntotal} vectors")
            
        except Exception as e:
            self.logger.error(f"Failed to load vector index: {e}")
            raise
    
    async def search(
        self,
        query: str,
        filters: Optional[Dict[str, Any]] = None,
        limit: int = 20,
        offset: int = 0,
        include_debug: bool = False,
        hybrid_weights: Tuple[float, float] = (0.7, 0.3)  # (keyword, vector)
    ) -> SearchResponse:
        """
        Perform hybrid search combining keyword and vector search
        """
        search_start = time.time()
        
        # Check cache first
        cache_key = self._get_cache_key(query, filters, limit, offset)
        cached_result = await self._get_cached_result(cache_key)
        if cached_result:
            return cached_result
        
        # Process query
        processed_query = await self._process_query(query)
        
        # Parallel search execution
        keyword_task = self._keyword_search(
            processed_query, filters, limit * 3, offset  # Get more candidates
        )
        vector_task = self._vector_search(
            processed_query, limit * 3
        )
        
        keyword_results, vector_results = await asyncio.gather(
            keyword_task, vector_task
        )
        
        # Hybrid ranking and fusion
        fused_results = await self._fuse_results(
            keyword_results, vector_results, 
            hybrid_weights, limit, offset
        )
        
        # Generate snippets and suggestions
        snippets_task = self._generate_snippets(fused_results, query)
        suggestions_task = self._get_query_suggestions(query)
        facets_task = self._get_facets(filters)
        
        snippets, suggestions, facets = await asyncio.gather(
            snippets_task, suggestions_task, facets_task
        )
        
        # Build response
        search_time_ms = (time.time() - search_start) * 1000
        
        response = SearchResponse(
            query=query,
            results=fused_results[:limit],
            total_found=len(fused_results),
            search_time_ms=search_time_ms,
            facets=facets,
            suggestions=suggestions,
            debug_info=self._build_debug_info(
                keyword_results, vector_results, hybrid_weights
            ) if include_debug else None
        )
        
        # Cache result
        await self._cache_result(cache_key, response)
        
        return response
    
    async def _process_query(self, query: str) -> str:
        """Process and enhance the input query"""
        # Basic query processing
        processed = query.strip().lower()
        
        # Query expansion (simplified - would use word2vec, WordNet, etc.)
        # expanded_terms = await self._expand_query(processed)
        
        # Spell correction (simplified - would use proper spell checker)
        # corrected = await self._correct_spelling(processed)
        
        return processed
    
    async def _keyword_search(
        self,
        query: str,
        filters: Optional[Dict[str, Any]],
        limit: int,
        offset: int
    ) -> List[Dict[str, Any]]:
        """Elasticsearch keyword search with BM25 scoring"""
        
        # Build query DSL
        es_query = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "multi_match": {
                                "query": query,
                                "fields": ["title^3", "content^1"],
                                "type": "best_fields",
                                "fuzziness": "AUTO"
                            }
                        }
                    ],
                    "filter": []
                }
            },
            "highlight": {
                "fields": {
                    "content": {"fragment_size": 150, "number_of_fragments": 2}
                }
            },
            "sort": ["_score"],
            "size": limit,
            "from": offset
        }
        
        # Add filters
        if filters:
            for field, value in filters.items():
                if isinstance(value, list):
                    es_query["query"]["bool"]["filter"].append({
                        "terms": {field: value}
                    })
                else:
                    es_query["query"]["bool"]["filter"].append({
                        "term": {field: value}
                    })
        
        try:
            response = await self.es_client.search(
                index="search_documents",
                body=es_query
            )
            
            results = []
            for hit in response['hits']['hits']:
                results.append({
                    'doc_id': hit['_id'],
                    'score': hit['_score'],
                    'source': hit['_source'],
                    'highlight': hit.get('highlight', {})
                })
            
            return results
            
        except Exception as e:
            self.logger.error(f"Keyword search failed: {e}")
            return []
    
    async def _vector_search(
        self,
        query: str,
        limit: int
    ) -> List[Dict[str, Any]]:
        """Vector similarity search using Faiss"""
        
        try:
            # Generate query embedding
            query_embedding = self.embedding_model.encode([query])
            query_vector = query_embedding.astype('float32')
            
            # Search in Faiss index
            if self.faiss_index and self.faiss_index.ntotal > 0:
                # Set search parameters
                self.faiss_index.nprobe = 10  # Number of clusters to search
                
                scores, indices = self.faiss_index.search(query_vector, limit)
                
                results = []
                for score, idx in zip(scores[0], indices[0]):
                    if idx != -1 and idx in self.doc_id_mapping:  # Valid result
                        results.append({
                            'doc_id': self.doc_id_mapping[idx],
                            'vector_score': float(score),
                            'faiss_index': int(idx)
                        })
                
                return results
            else:
                self.logger.warning("Faiss index not available")
                return []
                
        except Exception as e:
            self.logger.error(f"Vector search failed: {e}")
            return []
    
    async def _fuse_results(
        self,
        keyword_results: List[Dict[str, Any]],
        vector_results: List[Dict[str, Any]],
        hybrid_weights: Tuple[float, float],
        limit: int,
        offset: int
    ) -> List[SearchResult]:
        """Fuse keyword and vector search results with hybrid ranking"""
        
        # Create document score mapping
        doc_scores = {}
        
        # Process keyword results
        keyword_weight, vector_weight = hybrid_weights
        max_keyword_score = max([r['score'] for r in keyword_results]) if keyword_results else 1.0
        
        for result in keyword_results:
            doc_id = result['doc_id']
            normalized_score = result['score'] / max_keyword_score
            doc_scores[doc_id] = {
                'keyword_score': normalized_score,
                'vector_score': 0.0,
                'source': result['source'],
                'highlight': result.get('highlight', {})
            }
        
        # Process vector results  
        max_vector_score = max([r['vector_score'] for r in vector_results]) if vector_results else 1.0
        
        for result in vector_results:
            doc_id = result['doc_id']
            normalized_score = result['vector_score'] / max_vector_score
            
            if doc_id in doc_scores:
                doc_scores[doc_id]['vector_score'] = normalized_score
            else:
                # Get document from Elasticsearch
                try:
                    doc = await self.es_client.get(
                        index="search_documents",
                        id=doc_id
                    )
                    doc_scores[doc_id] = {
                        'keyword_score': 0.0,
                        'vector_score': normalized_score,
                        'source': doc['_source'],
                        'highlight': {}
                    }
                except:
                    continue
        
        # Calculate hybrid scores and create SearchResult objects
        search_results = []
        
        for doc_id, scores in doc_scores.items():
            hybrid_score = (
                keyword_weight * scores['keyword_score'] + 
                vector_weight * scores['vector_score']
            )
            
            source = scores['source']
            
            search_result = SearchResult(
                doc_id=doc_id,
                title=source.get('title', ''),
                content=source.get('content', ''),
                url=source.get('url', ''),
                score=hybrid_score,
                vector_score=scores['vector_score'],
                keyword_score=scores['keyword_score'],
                metadata={
                    'category': source.get('category'),
                    'published_date': source.get('published_date'),
                    'view_count': source.get('view_count', 0)
                },
                snippet='',  # Will be filled by _generate_snippets
                rank=0  # Will be set after sorting
            )
            
            search_results.append(search_result)
        
        # Sort by hybrid score
        search_results.sort(key=lambda x: x.score, reverse=True)
        
        # Set ranks
        for i, result in enumerate(search_results):
            result.rank = i + 1 + offset
        
        return search_results
    
    async def _generate_snippets(
        self,
        results: List[SearchResult],
        query: str
    ) -> List[SearchResult]:
        """Generate relevant snippets for search results"""
        
        # Simple snippet generation (in production, use more sophisticated methods)
        query_terms = query.lower().split()
        
        for result in results:
            content = result.content.lower()
            
            # Find best sentence containing query terms
            sentences = content.split('.')
            best_sentence = ""
            max_matches = 0
            
            for sentence in sentences:
                matches = sum(1 for term in query_terms if term in sentence)
                if matches > max_matches:
                    max_matches = matches
                    best_sentence = sentence.strip()
            
            # Create snippet (max 200 chars)
            if best_sentence:
                snippet = best_sentence[:200] + "..." if len(best_sentence) > 200 else best_sentence
                result.snippet = snippet
            else:
                result.snippet = result.content[:200] + "..." if len(result.content) > 200 else result.content
        
        return results
    
    async def _get_query_suggestions(self, query: str) -> List[str]:
        """Get query suggestions and corrections"""
        
        # Simple suggestions (in production, use completion suggester)
        try:
            suggest_query = {
                "suggest": {
                    "query_suggestion": {
                        "text": query,
                        "term": {
                            "field": "title"
                        }
                    }
                }
            }
            
            response = await self.es_client.search(
                index="search_documents",
                body=suggest_query
            )
            
            suggestions = []
            if 'suggest' in response:
                for suggestion in response['suggest']['query_suggestion']:
                    for option in suggestion['options']:
                        suggestions.append(option['text'])
            
            return suggestions[:5]
            
        except Exception as e:
            self.logger.error(f"Query suggestions failed: {e}")
            return []
    
    async def _get_facets(self, filters: Optional[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
        """Get search facets for filtering"""
        
        try:
            agg_query = {
                "size": 0,
                "aggs": {
                    "categories": {
                        "terms": {"field": "category", "size": 10}
                    },
                    "authors": {
                        "terms": {"field": "author", "size": 10}
                    },
                    "tags": {
                        "terms": {"field": "tags", "size": 15}
                    }
                }
            }
            
            response = await self.es_client.search(
                index="search_documents",
                body=agg_query
            )
            
            facets = {}
            for facet_name, aggregation in response['aggregations'].items():
                facets[facet_name] = [
                    {"value": bucket["key"], "count": bucket["doc_count"]}
                    for bucket in aggregation["buckets"]
                ]
            
            return facets
            
        except Exception as e:
            self.logger.error(f"Facets retrieval failed: {e}")
            return {}
    
    def _get_cache_key(
        self,
        query: str,
        filters: Optional[Dict[str, Any]],
        limit: int,
        offset: int
    ) -> str:
        """Generate cache key for search results"""
        import hashlib
        
        cache_data = {
            'query': query,
            'filters': filters or {},
            'limit': limit,
            'offset': offset
        }
        
        cache_string = json.dumps(cache_data, sort_keys=True)
        return f"search:{hashlib.md5(cache_string.encode()).hexdigest()}"
    
    async def _get_cached_result(self, cache_key: str) -> Optional[SearchResponse]:
        """Get cached search result"""
        try:
            if self.redis_client:
                cached_data = await self.redis_client.get(cache_key)
                if cached_data:
                    data = json.loads(cached_data)
                    # Convert back to SearchResponse (simplified)
                    return SearchResponse(**data)
        except Exception as e:
            self.logger.warning(f"Cache retrieval failed: {e}")
        
        return None
    
    async def _cache_result(self, cache_key: str, response: SearchResponse):
        """Cache search result"""
        try:
            if self.redis_client:
                # Convert to dict for JSON serialization
                data = asdict(response)
                await self.redis_client.setex(
                    cache_key,
                    300,  # 5 minute cache
                    json.dumps(data, default=str)
                )
        except Exception as e:
            self.logger.warning(f"Cache storage failed: {e}")
    
    def _build_debug_info(
        self,
        keyword_results: List[Dict[str, Any]],
        vector_results: List[Dict[str, Any]],
        hybrid_weights: Tuple[float, float]
    ) -> Dict[str, Any]:
        """Build debug information for search results"""
        return {
            'keyword_results_count': len(keyword_results),
            'vector_results_count': len(vector_results),
            'hybrid_weights': {
                'keyword': hybrid_weights[0],
                'vector': hybrid_weights[1]
            },
            'search_strategy': 'hybrid_bm25_vector'
        }

Search Analytics & Optimization

Search Quality Monitoring System

import asyncio
import json
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from collections import defaultdict, deque
import numpy as np
from kafka import KafkaConsumer, KafkaProducer
import redis.asyncio as redis
from elasticsearch import AsyncElasticsearch
import logging

@dataclass
class SearchMetric:
    """Search quality and performance metrics"""
    query: str
    results_returned: int
    search_time_ms: float
    click_through_rate: float
    zero_results_rate: float
    user_satisfaction: Optional[float]
    timestamp: float
    session_id: str
    user_id: Optional[str]

@dataclass
class QueryPerformance:
    """Aggregated query performance stats"""
    query: str
    frequency: int
    avg_latency: float
    avg_ctr: float
    zero_results_rate: float
    user_satisfaction: float
    improvement_suggestions: List[str]

class SearchAnalyticsSystem:
    """
    Real-time search analytics and optimization system
    Monitors search quality, performance, and user satisfaction
    """
    
    def __init__(
        self,
        kafka_servers: List[str] = ['localhost:9092'],
        redis_url: str = 'redis://localhost:6379',
        es_hosts: List[str] = ['localhost:9200']
    ):
        # Kafka for streaming analytics
        self.consumer = KafkaConsumer(
            'search_events',
            'click_events',
            'user_feedback',
            bootstrap_servers=kafka_servers,
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))
        )
        
        self.producer = KafkaProducer(
            bootstrap_servers=kafka_servers,
            value_serializer=lambda x: json.dumps(x).encode('utf-8')
        )
        
        # Redis for real-time metrics
        self.redis_client = None
        self.redis_url = redis_url
        
        # Elasticsearch for analytics data
        self.es_client = AsyncElasticsearch(es_hosts)
        
        # Real-time metrics storage
        self.query_metrics = defaultdict(lambda: {
            'count': 0,
            'total_latency': 0,
            'clicks': 0,
            'zero_results': 0,
            'satisfaction_scores': []
        })
        
        # Sliding windows for performance tracking
        self.latency_window = deque(maxlen=1000)
        self.ctr_window = deque(maxlen=1000)
        
        # A/B test tracking
        self.ab_test_results = defaultdict(lambda: {
            'variant_a': {'queries': 0, 'satisfaction': []},
            'variant_b': {'queries': 0, 'satisfaction': []}
        })
        
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    async def initialize(self):
        """Initialize analytics system"""
        self.redis_client = await redis.from_url(self.redis_url)
        
        # Setup ES index for analytics
        await self._setup_analytics_index()
        
        # Start background processors
        asyncio.create_task(self._process_search_events())
        asyncio.create_task(self._generate_insights())
        
        self.logger.info("Search analytics system initialized")
    
    async def _setup_analytics_index(self):
        """Create Elasticsearch index for search analytics"""
        mapping = {
            "mappings": {
                "properties": {
                    "query": {"type": "keyword"},
                    "normalized_query": {"type": "keyword"},
                    "results_returned": {"type": "integer"},
                    "search_time_ms": {"type": "float"},
                    "user_clicked": {"type": "boolean"},
                    "clicked_position": {"type": "integer"},
                    "session_id": {"type": "keyword"},
                    "user_id": {"type": "keyword"},
                    "timestamp": {"type": "date"},
                    "search_type": {"type": "keyword"},
                    "ab_test_variant": {"type": "keyword"},
                    "user_satisfaction": {"type": "float"},
                    "query_category": {"type": "keyword"}
                }
            },
            "settings": {
                "number_of_shards": 2,
                "number_of_replicas": 1
            }
        }
        
        try:
            await self.es_client.indices.create(
                index="search_analytics",
                body=mapping,
                ignore=400
            )
        except Exception as e:
            self.logger.warning(f"Analytics index creation failed: {e}")
    
    async def _process_search_events(self):
        """Process streaming search events"""
        self.logger.info("Starting search events processor...")
        
        for message in self.consumer:
            try:
                event = message.value
                event_type = message.topic
                
                if event_type == 'search_events':
                    await self._process_search_event(event)
                elif event_type == 'click_events':
                    await self._process_click_event(event)
                elif event_type == 'user_feedback':
                    await self._process_feedback_event(event)
                    
            except Exception as e:
                self.logger.error(f"Event processing error: {e}")
                continue
    
    async def _process_search_event(self, event: Dict[str, Any]):
        """Process individual search event"""
        query = event['query'].lower().strip()
        results_count = event['results_returned']
        latency = event['search_time_ms']
        session_id = event['session_id']
        
        # Update real-time metrics
        self.query_metrics[query]['count'] += 1
        self.query_metrics[query]['total_latency'] += latency
        
        if results_count == 0:
            self.query_metrics[query]['zero_results'] += 1
        
        # Update sliding windows
        self.latency_window.append(latency)
        
        # Store in Redis for real-time dashboards
        await self._update_redis_metrics(query, event)
        
        # Store in Elasticsearch for historical analysis
        await self._store_search_analytics(event)
    
    async def _process_click_event(self, event: Dict[str, Any]):
        """Process click events to calculate CTR"""
        query = event['query'].lower().strip()
        clicked_position = event.get('clicked_position', -1)
        
        # Update click metrics
        self.query_metrics[query]['clicks'] += 1
        
        # Calculate position-based CTR
        ctr = 1.0 / (clicked_position + 1) if clicked_position >= 0 else 0
        self.ctr_window.append(ctr)
        
        # Update Redis
        await self._update_click_metrics(query, event)
    
    async def _process_feedback_event(self, event: Dict[str, Any]):
        """Process user satisfaction feedback"""
        query = event['query'].lower().strip()
        satisfaction = event.get('satisfaction_score', 0)  # 0-5 scale
        
        self.query_metrics[query]['satisfaction_scores'].append(satisfaction)
        
        # Update A/B test results if applicable
        ab_variant = event.get('ab_test_variant')
        if ab_variant:
            self.ab_test_results[query][f'variant_{ab_variant}']['satisfaction'].append(satisfaction)
    
    async def _update_redis_metrics(self, query: str, event: Dict[str, Any]):
        """Update real-time metrics in Redis"""
        try:
            pipe = self.redis_client.pipeline()
            
            # Query frequency
            pipe.incr(f"query_count:{query}")
            
            # Average latency (using exponential moving average)
            current_avg = await self.redis_client.get(f"query_latency:{query}")
            if current_avg:
                new_avg = 0.9 * float(current_avg) + 0.1 * event['search_time_ms']
            else:
                new_avg = event['search_time_ms']
            
            pipe.set(f"query_latency:{query}", new_avg)
            
            # Zero results tracking
            if event['results_returned'] == 0:
                pipe.incr(f"zero_results:{query}")
            
            # Hourly volume
            hour_key = f"hourly_searches:{int(time.time() // 3600)}"
            pipe.incr(hour_key)
            pipe.expire(hour_key, 86400)
            
            await pipe.execute()
            
        except Exception as e:
            self.logger.error(f"Redis metrics update failed: {e}")
    
    async def _update_click_metrics(self, query: str, event: Dict[str, Any]):
        """Update click-through rate metrics"""
        try:
            # Calculate and store CTR
            searches = self.query_metrics[query]['count']
            clicks = self.query_metrics[query]['clicks']
            ctr = clicks / searches if searches > 0 else 0
            
            await self.redis_client.set(f"query_ctr:{query}", ctr)
            
        except Exception as e:
            self.logger.error(f"CTR metrics update failed: {e}")
    
    async def _store_search_analytics(self, event: Dict[str, Any]):
        """Store search event in Elasticsearch for analysis"""
        try:
            await self.es_client.index(
                index="search_analytics",
                body={
                    **event,
                    'normalized_query': event['query'].lower().strip(),
                    'timestamp': time.time()
                }
            )
        except Exception as e:
            self.logger.error(f"ES analytics storage failed: {e}")
    
    async def _generate_insights(self):
        """Generate periodic search insights and recommendations"""
        while True:
            try:
                await asyncio.sleep(300)  # Every 5 minutes
                
                insights = await self._analyze_search_patterns()
                poor_performers = await self._identify_poor_performing_queries()
                recommendations = await self._generate_recommendations(poor_performers)
                
                # Publish insights
                await self._publish_insights({
                    'timestamp': time.time(),
                    'insights': insights,
                    'poor_performers': poor_performers,
                    'recommendations': recommendations
                })
                
                self.logger.info("Search insights generated and published")
                
            except Exception as e:
                self.logger.error(f"Insights generation failed: {e}")
    
    async def _analyze_search_patterns(self) -> Dict[str, Any]:
        """Analyze search patterns and trends"""
        
        # Current system performance
        avg_latency = np.mean(self.latency_window) if self.latency_window else 0
        avg_ctr = np.mean(self.ctr_window) if self.ctr_window else 0
        
        # Top queries by volume
        top_queries = sorted(
            [(q, metrics['count']) for q, metrics in self.query_metrics.items()],
            key=lambda x: x[1],
            reverse=True
        )[:10]
        
        # Zero results analysis
        zero_results_queries = [
            q for q, metrics in self.query_metrics.items()
            if metrics['zero_results'] / max(1, metrics['count']) > 0.5
        ]
        
        return {
            'avg_latency_ms': avg_latency,
            'avg_ctr': avg_ctr,
            'total_unique_queries': len(self.query_metrics),
            'top_queries': top_queries,
            'high_zero_results': zero_results_queries[:5]
        }
    
    async def _identify_poor_performing_queries(self) -> List[QueryPerformance]:
        """Identify queries with poor performance metrics"""
        poor_performers = []
        
        for query, metrics in self.query_metrics.items():
            if metrics['count'] < 5:  # Skip low-volume queries
                continue
            
            avg_latency = metrics['total_latency'] / metrics['count']
            ctr = metrics['clicks'] / metrics['count'] if metrics['count'] > 0 else 0
            zero_rate = metrics['zero_results'] / metrics['count']
            
            avg_satisfaction = np.mean(metrics['satisfaction_scores']) if metrics['satisfaction_scores'] else 2.5
            
            # Identify poor performance
            is_poor = (
                avg_latency > 200 or  # High latency
                ctr < 0.1 or         # Low CTR
                zero_rate > 0.3 or   # High zero results
                avg_satisfaction < 2.0  # Low satisfaction
            )
            
            if is_poor:
                suggestions = []
                if avg_latency > 200:
                    suggestions.append("Optimize query processing")
                if ctr < 0.1:
                    suggestions.append("Improve result relevance")
                if zero_rate > 0.3:
                    suggestions.append("Expand index coverage or add query suggestions")
                if avg_satisfaction < 2.0:
                    suggestions.append("Review ranking algorithm")
                
                poor_performers.append(QueryPerformance(
                    query=query,
                    frequency=metrics['count'],
                    avg_latency=avg_latency,
                    avg_ctr=ctr,
                    zero_results_rate=zero_rate,
                    user_satisfaction=avg_satisfaction,
                    improvement_suggestions=suggestions
                ))
        
        return sorted(poor_performers, key=lambda x: x.frequency, reverse=True)
    
    async def _generate_recommendations(
        self,
        poor_performers: List[QueryPerformance]
    ) -> List[Dict[str, Any]]:
        """Generate actionable recommendations for search improvement"""
        
        recommendations = []
        
        # High latency queries
        high_latency = [p for p in poor_performers if p.avg_latency > 200]
        if high_latency:
            recommendations.append({
                'type': 'performance',
                'priority': 'high',
                'title': 'Optimize High Latency Queries',
                'description': f'{len(high_latency)} queries have >200ms latency',
                'queries': [p.query for p in high_latency[:5]],
                'actions': [
                    'Review index sharding strategy',
                    'Optimize query structure',
                    'Consider result caching',
                    'Upgrade hardware resources'
                ]
            })
        
        # Zero results queries
        zero_results = [p for p in poor_performers if p.zero_results_rate > 0.5]
        if zero_results:
            recommendations.append({
                'type': 'coverage',
                'priority': 'high',
                'title': 'Address Zero Results Queries',
                'description': f'{len(zero_results)} queries return no results >50% of time',
                'queries': [p.query for p in zero_results[:5]],
                'actions': [
                    'Expand document index',
                    'Implement query relaxation',
                    'Add spell correction',
                    'Provide query suggestions'
                ]
            })
        
        # Low engagement queries
        low_ctr = [p for p in poor_performers if p.avg_ctr < 0.05]
        if low_ctr:
            recommendations.append({
                'type': 'relevance',
                'priority': 'medium',
                'title': 'Improve Result Relevance',
                'description': f'{len(low_ctr)} queries have very low click-through rates',
                'queries': [p.query for p in low_ctr[:5]],
                'actions': [
                    'Retrain ranking model',
                    'Adjust result weights',
                    'Improve snippet generation',
                    'A/B test different algorithms'
                ]
            })
        
        return recommendations
    
    async def _publish_insights(self, insights: Dict[str, Any]):
        """Publish insights to downstream systems"""
        try:
            self.producer.send('search_insights', value=insights)
            
            # Also store in Redis for dashboards
            await self.redis_client.setex(
                'latest_search_insights',
                3600,
                json.dumps(insights, default=str)
            )
            
        except Exception as e:
            self.logger.error(f"Insights publishing failed: {e}")
    
    async def get_real_time_dashboard_data(self) -> Dict[str, Any]:
        """Get current search system health for dashboards"""
        try:
            current_hour = int(time.time() // 3600)
            
            # Get metrics from Redis
            pipe = self.redis_client.pipeline()
            pipe.get(f"hourly_searches:{current_hour}")
            pipe.get(f"hourly_searches:{current_hour - 1}")
            pipe.get("latest_search_insights")
            
            results = await pipe.execute()
            
            dashboard_data = {
                'current_hour_searches': int(results[0] or 0),
                'previous_hour_searches': int(results[1] or 0),
                'system_health': 'healthy',  # Would calculate based on error rates
                'avg_latency': np.mean(self.latency_window) if self.latency_window else 0,
                'avg_ctr': np.mean(self.ctr_window) if self.ctr_window else 0,
                'active_queries': len(self.query_metrics),
                'insights': json.loads(results[2]) if results[2] else {}
            }
            
            return dashboard_data
            
        except Exception as e:
            self.logger.error(f"Dashboard data retrieval failed: {e}")
            return {'error': str(e)}

Production Search Systems

Google

Web Search Infrastructure

  • Scale: 8.5B searches/day, 130 trillion web pages indexed
  • Architecture: Distributed crawling + inverted index + PageRank
  • Latency: <200ms for billions of pages search
  • Infrastructure: Thousands of data centers globally
  • Innovation: RankBrain ML for query understanding
  • Features: Knowledge graphs, featured snippets, autocomplete
Elasticsearch

Distributed Search Engine

  • Scale: Petabyte-scale deployments, millions of queries/sec
  • Architecture: Lucene-based with distributed sharding
  • Latency: Sub-second search across massive datasets
  • Use Cases: Log analytics, e-commerce, enterprise search
  • Features: Full-text search, aggregations, real-time analytics
  • Clients: Netflix, GitHub, Uber, Stack Overflow
Pinterest

Visual Search System

  • Scale: 450M+ users, billions of pins searched monthly
  • Architecture: CNN embeddings + approximate nearest neighbor
  • Latency: <100ms for visual similarity search
  • Innovation: Multi-modal search (image + text)
  • Infrastructure: Custom GPU clusters for embedding generation
  • Features: Visual search, shop the look, try-on AR
Shopify

E-commerce Search Platform

  • Scale: 2M+ merchants, billions of product searches
  • Architecture: Hybrid search with personalization
  • Latency: <50ms for product discovery
  • Features: Auto-complete, filters, recommendations
  • ML Models: Query understanding, ranking, personalization
  • Business Impact: 30% of sales from search traffic

Search System Best Practices

✅ Do

  • Implement hybrid search - Combine keyword and vector search for best relevance
  • Monitor search analytics - Track CTR, zero results, and user satisfaction
  • Cache aggressively - Popular queries and expensive computations
  • A/B test ranking algorithms - Measure business metrics, not just precision/recall
  • Provide query suggestions - Help users refine searches and reduce zero results

❌ Don't

  • Rely only on keyword search - Modern users expect semantic understanding
  • Ignore query performance - Slow searches hurt user experience significantly
  • Over-optimize for precision - Sometimes showing more results is better than perfect ones
  • Forget about zero results - High zero-result rates indicate index or algorithm problems
  • Skip spell correction - User typos are extremely common in search queries
No quiz questions available
Quiz ID "advanced-search-systems" not found