Financial ML Systems
Building AI systems for finance: fraud detection, algorithmic trading, and risk management
⚠️ Regulatory Compliance Notice
Financial ML systems are subject to strict regulatory oversight including SEC, FINRA, GDPR, and Basel III requirements. This content is for educational purposes only. Production systems require compliance with applicable financial regulations, model governance frameworks, and risk management protocols. Always consult with compliance and legal teams before deployment.
Financial ML Domains
Fraud Detection Systems
Production Fraud Detection System
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
from typing import Dict, List, Tuple, Optional
import redis
import json
from datetime import datetime, timedelta
import logging
import warnings
class FraudDetectionSystem:
"""
Production-grade fraud detection system for financial transactions
Features:
- Real-time transaction scoring
- Multi-model ensemble approach
- Behavioral analytics
- Graph-based network analysis
- Continuous learning and adaptation
"""
def __init__(self, config: Dict):
self.config = config
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Initialize models
self.ensemble_models = self._load_ensemble_models()
self.behavioral_model = self._load_behavioral_model()
self.graph_model = self._load_graph_model()
# Initialize feature engineering pipeline
self.feature_pipeline = self._init_feature_pipeline()
# Initialize real-time data connections
self.redis_client = redis.Redis(
host=config['redis_host'],
port=config['redis_port']
)
# Initialize logging
self.logger = self._setup_logging()
# Performance metrics tracking
self.metrics = {
'total_transactions': 0,
'fraud_detected': 0,
'false_positives': 0,
'model_performance': {}
}
def score_transaction(self, transaction: Dict) -> Dict:
"""
Score a single transaction for fraud risk
Args:
transaction: Transaction data including amount, merchant, user, etc.
Returns:
Fraud risk score and detailed analysis
"""
transaction_id = transaction.get('transaction_id')
self.logger.info(f"Scoring transaction {transaction_id}")
try:
# Extract and engineer features
features = self.feature_pipeline.transform(transaction)
# Get real-time user context
user_context = self._get_user_context(transaction['user_id'])
# Behavioral analysis
behavioral_score = self._analyze_user_behavior(
transaction, user_context
)
# Ensemble model scoring
ensemble_scores = self._score_with_ensemble(features)
# Graph-based network analysis
network_score = self._analyze_transaction_network(transaction)
# Combine scores
final_score = self._combine_scores(
ensemble_scores, behavioral_score, network_score
)
# Risk assessment
risk_level = self._assess_risk_level(final_score)
# Generate explanations
explanations = self._generate_explanations(
features, ensemble_scores, behavioral_score, network_score
)
# Update user profile
self._update_user_profile(transaction['user_id'], transaction, final_score)
result = {
'transaction_id': transaction_id,
'fraud_score': float(final_score),
'risk_level': risk_level,
'decision': self._make_decision(final_score, risk_level),
'explanations': explanations,
'model_scores': {
'ensemble': float(ensemble_scores['weighted_average']),
'behavioral': float(behavioral_score),
'network': float(network_score)
},
'processing_time_ms': self._get_processing_time(),
'timestamp': datetime.utcnow().isoformat()
}
# Log and update metrics
self._log_transaction_result(result)
self._update_metrics(result)
return result
except Exception as e:
self.logger.error(f"Error scoring transaction {transaction_id}: {str(e)}")
return {
'transaction_id': transaction_id,
'error': True,
'message': 'Fraud scoring failed - manual review required',
'timestamp': datetime.utcnow().isoformat()
}
def _score_with_ensemble(self, features: np.ndarray) -> Dict:
"""Score transaction using ensemble of ML models"""
scores = {}
# Random Forest
rf_score = self.ensemble_models['random_forest'].predict_proba(features.reshape(1, -1))[0][1]
scores['random_forest'] = rf_score
# Gradient Boosting
gb_score = self.ensemble_models['gradient_boosting'].predict_proba(features.reshape(1, -1))[0][1]
scores['gradient_boosting'] = gb_score
# Neural Network
with torch.no_grad():
features_tensor = torch.FloatTensor(features).unsqueeze(0).to(self.device)
nn_score = torch.sigmoid(self.ensemble_models['neural_network'](features_tensor)).cpu().item()
scores['neural_network'] = nn_score
# Isolation Forest (anomaly detection)
anomaly_score = self.ensemble_models['isolation_forest'].decision_function(features.reshape(1, -1))[0]
# Convert to probability (0-1 scale)
scores['anomaly_detection'] = 1 / (1 + np.exp(anomaly_score))
# Weighted ensemble
weights = self.config['ensemble_weights']
weighted_score = sum(scores[model] * weights[model] for model in scores.keys())
scores['weighted_average'] = weighted_score
return scores
def _analyze_user_behavior(self, transaction: Dict, user_context: Dict) -> float:
"""Analyze user behavior patterns for anomaly detection"""
user_id = transaction['user_id']
# Get user's historical behavior
historical_data = self._get_user_transaction_history(user_id, days=30)
if len(historical_data) < 5: # Not enough history
return 0.3 # Moderate risk for new users
# Behavioral features
behavioral_features = {
'amount_deviation': self._calculate_amount_deviation(transaction, historical_data),
'time_deviation': self._calculate_time_deviation(transaction, historical_data),
'merchant_novelty': self._calculate_merchant_novelty(transaction, historical_data),
'location_deviation': self._calculate_location_deviation(transaction, historical_data),
'velocity_score': self._calculate_transaction_velocity(user_id),
'spending_pattern_change': self._detect_spending_pattern_change(historical_data)
}
# Use behavioral model to score
feature_vector = np.array(list(behavioral_features.values())).reshape(1, -1)
behavioral_score = self.behavioral_model.predict_proba(feature_vector)[0][1]
return behavioral_score
def _analyze_transaction_network(self, transaction: Dict) -> float:
"""Analyze transaction in context of network patterns"""
# Build transaction graph features
graph_features = {
'merchant_risk_score': self._get_merchant_risk_score(transaction['merchant_id']),
'ip_reputation_score': self._get_ip_reputation(transaction.get('ip_address')),
'device_risk_score': self._get_device_risk_score(transaction.get('device_id')),
'connected_accounts_risk': self._analyze_connected_accounts(transaction['user_id']),
'transaction_cluster_risk': self._analyze_transaction_cluster(transaction)
}
# Network-based scoring
network_score = 0.0
for feature, value in graph_features.items():
weight = self.config['network_weights'].get(feature, 0.2)
network_score += value * weight
return min(network_score, 1.0) # Cap at 1.0
def batch_score_transactions(self, transactions: List[Dict]) -> List[Dict]:
"""Efficiently score multiple transactions in batch"""
results = []
# Process in batches for efficiency
batch_size = self.config.get('batch_size', 100)
for i in range(0, len(transactions), batch_size):
batch = transactions[i:i + batch_size]
# Parallel feature extraction
batch_features = self.feature_pipeline.transform_batch(batch)
# Batch model scoring
batch_ensemble_scores = self._score_batch_with_ensemble(batch_features)
# Individual analysis for each transaction
for j, transaction in enumerate(batch):
try:
# Get individual scores
ensemble_scores = {k: v[j] for k, v in batch_ensemble_scores.items()}
# Behavioral and network analysis (individual)
user_context = self._get_user_context(transaction['user_id'])
behavioral_score = self._analyze_user_behavior(transaction, user_context)
network_score = self._analyze_transaction_network(transaction)
# Combine scores
final_score = self._combine_scores(
ensemble_scores, behavioral_score, network_score
)
result = {
'transaction_id': transaction['transaction_id'],
'fraud_score': float(final_score),
'risk_level': self._assess_risk_level(final_score),
'decision': self._make_decision(final_score, self._assess_risk_level(final_score)),
'batch_processed': True,
'timestamp': datetime.utcnow().isoformat()
}
results.append(result)
except Exception as e:
self.logger.error(f"Batch processing error for transaction {transaction.get('transaction_id')}: {str(e)}")
results.append({
'transaction_id': transaction.get('transaction_id'),
'error': True,
'message': 'Batch processing failed'
})
return results
def retrain_models(self, new_data: pd.DataFrame, labels: pd.Series):
"""Retrain fraud detection models with new data"""
self.logger.info("Starting model retraining")
# Prepare data
X_train, X_val, y_train, y_val = train_test_split(
new_data, labels, test_size=0.2, stratify=labels, random_state=42
)
# Feature engineering
X_train_features = self.feature_pipeline.fit_transform(X_train)
X_val_features = self.feature_pipeline.transform(X_val)
# Retrain ensemble models
retrained_models = {}
# Random Forest
rf_model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42
)
rf_model.fit(X_train_features, y_train)
retrained_models['random_forest'] = rf_model
# Gradient Boosting
from sklearn.ensemble import GradientBoostingClassifier
gb_model = GradientBoostingClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=6,
random_state=42
)
gb_model.fit(X_train_features, y_train)
retrained_models['gradient_boosting'] = gb_model
# Neural Network
nn_model = self._retrain_neural_network(X_train_features, y_train, X_val_features, y_val)
retrained_models['neural_network'] = nn_model
# Evaluate performance
performance_metrics = self._evaluate_model_performance(
retrained_models, X_val_features, y_val
)
# Deploy if performance is satisfactory
if self._should_deploy_retrained_models(performance_metrics):
self.ensemble_models.update(retrained_models)
self.logger.info("Models successfully retrained and deployed")
else:
self.logger.warning("Retrained models did not meet performance criteria")
return performance_metrics
# Real-time fraud detection service
class RealTimeFraudService:
"""Real-time fraud detection service with stream processing"""
def __init__(self, fraud_detector: FraudDetectionSystem):
self.fraud_detector = fraud_detector
self.stream_processor = self._init_stream_processor()
def start_real_time_processing(self):
"""Start processing real-time transaction stream"""
self.logger.info("Starting real-time fraud detection service")
# Subscribe to transaction stream
for message in self.stream_processor.subscribe(['transactions']):
try:
transaction_data = json.loads(message['data'])
# Score transaction
result = self.fraud_detector.score_transaction(transaction_data)
# Take action based on risk level
self._handle_fraud_decision(result)
except Exception as e:
self.logger.error(f"Stream processing error: {str(e)}")
def _handle_fraud_decision(self, result: Dict):
"""Handle fraud detection decision"""
if result['decision'] == 'BLOCK':
# Block transaction immediately
self._block_transaction(result['transaction_id'])
elif result['decision'] == 'REVIEW':
# Send to manual review queue
self._queue_for_review(result)
elif result['decision'] == 'MONITOR':
# Allow but increase monitoring
self._increase_monitoring(result['transaction_id'])
# Always log decision
self._log_decision(result)
Related Technologies
Apache Kafka→
Real-time data streaming for financial systems
Redis→
High-performance caching for fraud detection
Apache Spark→
Large-scale financial data processing
XGBoost→
Gradient boosting for financial ML models
TensorFlow→
Deep learning for trading and risk models
PostgreSQL→
Transaction and risk data storage
Prometheus→
Financial system monitoring and alerting
Kubernetes→
Container orchestration for trading systems