Skip to main contentSkip to user menuSkip to navigation

NLP Systems Architecture & Scaling

Build production NLP systems with transformer serving, distributed text processing, and real-time language model inference

55 min readAdvanced
Not Started
Loading...

Production NLP Systems at Scale

Modern NLP systems power critical applications from search engines to customer support chatbots, processing billions of text documents daily. Production systems must handle massive text corpora, serve transformer models efficiently, and maintain low latency for real-time applications.

Core NLP System Challenges

  • Model Size vs Latency: Balancing large language models with response time requirements
  • Text Processing Pipeline: Efficient tokenization, preprocessing, and feature extraction
  • Multi-language Support: Handling diverse languages and encoding schemes
  • Contextual Understanding: Maintaining context across long documents and conversations

NLP System Performance Calculator

1,000/s
7B
32
512

System Requirements

Tokens/Second:16,666.667
GPU Memory/Model:14.3 GB
Inference Latency:50 ms
Throughput:1,200/min
GPUs Needed:1 x A100

Recommendation: Single GPU deployment feasible

NLP System Architecture

Multi-Stage Text Processing Pipeline

1. Ingestion

  • • Document parsing
  • • Format normalization
  • • Language detection
  • • Quality filtering

2. Preprocessing

  • • Tokenization
  • • Text cleaning
  • • Sentence splitting
  • • Encoding

3. Model Inference

  • • Transformer serving
  • • Batch processing
  • • Distributed inference
  • • Result aggregation

4. Post-processing

  • • Result ranking
  • • Output formatting
  • • Quality scoring
  • • Caching

High-Performance Transformer Serving

Optimized Model Serving with TensorRT and vLLM

import torch
import asyncio
import numpy as np
from transformers import AutoTokenizer, AutoModel
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
from vllm.utils import random_uuid
from typing import List, Dict, Optional, AsyncGenerator
import time
import logging
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import uvloop
import aioredis

@dataclass
class TextBatch:
    """Batch of text for efficient processing"""
    texts: List[str]
    batch_id: str
    created_at: float
    max_tokens: int = 512
    temperature: float = 0.7

