π ML Data Pipelines
ML data pipelines automate the flow of data from raw sources through cleaning, transformation, feature engineering, and model training. They ensure reproducible, reliable, and scalable machine learning workflows.
Pipeline Reality: 80% of ML engineering time is spent on data pipelines. Poor pipeline design causes 90% of ML production failures. Investment here pays massive dividends.
β Ad-hoc Data Processing
- β’ Manual data extraction and cleaning
- β’ Inconsistent feature engineering
- β’ No data validation or monitoring
- β’ Difficult to reproduce results
β Production ML Pipelines
- β’ Automated, scheduled data processing
- β’ Consistent, versioned transformations
- β’ Built-in quality checks and monitoring
- β’ Reproducible and scalable workflows
ποΈ Pipeline Architectures
Batch Processing
Process large volumes of data at scheduled intervals
Best for: Daily/weekly model training, large dataset processing, ETL jobs
Cost effective
High throughput
Simple to implement
Resource efficient
Implementation
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg, stddev
from datetime import datetime, timedelta
import logging
class BatchDataPipeline:
def __init__(self, spark_config):
self.spark = SparkSession.builder \
.appName("MLBatchPipeline") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
self.logger = logging.getLogger(__name__)
def run_daily_pipeline(self, date: str):
"""Execute daily batch processing pipeline"""
try:
# 1. Extract - Read raw data from data lake
raw_data = self.spark.read \
.parquet(f"s3://data-lake/raw/events/date={date}/") \
.filter(col("event_type").isNotNull())
# 2. Transform - Clean and feature engineer
cleaned_data = self.clean_data(raw_data)
features = self.extract_features(cleaned_data)
# 3. Load - Write to feature store
features.write \
.mode("overwrite") \
.parquet(f"s3://feature-store/daily/date={date}/")
# 4. Data quality checks
quality_metrics = self.run_quality_checks(features)
# 5. Update model if data quality passes
if quality_metrics["data_quality_score"] > 0.8:
self.trigger_model_training(date)
self.logger.info(f"Pipeline completed successfully for {date}")
except Exception as e:
self.logger.error(f"Pipeline failed: {str(e)}")
self.send_alert(f"Batch pipeline failed: {str(e)}")
raise
def clean_data(self, df):
"""Clean and validate data"""
return df.filter(
col("user_id").isNotNull() &
col("timestamp").isNotNull() &
(col("value") > 0)
).dropDuplicates(["user_id", "timestamp"])
def extract_features(self, df):
"""Extract ML features"""
return df.groupBy("user_id") \
.agg(
avg("value").alias("avg_value"),
stddev("value").alias("std_value"),
count("*").alias("event_count")
)
def run_quality_checks(self, df):
"""Run data quality validation"""
total_rows = df.count()
null_rows = df.filter(col("avg_value").isNull()).count()
return {
"total_rows": total_rows,
"null_percentage": null_rows / total_rows if total_rows > 0 else 0,
"data_quality_score": 1 - (null_rows / total_rows) if total_rows > 0 else 0
}
π οΈ Orchestration Tools
Apache Airflow
Workflow orchestration and scheduling
β Strengths
- β’ Visual DAGs
- β’ Rich scheduling
- β’ Great UI
- β’ Extensive plugins
β οΈ Limitations
- β’ Python-centric
- β’ Resource heavy
- β’ Learning curve
π» Implementation Examples
# Basic Pipeline with Python
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
import joblib
from datetime import datetime
class BasicMLPipeline:
def __init__(self):
self.scaler = StandardScaler()
self.model = RandomForestClassifier(n_estimators=100)
self.is_trained = False
def load_data(self, data_path: str) -> pd.DataFrame:
"""Load data from various sources"""
if data_path.endswith('.csv'):
return pd.read_csv(data_path)
elif data_path.endswith('.parquet'):
return pd.read_parquet(data_path)
else:
raise ValueError("Unsupported file format")
def clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Basic data cleaning"""
# Remove duplicates
df = df.drop_duplicates()
# Handle missing values
numeric_columns = df.select_dtypes(include=['number']).columns
df[numeric_columns] = df[numeric_columns].fillna(df[numeric_columns].median())
# Remove outliers (simple IQR method)
for col in numeric_columns:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
return df
def feature_engineering(self, df: pd.DataFrame) -> pd.DataFrame:
"""Basic feature engineering"""
# Create interaction features
if 'feature1' in df.columns and 'feature2' in df.columns:
df['feature1_x_feature2'] = df['feature1'] * df['feature2']
# Create time-based features if timestamp exists
if 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['is_weekend'] = df['day_of_week'].isin([5, 6])
return df
def train_pipeline(self, data_path: str, target_column: str):
"""Full training pipeline"""
print(f"Starting training pipeline at {datetime.now()}")
# 1. Load data
df = self.load_data(data_path)
print(f"Loaded {len(df)} records")
# 2. Clean data
df = self.clean_data(df)
print(f"After cleaning: {len(df)} records")
# 3. Feature engineering
df = self.feature_engineering(df)
# 4. Prepare features and target
X = df.drop(columns=[target_column])
y = df[target_column]
# 5. Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 6. Scale features
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# 7. Train model
self.model.fit(X_train_scaled, y_train)
self.is_trained = True
# 8. Evaluate
train_score = self.model.score(X_train_scaled, y_train)
test_score = self.model.score(X_test_scaled, y_test)
print(f"Training completed:")
print(f"Train accuracy: {train_score:.3f}")
print(f"Test accuracy: {test_score:.3f}")
# 9. Save model
self.save_model()
return {
'train_score': train_score,
'test_score': test_score,
'model_path': 'model.joblib'
}
def save_model(self, path: str = 'model.joblib'):
"""Save trained model and scaler"""
if not self.is_trained:
raise ValueError("Model must be trained before saving")
model_data = {
'model': self.model,
'scaler': self.scaler,
'trained_at': datetime.now().isoformat()
}
joblib.dump(model_data, path)
print(f"Model saved to {path}")
β Data Quality & Validation
Schema Validation
- β’ Data type checking
- β’ Column presence validation
- β’ Range and constraint checks
- β’ Format validation
Statistical Validation
- β’ Distribution drift detection
- β’ Outlier identification
- β’ Missing value thresholds
- β’ Correlation analysis
Business Logic Validation
- β’ Domain-specific rules
- β’ Cross-field validation
- β’ Temporal consistency
- β’ Referential integrity
π― Pipeline Best Practices
Design Principles
- β Idempotency: Same input = same output
- β Modularity: Small, reusable components
- β Monitoring: Comprehensive observability
- β Error handling: Graceful failure recovery
- β Versioning: Track data and code changes
Operational Excellence
- β Testing: Unit, integration, and data tests
- β Documentation: Clear data lineage
- β Alerting: Proactive issue detection
- β Backfill: Historical data processing
- β Security: Data access controls
π― Key Takeaways
Choose the right architecture: Batch for cost efficiency, streaming for low latency, lambda for flexibility
Data quality is critical: Implement comprehensive validation at every stage
Design for failure: Pipelines will break - build with retries and monitoring
Version everything: Data, code, and model artifacts for reproducibility
Start simple: Begin with batch processing and evolve to streaming as needed
Related Technologies for Data Pipelines
Apache Sparkβ
Distributed data processing engine for large-scale batch and streaming
Apache Kafkaβ
Distributed streaming platform for real-time data pipelines
Dockerβ
Containerization for portable pipeline deployment
Kubernetesβ
Container orchestration for scalable pipeline infrastructure
Prometheusβ
Monitoring and alerting for pipeline observability
Apache Airflowβ
Workflow orchestration platform for complex pipelines