Vector Database Implementation
Implement production vector databases: Pinecone integration, similarity search optimization, and performance tuning for RAG systems
75 min read•Intermediate
Not Started
Loading...
Vector Database Landscape
Vector databases are specialized for storing and retrieving high-dimensional embeddings with similarity search. Choose the right solution based on your scale, performance, and operational requirements.
Key Capabilities
- • Similarity Search: Cosine, Euclidean, dot product
- • Approximate KNN: HNSW, IVF, LSH algorithms
- • Metadata Filtering: Hybrid search capabilities
- • Horizontal Scaling: Distributed vector indices
- • Real-time Updates: Dynamic index management
Selection Criteria
- • Scale: Million vs billion vector requirements
- • Latency: Real-time vs batch processing
- • Operations: Managed vs self-hosted
- • Features: Filtering, multimodal, APIs
- • Cost: Pricing model and budget constraints
Implementation Deep Dive
Pinecone Implementation
Pinecone is a fully managed vector database that provides serverless scaling and high performance with minimal operational overhead.
Advantages
- • Fully managed service
- • Serverless scaling
- • High performance HNSW
- • Real-time updates
- • Excellent Python SDK
Considerations
- • Usage-based pricing
- • Vendor lock-in
- • Limited customization
- • API rate limits
- • Cold start latency
import pinecone
from typing import List, Dict, Optional
import asyncio
import logging
class PineconeVectorStore:
def __init__(self,
api_key: str,
environment: str,
index_name: str,
dimension: int = 1536):
# Initialize Pinecone
pinecone.init(api_key=api_key, environment=environment)
self.index_name = index_name
self.dimension = dimension
# Create or connect to index
self.index = self._initialize_index()
# Performance settings
self.batch_size = 100
self.max_concurrent_requests = 10
def _initialize_index(self):
"""Initialize Pinecone index with optimal settings"""
# Check if index exists
if self.index_name not in pinecone.list_indexes():
# Create index with optimal configuration
pinecone.create_index(
name=self.index_name,
dimension=self.dimension,
metric="cosine", # or "euclidean", "dotproduct"
# Production optimizations
pods=1, # Start with 1 pod, scale as needed
replicas=1, # Add replicas for high availability
pod_type="p1.x1", # Choose appropriate pod type
# Metadata configuration
metadata_config={
"indexed": ["category", "timestamp", "source"]
}
)
# Wait for index to be ready
while not pinecone.describe_index(self.index_name).status['ready']:
time.sleep(1)
return pinecone.Index(self.index_name)
async def upsert_vectors(self,
vectors: List[Dict],
namespace: str = "") -> Dict:
"""Efficiently upsert vectors with batch processing"""
total_vectors = len(vectors)
batches = [
vectors[i:i + self.batch_size]
for i in range(0, total_vectors, self.batch_size)
]
# Process batches concurrently
semaphore = asyncio.Semaphore(self.max_concurrent_requests)
async def upsert_batch(batch):
async with semaphore:
try:
# Convert to Pinecone format
pinecone_vectors = []
for vector_data in batch:
pinecone_vectors.append((
vector_data['id'],
vector_data['vector'],
vector_data.get('metadata', {})
))
# Upsert batch
response = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.index.upsert(
vectors=pinecone_vectors,
namespace=namespace
)
)
return {'success': len(batch), 'errors': 0}
except Exception as e:
logging.error(f"Batch upsert failed: {e}")
return {'success': 0, 'errors': len(batch)}
# Execute all batches
results = await asyncio.gather(*[upsert_batch(batch) for batch in batches])
# Aggregate results
total_success = sum(r['success'] for r in results)
total_errors = sum(r['errors'] for r in results)
return {
'upserted_count': total_success,
'error_count': total_errors,
'total_batches': len(batches)
}
async def similarity_search(self,
query_vector: List[float],
top_k: int = 10,
metadata_filter: Optional[Dict] = None,
namespace: str = "",
include_metadata: bool = True) -> List[Dict]:
"""Perform similarity search with optional filtering"""
try:
# Prepare query
query_params = {
'vector': query_vector,
'top_k': top_k,
'namespace': namespace,
'include_metadata': include_metadata,
'include_values': False # Don't return vectors to save bandwidth
}
# Add metadata filter if provided
if metadata_filter:
query_params['filter'] = metadata_filter
# Execute query
response = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.index.query(**query_params)
)
# Process results
results = []
for match in response.matches:
result = {
'id': match.id,
'score': match.score,
}
if include_metadata and hasattr(match, 'metadata'):
result['metadata'] = match.metadata
results.append(result)
return results
except Exception as e:
logging.error(f"Similarity search failed: {e}")
raise
async def delete_vectors(self,
ids: List[str] = None,
metadata_filter: Dict = None,
namespace: str = "",
delete_all: bool = False) -> Dict:
"""Delete vectors by IDs or filter"""
try:
delete_params = {'namespace': namespace}
if delete_all:
delete_params['delete_all'] = True
elif ids:
# Delete by IDs in batches
batches = [ids[i:i + 1000] for i in range(0, len(ids), 1000)]
deleted_count = 0
for batch in batches:
response = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.index.delete(ids=batch, namespace=namespace)
)
deleted_count += len(batch)
return {'deleted_count': deleted_count}
elif metadata_filter:
delete_params['filter'] = metadata_filter
else:
raise ValueError("Must specify ids, filter, or delete_all=True")
# Execute delete
response = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.index.delete(**delete_params)
)
return {'status': 'success'}
except Exception as e:
logging.error(f"Delete operation failed: {e}")
raise
async def get_index_stats(self, namespace: str = "") -> Dict:
"""Get index statistics"""
try:
stats = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.index.describe_index_stats()
)
return {
'total_vector_count': stats.total_vector_count,
'dimension': stats.dimension,
'index_fullness': stats.index_fullness,
'namespace_stats': dict(stats.namespaces) if stats.namespaces else {}
}
except Exception as e:
logging.error(f"Failed to get index stats: {e}")
raise
# Usage example
async def main():
# Initialize vector store
vector_store = PineconeVectorStore(
api_key="your-pinecone-api-key",
environment="us-west1-gcp",
index_name="rag-documents",
dimension=1536
)
# Example: Upsert document vectors
document_vectors = [
{
'id': 'doc-1',
'vector': [0.1] * 1536, # Your actual embedding
'metadata': {
'title': 'Document Title',
'category': 'technical',
'timestamp': 1640995200
}
}
]
result = await vector_store.upsert_vectors(document_vectors)
print(f"Upserted {result['upserted_count']} vectors")
# Example: Search for similar documents
query_vector = [0.15] * 1536 # Your query embedding
search_results = await vector_store.similarity_search(
query_vector=query_vector,
top_k=5,
metadata_filter={'category': 'technical'}
)
for result in search_results:
print(f"ID: {result['id']}, Score: {result['score']:.4f}")
# Run the example
# asyncio.run(main())Performance Optimization Strategies
Index Optimization
- • HNSW Parameters: M, ef_construction tuning
- • Quantization: PQ, SQ for memory efficiency
- • Sharding: Distribute large indices
- • Replication: High availability setup
Query Optimization
- • Batch Queries: Process multiple vectors
- • Result Caching: Cache frequent queries
- • Prefiltering: Metadata before vector search
- • Async Processing: Non-blocking operations
System Architecture
- • Connection Pooling: Efficient client management
- • Circuit Breakers: Fault tolerance
- • Load Balancing: Distribute query load
- • Monitoring: Performance metrics
Production Vector Database Client
class ProductionVectorDB:
def __init__(self, config):
self.config = config
self.client = self.initialize_client()
self.connection_pool = self.setup_connection_pool()
self.metrics = MetricsCollector()
self.circuit_breaker = CircuitBreaker()
# Performance optimizations
self.query_cache = QueryCache(max_size=10000, ttl=300)
self.batch_processor = BatchProcessor(
max_batch_size=config.max_batch_size,
max_wait_time_ms=config.max_wait_time_ms
)
async def similarity_search(self,
query_vector: List[float],
top_k: int = 10,
metadata_filter: Dict = None,
use_cache: bool = True) -> List[SearchResult]:
"""Optimized similarity search with caching and batching"""
# Create cache key
cache_key = self.create_cache_key(query_vector, top_k, metadata_filter)
# Check cache first
if use_cache:
cached_result = await self.query_cache.get(cache_key)
if cached_result:
await self.metrics.record_cache_hit()
return cached_result
# Use circuit breaker for fault tolerance
if not self.circuit_breaker.can_execute():
raise VectorDBUnavailableError("Circuit breaker is open")
try:
# Execute search with timeout
start_time = time.time()
results = await asyncio.wait_for(
self._execute_similarity_search(
query_vector, top_k, metadata_filter
),
timeout=self.config.query_timeout_seconds
)
# Record metrics
query_time = time.time() - start_time
await self.metrics.record_query(
query_time=query_time,
result_count=len(results),
top_k=top_k
)
# Cache successful result
if use_cache and results:
await self.query_cache.set(cache_key, results)
# Update circuit breaker
self.circuit_breaker.record_success()
return results
except Exception as e:
# Record failure
self.circuit_breaker.record_failure()
await self.metrics.record_error(str(e))
raise
async def _execute_similarity_search(self,
query_vector: List[float],
top_k: int,
metadata_filter: Dict) -> List[SearchResult]:
"""Execute the actual similarity search"""
# Use batch processing if beneficial
if self.should_batch_query(query_vector):
return await self.batch_processor.add_query(
query_vector, top_k, metadata_filter
)
# Direct query for single requests
async with self.connection_pool.get_connection() as conn:
raw_results = await conn.query(
vector=query_vector,
top_k=top_k,
filter=metadata_filter,
include_metadata=True,
include_values=False # Don't return vectors to save bandwidth
)
# Process and rank results
return self.process_search_results(raw_results)
async def upsert_vectors(self,
vectors: List[VectorRecord],
batch_size: int = 100,
parallel_batches: int = 4) -> UpsertResult:
"""Efficient batch upsert with parallelization"""
total_vectors = len(vectors)
batches = [
vectors[i:i + batch_size]
for i in range(0, total_vectors, batch_size)
]
# Process batches in parallel with controlled concurrency
semaphore = asyncio.Semaphore(parallel_batches)
async def process_batch(batch):
async with semaphore:
return await self._upsert_batch(batch)
# Execute all batches
start_time = time.time()
batch_results = await asyncio.gather(
*[process_batch(batch) for batch in batches],
return_exceptions=True
)
# Aggregate results
successful_upserts = 0
failed_upserts = 0
errors = []
for result in batch_results:
if isinstance(result, Exception):
failed_upserts += batch_size
errors.append(str(result))
else:
successful_upserts += result.upserted_count
if result.errors:
failed_upserts += len(result.errors)
errors.extend(result.errors)
total_time = time.time() - start_time
# Record metrics
await self.metrics.record_upsert(
total_vectors=total_vectors,
successful=successful_upserts,
failed=failed_upserts,
duration=total_time,
throughput=total_vectors / total_time
)
return UpsertResult(
total_vectors=total_vectors,
successful_upserts=successful_upserts,
failed_upserts=failed_upserts,
errors=errors,
duration=total_time
)
async def _upsert_batch(self, batch: List[VectorRecord]) -> BatchUpsertResult:
"""Upsert a single batch of vectors"""
try:
async with self.connection_pool.get_connection() as conn:
# Prepare batch data
ids = [record.id for record in batch]
vectors = [record.vector for record in batch]
metadata = [record.metadata for record in batch]
# Execute upsert
result = await conn.upsert(
ids=ids,
vectors=vectors,
metadata=metadata
)
return BatchUpsertResult(
upserted_count=len(batch),
errors=[]
)
except Exception as e:
return BatchUpsertResult(
upserted_count=0,
errors=[str(e)]
)
class ConnectionPool:
"""Connection pool for vector database clients"""
def __init__(self, config):
self.config = config
self.pool = asyncio.Queue(maxsize=config.max_connections)
self.active_connections = 0
# Initialize pool
asyncio.create_task(self.initialize_pool())
async def initialize_pool(self):
"""Initialize connection pool with minimum connections"""
for _ in range(self.config.min_connections):
connection = await self.create_connection()
await self.pool.put(connection)
self.active_connections += 1
async def get_connection(self):
"""Get connection from pool or create new one"""
try:
# Try to get existing connection
connection = await asyncio.wait_for(
self.pool.get(),
timeout=self.config.connection_timeout
)
# Validate connection is still healthy
if await self.validate_connection(connection):
return ConnectionContext(connection, self.pool)
else:
# Connection is stale, create new one
await self.close_connection(connection)
self.active_connections -= 1
except asyncio.TimeoutError:
# No available connections, create new if under limit
pass
# Create new connection if under limit
if self.active_connections < self.config.max_connections:
connection = await self.create_connection()
self.active_connections += 1
return ConnectionContext(connection, self.pool)
# Pool exhausted, wait for available connection
connection = await self.pool.get()
return ConnectionContext(connection, self.pool)
class CircuitBreaker:
"""Circuit breaker pattern for vector database resilience"""
def __init__(self,
failure_threshold: int = 5,
timeout_seconds: int = 60,
expected_exception_types: tuple = (Exception,)):
self.failure_threshold = failure_threshold
self.timeout_seconds = timeout_seconds
self.expected_exception_types = expected_exception_types
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def can_execute(self) -> bool:
"""Check if execution is allowed"""
if self.state == 'CLOSED':
return True
elif self.state == 'OPEN':
# Check if timeout has passed
if (time.time() - self.last_failure_time) > self.timeout_seconds:
self.state = 'HALF_OPEN'
return True
return False
elif self.state == 'HALF_OPEN':
return True
return False
def record_success(self):
"""Record successful execution"""
if self.state == 'HALF_OPEN':
self.state = 'CLOSED'
self.failure_count = 0
self.last_failure_time = None
def record_failure(self):
"""Record failed execution"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'Monitoring & Observability
Key Metrics
- Query Performance:
P50/P95/P99 query latency, throughput
- Index Health:
Index size, memory usage, refresh rate
- Recall Quality:
Search accuracy, relevance scores
- Resource Utilization:
CPU, memory, disk I/O patterns
Health Checks
Vector DB Health Monitor
class VectorDBHealthMonitor:
def __init__(self, vector_db: ProductionVectorDB):
self.vector_db = vector_db
self.metrics = {}
self.health_status = 'unknown'
async def run_comprehensive_health_check(self) -> HealthCheckResult:
"""Run comprehensive health check"""
checks = {
'connectivity': self.check_connectivity,
'query_performance': self.check_query_performance,
'index_health': self.check_index_health,
'resource_usage': self.check_resource_usage
}
results = {}
overall_health = True
for check_name, check_func in checks.items():
try:
result = await check_func()
results[check_name] = result
if not result.healthy:
overall_health = False
except Exception as e:
results[check_name] = HealthCheck(
healthy=False,
message=f"Check failed: {str(e)}"
)
overall_health = False
return HealthCheckResult(
healthy=overall_health,
checks=results,
timestamp=time.time()
)
async def check_query_performance(self) -> HealthCheck:
"""Check query performance metrics"""
# Test query with known vector
test_vector = [0.1] * 1536 # Standard embedding size
start_time = time.time()
try:
results = await self.vector_db.similarity_search(
query_vector=test_vector,
top_k=10,
use_cache=False
)
query_time = time.time() - start_time
# Check if performance is within acceptable bounds
if query_time > 1.0: # 1 second threshold
return HealthCheck(
healthy=False,
message=f"Query too slow: {query_time:.2f}s",
metrics={'query_time': query_time}
)
return HealthCheck(
healthy=True,
message=f"Query performance good: {query_time:.2f}s",
metrics={'query_time': query_time, 'result_count': len(results)}
)
except Exception as e:
return HealthCheck(
healthy=False,
message=f"Query failed: {str(e)}"
)No quiz questions available
Quiz ID "vector-database-implementation" not found