Advanced ML Data Preparation
Comprehensive guide to data preprocessing, feature engineering, and pipeline design for production machine learning systems.
45 min read•Advanced
Not Started
Loading...
Data Pipeline Planning Calculator
Estimate processing time, costs, and quality improvements for your data preparation pipeline.
10K100M
70%98%
Pipeline Estimates
Processing Time:10.0h
Estimated Cost:$500
Accuracy Gain:+14%
Final Accuracy:98%
Quality Score:75/100
Pipeline Steps:8
Data Quality Assessment Framework
Data Profiling Pipeline
import pandas as pd
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple
class DataProfiler:
"""Comprehensive data profiling for ML pipelines"""
def __init__(self, df: pd.DataFrame):
self.df = df
self.profile_results = {}
def generate_profile(self) -> Dict:
"""Generate comprehensive data profile"""
profile = {
'basic_info': self._basic_info(),
'missing_analysis': self._missing_analysis(),
'distribution_analysis': self._distribution_analysis(),
'correlation_analysis': self._correlation_analysis(),
'outlier_analysis': self._outlier_analysis(),
'data_quality_score': self._calculate_quality_score()
}
self.profile_results = profile
return profile
def _basic_info(self) -> Dict:
"""Basic dataset information"""
return {
'shape': self.df.shape,
'memory_usage_mb': self.df.memory_usage(deep=True).sum() / 1024**2,
'dtypes': self.df.dtypes.value_counts().to_dict(),
'duplicate_rows': self.df.duplicated().sum(),
'unique_row_percentage': (self.df.drop_duplicates().shape[0] / self.df.shape[0]) * 100
}
def _missing_analysis(self) -> Dict:
"""Analyze missing values patterns"""
missing_counts = self.df.isnull().sum()
missing_percentages = (missing_counts / len(self.df)) * 100
return {
'total_missing_cells': missing_counts.sum(),
'missing_percentage_overall': (missing_counts.sum() / self.df.size) * 100,
'columns_with_missing': missing_counts[missing_counts > 0].to_dict(),
'missing_percentages_by_column': missing_percentages[missing_percentages > 0].to_dict(),
'complete_rows': len(self.df) - self.df.isnull().any(axis=1).sum()
}
def _distribution_analysis(self) -> Dict:
"""Analyze distributions of numerical columns"""
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
distributions = {}
for col in numeric_cols:
data = self.df[col].dropna()
if len(data) > 0:
distributions[col] = {
'mean': data.mean(),
'median': data.median(),
'std': data.std(),
'skewness': stats.skew(data),
'kurtosis': stats.kurtosis(data),
'min': data.min(),
'max': data.max(),
'q25': data.quantile(0.25),
'q75': data.quantile(0.75),
'normality_test_p_value': stats.normaltest(data)[1] if len(data) >= 8 else None
}
return distributions
def _correlation_analysis(self) -> Dict:
"""Analyze correlations between numerical features"""
numeric_df = self.df.select_dtypes(include=[np.number])
if numeric_df.shape[1] < 2:
return {'message': 'Insufficient numerical columns for correlation analysis'}
corr_matrix = numeric_df.corr()
# Find highly correlated pairs
high_corr_pairs = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
corr_val = corr_matrix.iloc[i, j]
if abs(corr_val) > 0.8: # High correlation threshold
high_corr_pairs.append({
'feature1': corr_matrix.columns[i],
'feature2': corr_matrix.columns[j],
'correlation': corr_val
})
return {
'correlation_matrix': corr_matrix.to_dict(),
'high_correlation_pairs': high_corr_pairs,
'max_correlation': corr_matrix.abs().unstack().sort_values(ascending=False).iloc[1] # Exclude self-correlations
}
def _outlier_analysis(self) -> Dict:
"""Detect outliers using IQR and z-score methods"""
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
outlier_analysis = {}
for col in numeric_cols:
data = self.df[col].dropna()
if len(data) > 0:
# IQR method
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
iqr_outliers = ((data < lower_bound) | (data > upper_bound)).sum()
# Z-score method (if data is roughly normal)
z_scores = np.abs(stats.zscore(data))
zscore_outliers = (z_scores > 3).sum()
outlier_analysis[col] = {
'iqr_outliers': int(iqr_outliers),
'iqr_outlier_percentage': (iqr_outliers / len(data)) * 100,
'zscore_outliers': int(zscore_outliers),
'zscore_outlier_percentage': (zscore_outliers / len(data)) * 100,
'outlier_bounds': {'lower': lower_bound, 'upper': upper_bound}
}
return outlier_analysis
def _calculate_quality_score(self) -> float:
"""Calculate overall data quality score (0-100)"""
score = 100.0
# Penalize missing data
missing_penalty = (self.df.isnull().sum().sum() / self.df.size) * 30
score -= missing_penalty
# Penalize duplicates
duplicate_penalty = (self.df.duplicated().sum() / len(self.df)) * 20
score -= duplicate_penalty
# Penalize high correlation (multicollinearity)
numeric_df = self.df.select_dtypes(include=[np.number])
if numeric_df.shape[1] > 1:
corr_matrix = numeric_df.corr().abs()
high_corr_count = ((corr_matrix > 0.9) & (corr_matrix < 1.0)).sum().sum()
correlation_penalty = (high_corr_count / (numeric_df.shape[1] ** 2)) * 15
score -= correlation_penalty
return max(0, score)
# Usage example
def analyze_dataset(df: pd.DataFrame):
"""Complete data quality analysis"""
profiler = DataProfiler(df)
profile = profiler.generate_profile()
print(f"Dataset Quality Score: {profile['data_quality_score']:.1f}/100")
print(f"Missing data: {profile['missing_analysis']['missing_percentage_overall']:.2f}%")
print(f"Duplicate rows: {profile['basic_info']['duplicate_rows']}")
return profileAdvanced Feature Engineering
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from category_encoders import TargetEncoder, CatBoostEncoder
import warnings
warnings.filterwarnings('ignore')
class AdvancedFeatureEngineer:
"""Advanced feature engineering pipeline"""
def __init__(self):
self.encoders = {}
self.scalers = {}
self.feature_selectors = {}
self.feature_history = []
def create_temporal_features(self, df: pd.DataFrame, date_col: str) -> pd.DataFrame:
"""Extract comprehensive temporal features"""
df = df.copy()
df[date_col] = pd.to_datetime(df[date_col])
# Basic temporal features
df[f'{date_col}_year'] = df[date_col].dt.year
df[f'{date_col}_month'] = df[date_col].dt.month
df[f'{date_col}_day'] = df[date_col].dt.day
df[f'{date_col}_dayofweek'] = df[date_col].dt.dayofweek
df[f'{date_col}_dayofyear'] = df[date_col].dt.dayofyear
df[f'{date_col}_week'] = df[date_col].dt.isocalendar().week
df[f'{date_col}_quarter'] = df[date_col].dt.quarter
# Advanced temporal features
df[f'{date_col}_is_weekend'] = (df[date_col].dt.dayofweek >= 5).astype(int)
df[f'{date_col}_is_month_end'] = df[date_col].dt.is_month_end.astype(int)
df[f'{date_col}_is_month_start'] = df[date_col].dt.is_month_start.astype(int)
df[f'{date_col}_days_since_epoch'] = (df[date_col] - pd.Timestamp('1970-01-01')).dt.days
# Cyclical encoding for temporal features
for period, max_val in [('month', 12), ('day', 31), ('dayofweek', 7), ('hour', 24)]:
if f'{date_col}_{period}' in df.columns or period == 'hour':
if period == 'hour' and date_col in df.columns:
values = df[date_col].dt.hour
col_name = f'{date_col}_hour'
else:
values = df[f'{date_col}_{period}']
col_name = f'{date_col}_{period}'
df[f'{col_name}_sin'] = np.sin(2 * np.pi * values / max_val)
df[f'{col_name}_cos'] = np.cos(2 * np.pi * values / max_val)
self.feature_history.append(f"Created temporal features from {date_col}")
return df
def create_interaction_features(self, df: pd.DataFrame, numeric_cols: List[str], max_interactions: int = 20) -> pd.DataFrame:
"""Create interaction features between numerical columns"""
df = df.copy()
interactions_created = 0
for i, col1 in enumerate(numeric_cols):
if interactions_created >= max_interactions:
break
for col2 in numeric_cols[i+1:]:
if interactions_created >= max_interactions:
break
# Multiplicative interaction
df[f'{col1}_x_{col2}'] = df[col1] * df[col2]
# Ratio interaction (avoid division by zero)
with np.errstate(divide='ignore', invalid='ignore'):
df[f'{col1}_div_{col2}'] = np.where(df[col2] != 0, df[col1] / df[col2], 0)
interactions_created += 2
self.feature_history.append(f"Created {interactions_created} interaction features")
return df
def encode_categorical_features(self, df: pd.DataFrame, cat_cols: List[str], target_col: str = None, method: str = 'auto') -> pd.DataFrame:
"""Advanced categorical encoding strategies"""
df = df.copy()
for col in cat_cols:
unique_count = df[col].nunique()
# Automatic method selection
if method == 'auto':
if unique_count <= 10:
# Low cardinality: One-hot encoding
dummies = pd.get_dummies(df[col], prefix=col, dummy_na=True)
df = pd.concat([df.drop(columns=[col]), dummies], axis=1)
self.feature_history.append(f"One-hot encoded {col} ({unique_count} categories)")
elif unique_count <= 50 and target_col is not None:
# Medium cardinality: Target encoding with regularization
encoder = TargetEncoder(smoothing=1.0, min_samples_leaf=1)
df[f'{col}_target_encoded'] = encoder.fit_transform(df[col], df[target_col])
self.encoders[col] = encoder
self.feature_history.append(f"Target encoded {col} ({unique_count} categories)")
else:
# High cardinality: Frequency encoding + Hash encoding
freq_map = df[col].value_counts(normalize=True).to_dict()
df[f'{col}_frequency'] = df[col].map(freq_map)
# Simple hash encoding for high cardinality
df[f'{col}_hash'] = df[col].astype(str).apply(lambda x: hash(x) % 1000)
self.feature_history.append(f"Frequency + Hash encoded {col} ({unique_count} categories)")
return df
def create_aggregated_features(self, df: pd.DataFrame, group_cols: List[str], agg_cols: List[str]) -> pd.DataFrame:
"""Create aggregated features based on groupings"""
df = df.copy()
for group_col in group_cols:
for agg_col in agg_cols:
if group_col != agg_col and df[agg_col].dtype in ['int64', 'float64']:
# Create various aggregations
agg_features = df.groupby(group_col)[agg_col].agg([
'mean', 'median', 'std', 'min', 'max', 'count'
]).add_prefix(f'{group_col}_{agg_col}_')
df = df.merge(agg_features, left_on=group_col, right_index=True, how='left')
self.feature_history.append(f"Created aggregated features for {len(group_cols)} grouping columns")
return df
def select_features(self, X: pd.DataFrame, y: pd.Series, method: str = 'mutual_info', k: int = 50) -> pd.DataFrame:
"""Intelligent feature selection"""
if method == 'mutual_info':
selector = SelectKBest(score_func=mutual_info_classif, k=min(k, X.shape[1]))
else:
selector = SelectKBest(score_func=f_classif, k=min(k, X.shape[1]))
X_selected = selector.fit_transform(X, y)
selected_features = X.columns[selector.get_support()]
self.feature_selectors['primary'] = selector
self.feature_history.append(f"Selected {len(selected_features)} features using {method}")
return pd.DataFrame(X_selected, columns=selected_features, index=X.index)
def get_feature_importance_report(self, selector_name: str = 'primary') -> pd.DataFrame:
"""Get feature importance scores"""
if selector_name not in self.feature_selectors:
return pd.DataFrame()
selector = self.feature_selectors[selector_name]
scores = selector.scores_
feature_names = self.original_features
importance_df = pd.DataFrame({
'feature': feature_names,
'importance_score': scores
}).sort_values('importance_score', ascending=False)
return importance_df
# Example usage pipeline
def comprehensive_feature_engineering(df: pd.DataFrame, target_col: str, date_col: str = None) -> pd.DataFrame:
"""Complete feature engineering pipeline"""
engineer = AdvancedFeatureEngineer()
# 1. Handle temporal features
if date_col and date_col in df.columns:
df = engineer.create_temporal_features(df, date_col)
# 2. Identify column types
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
if target_col in numeric_cols:
numeric_cols.remove(target_col)
categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
# 3. Create interaction features (limited to prevent explosion)
if len(numeric_cols) > 1:
df = engineer.create_interaction_features(df, numeric_cols[:10], max_interactions=15)
# 4. Encode categorical features
if categorical_cols:
df = engineer.encode_categorical_features(df, categorical_cols, target_col)
# 5. Create aggregated features if we have categorical grouping columns
if categorical_cols and numeric_cols:
df = engineer.create_aggregated_features(df, categorical_cols[:3], numeric_cols[:5])
print("Feature Engineering Summary:")
for step in engineer.feature_history:
print(f"- {step}")
return dfProduction Data Pipeline Architecture
Scalable Data Processing Pipeline
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText, WriteToText
import pandas as pd
from typing import Dict, Any, Iterator
import logging
class DataValidation(beam.DoFn):
"""Data validation and quality checks"""
def __init__(self, validation_rules: Dict[str, Any]):
self.validation_rules = validation_rules
self.error_counter = beam.metrics.Metrics.counter('data_validation', 'errors')
self.valid_counter = beam.metrics.Metrics.counter('data_validation', 'valid_records')
def process(self, element: Dict) -> Iterator[Dict]:
"""Validate individual record"""
errors = []
# Check required fields
for field in self.validation_rules.get('required_fields', []):
if field not in element or element[field] is None or element[field] == '':
errors.append(f"Missing required field: {field}")
# Check data types
for field, expected_type in self.validation_rules.get('field_types', {}).items():
if field in element:
try:
if expected_type == 'numeric':
float(element[field])
elif expected_type == 'date':
pd.to_datetime(element[field])
except (ValueError, TypeError):
errors.append(f"Invalid {expected_type} for field {field}: {element[field]}")
# Check value ranges
for field, range_def in self.validation_rules.get('value_ranges', {}).items():
if field in element:
try:
value = float(element[field])
if 'min' in range_def and value < range_def['min']:
errors.append(f"Value {value} below minimum {range_def['min']} for field {field}")
if 'max' in range_def and value > range_def['max']:
errors.append(f"Value {value} above maximum {range_def['max']} for field {field}")
except (ValueError, TypeError):
pass # Already caught in type validation
if errors:
self.error_counter.inc()
element['validation_errors'] = errors
element['is_valid'] = False
else:
self.valid_counter.inc()
element['is_valid'] = True
yield element
class FeatureEngineering(beam.DoFn):
"""Feature engineering transformations"""
def process(self, element: Dict) -> Iterator[Dict]:
"""Apply feature engineering to valid records"""
if not element.get('is_valid', False):
yield element
return
# Create derived features
try:
# Temporal features
if 'timestamp' in element:
timestamp = pd.to_datetime(element['timestamp'])
element['hour'] = timestamp.hour
element['day_of_week'] = timestamp.dayofweek
element['is_weekend'] = 1 if timestamp.dayofweek >= 5 else 0
element['month'] = timestamp.month
# Interaction features
if 'feature1' in element and 'feature2' in element:
try:
element['feature1_x_feature2'] = float(element['feature1']) * float(element['feature2'])
except (ValueError, TypeError):
pass
# Categorical encoding (simplified)
if 'category' in element:
# This would typically use pre-fitted encoders
category_mapping = {'A': 1, 'B': 2, 'C': 3} # Example mapping
element['category_encoded'] = category_mapping.get(element['category'], 0)
except Exception as e:
logging.error(f"Feature engineering error: {e}")
element['feature_engineering_error'] = str(e)
yield element
class DataNormalization(beam.DoFn):
"""Data normalization and scaling"""
def __init__(self, scaling_params: Dict[str, Dict]):
self.scaling_params = scaling_params
def process(self, element: Dict) -> Iterator[Dict]:
"""Normalize numeric features"""
if not element.get('is_valid', False):
yield element
return
for field, params in self.scaling_params.items():
if field in element:
try:
value = float(element[field])
if params['method'] == 'standard':
# Standard scaling: (x - mean) / std
normalized_value = (value - params['mean']) / params['std']
elif params['method'] == 'minmax':
# Min-max scaling: (x - min) / (max - min)
normalized_value = (value - params['min']) / (params['max'] - params['min'])
else:
normalized_value = value
element[f'{field}_normalized'] = normalized_value
except (ValueError, TypeError, ZeroDivisionError):
logging.warning(f"Could not normalize field {field} with value {element[field]}")
yield element
def run_data_pipeline(input_path: str, output_path: str, validation_rules: Dict, scaling_params: Dict):
"""Run the complete data processing pipeline"""
pipeline_options = PipelineOptions([
'--runner=DataflowRunner', # Use DirectRunner for local testing
'--project=your-gcp-project',
'--region=us-central1',
'--temp_location=gs://your-bucket/temp',
'--staging_location=gs://your-bucket/staging'
])
with beam.Pipeline(options=pipeline_options) as pipeline:
# Read raw data
raw_data = (
pipeline
| 'Read Input' >> ReadFromText(input_path)
| 'Parse JSON' >> beam.Map(lambda x: json.loads(x))
)
# Data validation
validated_data = (
raw_data
| 'Validate Data' >> beam.ParDo(DataValidation(validation_rules))
)
# Split valid and invalid records
valid_data = (
validated_data
| 'Filter Valid' >> beam.Filter(lambda x: x.get('is_valid', False))
)
invalid_data = (
validated_data
| 'Filter Invalid' >> beam.Filter(lambda x: not x.get('is_valid', False))
)
# Process valid data
processed_data = (
valid_data
| 'Feature Engineering' >> beam.ParDo(FeatureEngineering())
| 'Data Normalization' >> beam.ParDo(DataNormalization(scaling_params))
)
# Write outputs
(
processed_data
| 'Convert to JSON' >> beam.Map(json.dumps)
| 'Write Processed' >> WriteToText(f'{output_path}/processed')
)
(
invalid_data
| 'Convert Errors to JSON' >> beam.Map(json.dumps)
| 'Write Errors' >> WriteToText(f'{output_path}/errors')
)
# Example configuration
validation_config = {
'required_fields': ['user_id', 'timestamp', 'feature1'],
'field_types': {
'feature1': 'numeric',
'feature2': 'numeric',
'timestamp': 'date'
},
'value_ranges': {
'feature1': {'min': 0, 'max': 1000},
'feature2': {'min': -10, 'max': 10}
}
}
scaling_config = {
'feature1': {'method': 'standard', 'mean': 50.0, 'std': 15.0},
'feature2': {'method': 'minmax', 'min': -10.0, 'max': 10.0}
}
# Run the pipeline
if __name__ == '__main__':
run_data_pipeline(
input_path='gs://your-bucket/input/*.json',
output_path='gs://your-bucket/output',
validation_rules=validation_config,
scaling_params=scaling_config
)Feature Store Architecture
Feature Store Implementation
from datetime import datetime, timedelta
import pandas as pd
import redis
from typing import List, Dict, Any, Optional
import pickle
import logging
class FeatureStore:
"""Production-ready feature store implementation"""
def __init__(self, redis_host: str = 'localhost', redis_port: int = 6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=False)
self.feature_metadata = {}
def register_feature(self, feature_name: str, feature_type: str, description: str,
default_ttl: int = 86400, feature_group: str = None):
"""Register a new feature with metadata"""
metadata = {
'name': feature_name,
'type': feature_type,
'description': description,
'default_ttl': default_ttl,
'feature_group': feature_group,
'created_at': datetime.now().isoformat(),
'last_updated': datetime.now().isoformat()
}
self.feature_metadata[feature_name] = metadata
# Store metadata in Redis
self.redis_client.hset('feature_metadata', feature_name, pickle.dumps(metadata))
def store_feature(self, entity_id: str, feature_name: str, value: Any, ttl: Optional[int] = None):
"""Store a feature value for an entity"""
if feature_name not in self.feature_metadata:
logging.warning(f"Feature {feature_name} not registered. Consider registering first.")
key = f"feature:{feature_name}:{entity_id}"
# Serialize complex objects
if isinstance(value, (dict, list)):
value = pickle.dumps(value)
ttl = ttl or self.feature_metadata.get(feature_name, {}).get('default_ttl', 86400)
self.redis_client.setex(key, ttl, value)
# Update metadata
if feature_name in self.feature_metadata:
self.feature_metadata[feature_name]['last_updated'] = datetime.now().isoformat()
self.redis_client.hset('feature_metadata', feature_name,
pickle.dumps(self.feature_metadata[feature_name]))
def get_feature(self, entity_id: str, feature_name: str) -> Optional[Any]:
"""Retrieve a feature value for an entity"""
key = f"feature:{feature_name}:{entity_id}"
value = self.redis_client.get(key)
if value is None:
return None
# Try to deserialize if it's a pickled object
try:
return pickle.loads(value)
except (pickle.UnpicklingError, TypeError):
return value.decode() if isinstance(value, bytes) else value
def get_features_batch(self, entity_ids: List[str], feature_names: List[str]) -> pd.DataFrame:
"""Retrieve multiple features for multiple entities efficiently"""
results = []
# Use pipeline for efficient batch operations
pipeline = self.redis_client.pipeline()
# Queue all operations
keys = []
for entity_id in entity_ids:
for feature_name in feature_names:
key = f"feature:{feature_name}:{entity_id}"
keys.append((entity_id, feature_name, key))
pipeline.get(key)
# Execute all operations
values = pipeline.execute()
# Process results
for i, (entity_id, feature_name, key) in enumerate(keys):
value = values[i]
if value is not None:
try:
value = pickle.loads(value)
except (pickle.UnpicklingError, TypeError):
value = value.decode() if isinstance(value, bytes) else value
results.append({
'entity_id': entity_id,
'feature_name': feature_name,
'value': value
})
# Convert to DataFrame
df = pd.DataFrame(results)
if len(df) > 0:
df_pivot = df.pivot(index='entity_id', columns='feature_name', values='value')
df_pivot.index.name = 'entity_id'
return df_pivot.reset_index()
else:
return pd.DataFrame()
def compute_and_store_feature_group(self, feature_group_name: str, entities: List[str],
compute_function, **kwargs):
"""Compute and store a group of related features"""
logging.info(f"Computing feature group: {feature_group_name} for {len(entities)} entities")
for entity_id in entities:
try:
# Compute features for this entity
features = compute_function(entity_id, **kwargs)
# Store each feature
for feature_name, feature_value in features.items():
full_feature_name = f"{feature_group_name}_{feature_name}"
self.store_feature(entity_id, full_feature_name, feature_value)
except Exception as e:
logging.error(f"Error computing features for entity {entity_id}: {e}")
def get_feature_freshness(self, feature_name: str, entity_ids: List[str]) -> Dict[str, float]:
"""Check how fresh features are (time since last update)"""
pipeline = self.redis_client.pipeline()
for entity_id in entity_ids:
key = f"feature:{feature_name}:{entity_id}"
pipeline.ttl(key)
ttls = pipeline.execute()
freshness = {}
for i, entity_id in enumerate(entity_ids):
ttl = ttls[i]
if ttl > 0:
# Calculate how long ago it was stored (assuming default TTL)
default_ttl = self.feature_metadata.get(feature_name, {}).get('default_ttl', 86400)
age_seconds = default_ttl - ttl
freshness[entity_id] = age_seconds / 3600 # Convert to hours
else:
freshness[entity_id] = None # Feature not found or expired
return freshness
def cleanup_expired_features(self):
"""Clean up expired features (Redis handles this automatically, but useful for monitoring)"""
cursor = 0
expired_count = 0
while True:
cursor, keys = self.redis_client.scan(cursor=cursor, match="feature:*", count=1000)
for key in keys:
ttl = self.redis_client.ttl(key)
if ttl == -2: # Key doesn't exist
expired_count += 1
if cursor == 0:
break
logging.info(f"Found {expired_count} expired feature keys")
return expired_count
# Example usage
def user_engagement_features(user_id: str, lookback_days: int = 30) -> Dict[str, Any]:
"""Example feature computation function"""
# This would typically query your data warehouse/database
# For demo purposes, we'll simulate some features
features = {
'total_sessions': 15,
'avg_session_duration': 12.5,
'pages_per_session': 3.2,
'bounce_rate': 0.25,
'last_activity_days_ago': 2,
'preferred_category': 'electronics',
'is_premium': False
}
return features
# Usage example
feature_store = FeatureStore()
# Register features
feature_store.register_feature('user_total_sessions', 'int', 'Total user sessions in lookback period')
feature_store.register_feature('user_avg_session_duration', 'float', 'Average session duration')
feature_store.register_feature('user_preferred_category', 'str', 'Most visited product category')
# Compute and store features for multiple users
user_ids = ['user_123', 'user_456', 'user_789']
feature_store.compute_and_store_feature_group(
'user_engagement',
user_ids,
user_engagement_features,
lookback_days=30
)
# Retrieve features for model inference
features_df = feature_store.get_features_batch(
entity_ids=['user_123', 'user_456'],
feature_names=['user_engagement_total_sessions', 'user_engagement_avg_session_duration']
)
print("Retrieved features:")
print(features_df)Data Quality Monitoring
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import logging
import json
class DataQualityMonitor:
"""Monitor data quality metrics in production"""
def __init__(self, alert_thresholds: Dict[str, float]):
self.alert_thresholds = alert_thresholds
self.baseline_stats = {}
self.alerts = []
def establish_baseline(self, df: pd.DataFrame, feature_cols: List[str]):
"""Establish baseline statistics for features"""
for col in feature_cols:
if col in df.columns:
if df[col].dtype in ['int64', 'float64']:
self.baseline_stats[col] = {
'mean': df[col].mean(),
'std': df[col].std(),
'median': df[col].median(),
'q1': df[col].quantile(0.25),
'q3': df[col].quantile(0.75),
'min': df[col].min(),
'max': df[col].max(),
'missing_rate': df[col].isnull().mean()
}
else:
# Categorical features
value_counts = df[col].value_counts(normalize=True)
self.baseline_stats[col] = {
'top_categories': value_counts.head(10).to_dict(),
'cardinality': df[col].nunique(),
'missing_rate': df[col].isnull().mean()
}
logging.info(f"Established baseline for {len(self.baseline_stats)} features")
def monitor_batch(self, df: pd.DataFrame, batch_id: str = None) -> Dict[str, Any]:
"""Monitor a new batch of data against baseline"""
batch_id = batch_id or datetime.now().strftime("%Y%m%d_%H%M%S")
alerts = []
quality_scores = {}
for col, baseline in self.baseline_stats.items():
if col not in df.columns:
alerts.append({
'type': 'missing_feature',
'feature': col,
'severity': 'high',
'message': f"Feature {col} missing from batch"
})
continue
current_stats = self._calculate_current_stats(df[col])
quality_score, col_alerts = self._compare_with_baseline(col, current_stats, baseline)
quality_scores[col] = quality_score
alerts.extend(col_alerts)
# Overall quality assessment
overall_quality = np.mean(list(quality_scores.values())) if quality_scores else 0
report = {
'batch_id': batch_id,
'timestamp': datetime.now().isoformat(),
'overall_quality_score': overall_quality,
'feature_quality_scores': quality_scores,
'alerts': alerts,
'batch_size': len(df),
'features_monitored': len(quality_scores)
}
# Log high-severity alerts
high_severity_alerts = [a for a in alerts if a.get('severity') == 'high']
if high_severity_alerts:
logging.warning(f"High severity data quality alerts for batch {batch_id}: {len(high_severity_alerts)} issues")
return report
def _calculate_current_stats(self, series: pd.Series) -> Dict:
"""Calculate current statistics for a feature"""
if series.dtype in ['int64', 'float64']:
return {
'mean': series.mean(),
'std': series.std(),
'median': series.median(),
'q1': series.quantile(0.25),
'q3': series.quantile(0.75),
'min': series.min(),
'max': series.max(),
'missing_rate': series.isnull().mean()
}
else:
value_counts = series.value_counts(normalize=True)
return {
'top_categories': value_counts.head(10).to_dict(),
'cardinality': series.nunique(),
'missing_rate': series.isnull().mean()
}
def _compare_with_baseline(self, feature: str, current: Dict, baseline: Dict) -> Tuple[float, List[Dict]]:
"""Compare current stats with baseline and generate alerts"""
alerts = []
quality_score = 100.0
if 'mean' in baseline: # Numeric feature
# Check for distribution drift
mean_drift = abs((current['mean'] - baseline['mean']) / baseline['mean']) if baseline['mean'] != 0 else 0
if mean_drift > self.alert_thresholds.get('mean_drift', 0.2):
alerts.append({
'type': 'distribution_drift',
'feature': feature,
'severity': 'medium',
'message': f"Mean drift of {mean_drift:.2%} detected",
'current_value': current['mean'],
'baseline_value': baseline['mean']
})
quality_score -= 20
# Check for outliers (values outside baseline range)
if current['min'] < baseline['min'] - 3 * baseline['std']:
alerts.append({
'type': 'outlier_detected',
'feature': feature,
'severity': 'low',
'message': f"Values below expected range detected: {current['min']}"
})
quality_score -= 10
if current['max'] > baseline['max'] + 3 * baseline['std']:
alerts.append({
'type': 'outlier_detected',
'feature': feature,
'severity': 'low',
'message': f"Values above expected range detected: {current['max']}"
})
quality_score -= 10
else: # Categorical feature
# Check for new categories
current_cats = set(current['top_categories'].keys())
baseline_cats = set(baseline['top_categories'].keys())
new_categories = current_cats - baseline_cats
if new_categories:
alerts.append({
'type': 'new_categories',
'feature': feature,
'severity': 'medium',
'message': f"New categories detected: {list(new_categories)[:5]}"
})
quality_score -= 15
# Check for significant changes in category distribution
for cat, baseline_freq in baseline['top_categories'].items():
current_freq = current['top_categories'].get(cat, 0)
freq_change = abs(current_freq - baseline_freq)
if freq_change > self.alert_thresholds.get('category_drift', 0.1):
alerts.append({
'type': 'category_distribution_drift',
'feature': feature,
'severity': 'medium',
'message': f"Category '{cat}' frequency changed by {freq_change:.2%}"
})
quality_score -= 10
# Check missing value rate
missing_rate_change = current['missing_rate'] - baseline['missing_rate']
if missing_rate_change > self.alert_thresholds.get('missing_rate_increase', 0.05):
alerts.append({
'type': 'missing_rate_increase',
'feature': feature,
'severity': 'high',
'message': f"Missing rate increased by {missing_rate_change:.2%}"
})
quality_score -= 30
return max(0, quality_score), alerts
# Usage example
monitor = DataQualityMonitor(alert_thresholds={
'mean_drift': 0.15,
'category_drift': 0.08,
'missing_rate_increase': 0.03
})
# Establish baseline from training data
training_data = pd.DataFrame({
'feature1': np.random.normal(50, 10, 1000),
'feature2': np.random.choice(['A', 'B', 'C'], 1000),
'feature3': np.random.exponential(2, 1000)
})
monitor.establish_baseline(training_data, ['feature1', 'feature2', 'feature3'])
# Monitor new batch
new_batch = pd.DataFrame({
'feature1': np.random.normal(55, 12, 500), # Slight drift
'feature2': np.random.choice(['A', 'B', 'C', 'D'], 500), # New category
'feature3': np.random.exponential(2, 500)
})
quality_report = monitor.monitor_batch(new_batch)
print("Data Quality Report:")
print(json.dumps(quality_report, indent=2, default=str))No quiz questions available
Quiz ID "data-preparation" not found