class DistributedTransformerService:
    """
    High-performance distributed transformer serving system
    Supports multiple models, batching, and horizontal scaling
    """
    
    def __init__(
        self,
        model_name: str = "meta-llama/Llama-2-7b-hf",
        tensor_parallel_size: int = 1,
        max_model_len: int = 4096,
        gpu_memory_utilization: float = 0.9,
        max_batch_size: int = 256,
        max_wait_ms: int = 50
    ):
        self.model_name = model_name
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        
        # Initialize vLLM async engine for efficient serving
        engine_args = AsyncEngineArgs(
            model=model_name,
            tensor_parallel_size=tensor_parallel_size,
            dtype="float16",  # Use FP16 for memory efficiency
            max_model_len=max_model_len,
            gpu_memory_utilization=gpu_memory_utilization,
            disable_log_stats=False,
            quantization="awq",  # AWQ quantization for faster inference
        )
        
        self.engine = AsyncLLMEngine.from_engine_args(engine_args)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        
        # Request batching
        self.pending_requests = []
        self.request_futures = {}
        
        # Redis for distributed caching
        self.redis_client = None
        
        # Thread pool for CPU preprocessing
        self.executor = ThreadPoolExecutor(max_workers=8)
        
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    async def initialize(self):
        """Initialize async components"""
        self.redis_client = await aioredis.from_url("redis://localhost:6379")
        
        # Start background batch processor
        asyncio.create_task(self._batch_processor())
    
    def preprocess_text(self, text: str) -> str:
        """CPU-intensive text preprocessing"""
        # Clean and normalize text
        text = text.strip()
        
        # Remove excessive whitespace
        text = ' '.join(text.split())
        
        # Truncate if too long (leave room for generation)
        if len(text) > 3000:  # Conservative estimate for token count
            text = text[:3000] + "..."
        
        return text
    
    async def generate_text_batch(
        self,
        prompts: List[str],
        max_tokens: int = 256,
        temperature: float = 0.7,
        top_p: float = 0.9,
        use_cache: bool = True
    ) -> List[str]:
        """
        Generate text for multiple prompts efficiently
        """
        # Check cache first
        if use_cache:
            cached_results = await self._check_cache(prompts)
            if cached_results:
                return cached_results
        
        # Preprocess all prompts in parallel
        loop = asyncio.get_event_loop()
        preprocessed = await asyncio.gather(*[
            loop.run_in_executor(self.executor, self.preprocess_text, prompt)
            for prompt in prompts
        ])
        
        # Create sampling parameters
        sampling_params = SamplingParams(
            temperature=temperature,
            top_p=top_p,
            max_tokens=max_tokens,
            skip_special_tokens=True,
        )
        
        # Generate request IDs
        request_ids = [random_uuid() for _ in preprocessed]
        
        # Submit to vLLM engine
        results = []
        async for request_output in self.engine.generate(
            inputs=preprocessed,
            sampling_params=sampling_params,
            request_id=request_ids[0] if len(request_ids) == 1 else None
        ):
            if request_output.finished:
                generated_text = request_output.outputs[0].text
                results.append(generated_text)
        
        # Cache results
        if use_cache:
            await self._cache_results(prompts, results)
        
        return results
    
    async def _check_cache(self, prompts: List[str]) -> Optional[List[str]]:
        """Check Redis cache for existing results"""
        if not self.redis_client:
            return None
        
        cache_keys = [f"nlp:prompt:{hash(prompt)}" for prompt in prompts]
        
        try:
            cached_values = await self.redis_client.mget(cache_keys)
            
            # Return cached results if all prompts are cached
            if all(val is not None for val in cached_values):
                return [val.decode('utf-8') for val in cached_values]
        except Exception as e:
            self.logger.warning(f"Cache check failed: {e}")
        
        return None
    
    async def _cache_results(self, prompts: List[str], results: List[str]):
        """Cache generation results"""
        if not self.redis_client:
            return
        
        try:
            cache_data = {
                f"nlp:prompt:{hash(prompt)}": result
                for prompt, result in zip(prompts, results)
            }
            
            await self.redis_client.mset(cache_data)
            
            # Set expiration (1 hour)
            for key in cache_data.keys():
                await self.redis_client.expire(key, 3600)
                
        except Exception as e:
            self.logger.warning(f"Cache write failed: {e}")
    
    async def _batch_processor(self):
        """Background task to process batched requests"""
        while True:
            if len(self.pending_requests) >= self.max_batch_size:
                # Process full batch immediately
                batch = self.pending_requests[:self.max_batch_size]
                self.pending_requests = self.pending_requests[self.max_batch_size:]
                
                await self._process_batch(batch)
                
            elif len(self.pending_requests) > 0:
                # Wait for more requests or timeout
                await asyncio.sleep(self.max_wait_ms / 1000)
                
                if self.pending_requests:
                    batch = self.pending_requests.copy()
                    self.pending_requests.clear()
                    await self._process_batch(batch)
            else:
                # No pending requests, wait
                await asyncio.sleep(0.01)
    
    async def _process_batch(self, batch: List[TextBatch]):
        """Process a batch of text generation requests"""
        try:
            # Collect all prompts from batch
            all_prompts = []
            request_mapping = []
            
            for text_batch in batch:
                for i, text in enumerate(text_batch.texts):
                    all_prompts.append(text)
                    request_mapping.append((text_batch.batch_id, i))
            
            # Generate for all prompts
            results = await self.generate_text_batch(
                all_prompts,
                max_tokens=batch[0].max_tokens,
                temperature=batch[0].temperature
            )
            
            # Group results back by original batches
            batch_results = {}
            for (batch_id, idx), result in zip(request_mapping, results):
                if batch_id not in batch_results:
                    batch_results[batch_id] = {}
                batch_results[batch_id][idx] = result
            
            # Complete futures
            for text_batch in batch:
                if text_batch.batch_id in batch_results:
                    ordered_results = [
                        batch_results[text_batch.batch_id][i]
                        for i in range(len(text_batch.texts))
                    ]
                    
                    if text_batch.batch_id in self.request_futures:
                        future = self.request_futures[text_batch.batch_id]
                        if not future.done():
                            future.set_result(ordered_results)
                        del self.request_futures[text_batch.batch_id]
        
        except Exception as e:
            self.logger.error(f"Batch processing failed: {e}")
            
            # Complete futures with error
            for text_batch in batch:
                if text_batch.batch_id in self.request_futures:
                    future = self.request_futures[text_batch.batch_id]
                    if not future.done():
                        future.set_exception(e)
                    del self.request_futures[text_batch.batch_id]

