Unified AI Platform Architecture
Master end-to-end ML infrastructure, multi-tenant systems, platform engineering, and enterprise AI deployment
What is a Unified AI Platform Architecture?
A unified AI platform architecture is an integrated, end-to-end infrastructure that provides all necessary components for developing, deploying, and managing AI/ML applications at enterprise scale with multi-tenancy support.
🏗️ AI Platform Calculator
Calculate infrastructure requirements, costs, and team productivity metrics for unified AI platforms.
Platform Analysis
Unified AI Platform Components
Data & Feature Layer
- • Data ingestion & ETL pipelines
- • Feature stores & feature serving
- • Data cataloging & lineage tracking
- • Data quality monitoring
- • Multi-tenant data isolation
Model Development Layer
- • Notebook-as-a-Service environments
- • Distributed training infrastructure
- • Experiment tracking & versioning
- • Model registry & artifact management
- • AutoML & hyperparameter tuning
Deployment & Serving Layer
- • Multi-model serving infrastructure
- • Auto-scaling & load balancing
- • A/B testing & canary deployments
- • Edge deployment capabilities
- • API gateway & rate limiting
Operations & Governance Layer
- • Comprehensive monitoring & alerting
- • Model performance tracking
- • Resource usage analytics
- • Compliance & audit trails
- • Cost optimization & chargeback
Platform Core Infrastructure
Multi-Tenant Platform Controller
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, asdict
from enum import Enum
import asyncio
import logging
import time
import uuid
import json
from datetime import datetime, timedelta
import kubernetes
from kubernetes import client, config
import boto3
import redis
import psycopg2
from contextlib import asynccontextmanager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TenantTier(Enum):
BASIC = "basic"
PREMIUM = "premium"
ENTERPRISE = "enterprise"
class ResourceType(Enum):
CPU = "cpu"
MEMORY = "memory"
GPU = "gpu"
STORAGE = "storage"
@dataclass
class ResourceQuota:
cpu_cores: float
memory_gb: float
gpu_count: int
storage_gb: float
max_models: int
max_deployments: int
max_requests_per_hour: int
@dataclass
class TenantConfig:
tenant_id: str
tenant_name: str
tier: TenantTier
quota: ResourceQuota
created_at: datetime
active: bool
metadata: Dict[str, Any]
@dataclass
class ModelDeployment:
deployment_id: str
tenant_id: str
model_name: str
model_version: str
replicas: int
resource_requirements: Dict[ResourceType, float]
endpoint_url: str
status: str
created_at: datetime
updated_at: datetime
class UnifiedAIPlatformController:
"""Core controller for unified AI platform with multi-tenancy"""
def __init__(self,
kubeconfig_path: Optional[str] = None,
redis_host: str = "localhost",
postgres_host: str = "localhost"):
# Kubernetes setup
if kubeconfig_path:
config.load_kube_config(config_file=kubeconfig_path)
else:
config.load_incluster_config()
self.k8s_apps_v1 = client.AppsV1Api()
self.k8s_core_v1 = client.CoreV1Api()
self.k8s_custom = client.CustomObjectsApi()
# Database connections
self.redis_client = redis.Redis(host=redis_host, port=6379, decode_responses=True)
self.postgres_conn = psycopg2.connect(
host=postgres_host,
database="ai_platform",
user="platform_user",
password="secure_password"
)
# AWS services (for cloud resources)
self.s3_client = boto3.client('s3')
self.sagemaker_client = boto3.client('sagemaker')
# Platform state
self.tenants: Dict[str, TenantConfig] = {}
self.deployments: Dict[str, ModelDeployment] = {}
# Resource monitoring
self.resource_usage_cache = {}
# Default tier configurations
self.tier_quotas = {
TenantTier.BASIC: ResourceQuota(
cpu_cores=16, memory_gb=64, gpu_count=0, storage_gb=100,
max_models=10, max_deployments=5, max_requests_per_hour=10000
),
TenantTier.PREMIUM: ResourceQuota(
cpu_cores=64, memory_gb=256, gpu_count=2, storage_gb=500,
max_models=50, max_deployments=20, max_requests_per_hour=100000
),
TenantTier.ENTERPRISE: ResourceQuota(
cpu_cores=256, memory_gb=1024, gpu_count=8, storage_gb=2000,
max_models=200, max_deployments=100, max_requests_per_hour=1000000
)
}
# Initialize platform
asyncio.create_task(self._initialize_platform())
async def _initialize_platform(self):
"""Initialize platform infrastructure"""
try:
# Create platform namespaces
await self._create_platform_namespaces()
# Set up shared services
await self._deploy_shared_services()
# Load existing tenants
await self._load_tenants_from_db()
logger.info("AI Platform Controller initialized successfully")
except Exception as e:
logger.error(f"Platform initialization failed: {e}")
raise
async def _create_platform_namespaces(self):
"""Create necessary Kubernetes namespaces"""
namespaces = [
"ai-platform-system",
"ai-platform-shared",
"ai-platform-monitoring"
]
for ns in namespaces:
try:
namespace = client.V1Namespace(
metadata=client.V1ObjectMeta(name=ns)
)
self.k8s_core_v1.create_namespace(namespace)
logger.info(f"Created namespace: {ns}")
except kubernetes.client.exceptions.ApiException as e:
if e.status != 409: # Already exists
raise
async def _deploy_shared_services(self):
"""Deploy shared platform services"""
shared_services = [
("feature-store", self._deploy_feature_store),
("model-registry", self._deploy_model_registry),
("monitoring-stack", self._deploy_monitoring_stack),
("api-gateway", self._deploy_api_gateway)
]
for service_name, deploy_func in shared_services:
try:
await deploy_func()
logger.info(f"Deployed shared service: {service_name}")
except Exception as e:
logger.error(f"Failed to deploy {service_name}: {e}")
async def create_tenant(self,
tenant_name: str,
tier: TenantTier,
metadata: Optional[Dict[str, Any]] = None) -> TenantConfig:
"""Create new tenant with isolated resources"""
tenant_id = f"tenant-{uuid.uuid4().hex[:8]}"
# Create tenant configuration
tenant_config = TenantConfig(
tenant_id=tenant_id,
tenant_name=tenant_name,
tier=tier,
quota=self.tier_quotas[tier],
created_at=datetime.utcnow(),
active=True,
metadata=metadata or {}
)
try:
# Create tenant namespace
await self._create_tenant_namespace(tenant_id)
# Set up resource quotas
await self._configure_resource_quotas(tenant_id, tenant_config.quota)
# Create tenant-specific services
await self._setup_tenant_services(tenant_id, tenant_config)
# Store in database
await self._store_tenant_config(tenant_config)
# Cache tenant configuration
self.tenants[tenant_id] = tenant_config
logger.info(f"Created tenant: {tenant_id} ({tenant_name}) with tier {tier.value}")
return tenant_config
except Exception as e:
logger.error(f"Failed to create tenant {tenant_name}: {e}")
# Cleanup on failure
await self._cleanup_tenant_resources(tenant_id)
raise
async def _create_tenant_namespace(self, tenant_id: str):
"""Create Kubernetes namespace for tenant"""
namespace = client.V1Namespace(
metadata=client.V1ObjectMeta(
name=f"tenant-{tenant_id}",
labels={
"tenant-id": tenant_id,
"ai-platform/tenant": "true"
}
)
)
self.k8s_core_v1.create_namespace(namespace)
async def _configure_resource_quotas(self, tenant_id: str, quota: ResourceQuota):
"""Configure Kubernetes resource quotas for tenant"""
resource_quota = client.V1ResourceQuota(
metadata=client.V1ObjectMeta(name="tenant-quota"),
spec=client.V1ResourceQuotaSpec(
hard={
"requests.cpu": str(quota.cpu_cores),
"requests.memory": f"{quota.memory_gb}Gi",
"requests.nvidia.com/gpu": str(quota.gpu_count),
"persistentvolumeclaims": "10",
"pods": "100"
}
)
)
self.k8s_core_v1.create_namespaced_resource_quota(
namespace=f"tenant-{tenant_id}",
body=resource_quota
)
async def deploy_model(self,
tenant_id: str,
model_name: str,
model_version: str,
model_artifact_url: str,
resource_requirements: Optional[Dict[ResourceType, float]] = None) -> ModelDeployment:
"""Deploy ML model for tenant"""
if tenant_id not in self.tenants:
raise ValueError(f"Tenant {tenant_id} not found")
tenant = self.tenants[tenant_id]
# Check quota limits
current_deployments = sum(1 for d in self.deployments.values() if d.tenant_id == tenant_id)
if current_deployments >= tenant.quota.max_deployments:
raise ValueError(f"Tenant {tenant_id} has reached deployment limit")
# Default resource requirements
if resource_requirements is None:
resource_requirements = {
ResourceType.CPU: 1.0,
ResourceType.MEMORY: 2.0,
ResourceType.GPU: 0,
ResourceType.STORAGE: 10.0
}
deployment_id = f"deploy-{uuid.uuid4().hex[:8]}"
try:
# Create Kubernetes deployment
k8s_deployment = await self._create_model_deployment_k8s(
tenant_id, deployment_id, model_name, model_version,
model_artifact_url, resource_requirements
)
# Create service for the deployment
service = await self._create_model_service_k8s(
tenant_id, deployment_id, model_name
)
# Configure ingress/route
endpoint_url = await self._configure_model_endpoint(
tenant_id, deployment_id, model_name
)
# Create deployment record
deployment = ModelDeployment(
deployment_id=deployment_id,
tenant_id=tenant_id,
model_name=model_name,
model_version=model_version,
replicas=1, # Start with 1 replica
resource_requirements=resource_requirements,
endpoint_url=endpoint_url,
status="deploying",
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
# Store deployment
await self._store_deployment_record(deployment)
self.deployments[deployment_id] = deployment
# Set up monitoring for this deployment
await self._setup_deployment_monitoring(deployment)
logger.info(f"Started deployment {deployment_id} for tenant {tenant_id}")
return deployment
except Exception as e:
logger.error(f"Failed to deploy model {model_name}: {e}")
# Cleanup on failure
await self._cleanup_deployment_resources(tenant_id, deployment_id)
raise
async def _create_model_deployment_k8s(self,
tenant_id: str,
deployment_id: str,
model_name: str,
model_version: str,
model_artifact_url: str,
resource_requirements: Dict[ResourceType, float]) -> client.V1Deployment:
"""Create Kubernetes deployment for ML model"""
# Container specifications
container = client.V1Container(
name="model-server",
image="ai-platform/model-server:latest",
env=[
client.V1EnvVar(name="MODEL_NAME", value=model_name),
client.V1EnvVar(name="MODEL_VERSION", value=model_version),
client.V1EnvVar(name="MODEL_ARTIFACT_URL", value=model_artifact_url),
client.V1EnvVar(name="TENANT_ID", value=tenant_id)
],
resources=client.V1ResourceRequirements(
requests={
"cpu": str(resource_requirements[ResourceType.CPU]),
"memory": f"{resource_requirements[ResourceType.MEMORY]}Gi"
},
limits={
"cpu": str(resource_requirements[ResourceType.CPU] * 2),
"memory": f"{resource_requirements[ResourceType.MEMORY] * 1.5}Gi"
}
),
ports=[client.V1ContainerPort(container_port=8080)]
)
# Add GPU resources if needed
if resource_requirements[ResourceType.GPU] > 0:
container.resources.requests["nvidia.com/gpu"] = str(int(resource_requirements[ResourceType.GPU]))
container.resources.limits["nvidia.com/gpu"] = str(int(resource_requirements[ResourceType.GPU]))
# Pod template
pod_template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={
"app": f"model-{deployment_id}",
"tenant-id": tenant_id,
"model-name": model_name,
"model-version": model_version
}
),
spec=client.V1PodSpec(
containers=[container],
restart_policy="Always"
)
)
# Deployment specification
deployment_spec = client.V1DeploymentSpec(
replicas=1,
selector=client.V1LabelSelector(
match_labels={"app": f"model-{deployment_id}"}
),
template=pod_template
)
deployment = client.V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=client.V1ObjectMeta(
name=f"model-{deployment_id}",
namespace=f"tenant-{tenant_id}"
),
spec=deployment_spec
)
return self.k8s_apps_v1.create_namespaced_deployment(
namespace=f"tenant-{tenant_id}",
body=deployment
)
async def _create_model_service_k8s(self,
tenant_id: str,
deployment_id: str,
model_name: str) -> client.V1Service:
"""Create Kubernetes service for model deployment"""
service = client.V1Service(
metadata=client.V1ObjectMeta(
name=f"model-{deployment_id}-service",
namespace=f"tenant-{tenant_id}"
),
spec=client.V1ServiceSpec(
selector={"app": f"model-{deployment_id}"},
ports=[client.V1ServicePort(port=80, target_port=8080)],
type="ClusterIP"
)
)
return self.k8s_core_v1.create_namespaced_service(
namespace=f"tenant-{tenant_id}",
body=service
)
async def _configure_model_endpoint(self,
tenant_id: str,
deployment_id: str,
model_name: str) -> str:
"""Configure external endpoint for model"""
# This would typically involve creating an Ingress or configuring
# a load balancer. For simplicity, we'll return a constructed URL.
base_url = "https://api.ai-platform.com"
endpoint_url = f"{base_url}/tenants/{tenant_id}/models/{model_name}/predict"
# Store endpoint mapping in Redis
self.redis_client.hset(
f"endpoints:{tenant_id}",
deployment_id,
endpoint_url
)
return endpoint_url
async def scale_deployment(self,
deployment_id: str,
target_replicas: int) -> bool:
"""Scale model deployment"""
if deployment_id not in self.deployments:
raise ValueError(f"Deployment {deployment_id} not found")
deployment = self.deployments[deployment_id]
try:
# Update Kubernetes deployment
self.k8s_apps_v1.patch_namespaced_deployment_scale(
name=f"model-{deployment_id}",
namespace=f"tenant-{deployment.tenant_id}",
body=client.V1Scale(
spec=client.V1ScaleSpec(replicas=target_replicas)
)
)
# Update deployment record
deployment.replicas = target_replicas
deployment.updated_at = datetime.utcnow()
await self._store_deployment_record(deployment)
logger.info(f"Scaled deployment {deployment_id} to {target_replicas} replicas")
return True
except Exception as e:
logger.error(f"Failed to scale deployment {deployment_id}: {e}")
return False
async def get_resource_usage(self, tenant_id: str) -> Dict[str, Any]:
"""Get current resource usage for tenant"""
# Get CPU and memory usage from Kubernetes metrics
try:
# This would typically query Prometheus or Kubernetes metrics API
# For demonstration, we'll return mock data
usage = {
"cpu_cores_used": 4.2,
"memory_gb_used": 16.5,
"gpu_count_used": 1,
"storage_gb_used": 45.3,
"active_deployments": len([d for d in self.deployments.values() if d.tenant_id == tenant_id]),
"total_requests_last_hour": 15420,
"timestamp": time.time()
}
# Cache the results
self.resource_usage_cache[tenant_id] = usage
return usage
except Exception as e:
logger.error(f"Failed to get resource usage for {tenant_id}: {e}")
return {}
async def cleanup_tenant(self, tenant_id: str) -> bool:
"""Clean up all tenant resources"""
if tenant_id not in self.tenants:
raise ValueError(f"Tenant {tenant_id} not found")
try:
# Delete all deployments for this tenant
tenant_deployments = [d for d in self.deployments.values() if d.tenant_id == tenant_id]
for deployment in tenant_deployments:
await self._cleanup_deployment_resources(tenant_id, deployment.deployment_id)
# Delete tenant namespace (this removes all resources)
self.k8s_core_v1.delete_namespace(name=f"tenant-{tenant_id}")
# Clean up database records
await self._delete_tenant_from_db(tenant_id)
# Remove from cache
del self.tenants[tenant_id]
logger.info(f"Successfully cleaned up tenant {tenant_id}")
return True
except Exception as e:
logger.error(f"Failed to cleanup tenant {tenant_id}: {e}")
return False
# Database operations (simplified)
async def _store_tenant_config(self, config: TenantConfig):
"""Store tenant configuration in database"""
# Implementation would store in PostgreSQL
pass
async def _store_deployment_record(self, deployment: ModelDeployment):
"""Store deployment record in database"""
# Implementation would store in PostgreSQL
pass
async def _load_tenants_from_db(self):
"""Load existing tenants from database"""
# Implementation would load from PostgreSQL
pass
# Monitoring and shared service setup (simplified)
async def _deploy_feature_store(self):
"""Deploy shared feature store service"""
pass
async def _deploy_model_registry(self):
"""Deploy shared model registry service"""
pass
async def _deploy_monitoring_stack(self):
"""Deploy monitoring stack (Prometheus, Grafana, etc.)"""
pass
async def _deploy_api_gateway(self):
"""Deploy API gateway for unified access"""
pass
async def _setup_tenant_services(self, tenant_id: str, config: TenantConfig):
"""Set up tenant-specific services"""
pass
async def _setup_deployment_monitoring(self, deployment: ModelDeployment):
"""Set up monitoring for deployment"""
pass
async def _cleanup_tenant_resources(self, tenant_id: str):
"""Cleanup tenant resources on failure"""
pass
async def _cleanup_deployment_resources(self, tenant_id: str, deployment_id: str):
"""Cleanup deployment resources"""
pass
async def _delete_tenant_from_db(self, tenant_id: str):
"""Delete tenant from database"""
pass
# Usage example
async def main():
"""Example usage of the AI Platform Controller"""
platform = UnifiedAIPlatformController()
# Create a new tenant
tenant = await platform.create_tenant(
tenant_name="Acme Corp ML Team",
tier=TenantTier.PREMIUM,
metadata={"department": "data-science", "cost_center": "engineering"}
)
print(f"Created tenant: {tenant.tenant_id}")
# Deploy a model
deployment = await platform.deploy_model(
tenant_id=tenant.tenant_id,
model_name="fraud-detector",
model_version="v1.2.0",
model_artifact_url="s3://models/fraud-detector-v1.2.0.tar.gz",
resource_requirements={
ResourceType.CPU: 2.0,
ResourceType.MEMORY: 4.0,
ResourceType.GPU: 0,
ResourceType.STORAGE: 20.0
}
)
print(f"Deployed model: {deployment.endpoint_url}")
# Scale deployment
success = await platform.scale_deployment(deployment.deployment_id, 3)
print(f"Scaling result: {success}")
# Get resource usage
usage = await platform.get_resource_usage(tenant.tenant_id)
print(f"Resource usage: {usage}")
if __name__ == "__main__":
asyncio.run(main())Self-Service Developer Portal
Developer Experience API
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
from typing import Dict, List, Any, Optional
import asyncio
import json
import time
import logging
import uuid
from datetime import datetime, timedelta
import jwt
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Request/Response models
class ModelDeployRequest(BaseModel):
model_name: str = Field(..., description="Name of the model")
model_version: str = Field(..., description="Version of the model")
model_artifact_url: str = Field(..., description="URL to model artifacts")
resource_requirements: Optional[Dict[str, float]] = None
environment_variables: Optional[Dict[str, str]] = None
scaling_config: Optional[Dict[str, Any]] = None
class DeploymentResponse(BaseModel):
deployment_id: str
endpoint_url: str
status: str
estimated_ready_time: int # seconds
class ExperimentRequest(BaseModel):
experiment_name: str
description: Optional[str] = None
parameters: Dict[str, Any]
dataset_config: Dict[str, Any]
compute_requirements: Optional[Dict[str, float]] = None
class FeatureRequest(BaseModel):
feature_name: str
feature_definition: str # SQL or Python code
feature_type: str # "batch", "streaming", "on_demand"
dependencies: Optional[List[str]] = None
schedule: Optional[str] = None # Cron expression
class TenantInfo(BaseModel):
tenant_id: str
tenant_name: str
tier: str
quota: Dict[str, Any]
current_usage: Dict[str, Any]
# Authentication and authorization
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> Dict[str, Any]:
"""Verify JWT token and extract tenant information"""
try:
# In production, use proper JWT secret and validation
payload = jwt.decode(credentials.credentials, "secret", algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
class DeveloperPortalService:
"""Self-service developer portal for AI platform"""
def __init__(self, platform_controller):
self.platform = platform_controller
self.experiment_queue = asyncio.Queue()
self.feature_pipeline_jobs = {}
# Start background workers
asyncio.create_task(self._experiment_worker())
asyncio.create_task(self._feature_pipeline_worker())
async def deploy_model(self,
tenant_info: Dict[str, Any],
request: ModelDeployRequest) -> DeploymentResponse:
"""Deploy model with simplified interface"""
tenant_id = tenant_info["tenant_id"]
try:
# Validate request against tenant quotas
await self._validate_deployment_request(tenant_id, request)
# Deploy model using platform controller
deployment = await self.platform.deploy_model(
tenant_id=tenant_id,
model_name=request.model_name,
model_version=request.model_version,
model_artifact_url=request.model_artifact_url,
resource_requirements=request.resource_requirements or {
"cpu": 1.0, "memory": 2.0, "gpu": 0, "storage": 10.0
}
)
# Set up auto-scaling if configured
if request.scaling_config:
await self._configure_auto_scaling(deployment.deployment_id, request.scaling_config)
# Estimate ready time based on model size and resources
estimated_ready_time = self._estimate_deployment_time(request)
return DeploymentResponse(
deployment_id=deployment.deployment_id,
endpoint_url=deployment.endpoint_url,
status="deploying",
estimated_ready_time=estimated_ready_time
)
except Exception as e:
logger.error(f"Model deployment failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def create_experiment(self,
tenant_info: Dict[str, Any],
request: ExperimentRequest,
background_tasks: BackgroundTasks) -> Dict[str, Any]:
"""Create and queue ML experiment"""
experiment_id = f"exp-{uuid.uuid4().hex[:8]}"
tenant_id = tenant_info["tenant_id"]
experiment_config = {
"experiment_id": experiment_id,
"tenant_id": tenant_id,
"experiment_name": request.experiment_name,
"description": request.description,
"parameters": request.parameters,
"dataset_config": request.dataset_config,
"compute_requirements": request.compute_requirements or {"cpu": 2, "memory": 4},
"status": "queued",
"created_at": datetime.utcnow().isoformat(),
"estimated_duration": self._estimate_experiment_duration(request)
}
# Add to experiment queue
await self.experiment_queue.put(experiment_config)
# Start monitoring in background
background_tasks.add_task(self._monitor_experiment, experiment_id)
return {
"experiment_id": experiment_id,
"status": "queued",
"queue_position": self.experiment_queue.qsize(),
"estimated_start_time": time.time() + (self.experiment_queue.qsize() * 300) # 5 min per experiment
}
async def create_feature(self,
tenant_info: Dict[str, Any],
request: FeatureRequest) -> Dict[str, Any]:
"""Create feature pipeline"""
feature_id = f"feature-{uuid.uuid4().hex[:8]}"
tenant_id = tenant_info["tenant_id"]
try:
# Validate feature definition
validation_result = await self._validate_feature_definition(request)
if not validation_result["valid"]:
raise HTTPException(status_code=400, detail=validation_result["errors"])
# Create feature pipeline configuration
pipeline_config = {
"feature_id": feature_id,
"tenant_id": tenant_id,
"feature_name": request.feature_name,
"feature_definition": request.feature_definition,
"feature_type": request.feature_type,
"dependencies": request.dependencies or [],
"schedule": request.schedule,
"status": "creating",
"created_at": datetime.utcnow().isoformat()
}
# Deploy feature pipeline
await self._deploy_feature_pipeline(pipeline_config)
# Store in feature registry
await self._register_feature(pipeline_config)
return {
"feature_id": feature_id,
"feature_name": request.feature_name,
"status": "active",
"endpoint_url": f"https://api.ai-platform.com/tenants/{tenant_id}/features/{request.feature_name}"
}
except Exception as e:
logger.error(f"Feature creation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def get_tenant_dashboard(self, tenant_info: Dict[str, Any]) -> Dict[str, Any]:
"""Get comprehensive tenant dashboard data"""
tenant_id = tenant_info["tenant_id"]
try:
# Get tenant configuration
tenant_config = self.platform.tenants.get(tenant_id)
if not tenant_config:
raise HTTPException(status_code=404, detail="Tenant not found")
# Get current resource usage
resource_usage = await self.platform.get_resource_usage(tenant_id)
# Get active deployments
active_deployments = [
{
"deployment_id": d.deployment_id,
"model_name": d.model_name,
"model_version": d.model_version,
"status": d.status,
"endpoint_url": d.endpoint_url,
"replicas": d.replicas,
"created_at": d.created_at.isoformat()
}
for d in self.platform.deployments.values()
if d.tenant_id == tenant_id
]
# Get recent experiments
recent_experiments = await self._get_recent_experiments(tenant_id)
# Get feature pipeline status
feature_pipelines = await self._get_feature_pipelines(tenant_id)
# Calculate cost and usage trends
cost_analysis = await self._calculate_cost_trends(tenant_id)
return {
"tenant_info": {
"tenant_id": tenant_config.tenant_id,
"tenant_name": tenant_config.tenant_name,
"tier": tenant_config.tier.value,
"created_at": tenant_config.created_at.isoformat()
},
"quota_and_usage": {
"quota": {
"cpu_cores": tenant_config.quota.cpu_cores,
"memory_gb": tenant_config.quota.memory_gb,
"gpu_count": tenant_config.quota.gpu_count,
"max_models": tenant_config.quota.max_models,
"max_deployments": tenant_config.quota.max_deployments
},
"current_usage": resource_usage,
"utilization_percentage": {
"cpu": (resource_usage.get("cpu_cores_used", 0) / tenant_config.quota.cpu_cores) * 100,
"memory": (resource_usage.get("memory_gb_used", 0) / tenant_config.quota.memory_gb) * 100,
"deployments": (len(active_deployments) / tenant_config.quota.max_deployments) * 100
}
},
"active_deployments": active_deployments,
"recent_experiments": recent_experiments,
"feature_pipelines": feature_pipelines,
"cost_analysis": cost_analysis,
"platform_status": await self._get_platform_health()
}
except Exception as e:
logger.error(f"Dashboard data retrieval failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def get_model_logs(self,
tenant_info: Dict[str, Any],
deployment_id: str,
lines: int = 100) -> Dict[str, Any]:
"""Get logs for model deployment"""
tenant_id = tenant_info["tenant_id"]
# Verify deployment belongs to tenant
deployment = self.platform.deployments.get(deployment_id)
if not deployment or deployment.tenant_id != tenant_id:
raise HTTPException(status_code=404, detail="Deployment not found")
try:
# Get logs from Kubernetes
logs = await self._get_kubernetes_logs(tenant_id, deployment_id, lines)
# Parse and structure logs
structured_logs = self._parse_logs(logs)
return {
"deployment_id": deployment_id,
"model_name": deployment.model_name,
"logs": structured_logs,
"retrieved_at": datetime.utcnow().isoformat(),
"lines_returned": len(structured_logs)
}
except Exception as e:
logger.error(f"Log retrieval failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Helper methods
async def _validate_deployment_request(self, tenant_id: str, request: ModelDeployRequest):
"""Validate deployment request against quotas"""
tenant_config = self.platform.tenants.get(tenant_id)
if not tenant_config:
raise ValueError("Tenant not found")
current_deployments = sum(1 for d in self.platform.deployments.values() if d.tenant_id == tenant_id)
if current_deployments >= tenant_config.quota.max_deployments:
raise ValueError("Deployment quota exceeded")
async def _configure_auto_scaling(self, deployment_id: str, scaling_config: Dict[str, Any]):
"""Configure auto-scaling for deployment"""
# Implementation would set up HorizontalPodAutoscaler in Kubernetes
pass
def _estimate_deployment_time(self, request: ModelDeployRequest) -> int:
"""Estimate deployment ready time"""
# Base time + model size factor + resource complexity
base_time = 60 # 1 minute base
size_factor = len(request.model_artifact_url) // 1000 # Rough size estimate
return base_time + size_factor
async def _experiment_worker(self):
"""Background worker to process experiments"""
while True:
try:
experiment_config = await self.experiment_queue.get()
await self._run_experiment(experiment_config)
except Exception as e:
logger.error(f"Experiment worker error: {e}")
await asyncio.sleep(1)
async def _feature_pipeline_worker(self):
"""Background worker for feature pipeline management"""
while True:
try:
# Check and maintain feature pipelines
await self._maintain_feature_pipelines()
except Exception as e:
logger.error(f"Feature pipeline worker error: {e}")
await asyncio.sleep(30) # Check every 30 seconds
async def _run_experiment(self, experiment_config: Dict[str, Any]):
"""Execute ML experiment"""
# Implementation would create training job, monitor progress, store results
logger.info(f"Running experiment: {experiment_config['experiment_id']}")
async def _validate_feature_definition(self, request: FeatureRequest) -> Dict[str, Any]:
"""Validate feature definition syntax and dependencies"""
# Implementation would validate SQL/Python code, check dependencies
return {"valid": True, "errors": []}
async def _deploy_feature_pipeline(self, pipeline_config: Dict[str, Any]):
"""Deploy feature computation pipeline"""
# Implementation would create Apache Beam/Spark jobs
pass
async def _register_feature(self, pipeline_config: Dict[str, Any]):
"""Register feature in feature store"""
# Implementation would register in feature store (Feast, Tecton, etc.)
pass
def _estimate_experiment_duration(self, request: ExperimentRequest) -> int:
"""Estimate experiment duration in minutes"""
base_duration = 30 # 30 minutes base
complexity_factor = len(request.parameters) * 5 # 5 min per parameter
return base_duration + complexity_factor
async def _monitor_experiment(self, experiment_id: str):
"""Monitor experiment progress"""
# Implementation would track experiment status and send notifications
pass
async def _get_recent_experiments(self, tenant_id: str) -> List[Dict[str, Any]]:
"""Get recent experiments for tenant"""
# Implementation would query experiment database
return []
async def _get_feature_pipelines(self, tenant_id: str) -> List[Dict[str, Any]]:
"""Get feature pipelines for tenant"""
# Implementation would query feature store
return []
async def _calculate_cost_trends(self, tenant_id: str) -> Dict[str, Any]:
"""Calculate cost trends and forecasts"""
# Implementation would analyze resource usage and costs
return {"current_month_cost": 1250.50, "trend": "increasing", "forecast": 1400.00}
async def _get_platform_health(self) -> Dict[str, Any]:
"""Get overall platform health status"""
return {
"status": "healthy",
"services": {
"api_gateway": "healthy",
"feature_store": "healthy",
"model_registry": "healthy",
"monitoring": "healthy"
},
"last_updated": datetime.utcnow().isoformat()
}
async def _get_kubernetes_logs(self, tenant_id: str, deployment_id: str, lines: int) -> str:
"""Get logs from Kubernetes"""
# Implementation would call Kubernetes API
return f"Mock logs for deployment {deployment_id}"
def _parse_logs(self, raw_logs: str) -> List[Dict[str, Any]]:
"""Parse raw logs into structured format"""
# Implementation would parse and structure logs
return [{"timestamp": "2025-01-05T10:00:00Z", "level": "INFO", "message": "Model loaded successfully"}]
async def _maintain_feature_pipelines(self):
"""Maintain and monitor feature pipelines"""
# Implementation would check pipeline health, restart if needed
pass
# FastAPI application
app = FastAPI(
title="AI Platform Developer Portal",
description="Self-service portal for AI/ML platform",
version="1.0.0"
)
# Initialize services (would be dependency injected in production)
from .platform_controller import UnifiedAIPlatformController
platform_controller = UnifiedAIPlatformController()
portal_service = DeveloperPortalService(platform_controller)
@app.post("/models/deploy", response_model=DeploymentResponse)
async def deploy_model(
request: ModelDeployRequest,
tenant_info: Dict[str, Any] = Depends(verify_token)
):
"""Deploy ML model with self-service interface"""
return await portal_service.deploy_model(tenant_info, request)
@app.post("/experiments")
async def create_experiment(
request: ExperimentRequest,
background_tasks: BackgroundTasks,
tenant_info: Dict[str, Any] = Depends(verify_token)
):
"""Create ML experiment"""
return await portal_service.create_experiment(tenant_info, request, background_tasks)
@app.post("/features")
async def create_feature(
request: FeatureRequest,
tenant_info: Dict[str, Any] = Depends(verify_token)
):
"""Create feature pipeline"""
return await portal_service.create_feature(tenant_info, request)
@app.get("/dashboard")
async def get_dashboard(
tenant_info: Dict[str, Any] = Depends(verify_token)
):
"""Get tenant dashboard data"""
return await portal_service.get_tenant_dashboard(tenant_info)
@app.get("/models/{deployment_id}/logs")
async def get_model_logs(
deployment_id: str,
lines: int = 100,
tenant_info: Dict[str, Any] = Depends(verify_token)
):
"""Get model deployment logs"""
return await portal_service.get_model_logs(tenant_info, deployment_id, lines)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)Real-World Examples
Uber Michelangelo
End-to-end ML platform serving 1000+ models with self-service deployment, feature stores, and multi-tenant architecture.
- • 10M+ predictions per second
- • Self-service model deployment
- • Unified feature store
Netflix Metaflow
Human-friendly ML platform with built-in versioning, orchestration, and compute scaling for data scientists.
- • Python-native workflows
- • Automatic scaling to cloud
- • Built-in experiment tracking
Airbnb Bighead
Unified ML platform with feature stores, model training, serving, and monitoring supporting 150+ ML use cases.
- • Unified feature pipeline
- • Multi-model serving
- • Cost optimization tools
Unified Platform Best Practices
✅ Do's
- •Design for multi-tenancy from the beginning
- •Implement comprehensive resource quotas and monitoring
- •Provide self-service capabilities with guardrails
- •Build in cost optimization and chargeback
- •Standardize on container orchestration
- •Implement unified logging and monitoring
❌ Don'ts
- •Don't build single-tenant solutions
- •Don't ignore resource isolation and security
- •Don't skip proper authentication and authorization
- •Don't over-engineer the initial platform
- •Don't neglect developer experience and documentation
- •Don't forget about disaster recovery and backups