NLP Systems Architecture & Scaling
Build production NLP systems with transformer serving, distributed text processing, and real-time language model inference
55 min read•Advanced
Not Started
Loading...
Production NLP Systems at Scale
Modern NLP systems power critical applications from search engines to customer support chatbots, processing billions of text documents daily. Production systems must handle massive text corpora, serve transformer models efficiently, and maintain low latency for real-time applications.
Core NLP System Challenges
- Model Size vs Latency: Balancing large language models with response time requirements
- Text Processing Pipeline: Efficient tokenization, preprocessing, and feature extraction
- Multi-language Support: Handling diverse languages and encoding schemes
- Contextual Understanding: Maintaining context across long documents and conversations
NLP System Performance Calculator
1,000/s
7B
32
512
System Requirements
Tokens/Second:16,666.667
GPU Memory/Model:14.3 GB
Inference Latency:50 ms
Throughput:1,200/min
GPUs Needed:1 x A100
Recommendation: Single GPU deployment feasible
NLP System Architecture
Multi-Stage Text Processing Pipeline
1. Ingestion
- • Document parsing
- • Format normalization
- • Language detection
- • Quality filtering
2. Preprocessing
- • Tokenization
- • Text cleaning
- • Sentence splitting
- • Encoding
3. Model Inference
- • Transformer serving
- • Batch processing
- • Distributed inference
- • Result aggregation
4. Post-processing
- • Result ranking
- • Output formatting
- • Quality scoring
- • Caching
High-Performance Transformer Serving
Optimized Model Serving with TensorRT and vLLM
import torch
import asyncio
import numpy as np
from transformers import AutoTokenizer, AutoModel
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
from vllm.utils import random_uuid
from typing import List, Dict, Optional, AsyncGenerator
import time
import logging
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import uvloop
import aioredis
@dataclass
class TextBatch:
"""Batch of text for efficient processing"""
texts: List[str]
batch_id: str
created_at: float
max_tokens: int = 512
temperature: float = 0.7
class DistributedTransformerService:
"""
High-performance distributed transformer serving system
Supports multiple models, batching, and horizontal scaling
"""
def __init__(
self,
model_name: str = "meta-llama/Llama-2-7b-hf",
tensor_parallel_size: int = 1,
max_model_len: int = 4096,
gpu_memory_utilization: float = 0.9,
max_batch_size: int = 256,
max_wait_ms: int = 50
):
self.model_name = model_name
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
# Initialize vLLM async engine for efficient serving
engine_args = AsyncEngineArgs(
model=model_name,
tensor_parallel_size=tensor_parallel_size,
dtype="float16", # Use FP16 for memory efficiency
max_model_len=max_model_len,
gpu_memory_utilization=gpu_memory_utilization,
disable_log_stats=False,
quantization="awq", # AWQ quantization for faster inference
)
self.engine = AsyncLLMEngine.from_engine_args(engine_args)
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
# Request batching
self.pending_requests = []
self.request_futures = {}
# Redis for distributed caching
self.redis_client = None
# Thread pool for CPU preprocessing
self.executor = ThreadPoolExecutor(max_workers=8)
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
async def initialize(self):
"""Initialize async components"""
self.redis_client = await aioredis.from_url("redis://localhost:6379")
# Start background batch processor
asyncio.create_task(self._batch_processor())
def preprocess_text(self, text: str) -> str:
"""CPU-intensive text preprocessing"""
# Clean and normalize text
text = text.strip()
# Remove excessive whitespace
text = ' '.join(text.split())
# Truncate if too long (leave room for generation)
if len(text) > 3000: # Conservative estimate for token count
text = text[:3000] + "..."
return text
async def generate_text_batch(
self,
prompts: List[str],
max_tokens: int = 256,
temperature: float = 0.7,
top_p: float = 0.9,
use_cache: bool = True
) -> List[str]:
"""
Generate text for multiple prompts efficiently
"""
# Check cache first
if use_cache:
cached_results = await self._check_cache(prompts)
if cached_results:
return cached_results
# Preprocess all prompts in parallel
loop = asyncio.get_event_loop()
preprocessed = await asyncio.gather(*[
loop.run_in_executor(self.executor, self.preprocess_text, prompt)
for prompt in prompts
])
# Create sampling parameters
sampling_params = SamplingParams(
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
skip_special_tokens=True,
)
# Generate request IDs
request_ids = [random_uuid() for _ in preprocessed]
# Submit to vLLM engine
results = []
async for request_output in self.engine.generate(
inputs=preprocessed,
sampling_params=sampling_params,
request_id=request_ids[0] if len(request_ids) == 1 else None
):
if request_output.finished:
generated_text = request_output.outputs[0].text
results.append(generated_text)
# Cache results
if use_cache:
await self._cache_results(prompts, results)
return results
async def _check_cache(self, prompts: List[str]) -> Optional[List[str]]:
"""Check Redis cache for existing results"""
if not self.redis_client:
return None
cache_keys = [f"nlp:prompt:{hash(prompt)}" for prompt in prompts]
try:
cached_values = await self.redis_client.mget(cache_keys)
# Return cached results if all prompts are cached
if all(val is not None for val in cached_values):
return [val.decode('utf-8') for val in cached_values]
except Exception as e:
self.logger.warning(f"Cache check failed: {e}")
return None
async def _cache_results(self, prompts: List[str], results: List[str]):
"""Cache generation results"""
if not self.redis_client:
return
try:
cache_data = {
f"nlp:prompt:{hash(prompt)}": result
for prompt, result in zip(prompts, results)
}
await self.redis_client.mset(cache_data)
# Set expiration (1 hour)
for key in cache_data.keys():
await self.redis_client.expire(key, 3600)
except Exception as e:
self.logger.warning(f"Cache write failed: {e}")
async def _batch_processor(self):
"""Background task to process batched requests"""
while True:
if len(self.pending_requests) >= self.max_batch_size:
# Process full batch immediately
batch = self.pending_requests[:self.max_batch_size]
self.pending_requests = self.pending_requests[self.max_batch_size:]
await self._process_batch(batch)
elif len(self.pending_requests) > 0:
# Wait for more requests or timeout
await asyncio.sleep(self.max_wait_ms / 1000)
if self.pending_requests:
batch = self.pending_requests.copy()
self.pending_requests.clear()
await self._process_batch(batch)
else:
# No pending requests, wait
await asyncio.sleep(0.01)
async def _process_batch(self, batch: List[TextBatch]):
"""Process a batch of text generation requests"""
try:
# Collect all prompts from batch
all_prompts = []
request_mapping = []
for text_batch in batch:
for i, text in enumerate(text_batch.texts):
all_prompts.append(text)
request_mapping.append((text_batch.batch_id, i))
# Generate for all prompts
results = await self.generate_text_batch(
all_prompts,
max_tokens=batch[0].max_tokens,
temperature=batch[0].temperature
)
# Group results back by original batches
batch_results = {}
for (batch_id, idx), result in zip(request_mapping, results):
if batch_id not in batch_results:
batch_results[batch_id] = {}
batch_results[batch_id][idx] = result
# Complete futures
for text_batch in batch:
if text_batch.batch_id in batch_results:
ordered_results = [
batch_results[text_batch.batch_id][i]
for i in range(len(text_batch.texts))
]
if text_batch.batch_id in self.request_futures:
future = self.request_futures[text_batch.batch_id]
if not future.done():
future.set_result(ordered_results)
del self.request_futures[text_batch.batch_id]
except Exception as e:
self.logger.error(f"Batch processing failed: {e}")
# Complete futures with error
for text_batch in batch:
if text_batch.batch_id in self.request_futures:
future = self.request_futures[text_batch.batch_id]
if not future.done():
future.set_exception(e)
del self.request_futures[text_batch.batch_id]
# Distributed text processing pipeline
class NLPProcessingPipeline:
"""
Distributed NLP processing pipeline for large-scale text analysis
"""
def __init__(self, num_workers: int = 4):
self.num_workers = num_workers
self.workers = []
# Task queues for different processing stages
self.preprocessing_queue = asyncio.Queue(maxsize=1000)
self.inference_queue = asyncio.Queue(maxsize=500)
self.postprocessing_queue = asyncio.Queue(maxsize=1000)
# Results storage
self.results = {}
# Transformer service
self.transformer_service = DistributedTransformerService()
async def initialize(self):
"""Initialize pipeline workers"""
await self.transformer_service.initialize()
# Start worker tasks
for i in range(self.num_workers):
worker_tasks = [
asyncio.create_task(self._preprocessing_worker(i)),
asyncio.create_task(self._inference_worker(i)),
asyncio.create_task(self._postprocessing_worker(i)),
]
self.workers.extend(worker_tasks)
async def process_documents(
self,
documents: List[Dict[str, str]],
task_type: str = "summarization"
) -> List[Dict[str, any]]:
"""
Process multiple documents through the NLP pipeline
Args:
documents: List of {"id": str, "content": str} documents
task_type: Type of NLP task (summarization, classification, etc.)
Returns:
List of processed results with metadata
"""
# Submit documents to preprocessing queue
for doc in documents:
await self.preprocessing_queue.put({
"id": doc["id"],
"content": doc["content"],
"task_type": task_type,
"timestamp": time.time()
})
# Wait for all documents to be processed
processed_results = []
timeout = 300 # 5 minute timeout
start_time = time.time()
while len(processed_results) < len(documents):
if time.time() - start_time > timeout:
break
# Check if results are available
for doc in documents:
if doc["id"] in self.results and doc["id"] not in [r["id"] for r in processed_results]:
processed_results.append(self.results[doc["id"]])
await asyncio.sleep(0.1)
return processed_results
async def _preprocessing_worker(self, worker_id: int):
"""Worker for text preprocessing stage"""
while True:
try:
task = await self.preprocessing_queue.get()
# Perform preprocessing
processed = await self._preprocess_document(task)
# Forward to inference queue
await self.inference_queue.put(processed)
self.preprocessing_queue.task_done()
except Exception as e:
logging.error(f"Preprocessing worker {worker_id} error: {e}")
async def _inference_worker(self, worker_id: int):
"""Worker for model inference stage"""
while True:
try:
task = await self.inference_queue.get()
# Perform inference
result = await self._run_inference(task)
# Forward to postprocessing
await self.postprocessing_queue.put(result)
self.inference_queue.task_done()
except Exception as e:
logging.error(f"Inference worker {worker_id} error: {e}")
async def _postprocessing_worker(self, worker_id: int):
"""Worker for result postprocessing stage"""
while True:
try:
task = await self.postprocessing_queue.get()
# Perform postprocessing
final_result = await self._postprocess_result(task)
# Store final result
self.results[task["id"]] = final_result
self.postprocessing_queue.task_done()
except Exception as e:
logging.error(f"Postprocessing worker {worker_id} error: {e}")
async def _preprocess_document(self, task: Dict) -> Dict:
"""Preprocess a single document"""
content = task["content"]
# Document-level preprocessing
# Split long documents into chunks
max_chunk_size = 3000 # Characters
if len(content) > max_chunk_size:
# Split into overlapping chunks
chunks = []
overlap = 200
for i in range(0, len(content), max_chunk_size - overlap):
chunk = content[i:i + max_chunk_size]
chunks.append(chunk)
task["chunks"] = chunks
task["is_chunked"] = True
else:
task["chunks"] = [content]
task["is_chunked"] = False
return task
async def _run_inference(self, task: Dict) -> Dict:
"""Run model inference on processed document"""
chunks = task["chunks"]
task_type = task["task_type"]
# Create prompts based on task type
if task_type == "summarization":
prompts = [
f"Summarize the following text in 2-3 sentences:\n\n{chunk}"
for chunk in chunks
]
elif task_type == "classification":
prompts = [
f"Classify the sentiment of this text as positive, negative, or neutral:\n\n{chunk}"
for chunk in chunks
]
else:
prompts = chunks # Generic text generation
# Run batch inference
results = await self.transformer_service.generate_text_batch(
prompts,
max_tokens=256,
temperature=0.3 # Lower temperature for more consistent results
)
task["inference_results"] = results
return task
async def _postprocess_result(self, task: Dict) -> Dict:
"""Postprocess inference results"""
results = task["inference_results"]
# Aggregate results if document was chunked
if task["is_chunked"] and len(results) > 1:
if task["task_type"] == "summarization":
# Combine summaries
combined_summary = " ".join(results)
# Generate final summary of summaries
final_results = await self.transformer_service.generate_text_batch(
[f"Create a concise summary from these key points:\n\n{combined_summary}"],
max_tokens=150
)
aggregated_result = final_results[0]
else:
# For classification, take majority vote or average
aggregated_result = results[0] # Simplified for example
else:
aggregated_result = results[0] if results else ""
return {
"id": task["id"],
"result": aggregated_result,
"task_type": task["task_type"],
"processing_time": time.time() - task["timestamp"],
"chunk_count": len(results) if task["is_chunked"] else 1,
"metadata": {
"model": self.transformer_service.model_name,
"is_chunked": task["is_chunked"]
}
}Real-time Text Analysis & Monitoring
Streaming Text Analytics Platform
import asyncio
import json
from kafka import KafkaConsumer, KafkaProducer
from textblob import TextBlob
import spacy
import torch
from transformers import pipeline
from typing import Dict, List, Any, Optional
import time
import logging
from dataclasses import dataclass, asdict
from elasticsearch import AsyncElasticsearch
import aioredis
from collections import deque, Counter
import hashlib
@dataclass
class TextAnalysisResult:
"""Results from text analysis"""
document_id: str
content: str
language: str
sentiment_score: float
sentiment_label: str
entities: List[Dict[str, Any]]
keywords: List[str]
text_length: int
processing_time_ms: float
timestamp: float
metadata: Dict[str, Any]
class RealTimeTextAnalyzer:
"""
Real-time text analysis system processing streaming text data
Performs sentiment analysis, entity extraction, and keyword detection
"""
def __init__(self):
# Load NLP models
self.nlp = spacy.load("en_core_web_sm")
# Sentiment analysis pipeline
self.sentiment_pipeline = pipeline(
"sentiment-analysis",
model="cardiffnlp/twitter-roberta-base-sentiment-latest",
device=0 if torch.cuda.is_available() else -1
)
# Language detection
self.lang_detection = pipeline("text-classification",
model="papluca/xlm-roberta-base-language-detection")
# Kafka setup
self.consumer = KafkaConsumer(
'text-stream',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# Storage backends
self.es_client = None
self.redis_client = None
# Real-time metrics
self.processing_metrics = {
'documents_processed': 0,
'avg_processing_time': 0.0,
'error_count': 0,
'languages_detected': Counter(),
'sentiment_distribution': Counter()
}
# Sliding window for performance tracking
self.processing_times = deque(maxlen=1000)
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
async def initialize(self):
"""Initialize async components"""
self.es_client = AsyncElasticsearch([{'host': 'localhost', 'port': 9200}])
self.redis_client = await aioredis.from_url("redis://localhost:6379")
# Create Elasticsearch index for text analytics
await self._setup_elasticsearch_index()
async def _setup_elasticsearch_index(self):
"""Set up Elasticsearch index with appropriate mappings"""
index_mapping = {
"mappings": {
"properties": {
"document_id": {"type": "keyword"},
"content": {"type": "text", "analyzer": "standard"},
"language": {"type": "keyword"},
"sentiment_score": {"type": "float"},
"sentiment_label": {"type": "keyword"},
"entities": {
"type": "nested",
"properties": {
"text": {"type": "text"},
"label": {"type": "keyword"},
"start": {"type": "integer"},
"end": {"type": "integer"},
"confidence": {"type": "float"}
}
},
"keywords": {"type": "keyword"},
"text_length": {"type": "integer"},
"processing_time_ms": {"type": "float"},
"timestamp": {"type": "date"},
"metadata": {"type": "object"}
}
}
}
try:
await self.es_client.indices.create(
index="text-analytics",
body=index_mapping,
ignore=400 # Ignore if index already exists
)
except Exception as e:
self.logger.warning(f"Could not create ES index: {e}")
def analyze_text(self, text: str, document_id: str = None) -> TextAnalysisResult:
"""
Comprehensive text analysis
"""
start_time = time.time()
if not document_id:
document_id = hashlib.md5(text.encode()).hexdigest()[:12]
try:
# Language detection
lang_result = self.lang_detection(text[:512]) # Limit for performance
language = lang_result[0]['label'] if lang_result else 'unknown'
# Sentiment analysis
sentiment_result = self.sentiment_pipeline(text[:512])
sentiment_label = sentiment_result[0]['label']
sentiment_score = sentiment_result[0]['score']
# Convert to consistent sentiment labels
if sentiment_label in ['LABEL_0', 'NEGATIVE']:
sentiment_label = 'negative'
sentiment_score = -sentiment_score # Make negative
elif sentiment_label in ['LABEL_1', 'NEUTRAL']:
sentiment_label = 'neutral'
sentiment_score = 0.0
else: # LABEL_2, POSITIVE
sentiment_label = 'positive'
# Entity extraction with spaCy
doc = self.nlp(text)
entities = []
for ent in doc.ents:
entities.append({
'text': ent.text,
'label': ent.label_,
'start': ent.start_char,
'end': ent.end_char,
'confidence': 0.9 # spaCy doesn't provide confidence scores by default
})
# Keyword extraction (simple approach using noun phrases)
keywords = []
for chunk in doc.noun_chunks:
if len(chunk.text.split()) <= 3: # Keep short phrases
keywords.append(chunk.text.lower().strip())
# Remove duplicates and limit
keywords = list(set(keywords))[:10]
# Processing time
processing_time_ms = (time.time() - start_time) * 1000
# Update metrics
self._update_metrics(processing_time_ms, language, sentiment_label)
result = TextAnalysisResult(
document_id=document_id,
content=text,
language=language,
sentiment_score=sentiment_score,
sentiment_label=sentiment_label,
entities=entities,
keywords=keywords,
text_length=len(text),
processing_time_ms=processing_time_ms,
timestamp=time.time(),
metadata={
'model_versions': {
'sentiment': 'twitter-roberta-base-sentiment',
'ner': 'en_core_web_sm',
'language': 'xlm-roberta-base-language-detection'
}
}
)
return result
except Exception as e:
self.logger.error(f"Text analysis failed for document {document_id}: {e}")
self.processing_metrics['error_count'] += 1
raise
def _update_metrics(self, processing_time_ms: float, language: str, sentiment: str):
"""Update real-time processing metrics"""
self.processing_metrics['documents_processed'] += 1
self.processing_times.append(processing_time_ms)
self.processing_metrics['languages_detected'][language] += 1
self.processing_metrics['sentiment_distribution'][sentiment] += 1
# Update average processing time
if self.processing_times:
self.processing_metrics['avg_processing_time'] = sum(self.processing_times) / len(self.processing_times)
async def process_stream(self):
"""Main streaming processor loop"""
self.logger.info("Starting real-time text analysis stream processor...")
for message in self.consumer:
try:
# Parse incoming message
data = message.value
text = data.get('content', '')
document_id = data.get('id')
if not text.strip():
continue
# Analyze text
result = self.analyze_text(text, document_id)
# Store results in parallel
await asyncio.gather(
self._store_in_elasticsearch(result),
self._store_in_redis(result),
self._publish_result(result)
)
# Log progress
if self.processing_metrics['documents_processed'] % 100 == 0:
self.logger.info(f"Processed {self.processing_metrics['documents_processed']} documents, "
f"avg time: {self.processing_metrics['avg_processing_time']:.2f}ms")
except Exception as e:
self.logger.error(f"Error processing message: {e}")
continue
async def _store_in_elasticsearch(self, result: TextAnalysisResult):
"""Store analysis result in Elasticsearch"""
try:
await self.es_client.index(
index="text-analytics",
id=result.document_id,
body=asdict(result)
)
except Exception as e:
self.logger.warning(f"Failed to store in Elasticsearch: {e}")
async def _store_in_redis(self, result: TextAnalysisResult):
"""Store analysis result in Redis for fast retrieval"""
try:
# Store individual result
await self.redis_client.setex(
f"analysis:{result.document_id}",
3600, # 1 hour expiration
json.dumps(asdict(result), default=str)
)
# Update real-time aggregations
await self._update_redis_aggregations(result)
except Exception as e:
self.logger.warning(f"Failed to store in Redis: {e}")
async def _update_redis_aggregations(self, result: TextAnalysisResult):
"""Update real-time aggregation counters in Redis"""
pipe = self.redis_client.pipeline()
# Sentiment counters
pipe.incr(f"sentiment:{result.sentiment_label}")
# Language counters
pipe.incr(f"language:{result.language}")
# Hourly processing volume
hour_key = f"volume:{int(result.timestamp // 3600)}"
pipe.incr(hour_key)
pipe.expire(hour_key, 86400) # Keep 24 hours
await pipe.execute()
async def _publish_result(self, result: TextAnalysisResult):
"""Publish analysis result to downstream consumers"""
try:
self.producer.send(
'text-analysis-results',
value=asdict(result)
)
except Exception as e:
self.logger.warning(f"Failed to publish result: {e}")
async def get_real_time_stats(self) -> Dict[str, Any]:
"""Get current processing statistics"""
try:
# Get aggregated stats from Redis
pipe = self.redis_client.pipeline()
# Sentiment distribution
pipe.get("sentiment:positive")
pipe.get("sentiment:negative")
pipe.get("sentiment:neutral")
# Language distribution (top 5)
pipe.zrevrange("languages", 0, 4, withscores=True)
# Recent processing volume
current_hour = int(time.time() // 3600)
pipe.get(f"volume:{current_hour}")
pipe.get(f"volume:{current_hour - 1}")
redis_results = await pipe.execute()
stats = {
'processing_metrics': self.processing_metrics.copy(),
'sentiment_distribution': {
'positive': int(redis_results[0] or 0),
'negative': int(redis_results[1] or 0),
'neutral': int(redis_results[2] or 0)
},
'current_hour_volume': int(redis_results[4] or 0),
'previous_hour_volume': int(redis_results[5] or 0),
'system_health': {
'status': 'healthy' if self.processing_metrics['error_count'] < 100 else 'degraded',
'error_rate': self.processing_metrics['error_count'] / max(1, self.processing_metrics['documents_processed'])
}
}
return stats
except Exception as e:
self.logger.error(f"Failed to get real-time stats: {e}")
return {'error': str(e)}
# Usage example
async def main():
analyzer = RealTimeTextAnalyzer()
await analyzer.initialize()
# Start processing stream
await analyzer.process_stream()Real-World NLP Systems
Google
BERT-based Search
- Scale: 8.5B searches/day, 100+ languages
- Architecture: BERT embeddings + semantic matching
- Models: Distilled BERT for production inference
- Latency: <100ms for query understanding
- Infrastructure: TPUs for model serving at scale
- Innovation: Bidirectional encoding for context
OpenAI
ChatGPT Infrastructure
- Scale: 100M+ users, billions of tokens/day
- Architecture: Transformer + RLHF + distributed serving
- Models: GPT-3.5/4 with fine-tuning layers
- Latency: ~2-5s for conversational responses
- Infrastructure: Custom GPU clusters + Azure
- Innovation: Human feedback optimization
Twitter
Real-time Content Moderation
- Scale: 500M tweets/day analyzed in real-time
- Architecture: Multi-stage classification pipeline
- Models: BERT + CNN for text+image analysis
- Latency: <50ms for harmful content detection
- Infrastructure: Kafka streams + GPU inference
- Challenge: Balancing accuracy with free speech
Microsoft
Bing Chat Integration
- Scale: Millions of conversational queries daily
- Architecture: GPT-4 + web search + fact checking
- Models: Large language models + retrieval
- Latency: 3-8s for search-grounded responses
- Infrastructure: Azure OpenAI Service integration
- Innovation: Real-time web grounding
NLP Systems Best Practices
✅ Do
- •Use model quantization - FP16/INT8 reduces memory by 2-4x with minimal accuracy loss
- •Implement request batching - Process multiple texts together for GPU efficiency
- •Cache frequent queries - Redis/Memcached for repeated text analysis
- •Preprocess in parallel - Use CPU workers for tokenization and cleaning
- •Monitor model drift - Track prediction distributions over time
❌ Don't
- •Process texts individually - Single requests waste GPU compute capacity
- •Ignore sequence length limits - Transformer memory grows quadratically
- •Skip text cleaning - Noise significantly impacts model accuracy
- •Use huge models unnecessarily - Smaller fine-tuned models often outperform
- •Forget about bias - Language models inherit training data biases
No quiz questions available
Quiz ID "nlp-systems-architecture" not found