# Distributed text processing pipeline
class NLPProcessingPipeline:
    """
    Distributed NLP processing pipeline for large-scale text analysis
    """
    
    def __init__(self, num_workers: int = 4):
        self.num_workers = num_workers
        self.workers = []
        
        # Task queues for different processing stages
        self.preprocessing_queue = asyncio.Queue(maxsize=1000)
        self.inference_queue = asyncio.Queue(maxsize=500)
        self.postprocessing_queue = asyncio.Queue(maxsize=1000)
        
        # Results storage
        self.results = {}
        
        # Transformer service
        self.transformer_service = DistributedTransformerService()
    
    async def initialize(self):
        """Initialize pipeline workers"""
        await self.transformer_service.initialize()
        
        # Start worker tasks
        for i in range(self.num_workers):
            worker_tasks = [
                asyncio.create_task(self._preprocessing_worker(i)),
                asyncio.create_task(self._inference_worker(i)),
                asyncio.create_task(self._postprocessing_worker(i)),
            ]
            self.workers.extend(worker_tasks)
    
    async def process_documents(
        self,
        documents: List[Dict[str, str]],
        task_type: str = "summarization"
    ) -> List[Dict[str, any]]:
        """
        Process multiple documents through the NLP pipeline
        
        Args:
            documents: List of {"id": str, "content": str} documents
            task_type: Type of NLP task (summarization, classification, etc.)
            
        Returns:
            List of processed results with metadata
        """
        # Submit documents to preprocessing queue
        for doc in documents:
            await self.preprocessing_queue.put({
                "id": doc["id"],
                "content": doc["content"],
                "task_type": task_type,
                "timestamp": time.time()
            })
        
        # Wait for all documents to be processed
        processed_results = []
        timeout = 300  # 5 minute timeout
        start_time = time.time()
        
        while len(processed_results) < len(documents):
            if time.time() - start_time > timeout:
                break
                
            # Check if results are available
            for doc in documents:
                if doc["id"] in self.results and doc["id"] not in [r["id"] for r in processed_results]:
                    processed_results.append(self.results[doc["id"]])
            
            await asyncio.sleep(0.1)
        
        return processed_results
    
    async def _preprocessing_worker(self, worker_id: int):
        """Worker for text preprocessing stage"""
        while True:
            try:
                task = await self.preprocessing_queue.get()
                
                # Perform preprocessing
                processed = await self._preprocess_document(task)
                
                # Forward to inference queue
                await self.inference_queue.put(processed)
                
                self.preprocessing_queue.task_done()
                
            except Exception as e:
                logging.error(f"Preprocessing worker {worker_id} error: {e}")
    
    async def _inference_worker(self, worker_id: int):
        """Worker for model inference stage"""
        while True:
            try:
                task = await self.inference_queue.get()
                
                # Perform inference
                result = await self._run_inference(task)
                
                # Forward to postprocessing
                await self.postprocessing_queue.put(result)
                
                self.inference_queue.task_done()
                
            except Exception as e:
                logging.error(f"Inference worker {worker_id} error: {e}")
    
    async def _postprocessing_worker(self, worker_id: int):
        """Worker for result postprocessing stage"""
        while True:
            try:
                task = await self.postprocessing_queue.get()
                
                # Perform postprocessing
                final_result = await self._postprocess_result(task)
                
                # Store final result
                self.results[task["id"]] = final_result
                
                self.postprocessing_queue.task_done()
                
            except Exception as e:
                logging.error(f"Postprocessing worker {worker_id} error: {e}")
    
    async def _preprocess_document(self, task: Dict) -> Dict:
        """Preprocess a single document"""
        content = task["content"]
        
        # Document-level preprocessing
        # Split long documents into chunks
        max_chunk_size = 3000  # Characters
        
        if len(content) > max_chunk_size:
            # Split into overlapping chunks
            chunks = []
            overlap = 200
            
            for i in range(0, len(content), max_chunk_size - overlap):
                chunk = content[i:i + max_chunk_size]
                chunks.append(chunk)
            
            task["chunks"] = chunks
            task["is_chunked"] = True
        else:
            task["chunks"] = [content]
            task["is_chunked"] = False
        
        return task
    
    async def _run_inference(self, task: Dict) -> Dict:
        """Run model inference on processed document"""
        chunks = task["chunks"]
        task_type = task["task_type"]
        
        # Create prompts based on task type
        if task_type == "summarization":
            prompts = [
                f"Summarize the following text in 2-3 sentences:\n\n{chunk}"
                for chunk in chunks
            ]
        elif task_type == "classification":
            prompts = [
                f"Classify the sentiment of this text as positive, negative, or neutral:\n\n{chunk}"
                for chunk in chunks
            ]
        else:
            prompts = chunks  # Generic text generation
        
        # Run batch inference
        results = await self.transformer_service.generate_text_batch(
            prompts,
            max_tokens=256,
            temperature=0.3  # Lower temperature for more consistent results
        )
        
        task["inference_results"] = results
        return task
    
    async def _postprocess_result(self, task: Dict) -> Dict:
        """Postprocess inference results"""
        results = task["inference_results"]
        
        # Aggregate results if document was chunked
        if task["is_chunked"] and len(results) > 1:
            if task["task_type"] == "summarization":
                # Combine summaries
                combined_summary = " ".join(results)
                # Generate final summary of summaries
                final_results = await self.transformer_service.generate_text_batch(
                    [f"Create a concise summary from these key points:\n\n{combined_summary}"],
                    max_tokens=150
                )
                aggregated_result = final_results[0]
            else:
                # For classification, take majority vote or average
                aggregated_result = results[0]  # Simplified for example
        else:
            aggregated_result = results[0] if results else ""
        
        return {
            "id": task["id"],
            "result": aggregated_result,
            "task_type": task["task_type"],
            "processing_time": time.time() - task["timestamp"],
            "chunk_count": len(results) if task["is_chunked"] else 1,
            "metadata": {
                "model": self.transformer_service.model_name,
                "is_chunked": task["is_chunked"]
            }
        }

