ML Data Pipelines & Orchestration

Build robust, scalable data pipelines for machine learning with proper orchestration and monitoring.

35 min readβ€’Intermediate
Not Started
Loading...

πŸ”„ ML Data Pipelines

ML data pipelines automate the flow of data from raw sources through cleaning, transformation, feature engineering, and model training. They ensure reproducible, reliable, and scalable machine learning workflows.

Pipeline Reality: 80% of ML engineering time is spent on data pipelines. Poor pipeline design causes 90% of ML production failures. Investment here pays massive dividends.

❌ Ad-hoc Data Processing

  • β€’ Manual data extraction and cleaning
  • β€’ Inconsistent feature engineering
  • β€’ No data validation or monitoring
  • β€’ Difficult to reproduce results

βœ… Production ML Pipelines

  • β€’ Automated, scheduled data processing
  • β€’ Consistent, versioned transformations
  • β€’ Built-in quality checks and monitoring
  • β€’ Reproducible and scalable workflows

πŸ—οΈ Pipeline Architectures

Batch Processing

Process large volumes of data at scheduled intervals

Best for: Daily/weekly model training, large dataset processing, ETL jobs

Cost effective

High throughput

Simple to implement

Resource efficient

Implementation

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg, stddev
from datetime import datetime, timedelta
import logging

class BatchDataPipeline:
    def __init__(self, spark_config):
        self.spark = SparkSession.builder \
            .appName("MLBatchPipeline") \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
            .getOrCreate()
        
        self.logger = logging.getLogger(__name__)
        
    def run_daily_pipeline(self, date: str):
        """Execute daily batch processing pipeline"""
        
        try:
            # 1. Extract - Read raw data from data lake
            raw_data = self.spark.read \
                .parquet(f"s3://data-lake/raw/events/date={date}/") \
                .filter(col("event_type").isNotNull())
            
            # 2. Transform - Clean and feature engineer
            cleaned_data = self.clean_data(raw_data)
            features = self.extract_features(cleaned_data)
            
            # 3. Load - Write to feature store
            features.write \
                .mode("overwrite") \
                .parquet(f"s3://feature-store/daily/date={date}/")
            
            # 4. Data quality checks
            quality_metrics = self.run_quality_checks(features)
            
            # 5. Update model if data quality passes
            if quality_metrics["data_quality_score"] > 0.8:
                self.trigger_model_training(date)
            
            self.logger.info(f"Pipeline completed successfully for {date}")
            
        except Exception as e:
            self.logger.error(f"Pipeline failed: {str(e)}")
            self.send_alert(f"Batch pipeline failed: {str(e)}")
            raise
    
    def clean_data(self, df):
        """Clean and validate data"""
        return df.filter(
            col("user_id").isNotNull() &
            col("timestamp").isNotNull() &
            (col("value") > 0)
        ).dropDuplicates(["user_id", "timestamp"])
    
    def extract_features(self, df):
        """Extract ML features"""
        return df.groupBy("user_id") \
            .agg(
                avg("value").alias("avg_value"),
                stddev("value").alias("std_value"),
                count("*").alias("event_count")
            )
    
    def run_quality_checks(self, df):
        """Run data quality validation"""
        total_rows = df.count()
        null_rows = df.filter(col("avg_value").isNull()).count()
        
        return {
            "total_rows": total_rows,
            "null_percentage": null_rows / total_rows if total_rows > 0 else 0,
            "data_quality_score": 1 - (null_rows / total_rows) if total_rows > 0 else 0
        }

πŸ› οΈ Orchestration Tools

Apache Airflow

Workflow orchestration and scheduling

βœ… Strengths

  • β€’ Visual DAGs
  • β€’ Rich scheduling
  • β€’ Great UI
  • β€’ Extensive plugins

⚠️ Limitations

  • β€’ Python-centric
  • β€’ Resource heavy
  • β€’ Learning curve

πŸ’» Implementation Examples

# Basic Pipeline with Python
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
import joblib
from datetime import datetime

