Financial ML Systems

Building AI systems for finance: fraud detection, algorithmic trading, and risk management

⚠️ Regulatory Compliance Notice

Financial ML systems are subject to strict regulatory oversight including SEC, FINRA, GDPR, and Basel III requirements. This content is for educational purposes only. Production systems require compliance with applicable financial regulations, model governance frameworks, and risk management protocols. Always consult with compliance and legal teams before deployment.

Financial ML Domains

Fraud Detection Systems

Production Fraud Detection System

import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
from typing import Dict, List, Tuple, Optional
import redis
import json
from datetime import datetime, timedelta
import logging
import warnings

class FraudDetectionSystem:
    """
    Production-grade fraud detection system for financial transactions
    
    Features:
    - Real-time transaction scoring
    - Multi-model ensemble approach
    - Behavioral analytics
    - Graph-based network analysis
    - Continuous learning and adaptation
    """
    
    def __init__(self, config: Dict):
        self.config = config
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        # Initialize models
        self.ensemble_models = self._load_ensemble_models()
        self.behavioral_model = self._load_behavioral_model()
        self.graph_model = self._load_graph_model()
        
        # Initialize feature engineering pipeline
        self.feature_pipeline = self._init_feature_pipeline()
        
        # Initialize real-time data connections
        self.redis_client = redis.Redis(
            host=config['redis_host'], 
            port=config['redis_port']
        )
        
        # Initialize logging
        self.logger = self._setup_logging()
        
        # Performance metrics tracking
        self.metrics = {
            'total_transactions': 0,
            'fraud_detected': 0,
            'false_positives': 0,
            'model_performance': {}
        }
    
    def score_transaction(self, transaction: Dict) -> Dict:
        """
        Score a single transaction for fraud risk
        
        Args:
            transaction: Transaction data including amount, merchant, user, etc.
            
        Returns:
            Fraud risk score and detailed analysis
        """
        
        transaction_id = transaction.get('transaction_id')
        self.logger.info(f"Scoring transaction {transaction_id}")
        
        try:
            # Extract and engineer features
            features = self.feature_pipeline.transform(transaction)
            
            # Get real-time user context
            user_context = self._get_user_context(transaction['user_id'])
            
            # Behavioral analysis
            behavioral_score = self._analyze_user_behavior(
                transaction, user_context
            )
            
            # Ensemble model scoring
            ensemble_scores = self._score_with_ensemble(features)
            
            # Graph-based network analysis
            network_score = self._analyze_transaction_network(transaction)
            
            # Combine scores
            final_score = self._combine_scores(
                ensemble_scores, behavioral_score, network_score
            )
            
            # Risk assessment
            risk_level = self._assess_risk_level(final_score)
            
            # Generate explanations
            explanations = self._generate_explanations(
                features, ensemble_scores, behavioral_score, network_score
            )
            
            # Update user profile
            self._update_user_profile(transaction['user_id'], transaction, final_score)
            
            result = {
                'transaction_id': transaction_id,
                'fraud_score': float(final_score),
                'risk_level': risk_level,
                'decision': self._make_decision(final_score, risk_level),
                'explanations': explanations,
                'model_scores': {
                    'ensemble': float(ensemble_scores['weighted_average']),
                    'behavioral': float(behavioral_score),
                    'network': float(network_score)
                },
                'processing_time_ms': self._get_processing_time(),
                'timestamp': datetime.utcnow().isoformat()
            }
            
            # Log and update metrics
            self._log_transaction_result(result)
            self._update_metrics(result)
            
            return result
            
        except Exception as e:
            self.logger.error(f"Error scoring transaction {transaction_id}: {str(e)}")
            return {
                'transaction_id': transaction_id,
                'error': True,
                'message': 'Fraud scoring failed - manual review required',
                'timestamp': datetime.utcnow().isoformat()
            }
    
    def _score_with_ensemble(self, features: np.ndarray) -> Dict:
        """Score transaction using ensemble of ML models"""
        
        scores = {}
        
        # Random Forest
        rf_score = self.ensemble_models['random_forest'].predict_proba(features.reshape(1, -1))[0][1]
        scores['random_forest'] = rf_score
        
        # Gradient Boosting
        gb_score = self.ensemble_models['gradient_boosting'].predict_proba(features.reshape(1, -1))[0][1]
        scores['gradient_boosting'] = gb_score
        
        # Neural Network
        with torch.no_grad():
            features_tensor = torch.FloatTensor(features).unsqueeze(0).to(self.device)
            nn_score = torch.sigmoid(self.ensemble_models['neural_network'](features_tensor)).cpu().item()
        scores['neural_network'] = nn_score
        
        # Isolation Forest (anomaly detection)
        anomaly_score = self.ensemble_models['isolation_forest'].decision_function(features.reshape(1, -1))[0]
        # Convert to probability (0-1 scale)
        scores['anomaly_detection'] = 1 / (1 + np.exp(anomaly_score))
        
        # Weighted ensemble
        weights = self.config['ensemble_weights']
        weighted_score = sum(scores[model] * weights[model] for model in scores.keys())
        scores['weighted_average'] = weighted_score
        
        return scores
    
    def _analyze_user_behavior(self, transaction: Dict, user_context: Dict) -> float:
        """Analyze user behavior patterns for anomaly detection"""
        
        user_id = transaction['user_id']
        
        # Get user's historical behavior
        historical_data = self._get_user_transaction_history(user_id, days=30)
        
        if len(historical_data) < 5:  # Not enough history
            return 0.3  # Moderate risk for new users
        
        # Behavioral features
        behavioral_features = {
            'amount_deviation': self._calculate_amount_deviation(transaction, historical_data),
            'time_deviation': self._calculate_time_deviation(transaction, historical_data),
            'merchant_novelty': self._calculate_merchant_novelty(transaction, historical_data),
            'location_deviation': self._calculate_location_deviation(transaction, historical_data),
            'velocity_score': self._calculate_transaction_velocity(user_id),
            'spending_pattern_change': self._detect_spending_pattern_change(historical_data)
        }
        
        # Use behavioral model to score
        feature_vector = np.array(list(behavioral_features.values())).reshape(1, -1)
        behavioral_score = self.behavioral_model.predict_proba(feature_vector)[0][1]
        
        return behavioral_score
    
    def _analyze_transaction_network(self, transaction: Dict) -> float:
        """Analyze transaction in context of network patterns"""
        
        # Build transaction graph features
        graph_features = {
            'merchant_risk_score': self._get_merchant_risk_score(transaction['merchant_id']),
            'ip_reputation_score': self._get_ip_reputation(transaction.get('ip_address')),
            'device_risk_score': self._get_device_risk_score(transaction.get('device_id')),
            'connected_accounts_risk': self._analyze_connected_accounts(transaction['user_id']),
            'transaction_cluster_risk': self._analyze_transaction_cluster(transaction)
        }
        
        # Network-based scoring
        network_score = 0.0
        for feature, value in graph_features.items():
            weight = self.config['network_weights'].get(feature, 0.2)
            network_score += value * weight
        
        return min(network_score, 1.0)  # Cap at 1.0
    
    def batch_score_transactions(self, transactions: List[Dict]) -> List[Dict]:
        """Efficiently score multiple transactions in batch"""
        
        results = []
        
        # Process in batches for efficiency
        batch_size = self.config.get('batch_size', 100)
        
        for i in range(0, len(transactions), batch_size):
            batch = transactions[i:i + batch_size]
            
            # Parallel feature extraction
            batch_features = self.feature_pipeline.transform_batch(batch)
            
            # Batch model scoring
            batch_ensemble_scores = self._score_batch_with_ensemble(batch_features)
            
            # Individual analysis for each transaction
            for j, transaction in enumerate(batch):
                try:
                    # Get individual scores
                    ensemble_scores = {k: v[j] for k, v in batch_ensemble_scores.items()}
                    
                    # Behavioral and network analysis (individual)
                    user_context = self._get_user_context(transaction['user_id'])
                    behavioral_score = self._analyze_user_behavior(transaction, user_context)
                    network_score = self._analyze_transaction_network(transaction)
                    
                    # Combine scores
                    final_score = self._combine_scores(
                        ensemble_scores, behavioral_score, network_score
                    )
                    
                    result = {
                        'transaction_id': transaction['transaction_id'],
                        'fraud_score': float(final_score),
                        'risk_level': self._assess_risk_level(final_score),
                        'decision': self._make_decision(final_score, self._assess_risk_level(final_score)),
                        'batch_processed': True,
                        'timestamp': datetime.utcnow().isoformat()
                    }
                    
                    results.append(result)
                    
                except Exception as e:
                    self.logger.error(f"Batch processing error for transaction {transaction.get('transaction_id')}: {str(e)}")
                    results.append({
                        'transaction_id': transaction.get('transaction_id'),
                        'error': True,
                        'message': 'Batch processing failed'
                    })
        
        return results
    
    def retrain_models(self, new_data: pd.DataFrame, labels: pd.Series):
        """Retrain fraud detection models with new data"""
        
        self.logger.info("Starting model retraining")
        
        # Prepare data
        X_train, X_val, y_train, y_val = train_test_split(
            new_data, labels, test_size=0.2, stratify=labels, random_state=42
        )
        
        # Feature engineering
        X_train_features = self.feature_pipeline.fit_transform(X_train)
        X_val_features = self.feature_pipeline.transform(X_val)
        
        # Retrain ensemble models
        retrained_models = {}
        
        # Random Forest
        rf_model = RandomForestClassifier(
            n_estimators=100, 
            max_depth=10, 
            random_state=42
        )
        rf_model.fit(X_train_features, y_train)
        retrained_models['random_forest'] = rf_model
        
        # Gradient Boosting
        from sklearn.ensemble import GradientBoostingClassifier
        gb_model = GradientBoostingClassifier(
            n_estimators=100,
            learning_rate=0.1,
            max_depth=6,
            random_state=42
        )
        gb_model.fit(X_train_features, y_train)
        retrained_models['gradient_boosting'] = gb_model
        
        # Neural Network
        nn_model = self._retrain_neural_network(X_train_features, y_train, X_val_features, y_val)
        retrained_models['neural_network'] = nn_model
        
        # Evaluate performance
        performance_metrics = self._evaluate_model_performance(
            retrained_models, X_val_features, y_val
        )
        
        # Deploy if performance is satisfactory
        if self._should_deploy_retrained_models(performance_metrics):
            self.ensemble_models.update(retrained_models)
            self.logger.info("Models successfully retrained and deployed")
        else:
            self.logger.warning("Retrained models did not meet performance criteria")
        
        return performance_metrics

