📊 Training Data Management
Training data management is the backbone of successful ML systems. It encompasses the entire lifecycle from data collection and ingestion through versioning, quality assurance, and labeling strategies that ensure your models have high-quality, reliable training data.
Collection
Batch and streaming data ingestion with privacy preservation
Versioning
Data lineage tracking and reproducible dataset management
Quality
Validation pipelines and anomaly detection systems
Labeling
Active learning and weak supervision techniques
Data Management Aspects
Data Collection & Ingestion
PythonKey Concepts:
- • Batch vs streaming collection
- • Data source integration
- • Sampling strategies
- • Privacy-preserving collection
class DataCollectionFramework:
"""
Comprehensive data collection and ingestion system
for ML training pipelines
"""
def __init__(self, data_sources, privacy_config=None):
self.data_sources = data_sources
self.privacy_config = privacy_config or {}
self.collection_metrics = {}
def batch_collection_pipeline(self):
"""Batch data collection from multiple sources"""
import pandas as pd
from datetime import datetime, timedelta
class BatchCollector:
def __init__(self, sources, schedule='daily'):
self.sources = sources
self.schedule = schedule
self.last_collection = {}
def collect_from_database(self, config):
"""Collect data from relational databases"""
query = f"""
SELECT * FROM {config['table']}
WHERE created_at >= '{self.last_collection.get(config['source_id'], '2024-01-01')}'
AND created_at < CURRENT_DATE
ORDER BY created_at
LIMIT {config.get('batch_size', 10000)}
"""
# Connection handling
conn = create_connection(config['connection_string'])
data = pd.read_sql(query, conn)
# Update collection timestamp
if len(data) > 0:
self.last_collection[config['source_id']] = data['created_at'].max()
return data
def collect_from_api(self, config):
"""Collect data from REST APIs with pagination"""
all_data = []
page = 1
has_more = True
while has_more and page <= config.get('max_pages', 100):
response = requests.get(
config['endpoint'],
params={
'page': page,
'per_page': config.get('per_page', 100),
'since': self.last_collection.get(config['source_id'], '2024-01-01')
},
headers=config.get('headers', {})
)
if response.status_code == 200:
batch = response.json()
all_data.extend(batch['data'])
has_more = batch.get('has_more', False)
page += 1
else:
raise Exception(f"API error: {response.status_code}")
return pd.DataFrame(all_data)
def collect_from_files(self, config):
"""Collect data from file systems or object storage"""
import glob
from pathlib import Path
file_pattern = config['pattern']
base_path = Path(config['base_path'])
# Find new files since last collection
all_files = glob.glob(str(base_path / file_pattern))
last_timestamp = self.last_collection.get(config['source_id'], 0)
new_files = [
f for f in all_files
if os.path.getmtime(f) > last_timestamp
]
# Read and combine data
data_frames = []
for file_path in new_files:
if file_path.endswith('.csv'):
df = pd.read_csv(file_path)
elif file_path.endswith('.parquet'):
df = pd.read_parquet(file_path)
elif file_path.endswith('.json'):
df = pd.read_json(file_path)
data_frames.append(df)
if data_frames:
combined = pd.concat(data_frames, ignore_index=True)
self.last_collection[config['source_id']] = max(os.path.getmtime(f) for f in new_files)
return combined
return pd.DataFrame()
return BatchCollector(self.data_sources)
def streaming_collection_pipeline(self):
"""Real-time streaming data collection"""
class StreamCollector:
def __init__(self, stream_config):
self.config = stream_config
self.buffer = []
self.buffer_size = stream_config.get('buffer_size', 1000)
async def collect_from_kafka(self, topic, consumer_group):
"""Collect streaming data from Kafka"""
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
topic,
bootstrap_servers=self.config['kafka_servers'],
group_id=consumer_group,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='latest'
)
for message in consumer:
data = message.value
# Apply real-time transformations
processed = self.process_streaming_record(data)
self.buffer.append(processed)
# Flush buffer when full
if len(self.buffer) >= self.buffer_size:
yield self.flush_buffer()
def process_streaming_record(self, record):
"""Process individual streaming records"""
# Data validation
if not self.validate_record(record):
return None
# Feature extraction
features = {
'timestamp': record.get('timestamp'),
'user_id': record.get('user_id'),
'event_type': record.get('event_type'),
'features': self.extract_features(record)
}
return features
def flush_buffer(self):
"""Flush buffer to storage"""
if not self.buffer:
return None
df = pd.DataFrame(self.buffer)
self.buffer = []
return df
return StreamCollector(self.privacy_config)
def privacy_preserving_collection(self):
"""Implement privacy-preserving data collection"""
class PrivacyPreserver:
def __init__(self, privacy_level='medium'):
self.privacy_level = privacy_level
self.epsilon = {'low': 10.0, 'medium': 1.0, 'high': 0.1}[privacy_level]
def differential_privacy_aggregation(self, data, query_function):
"""Apply differential privacy to aggregated queries"""
import numpy as np
# Add Laplace noise based on sensitivity and epsilon
true_result = query_function(data)
sensitivity = self.estimate_sensitivity(query_function, data)
noise = np.random.laplace(0, sensitivity / self.epsilon)
return true_result + noise
def k_anonymity_transform(self, data, k=5):
"""Ensure k-anonymity in collected data"""
quasi_identifiers = ['age', 'zip_code', 'gender']
# Group by quasi-identifiers
grouped = data.groupby(quasi_identifiers).size().reset_index(name='count')
# Suppress groups with count < k
safe_groups = grouped[grouped['count'] >= k][quasi_identifiers]
anonymized = data.merge(safe_groups, on=quasi_identifiers, how='inner')
return anonymized
def data_minimization(self, data, required_features):
"""Collect only necessary features"""
# Remove unnecessary columns
minimal_data = data[required_features].copy()
# Hash or remove direct identifiers
if 'email' in minimal_data.columns:
minimal_data['email_hash'] = minimal_data['email'].apply(
lambda x: hashlib.sha256(x.encode()).hexdigest()
)
minimal_data.drop('email', axis=1, inplace=True)
return minimal_data
return PrivacyPreserver(self.privacy_config.get('level', 'medium'))
Training Data Lifecycle
Collection & Ingestion
Gather data from various sources, apply privacy measures, validate schemas
Validation & Cleaning
Check quality, detect anomalies, clean and preprocess data
Labeling & Annotation
Apply labels using active learning, weak supervision, or human annotation
Versioning & Storage
Version datasets, track lineage, manage schema evolution
Data Management Best Practices
🔒 Privacy & Security
- • Implement differential privacy for sensitive data
- • Apply k-anonymity and data minimization
- • Encrypt data at rest and in transit
- • Maintain audit logs for compliance
📈 Quality Control
- • Automate data validation pipelines
- • Monitor distribution shifts continuously
- • Implement anomaly detection systems
- • Regular quality audits and reports
🔄 Versioning Strategy
- • Use semantic versioning for datasets
- • Track complete data lineage
- • Maintain reproducible pipelines
- • Document schema changes thoroughly
🏷️ Efficient Labeling
- • Leverage active learning to reduce costs
- • Combine weak supervision with human review
- • Track inter-annotator agreement
- • Prioritize ambiguous samples for review
Common Data Management Challenges
Challenge | Impact | Solution |
---|---|---|
Label Noise | Reduced model accuracy | Multi-annotator consensus, confidence filtering |
Data Drift | Performance degradation | Continuous monitoring, adaptive sampling |
Class Imbalance | Biased predictions | Oversampling, SMOTE, weighted loss |
Storage Costs | Budget overruns | Data compression, tiered storage, sampling |
Privacy Compliance | Legal risks | Differential privacy, federated learning |
Essential Technologies for Data Management
DVC→
Data version control for ML projects
Apache Kafka→
Stream processing for real-time data collection
MLflow→
Tracking data lineage and experiments
Apache Spark→
Large-scale data processing and validation
Label Studio→
Data labeling and annotation platform
Great Expectations→
Data quality validation framework