Real-time Text Analysis & Monitoring

Streaming Text Analytics Platform

import asyncio
import json
from kafka import KafkaConsumer, KafkaProducer
from textblob import TextBlob
import spacy
import torch
from transformers import pipeline
from typing import Dict, List, Any, Optional
import time
import logging
from dataclasses import dataclass, asdict
from elasticsearch import AsyncElasticsearch
import aioredis
from collections import deque, Counter
import hashlib

@dataclass
class TextAnalysisResult:
    """Results from text analysis"""
    document_id: str
    content: str
    language: str
    sentiment_score: float
    sentiment_label: str
    entities: List[Dict[str, Any]]
    keywords: List[str]
    text_length: int
    processing_time_ms: float
    timestamp: float
    metadata: Dict[str, Any]

class RealTimeTextAnalyzer:
    """
    Real-time text analysis system processing streaming text data
    Performs sentiment analysis, entity extraction, and keyword detection
    """
    
    def __init__(self):
        # Load NLP models
        self.nlp = spacy.load("en_core_web_sm")
        
        # Sentiment analysis pipeline
        self.sentiment_pipeline = pipeline(
            "sentiment-analysis",
            model="cardiffnlp/twitter-roberta-base-sentiment-latest",
            device=0 if torch.cuda.is_available() else -1
        )
        
        # Language detection
        self.lang_detection = pipeline("text-classification", 
                                     model="papluca/xlm-roberta-base-language-detection")
        
        # Kafka setup
        self.consumer = KafkaConsumer(
            'text-stream',
            bootstrap_servers=['localhost:9092'],
            auto_offset_reset='latest',
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))
        )
        
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda x: json.dumps(x).encode('utf-8')
        )
        
        # Storage backends
        self.es_client = None
        self.redis_client = None
        
        # Real-time metrics
        self.processing_metrics = {
            'documents_processed': 0,
            'avg_processing_time': 0.0,
            'error_count': 0,
            'languages_detected': Counter(),
            'sentiment_distribution': Counter()
        }
        
        # Sliding window for performance tracking
        self.processing_times = deque(maxlen=1000)
        
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    async def initialize(self):
        """Initialize async components"""
        self.es_client = AsyncElasticsearch([{'host': 'localhost', 'port': 9200}])
        self.redis_client = await aioredis.from_url("redis://localhost:6379")
        
        # Create Elasticsearch index for text analytics
        await self._setup_elasticsearch_index()
    
    async def _setup_elasticsearch_index(self):
        """Set up Elasticsearch index with appropriate mappings"""
        index_mapping = {
            "mappings": {
                "properties": {
                    "document_id": {"type": "keyword"},
                    "content": {"type": "text", "analyzer": "standard"},
                    "language": {"type": "keyword"},
                    "sentiment_score": {"type": "float"},
                    "sentiment_label": {"type": "keyword"},
                    "entities": {
                        "type": "nested",
                        "properties": {
                            "text": {"type": "text"},
                            "label": {"type": "keyword"},
                            "start": {"type": "integer"},
                            "end": {"type": "integer"},
                            "confidence": {"type": "float"}
                        }
                    },
                    "keywords": {"type": "keyword"},
                    "text_length": {"type": "integer"},
                    "processing_time_ms": {"type": "float"},
                    "timestamp": {"type": "date"},
                    "metadata": {"type": "object"}
                }
            }
        }
        
        try:
            await self.es_client.indices.create(
                index="text-analytics", 
                body=index_mapping,
                ignore=400  # Ignore if index already exists
            )
        except Exception as e:
            self.logger.warning(f"Could not create ES index: {e}")
    
    def analyze_text(self, text: str, document_id: str = None) -> TextAnalysisResult:
        """
        Comprehensive text analysis
        """
        start_time = time.time()
        
        if not document_id:
            document_id = hashlib.md5(text.encode()).hexdigest()[:12]
        
        try:
            # Language detection
            lang_result = self.lang_detection(text[:512])  # Limit for performance
            language = lang_result[0]['label'] if lang_result else 'unknown'
            
            # Sentiment analysis
            sentiment_result = self.sentiment_pipeline(text[:512])
            sentiment_label = sentiment_result[0]['label']
            sentiment_score = sentiment_result[0]['score']
            
            # Convert to consistent sentiment labels
            if sentiment_label in ['LABEL_0', 'NEGATIVE']:
                sentiment_label = 'negative'
                sentiment_score = -sentiment_score  # Make negative
            elif sentiment_label in ['LABEL_1', 'NEUTRAL']:
                sentiment_label = 'neutral'
                sentiment_score = 0.0
            else:  # LABEL_2, POSITIVE
                sentiment_label = 'positive'
            
            # Entity extraction with spaCy
            doc = self.nlp(text)
            entities = []
            
            for ent in doc.ents:
                entities.append({
                    'text': ent.text,
                    'label': ent.label_,
                    'start': ent.start_char,
                    'end': ent.end_char,
                    'confidence': 0.9  # spaCy doesn't provide confidence scores by default
                })
            
            # Keyword extraction (simple approach using noun phrases)
            keywords = []
            for chunk in doc.noun_chunks:
                if len(chunk.text.split()) <= 3:  # Keep short phrases
                    keywords.append(chunk.text.lower().strip())
            
            # Remove duplicates and limit
            keywords = list(set(keywords))[:10]
            
            # Processing time
            processing_time_ms = (time.time() - start_time) * 1000
            
            # Update metrics
            self._update_metrics(processing_time_ms, language, sentiment_label)
            
            result = TextAnalysisResult(
                document_id=document_id,
                content=text,
                language=language,
                sentiment_score=sentiment_score,
                sentiment_label=sentiment_label,
                entities=entities,
                keywords=keywords,
                text_length=len(text),
                processing_time_ms=processing_time_ms,
                timestamp=time.time(),
                metadata={
                    'model_versions': {
                        'sentiment': 'twitter-roberta-base-sentiment',
                        'ner': 'en_core_web_sm',
                        'language': 'xlm-roberta-base-language-detection'
                    }
                }
            )
            
            return result
            
        except Exception as e:
            self.logger.error(f"Text analysis failed for document {document_id}: {e}")
            self.processing_metrics['error_count'] += 1
            raise
    
    def _update_metrics(self, processing_time_ms: float, language: str, sentiment: str):
        """Update real-time processing metrics"""
        self.processing_metrics['documents_processed'] += 1
        self.processing_times.append(processing_time_ms)
        self.processing_metrics['languages_detected'][language] += 1
        self.processing_metrics['sentiment_distribution'][sentiment] += 1
        
        # Update average processing time
        if self.processing_times:
            self.processing_metrics['avg_processing_time'] = sum(self.processing_times) / len(self.processing_times)
    
    async def process_stream(self):
        """Main streaming processor loop"""
        self.logger.info("Starting real-time text analysis stream processor...")
        
        for message in self.consumer:
            try:
                # Parse incoming message
                data = message.value
                text = data.get('content', '')
                document_id = data.get('id')
                
                if not text.strip():
                    continue
                
                # Analyze text
                result = self.analyze_text(text, document_id)
                
                # Store results in parallel
                await asyncio.gather(
                    self._store_in_elasticsearch(result),
                    self._store_in_redis(result),
                    self._publish_result(result)
                )
                
                # Log progress
                if self.processing_metrics['documents_processed'] % 100 == 0:
                    self.logger.info(f"Processed {self.processing_metrics['documents_processed']} documents, "
                                   f"avg time: {self.processing_metrics['avg_processing_time']:.2f}ms")
                
            except Exception as e:
                self.logger.error(f"Error processing message: {e}")
                continue
    
    async def _store_in_elasticsearch(self, result: TextAnalysisResult):
        """Store analysis result in Elasticsearch"""
        try:
            await self.es_client.index(
                index="text-analytics",
                id=result.document_id,
                body=asdict(result)
            )
        except Exception as e:
            self.logger.warning(f"Failed to store in Elasticsearch: {e}")
    
    async def _store_in_redis(self, result: TextAnalysisResult):
        """Store analysis result in Redis for fast retrieval"""
        try:
            # Store individual result
            await self.redis_client.setex(
                f"analysis:{result.document_id}",
                3600,  # 1 hour expiration
                json.dumps(asdict(result), default=str)
            )
            
            # Update real-time aggregations
            await self._update_redis_aggregations(result)
            
        except Exception as e:
            self.logger.warning(f"Failed to store in Redis: {e}")
    
    async def _update_redis_aggregations(self, result: TextAnalysisResult):
        """Update real-time aggregation counters in Redis"""
        pipe = self.redis_client.pipeline()
        
        # Sentiment counters
        pipe.incr(f"sentiment:{result.sentiment_label}")
        
        # Language counters
        pipe.incr(f"language:{result.language}")
        
        # Hourly processing volume
        hour_key = f"volume:{int(result.timestamp // 3600)}"
        pipe.incr(hour_key)
        pipe.expire(hour_key, 86400)  # Keep 24 hours
        
        await pipe.execute()
    
    async def _publish_result(self, result: TextAnalysisResult):
        """Publish analysis result to downstream consumers"""
        try:
            self.producer.send(
                'text-analysis-results',
                value=asdict(result)
            )
        except Exception as e:
            self.logger.warning(f"Failed to publish result: {e}")
    
    async def get_real_time_stats(self) -> Dict[str, Any]:
        """Get current processing statistics"""
        try:
            # Get aggregated stats from Redis
            pipe = self.redis_client.pipeline()
            
            # Sentiment distribution
            pipe.get("sentiment:positive")
            pipe.get("sentiment:negative") 
            pipe.get("sentiment:neutral")
            
            # Language distribution (top 5)
            pipe.zrevrange("languages", 0, 4, withscores=True)
            
            # Recent processing volume
            current_hour = int(time.time() // 3600)
            pipe.get(f"volume:{current_hour}")
            pipe.get(f"volume:{current_hour - 1}")
            
            redis_results = await pipe.execute()
            
            stats = {
                'processing_metrics': self.processing_metrics.copy(),
                'sentiment_distribution': {
                    'positive': int(redis_results[0] or 0),
                    'negative': int(redis_results[1] or 0),
                    'neutral': int(redis_results[2] or 0)
                },
                'current_hour_volume': int(redis_results[4] or 0),
                'previous_hour_volume': int(redis_results[5] or 0),
                'system_health': {
                    'status': 'healthy' if self.processing_metrics['error_count'] < 100 else 'degraded',
                    'error_rate': self.processing_metrics['error_count'] / max(1, self.processing_metrics['documents_processed'])
                }
            }
            
            return stats
            
        except Exception as e:
            self.logger.error(f"Failed to get real-time stats: {e}")
            return {'error': str(e)}

# Usage example
async def main():
    analyzer = RealTimeTextAnalyzer()
    await analyzer.initialize()
    
    # Start processing stream
    await analyzer.process_stream()

Real-World NLP Systems

Google

BERT-based Search

  • Scale: 8.5B searches/day, 100+ languages
  • Architecture: BERT embeddings + semantic matching
  • Models: Distilled BERT for production inference
  • Latency: <100ms for query understanding
  • Infrastructure: TPUs for model serving at scale
  • Innovation: Bidirectional encoding for context
OpenAI

ChatGPT Infrastructure

  • Scale: 100M+ users, billions of tokens/day
  • Architecture: Transformer + RLHF + distributed serving
  • Models: GPT-3.5/4 with fine-tuning layers
  • Latency: ~2-5s for conversational responses
  • Infrastructure: Custom GPU clusters + Azure
  • Innovation: Human feedback optimization
Twitter

Real-time Content Moderation

  • Scale: 500M tweets/day analyzed in real-time
  • Architecture: Multi-stage classification pipeline
  • Models: BERT + CNN for text+image analysis
  • Latency: <50ms for harmful content detection
  • Infrastructure: Kafka streams + GPU inference
  • Challenge: Balancing accuracy with free speech
Microsoft

Bing Chat Integration

  • Scale: Millions of conversational queries daily
  • Architecture: GPT-4 + web search + fact checking
  • Models: Large language models + retrieval
  • Latency: 3-8s for search-grounded responses
  • Infrastructure: Azure OpenAI Service integration
  • Innovation: Real-time web grounding

NLP Systems Best Practices

✅ Do

  • Use model quantization - FP16/INT8 reduces memory by 2-4x with minimal accuracy loss
  • Implement request batching - Process multiple texts together for GPU efficiency
  • Cache frequent queries - Redis/Memcached for repeated text analysis
  • Preprocess in parallel - Use CPU workers for tokenization and cleaning
  • Monitor model drift - Track prediction distributions over time

❌ Don't

  • Process texts individually - Single requests waste GPU compute capacity
  • Ignore sequence length limits - Transformer memory grows quadratically
  • Skip text cleaning - Noise significantly impacts model accuracy
  • Use huge models unnecessarily - Smaller fine-tuned models often outperform
  • Forget about bias - Language models inherit training data biases
No quiz questions available
Quiz ID "nlp-systems-architecture" not found