# Real-time fraud detection service
class RealTimeFraudService:
    """Real-time fraud detection service with stream processing"""
    
    def __init__(self, fraud_detector: FraudDetectionSystem):
        self.fraud_detector = fraud_detector
        self.stream_processor = self._init_stream_processor()
        
    def start_real_time_processing(self):
        """Start processing real-time transaction stream"""
        
        self.logger.info("Starting real-time fraud detection service")
        
        # Subscribe to transaction stream
        for message in self.stream_processor.subscribe(['transactions']):
            try:
                transaction_data = json.loads(message['data'])
                
                # Score transaction
                result = self.fraud_detector.score_transaction(transaction_data)
                
                # Take action based on risk level
                self._handle_fraud_decision(result)
                
            except Exception as e:
                self.logger.error(f"Stream processing error: {str(e)}")
    
    def _handle_fraud_decision(self, result: Dict):
        """Handle fraud detection decision"""
        
        if result['decision'] == 'BLOCK':
            # Block transaction immediately
            self._block_transaction(result['transaction_id'])
            
        elif result['decision'] == 'REVIEW':
            # Send to manual review queue
            self._queue_for_review(result)
            
        elif result['decision'] == 'MONITOR':
            # Allow but increase monitoring
            self._increase_monitoring(result['transaction_id'])
        
        # Always log decision
        self._log_decision(result)

📝 Financial ML Systems Mastery Check

1 of 4Current: 0/4

What is the primary challenge in using ML for high-frequency trading?