Skip to main contentSkip to user menuSkip to navigation

Advanced ML Data Preparation

Comprehensive guide to data preprocessing, feature engineering, and pipeline design for production machine learning systems.

45 min readAdvanced
Not Started
Loading...

Data Pipeline Planning Calculator

Estimate processing time, costs, and quality improvements for your data preparation pipeline.

10K100M
70%98%

Pipeline Estimates

Processing Time:10.0h
Estimated Cost:$500
Accuracy Gain:+14%
Final Accuracy:98%
Quality Score:75/100
Pipeline Steps:8

Data Quality Assessment Framework

Data Profiling Pipeline

import pandas as pd
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple

class DataProfiler:
    """Comprehensive data profiling for ML pipelines"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df
        self.profile_results = {}
    
    def generate_profile(self) -> Dict:
        """Generate comprehensive data profile"""
        profile = {
            'basic_info': self._basic_info(),
            'missing_analysis': self._missing_analysis(),
            'distribution_analysis': self._distribution_analysis(),
            'correlation_analysis': self._correlation_analysis(),
            'outlier_analysis': self._outlier_analysis(),
            'data_quality_score': self._calculate_quality_score()
        }
        self.profile_results = profile
        return profile
    
    def _basic_info(self) -> Dict:
        """Basic dataset information"""
        return {
            'shape': self.df.shape,
            'memory_usage_mb': self.df.memory_usage(deep=True).sum() / 1024**2,
            'dtypes': self.df.dtypes.value_counts().to_dict(),
            'duplicate_rows': self.df.duplicated().sum(),
            'unique_row_percentage': (self.df.drop_duplicates().shape[0] / self.df.shape[0]) * 100
        }
    
    def _missing_analysis(self) -> Dict:
        """Analyze missing values patterns"""
        missing_counts = self.df.isnull().sum()
        missing_percentages = (missing_counts / len(self.df)) * 100
        
        return {
            'total_missing_cells': missing_counts.sum(),
            'missing_percentage_overall': (missing_counts.sum() / self.df.size) * 100,
            'columns_with_missing': missing_counts[missing_counts > 0].to_dict(),
            'missing_percentages_by_column': missing_percentages[missing_percentages > 0].to_dict(),
            'complete_rows': len(self.df) - self.df.isnull().any(axis=1).sum()
        }
    
    def _distribution_analysis(self) -> Dict:
        """Analyze distributions of numerical columns"""
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        distributions = {}
        
        for col in numeric_cols:
            data = self.df[col].dropna()
            if len(data) > 0:
                distributions[col] = {
                    'mean': data.mean(),
                    'median': data.median(),
                    'std': data.std(),
                    'skewness': stats.skew(data),
                    'kurtosis': stats.kurtosis(data),
                    'min': data.min(),
                    'max': data.max(),
                    'q25': data.quantile(0.25),
                    'q75': data.quantile(0.75),
                    'normality_test_p_value': stats.normaltest(data)[1] if len(data) >= 8 else None
                }
        
        return distributions
    
    def _correlation_analysis(self) -> Dict:
        """Analyze correlations between numerical features"""
        numeric_df = self.df.select_dtypes(include=[np.number])
        
        if numeric_df.shape[1] < 2:
            return {'message': 'Insufficient numerical columns for correlation analysis'}
        
        corr_matrix = numeric_df.corr()
        
        # Find highly correlated pairs
        high_corr_pairs = []
        for i in range(len(corr_matrix.columns)):
            for j in range(i+1, len(corr_matrix.columns)):
                corr_val = corr_matrix.iloc[i, j]
                if abs(corr_val) > 0.8:  # High correlation threshold
                    high_corr_pairs.append({
                        'feature1': corr_matrix.columns[i],
                        'feature2': corr_matrix.columns[j],
                        'correlation': corr_val
                    })
        
        return {
            'correlation_matrix': corr_matrix.to_dict(),
            'high_correlation_pairs': high_corr_pairs,
            'max_correlation': corr_matrix.abs().unstack().sort_values(ascending=False).iloc[1]  # Exclude self-correlations
        }
    
    def _outlier_analysis(self) -> Dict:
        """Detect outliers using IQR and z-score methods"""
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        outlier_analysis = {}
        
        for col in numeric_cols:
            data = self.df[col].dropna()
            if len(data) > 0:
                # IQR method
                Q1 = data.quantile(0.25)
                Q3 = data.quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                iqr_outliers = ((data < lower_bound) | (data > upper_bound)).sum()
                
                # Z-score method (if data is roughly normal)
                z_scores = np.abs(stats.zscore(data))
                zscore_outliers = (z_scores > 3).sum()
                
                outlier_analysis[col] = {
                    'iqr_outliers': int(iqr_outliers),
                    'iqr_outlier_percentage': (iqr_outliers / len(data)) * 100,
                    'zscore_outliers': int(zscore_outliers),
                    'zscore_outlier_percentage': (zscore_outliers / len(data)) * 100,
                    'outlier_bounds': {'lower': lower_bound, 'upper': upper_bound}
                }
        
        return outlier_analysis
    
    def _calculate_quality_score(self) -> float:
        """Calculate overall data quality score (0-100)"""
        score = 100.0
        
        # Penalize missing data
        missing_penalty = (self.df.isnull().sum().sum() / self.df.size) * 30
        score -= missing_penalty
        
        # Penalize duplicates
        duplicate_penalty = (self.df.duplicated().sum() / len(self.df)) * 20
        score -= duplicate_penalty
        
        # Penalize high correlation (multicollinearity)
        numeric_df = self.df.select_dtypes(include=[np.number])
        if numeric_df.shape[1] > 1:
            corr_matrix = numeric_df.corr().abs()
            high_corr_count = ((corr_matrix > 0.9) & (corr_matrix < 1.0)).sum().sum()
            correlation_penalty = (high_corr_count / (numeric_df.shape[1] ** 2)) * 15
            score -= correlation_penalty
        
        return max(0, score)

# Usage example
def analyze_dataset(df: pd.DataFrame):
    """Complete data quality analysis"""
    profiler = DataProfiler(df)
    profile = profiler.generate_profile()
    
    print(f"Dataset Quality Score: {profile['data_quality_score']:.1f}/100")
    print(f"Missing data: {profile['missing_analysis']['missing_percentage_overall']:.2f}%")
    print(f"Duplicate rows: {profile['basic_info']['duplicate_rows']}")
    
    return profile

Advanced Feature Engineering

import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from category_encoders import TargetEncoder, CatBoostEncoder
import warnings
warnings.filterwarnings('ignore')

class AdvancedFeatureEngineer:
    """Advanced feature engineering pipeline"""
    
    def __init__(self):
        self.encoders = {}
        self.scalers = {}
        self.feature_selectors = {}
        self.feature_history = []
    
    def create_temporal_features(self, df: pd.DataFrame, date_col: str) -> pd.DataFrame:
        """Extract comprehensive temporal features"""
        df = df.copy()
        df[date_col] = pd.to_datetime(df[date_col])
        
        # Basic temporal features
        df[f'{date_col}_year'] = df[date_col].dt.year
        df[f'{date_col}_month'] = df[date_col].dt.month
        df[f'{date_col}_day'] = df[date_col].dt.day
        df[f'{date_col}_dayofweek'] = df[date_col].dt.dayofweek
        df[f'{date_col}_dayofyear'] = df[date_col].dt.dayofyear
        df[f'{date_col}_week'] = df[date_col].dt.isocalendar().week
        df[f'{date_col}_quarter'] = df[date_col].dt.quarter
        
        # Advanced temporal features
        df[f'{date_col}_is_weekend'] = (df[date_col].dt.dayofweek >= 5).astype(int)
        df[f'{date_col}_is_month_end'] = df[date_col].dt.is_month_end.astype(int)
        df[f'{date_col}_is_month_start'] = df[date_col].dt.is_month_start.astype(int)
        df[f'{date_col}_days_since_epoch'] = (df[date_col] - pd.Timestamp('1970-01-01')).dt.days
        
        # Cyclical encoding for temporal features
        for period, max_val in [('month', 12), ('day', 31), ('dayofweek', 7), ('hour', 24)]:
            if f'{date_col}_{period}' in df.columns or period == 'hour':
                if period == 'hour' and date_col in df.columns:
                    values = df[date_col].dt.hour
                    col_name = f'{date_col}_hour'
                else:
                    values = df[f'{date_col}_{period}']
                    col_name = f'{date_col}_{period}'
                
                df[f'{col_name}_sin'] = np.sin(2 * np.pi * values / max_val)
                df[f'{col_name}_cos'] = np.cos(2 * np.pi * values / max_val)
        
        self.feature_history.append(f"Created temporal features from {date_col}")
        return df
    
    def create_interaction_features(self, df: pd.DataFrame, numeric_cols: List[str], max_interactions: int = 20) -> pd.DataFrame:
        """Create interaction features between numerical columns"""
        df = df.copy()
        interactions_created = 0
        
        for i, col1 in enumerate(numeric_cols):
            if interactions_created >= max_interactions:
                break
            for col2 in numeric_cols[i+1:]:
                if interactions_created >= max_interactions:
                    break
                
                # Multiplicative interaction
                df[f'{col1}_x_{col2}'] = df[col1] * df[col2]
                
                # Ratio interaction (avoid division by zero)
                with np.errstate(divide='ignore', invalid='ignore'):
                    df[f'{col1}_div_{col2}'] = np.where(df[col2] != 0, df[col1] / df[col2], 0)
                
                interactions_created += 2
        
        self.feature_history.append(f"Created {interactions_created} interaction features")
        return df
    
    def encode_categorical_features(self, df: pd.DataFrame, cat_cols: List[str], target_col: str = None, method: str = 'auto') -> pd.DataFrame:
        """Advanced categorical encoding strategies"""
        df = df.copy()
        
        for col in cat_cols:
            unique_count = df[col].nunique()
            
            # Automatic method selection
            if method == 'auto':
                if unique_count <= 10:
                    # Low cardinality: One-hot encoding
                    dummies = pd.get_dummies(df[col], prefix=col, dummy_na=True)
                    df = pd.concat([df.drop(columns=[col]), dummies], axis=1)
                    self.feature_history.append(f"One-hot encoded {col} ({unique_count} categories)")
                
                elif unique_count <= 50 and target_col is not None:
                    # Medium cardinality: Target encoding with regularization
                    encoder = TargetEncoder(smoothing=1.0, min_samples_leaf=1)
                    df[f'{col}_target_encoded'] = encoder.fit_transform(df[col], df[target_col])
                    self.encoders[col] = encoder
                    self.feature_history.append(f"Target encoded {col} ({unique_count} categories)")
                
                else:
                    # High cardinality: Frequency encoding + Hash encoding
                    freq_map = df[col].value_counts(normalize=True).to_dict()
                    df[f'{col}_frequency'] = df[col].map(freq_map)
                    
                    # Simple hash encoding for high cardinality
                    df[f'{col}_hash'] = df[col].astype(str).apply(lambda x: hash(x) % 1000)
                    self.feature_history.append(f"Frequency + Hash encoded {col} ({unique_count} categories)")
        
        return df
    
    def create_aggregated_features(self, df: pd.DataFrame, group_cols: List[str], agg_cols: List[str]) -> pd.DataFrame:
        """Create aggregated features based on groupings"""
        df = df.copy()
        
        for group_col in group_cols:
            for agg_col in agg_cols:
                if group_col != agg_col and df[agg_col].dtype in ['int64', 'float64']:
                    # Create various aggregations
                    agg_features = df.groupby(group_col)[agg_col].agg([
                        'mean', 'median', 'std', 'min', 'max', 'count'
                    ]).add_prefix(f'{group_col}_{agg_col}_')
                    
                    df = df.merge(agg_features, left_on=group_col, right_index=True, how='left')
        
        self.feature_history.append(f"Created aggregated features for {len(group_cols)} grouping columns")
        return df
    
    def select_features(self, X: pd.DataFrame, y: pd.Series, method: str = 'mutual_info', k: int = 50) -> pd.DataFrame:
        """Intelligent feature selection"""
        if method == 'mutual_info':
            selector = SelectKBest(score_func=mutual_info_classif, k=min(k, X.shape[1]))
        else:
            selector = SelectKBest(score_func=f_classif, k=min(k, X.shape[1]))
        
        X_selected = selector.fit_transform(X, y)
        selected_features = X.columns[selector.get_support()]
        
        self.feature_selectors['primary'] = selector
        self.feature_history.append(f"Selected {len(selected_features)} features using {method}")
        
        return pd.DataFrame(X_selected, columns=selected_features, index=X.index)
    
    def get_feature_importance_report(self, selector_name: str = 'primary') -> pd.DataFrame:
        """Get feature importance scores"""
        if selector_name not in self.feature_selectors:
            return pd.DataFrame()
        
        selector = self.feature_selectors[selector_name]
        scores = selector.scores_
        feature_names = self.original_features
        
        importance_df = pd.DataFrame({
            'feature': feature_names,
            'importance_score': scores
        }).sort_values('importance_score', ascending=False)
        
        return importance_df

# Example usage pipeline
def comprehensive_feature_engineering(df: pd.DataFrame, target_col: str, date_col: str = None) -> pd.DataFrame:
    """Complete feature engineering pipeline"""
    engineer = AdvancedFeatureEngineer()
    
    # 1. Handle temporal features
    if date_col and date_col in df.columns:
        df = engineer.create_temporal_features(df, date_col)
    
    # 2. Identify column types
    numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    if target_col in numeric_cols:
        numeric_cols.remove(target_col)
    
    categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist()
    
    # 3. Create interaction features (limited to prevent explosion)
    if len(numeric_cols) > 1:
        df = engineer.create_interaction_features(df, numeric_cols[:10], max_interactions=15)
    
    # 4. Encode categorical features
    if categorical_cols:
        df = engineer.encode_categorical_features(df, categorical_cols, target_col)
    
    # 5. Create aggregated features if we have categorical grouping columns
    if categorical_cols and numeric_cols:
        df = engineer.create_aggregated_features(df, categorical_cols[:3], numeric_cols[:5])
    
    print("Feature Engineering Summary:")
    for step in engineer.feature_history:
        print(f"- {step}")
    
    return df

Production Data Pipeline Architecture

Scalable Data Processing Pipeline

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText, WriteToText
import pandas as pd
from typing import Dict, Any, Iterator
import logging

class DataValidation(beam.DoFn):
    """Data validation and quality checks"""
    
    def __init__(self, validation_rules: Dict[str, Any]):
        self.validation_rules = validation_rules
        self.error_counter = beam.metrics.Metrics.counter('data_validation', 'errors')
        self.valid_counter = beam.metrics.Metrics.counter('data_validation', 'valid_records')
    
    def process(self, element: Dict) -> Iterator[Dict]:
        """Validate individual record"""
        errors = []
        
        # Check required fields
        for field in self.validation_rules.get('required_fields', []):
            if field not in element or element[field] is None or element[field] == '':
                errors.append(f"Missing required field: {field}")
        
        # Check data types
        for field, expected_type in self.validation_rules.get('field_types', {}).items():
            if field in element:
                try:
                    if expected_type == 'numeric':
                        float(element[field])
                    elif expected_type == 'date':
                        pd.to_datetime(element[field])
                except (ValueError, TypeError):
                    errors.append(f"Invalid {expected_type} for field {field}: {element[field]}")
        
        # Check value ranges
        for field, range_def in self.validation_rules.get('value_ranges', {}).items():
            if field in element:
                try:
                    value = float(element[field])
                    if 'min' in range_def and value < range_def['min']:
                        errors.append(f"Value {value} below minimum {range_def['min']} for field {field}")
                    if 'max' in range_def and value > range_def['max']:
                        errors.append(f"Value {value} above maximum {range_def['max']} for field {field}")
                except (ValueError, TypeError):
                    pass  # Already caught in type validation
        
        if errors:
            self.error_counter.inc()
            element['validation_errors'] = errors
            element['is_valid'] = False
        else:
            self.valid_counter.inc()
            element['is_valid'] = True
        
        yield element

class FeatureEngineering(beam.DoFn):
    """Feature engineering transformations"""
    
    def process(self, element: Dict) -> Iterator[Dict]:
        """Apply feature engineering to valid records"""
        if not element.get('is_valid', False):
            yield element
            return
        
        # Create derived features
        try:
            # Temporal features
            if 'timestamp' in element:
                timestamp = pd.to_datetime(element['timestamp'])
                element['hour'] = timestamp.hour
                element['day_of_week'] = timestamp.dayofweek
                element['is_weekend'] = 1 if timestamp.dayofweek >= 5 else 0
                element['month'] = timestamp.month
            
            # Interaction features
            if 'feature1' in element and 'feature2' in element:
                try:
                    element['feature1_x_feature2'] = float(element['feature1']) * float(element['feature2'])
                except (ValueError, TypeError):
                    pass
            
            # Categorical encoding (simplified)
            if 'category' in element:
                # This would typically use pre-fitted encoders
                category_mapping = {'A': 1, 'B': 2, 'C': 3}  # Example mapping
                element['category_encoded'] = category_mapping.get(element['category'], 0)
            
        except Exception as e:
            logging.error(f"Feature engineering error: {e}")
            element['feature_engineering_error'] = str(e)
        
        yield element

class DataNormalization(beam.DoFn):
    """Data normalization and scaling"""
    
    def __init__(self, scaling_params: Dict[str, Dict]):
        self.scaling_params = scaling_params
    
    def process(self, element: Dict) -> Iterator[Dict]:
        """Normalize numeric features"""
        if not element.get('is_valid', False):
            yield element
            return
        
        for field, params in self.scaling_params.items():
            if field in element:
                try:
                    value = float(element[field])
                    if params['method'] == 'standard':
                        # Standard scaling: (x - mean) / std
                        normalized_value = (value - params['mean']) / params['std']
                    elif params['method'] == 'minmax':
                        # Min-max scaling: (x - min) / (max - min)
                        normalized_value = (value - params['min']) / (params['max'] - params['min'])
                    else:
                        normalized_value = value
                    
                    element[f'{field}_normalized'] = normalized_value
                except (ValueError, TypeError, ZeroDivisionError):
                    logging.warning(f"Could not normalize field {field} with value {element[field]}")
        
        yield element

def run_data_pipeline(input_path: str, output_path: str, validation_rules: Dict, scaling_params: Dict):
    """Run the complete data processing pipeline"""
    
    pipeline_options = PipelineOptions([
        '--runner=DataflowRunner',  # Use DirectRunner for local testing
        '--project=your-gcp-project',
        '--region=us-central1',
        '--temp_location=gs://your-bucket/temp',
        '--staging_location=gs://your-bucket/staging'
    ])
    
    with beam.Pipeline(options=pipeline_options) as pipeline:
        
        # Read raw data
        raw_data = (
            pipeline
            | 'Read Input' >> ReadFromText(input_path)
            | 'Parse JSON' >> beam.Map(lambda x: json.loads(x))
        )
        
        # Data validation
        validated_data = (
            raw_data
            | 'Validate Data' >> beam.ParDo(DataValidation(validation_rules))
        )
        
        # Split valid and invalid records
        valid_data = (
            validated_data
            | 'Filter Valid' >> beam.Filter(lambda x: x.get('is_valid', False))
        )
        
        invalid_data = (
            validated_data
            | 'Filter Invalid' >> beam.Filter(lambda x: not x.get('is_valid', False))
        )
        
        # Process valid data
        processed_data = (
            valid_data
            | 'Feature Engineering' >> beam.ParDo(FeatureEngineering())
            | 'Data Normalization' >> beam.ParDo(DataNormalization(scaling_params))
        )
        
        # Write outputs
        (
            processed_data
            | 'Convert to JSON' >> beam.Map(json.dumps)
            | 'Write Processed' >> WriteToText(f'{output_path}/processed')
        )
        
        (
            invalid_data
            | 'Convert Errors to JSON' >> beam.Map(json.dumps)
            | 'Write Errors' >> WriteToText(f'{output_path}/errors')
        )

# Example configuration
validation_config = {
    'required_fields': ['user_id', 'timestamp', 'feature1'],
    'field_types': {
        'feature1': 'numeric',
        'feature2': 'numeric',
        'timestamp': 'date'
    },
    'value_ranges': {
        'feature1': {'min': 0, 'max': 1000},
        'feature2': {'min': -10, 'max': 10}
    }
}

scaling_config = {
    'feature1': {'method': 'standard', 'mean': 50.0, 'std': 15.0},
    'feature2': {'method': 'minmax', 'min': -10.0, 'max': 10.0}
}

# Run the pipeline
if __name__ == '__main__':
    run_data_pipeline(
        input_path='gs://your-bucket/input/*.json',
        output_path='gs://your-bucket/output',
        validation_rules=validation_config,
        scaling_params=scaling_config
    )

Feature Store Architecture

Feature Store Implementation

from datetime import datetime, timedelta
import pandas as pd
import redis
from typing import List, Dict, Any, Optional
import pickle
import logging

class FeatureStore:
    """Production-ready feature store implementation"""
    
    def __init__(self, redis_host: str = 'localhost', redis_port: int = 6379):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=False)
        self.feature_metadata = {}
    
    def register_feature(self, feature_name: str, feature_type: str, description: str, 
                        default_ttl: int = 86400, feature_group: str = None):
        """Register a new feature with metadata"""
        metadata = {
            'name': feature_name,
            'type': feature_type,
            'description': description,
            'default_ttl': default_ttl,
            'feature_group': feature_group,
            'created_at': datetime.now().isoformat(),
            'last_updated': datetime.now().isoformat()
        }
        
        self.feature_metadata[feature_name] = metadata
        # Store metadata in Redis
        self.redis_client.hset('feature_metadata', feature_name, pickle.dumps(metadata))
    
    def store_feature(self, entity_id: str, feature_name: str, value: Any, ttl: Optional[int] = None):
        """Store a feature value for an entity"""
        if feature_name not in self.feature_metadata:
            logging.warning(f"Feature {feature_name} not registered. Consider registering first.")
        
        key = f"feature:{feature_name}:{entity_id}"
        
        # Serialize complex objects
        if isinstance(value, (dict, list)):
            value = pickle.dumps(value)
        
        ttl = ttl or self.feature_metadata.get(feature_name, {}).get('default_ttl', 86400)
        
        self.redis_client.setex(key, ttl, value)
        
        # Update metadata
        if feature_name in self.feature_metadata:
            self.feature_metadata[feature_name]['last_updated'] = datetime.now().isoformat()
            self.redis_client.hset('feature_metadata', feature_name, 
                                 pickle.dumps(self.feature_metadata[feature_name]))
    
    def get_feature(self, entity_id: str, feature_name: str) -> Optional[Any]:
        """Retrieve a feature value for an entity"""
        key = f"feature:{feature_name}:{entity_id}"
        value = self.redis_client.get(key)
        
        if value is None:
            return None
        
        # Try to deserialize if it's a pickled object
        try:
            return pickle.loads(value)
        except (pickle.UnpicklingError, TypeError):
            return value.decode() if isinstance(value, bytes) else value
    
    def get_features_batch(self, entity_ids: List[str], feature_names: List[str]) -> pd.DataFrame:
        """Retrieve multiple features for multiple entities efficiently"""
        results = []
        
        # Use pipeline for efficient batch operations
        pipeline = self.redis_client.pipeline()
        
        # Queue all operations
        keys = []
        for entity_id in entity_ids:
            for feature_name in feature_names:
                key = f"feature:{feature_name}:{entity_id}"
                keys.append((entity_id, feature_name, key))
                pipeline.get(key)
        
        # Execute all operations
        values = pipeline.execute()
        
        # Process results
        for i, (entity_id, feature_name, key) in enumerate(keys):
            value = values[i]
            if value is not None:
                try:
                    value = pickle.loads(value)
                except (pickle.UnpicklingError, TypeError):
                    value = value.decode() if isinstance(value, bytes) else value
            
            results.append({
                'entity_id': entity_id,
                'feature_name': feature_name,
                'value': value
            })
        
        # Convert to DataFrame
        df = pd.DataFrame(results)
        if len(df) > 0:
            df_pivot = df.pivot(index='entity_id', columns='feature_name', values='value')
            df_pivot.index.name = 'entity_id'
            return df_pivot.reset_index()
        else:
            return pd.DataFrame()
    
    def compute_and_store_feature_group(self, feature_group_name: str, entities: List[str], 
                                      compute_function, **kwargs):
        """Compute and store a group of related features"""
        logging.info(f"Computing feature group: {feature_group_name} for {len(entities)} entities")
        
        for entity_id in entities:
            try:
                # Compute features for this entity
                features = compute_function(entity_id, **kwargs)
                
                # Store each feature
                for feature_name, feature_value in features.items():
                    full_feature_name = f"{feature_group_name}_{feature_name}"
                    self.store_feature(entity_id, full_feature_name, feature_value)
                
            except Exception as e:
                logging.error(f"Error computing features for entity {entity_id}: {e}")
    
    def get_feature_freshness(self, feature_name: str, entity_ids: List[str]) -> Dict[str, float]:
        """Check how fresh features are (time since last update)"""
        pipeline = self.redis_client.pipeline()
        
        for entity_id in entity_ids:
            key = f"feature:{feature_name}:{entity_id}"
            pipeline.ttl(key)
        
        ttls = pipeline.execute()
        
        freshness = {}
        for i, entity_id in enumerate(entity_ids):
            ttl = ttls[i]
            if ttl > 0:
                # Calculate how long ago it was stored (assuming default TTL)
                default_ttl = self.feature_metadata.get(feature_name, {}).get('default_ttl', 86400)
                age_seconds = default_ttl - ttl
                freshness[entity_id] = age_seconds / 3600  # Convert to hours
            else:
                freshness[entity_id] = None  # Feature not found or expired
        
        return freshness
    
    def cleanup_expired_features(self):
        """Clean up expired features (Redis handles this automatically, but useful for monitoring)"""
        cursor = 0
        expired_count = 0
        
        while True:
            cursor, keys = self.redis_client.scan(cursor=cursor, match="feature:*", count=1000)
            
            for key in keys:
                ttl = self.redis_client.ttl(key)
                if ttl == -2:  # Key doesn't exist
                    expired_count += 1
            
            if cursor == 0:
                break
        
        logging.info(f"Found {expired_count} expired feature keys")
        return expired_count

# Example usage
def user_engagement_features(user_id: str, lookback_days: int = 30) -> Dict[str, Any]:
    """Example feature computation function"""
    # This would typically query your data warehouse/database
    # For demo purposes, we'll simulate some features
    
    features = {
        'total_sessions': 15,
        'avg_session_duration': 12.5,
        'pages_per_session': 3.2,
        'bounce_rate': 0.25,
        'last_activity_days_ago': 2,
        'preferred_category': 'electronics',
        'is_premium': False
    }
    
    return features

# Usage example
feature_store = FeatureStore()

# Register features
feature_store.register_feature('user_total_sessions', 'int', 'Total user sessions in lookback period')
feature_store.register_feature('user_avg_session_duration', 'float', 'Average session duration')
feature_store.register_feature('user_preferred_category', 'str', 'Most visited product category')

# Compute and store features for multiple users
user_ids = ['user_123', 'user_456', 'user_789']
feature_store.compute_and_store_feature_group(
    'user_engagement', 
    user_ids, 
    user_engagement_features,
    lookback_days=30
)

# Retrieve features for model inference
features_df = feature_store.get_features_batch(
    entity_ids=['user_123', 'user_456'],
    feature_names=['user_engagement_total_sessions', 'user_engagement_avg_session_duration']
)

print("Retrieved features:")
print(features_df)

Data Quality Monitoring

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import logging
import json

class DataQualityMonitor:
    """Monitor data quality metrics in production"""
    
    def __init__(self, alert_thresholds: Dict[str, float]):
        self.alert_thresholds = alert_thresholds
        self.baseline_stats = {}
        self.alerts = []
    
    def establish_baseline(self, df: pd.DataFrame, feature_cols: List[str]):
        """Establish baseline statistics for features"""
        for col in feature_cols:
            if col in df.columns:
                if df[col].dtype in ['int64', 'float64']:
                    self.baseline_stats[col] = {
                        'mean': df[col].mean(),
                        'std': df[col].std(),
                        'median': df[col].median(),
                        'q1': df[col].quantile(0.25),
                        'q3': df[col].quantile(0.75),
                        'min': df[col].min(),
                        'max': df[col].max(),
                        'missing_rate': df[col].isnull().mean()
                    }
                else:
                    # Categorical features
                    value_counts = df[col].value_counts(normalize=True)
                    self.baseline_stats[col] = {
                        'top_categories': value_counts.head(10).to_dict(),
                        'cardinality': df[col].nunique(),
                        'missing_rate': df[col].isnull().mean()
                    }
        
        logging.info(f"Established baseline for {len(self.baseline_stats)} features")
    
    def monitor_batch(self, df: pd.DataFrame, batch_id: str = None) -> Dict[str, Any]:
        """Monitor a new batch of data against baseline"""
        batch_id = batch_id or datetime.now().strftime("%Y%m%d_%H%M%S")
        alerts = []
        quality_scores = {}
        
        for col, baseline in self.baseline_stats.items():
            if col not in df.columns:
                alerts.append({
                    'type': 'missing_feature',
                    'feature': col,
                    'severity': 'high',
                    'message': f"Feature {col} missing from batch"
                })
                continue
            
            current_stats = self._calculate_current_stats(df[col])
            quality_score, col_alerts = self._compare_with_baseline(col, current_stats, baseline)
            
            quality_scores[col] = quality_score
            alerts.extend(col_alerts)
        
        # Overall quality assessment
        overall_quality = np.mean(list(quality_scores.values())) if quality_scores else 0
        
        report = {
            'batch_id': batch_id,
            'timestamp': datetime.now().isoformat(),
            'overall_quality_score': overall_quality,
            'feature_quality_scores': quality_scores,
            'alerts': alerts,
            'batch_size': len(df),
            'features_monitored': len(quality_scores)
        }
        
        # Log high-severity alerts
        high_severity_alerts = [a for a in alerts if a.get('severity') == 'high']
        if high_severity_alerts:
            logging.warning(f"High severity data quality alerts for batch {batch_id}: {len(high_severity_alerts)} issues")
        
        return report
    
    def _calculate_current_stats(self, series: pd.Series) -> Dict:
        """Calculate current statistics for a feature"""
        if series.dtype in ['int64', 'float64']:
            return {
                'mean': series.mean(),
                'std': series.std(),
                'median': series.median(),
                'q1': series.quantile(0.25),
                'q3': series.quantile(0.75),
                'min': series.min(),
                'max': series.max(),
                'missing_rate': series.isnull().mean()
            }
        else:
            value_counts = series.value_counts(normalize=True)
            return {
                'top_categories': value_counts.head(10).to_dict(),
                'cardinality': series.nunique(),
                'missing_rate': series.isnull().mean()
            }
    
    def _compare_with_baseline(self, feature: str, current: Dict, baseline: Dict) -> Tuple[float, List[Dict]]:
        """Compare current stats with baseline and generate alerts"""
        alerts = []
        quality_score = 100.0
        
        if 'mean' in baseline:  # Numeric feature
            # Check for distribution drift
            mean_drift = abs((current['mean'] - baseline['mean']) / baseline['mean']) if baseline['mean'] != 0 else 0
            if mean_drift > self.alert_thresholds.get('mean_drift', 0.2):
                alerts.append({
                    'type': 'distribution_drift',
                    'feature': feature,
                    'severity': 'medium',
                    'message': f"Mean drift of {mean_drift:.2%} detected",
                    'current_value': current['mean'],
                    'baseline_value': baseline['mean']
                })
                quality_score -= 20
            
            # Check for outliers (values outside baseline range)
            if current['min'] < baseline['min'] - 3 * baseline['std']:
                alerts.append({
                    'type': 'outlier_detected',
                    'feature': feature,
                    'severity': 'low',
                    'message': f"Values below expected range detected: {current['min']}"
                })
                quality_score -= 10
            
            if current['max'] > baseline['max'] + 3 * baseline['std']:
                alerts.append({
                    'type': 'outlier_detected',
                    'feature': feature,
                    'severity': 'low',
                    'message': f"Values above expected range detected: {current['max']}"
                })
                quality_score -= 10
        
        else:  # Categorical feature
            # Check for new categories
            current_cats = set(current['top_categories'].keys())
            baseline_cats = set(baseline['top_categories'].keys())
            new_categories = current_cats - baseline_cats
            
            if new_categories:
                alerts.append({
                    'type': 'new_categories',
                    'feature': feature,
                    'severity': 'medium',
                    'message': f"New categories detected: {list(new_categories)[:5]}"
                })
                quality_score -= 15
            
            # Check for significant changes in category distribution
            for cat, baseline_freq in baseline['top_categories'].items():
                current_freq = current['top_categories'].get(cat, 0)
                freq_change = abs(current_freq - baseline_freq)
                if freq_change > self.alert_thresholds.get('category_drift', 0.1):
                    alerts.append({
                        'type': 'category_distribution_drift',
                        'feature': feature,
                        'severity': 'medium',
                        'message': f"Category '{cat}' frequency changed by {freq_change:.2%}"
                    })
                    quality_score -= 10
        
        # Check missing value rate
        missing_rate_change = current['missing_rate'] - baseline['missing_rate']
        if missing_rate_change > self.alert_thresholds.get('missing_rate_increase', 0.05):
            alerts.append({
                'type': 'missing_rate_increase',
                'feature': feature,
                'severity': 'high',
                'message': f"Missing rate increased by {missing_rate_change:.2%}"
            })
            quality_score -= 30
        
        return max(0, quality_score), alerts

# Usage example
monitor = DataQualityMonitor(alert_thresholds={
    'mean_drift': 0.15,
    'category_drift': 0.08,
    'missing_rate_increase': 0.03
})

# Establish baseline from training data
training_data = pd.DataFrame({
    'feature1': np.random.normal(50, 10, 1000),
    'feature2': np.random.choice(['A', 'B', 'C'], 1000),
    'feature3': np.random.exponential(2, 1000)
})

monitor.establish_baseline(training_data, ['feature1', 'feature2', 'feature3'])

# Monitor new batch
new_batch = pd.DataFrame({
    'feature1': np.random.normal(55, 12, 500),  # Slight drift
    'feature2': np.random.choice(['A', 'B', 'C', 'D'], 500),  # New category
    'feature3': np.random.exponential(2, 500)
})

quality_report = monitor.monitor_batch(new_batch)
print("Data Quality Report:")
print(json.dumps(quality_report, indent=2, default=str))
No quiz questions available
Quiz ID "data-preparation" not found