class BasicMLPipeline:
    def __init__(self):
        self.scaler = StandardScaler()
        self.model = RandomForestClassifier(n_estimators=100)
        self.is_trained = False
    
    def load_data(self, data_path: str) -> pd.DataFrame:
        """Load data from various sources"""
        if data_path.endswith('.csv'):
            return pd.read_csv(data_path)
        elif data_path.endswith('.parquet'):
            return pd.read_parquet(data_path)
        else:
            raise ValueError("Unsupported file format")
    
    def clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Basic data cleaning"""
        # Remove duplicates
        df = df.drop_duplicates()
        
        # Handle missing values
        numeric_columns = df.select_dtypes(include=['number']).columns
        df[numeric_columns] = df[numeric_columns].fillna(df[numeric_columns].median())
        
        # Remove outliers (simple IQR method)
        for col in numeric_columns:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
        
        return df
    
    def feature_engineering(self, df: pd.DataFrame) -> pd.DataFrame:
        """Basic feature engineering"""
        # Create interaction features
        if 'feature1' in df.columns and 'feature2' in df.columns:
            df['feature1_x_feature2'] = df['feature1'] * df['feature2']
        
        # Create time-based features if timestamp exists
        if 'timestamp' in df.columns:
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            df['hour'] = df['timestamp'].dt.hour
            df['day_of_week'] = df['timestamp'].dt.dayofweek
            df['is_weekend'] = df['day_of_week'].isin([5, 6])
        
        return df
    
    def train_pipeline(self, data_path: str, target_column: str):
        """Full training pipeline"""
        print(f"Starting training pipeline at {datetime.now()}")
        
        # 1. Load data
        df = self.load_data(data_path)
        print(f"Loaded {len(df)} records")
        
        # 2. Clean data
        df = self.clean_data(df)
        print(f"After cleaning: {len(df)} records")
        
        # 3. Feature engineering
        df = self.feature_engineering(df)
        
        # 4. Prepare features and target
        X = df.drop(columns=[target_column])
        y = df[target_column]
        
        # 5. Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        # 6. Scale features
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        # 7. Train model
        self.model.fit(X_train_scaled, y_train)
        self.is_trained = True
        
        # 8. Evaluate
        train_score = self.model.score(X_train_scaled, y_train)
        test_score = self.model.score(X_test_scaled, y_test)
        
        print(f"Training completed:")
        print(f"Train accuracy: {train_score:.3f}")
        print(f"Test accuracy: {test_score:.3f}")
        
        # 9. Save model
        self.save_model()
        
        return {
            'train_score': train_score,
            'test_score': test_score,
            'model_path': 'model.joblib'
        }
    
    def save_model(self, path: str = 'model.joblib'):
        """Save trained model and scaler"""
        if not self.is_trained:
            raise ValueError("Model must be trained before saving")
        
        model_data = {
            'model': self.model,
            'scaler': self.scaler,
            'trained_at': datetime.now().isoformat()
        }
        
        joblib.dump(model_data, path)
        print(f"Model saved to {path}")

βœ… Data Quality & Validation

Schema Validation

  • β€’ Data type checking
  • β€’ Column presence validation
  • β€’ Range and constraint checks
  • β€’ Format validation

Statistical Validation

  • β€’ Distribution drift detection
  • β€’ Outlier identification
  • β€’ Missing value thresholds
  • β€’ Correlation analysis

Business Logic Validation

  • β€’ Domain-specific rules
  • β€’ Cross-field validation
  • β€’ Temporal consistency
  • β€’ Referential integrity

🎯 Pipeline Best Practices

Design Principles

  • βœ“ Idempotency: Same input = same output
  • βœ“ Modularity: Small, reusable components
  • βœ“ Monitoring: Comprehensive observability
  • βœ“ Error handling: Graceful failure recovery
  • βœ“ Versioning: Track data and code changes

Operational Excellence

  • βœ“ Testing: Unit, integration, and data tests
  • βœ“ Documentation: Clear data lineage
  • βœ“ Alerting: Proactive issue detection
  • βœ“ Backfill: Historical data processing
  • βœ“ Security: Data access controls

🎯 Key Takeaways

βœ“

Choose the right architecture: Batch for cost efficiency, streaming for low latency, lambda for flexibility

βœ“

Data quality is critical: Implement comprehensive validation at every stage

βœ“

Design for failure: Pipelines will break - build with retries and monitoring

βœ“

Version everything: Data, code, and model artifacts for reproducibility

βœ“

Start simple: Begin with batch processing and evolve to streaming as needed

πŸ“ ML Data Pipelines Mastery Check

1 of 8Current: 0/8

What is the main advantage of batch processing over streaming for ML pipelines?