Advanced AI Agents

Intelligent agents with planning, reasoning, tool use, and multi-agent coordination

🚀 Cutting-Edge Research Notice

Advanced AI agents represent rapidly evolving research areas with significant technical and ethical considerations. These systems can exhibit emergent behaviors, make autonomous decisions, and interact with real-world systems. Production deployment requires careful consideration of safety, alignment, controllability, and potential unintended consequences. This content is for educational and research purposes.

Agent Capabilities

Automated Planning Systems

Classical Planning

STRIPS, PDDL, state-space search

Hierarchical Planning

HTN planning, goal decomposition

Probabilistic Planning

MDPs, POMDPs, uncertainty handling

Intelligent Planning Agent

import heapq
import numpy as np
from typing import Dict, List, Tuple, Optional, Set, Callable
from dataclasses import dataclass
from abc import ABC, abstractmethod
from enum import Enum
import asyncio
import json

@dataclass
class State:
    """Represents a state in the planning domain"""
    predicates: Set[str]
    variables: Dict[str, any]
    timestamp: float
    
    def satisfies(self, conditions: Set[str]) -> bool:
        """Check if state satisfies given conditions"""
        return conditions.issubset(self.predicates)
    
    def __hash__(self):
        return hash(frozenset(self.predicates))

@dataclass 
class Action:
    """Represents an action in the planning domain"""
    name: str
    preconditions: Set[str]
    effects: Set[str]
    negative_effects: Set[str]
    cost: float = 1.0
    duration: float = 1.0
    parameters: Dict[str, any] = None
    
    def is_applicable(self, state: State) -> bool:
        """Check if action can be applied in given state"""
        return state.satisfies(self.preconditions)
    
    def apply(self, state: State) -> State:
        """Apply action to state and return new state"""
        new_predicates = state.predicates.copy()
        new_predicates.update(self.effects)
        new_predicates -= self.negative_effects
        
        return State(
            predicates=new_predicates,
            variables=state.variables.copy(),
            timestamp=state.timestamp + self.duration
        )

class PlanningAlgorithm(ABC):
    """Abstract base class for planning algorithms"""
    
    @abstractmethod
    def plan(self, initial_state: State, goal: Set[str], 
             actions: List[Action]) -> Optional[List[Action]]:
        """Generate a plan from initial state to goal"""
        pass

class AStarPlanner(PlanningAlgorithm):
    """A* search-based planner with heuristics"""
    
    def __init__(self, heuristic_function: Optional[Callable] = None):
        self.heuristic = heuristic_function or self._default_heuristic
        
    def plan(self, initial_state: State, goal: Set[str], 
             actions: List[Action]) -> Optional[List[Action]]:
        """A* planning algorithm"""
        
        # Priority queue: (f_score, g_score, state, path)
        open_set = [(self.heuristic(initial_state, goal), 0, initial_state, [])]
        closed_set = set()
        
        while open_set:
            f_score, g_score, current_state, path = heapq.heappop(open_set)
            
            # Goal check
            if current_state.satisfies(goal):
                return path
            
            if current_state in closed_set:
                continue
                
            closed_set.add(current_state)
            
            # Explore successors
            for action in actions:
                if action.is_applicable(current_state):
                    next_state = action.apply(current_state)
                    
                    if next_state not in closed_set:
                        new_g_score = g_score + action.cost
                        new_f_score = new_g_score + self.heuristic(next_state, goal)
                        new_path = path + [action]
                        
                        heapq.heappush(open_set, (new_f_score, new_g_score, next_state, new_path))
        
        return None  # No plan found
    
    def _default_heuristic(self, state: State, goal: Set[str]) -> float:
        """Default heuristic: number of unsatisfied goal conditions"""
        return len(goal - state.predicates)

class HierarchicalTaskNetworkPlanner(PlanningAlgorithm):
    """Hierarchical Task Network (HTN) planner"""
    
    def __init__(self):
        self.task_decompositions = {}
        self.primitive_actions = {}
        
    def add_task_decomposition(self, task: str, subtasks: List[str], 
                             conditions: Set[str] = None):
        """Add a task decomposition rule"""
        self.task_decompositions[task] = {
            'subtasks': subtasks,
            'conditions': conditions or set()
        }
    
    def add_primitive_action(self, action: Action):
        """Add a primitive action"""
        self.primitive_actions[action.name] = action
    
    def plan(self, initial_state: State, goals: List[str], 
             actions: List[Action] = None) -> Optional[List[Action]]:
        """HTN planning algorithm"""
        
        # Initialize task agenda with goals
        task_agenda = goals.copy()
        plan = []
        current_state = initial_state
        
        while task_agenda:
            current_task = task_agenda.pop(0)
            
            # Check if it's a primitive action
            if current_task in self.primitive_actions:
                action = self.primitive_actions[current_task]
                
                if action.is_applicable(current_state):
                    plan.append(action)
                    current_state = action.apply(current_state)
                else:
                    return None  # Cannot execute primitive action
            
            # Check if it's a compound task
            elif current_task in self.task_decompositions:
                decomposition = self.task_decompositions[current_task]
                
                # Check decomposition conditions
                if current_state.satisfies(decomposition['conditions']):
                    # Insert subtasks at the beginning of agenda
                    task_agenda = decomposition['subtasks'] + task_agenda
                else:
                    return None  # Cannot decompose task
            
            else:
                return None  # Unknown task
        
        return plan

class IntelligentPlanningAgent:
    """
    Advanced planning agent with multiple planning strategies
    
    Features:
    - Multiple planning algorithms (A*, HTN, probabilistic)
    - Dynamic replanning and plan repair
    - Learning from execution experience
    - Multi-objective optimization
    - Real-time planning and execution
    """
    
    def __init__(self, config: Dict):
        self.config = config
        
        # Planning algorithms
        self.planners = {
            'astar': AStarPlanner(),
            'htn': HierarchicalTaskNetworkPlanner(),
            'probabilistic': ProbabilisticPlanner()
        }
        
        # Current state and goals
        self.current_state = None
        self.current_goals = []
        self.current_plan = []
        self.plan_index = 0
        
        # Domain knowledge
        self.actions = []
        self.domain_predicates = set()
        self.learned_heuristics = {}
        
        # Execution monitoring
        self.execution_monitor = ExecutionMonitor()
        self.plan_repair = PlanRepair()
        
        # Learning components
        self.experience_buffer = []
        self.success_tracker = {}
        
    async def plan_and_execute(self, initial_state: State, 
                              goals: List[str]) -> Dict:
        """Plan and execute actions to achieve goals"""
        
        self.current_state = initial_state
        self.current_goals = goals
        
        # Generate initial plan
        plan_result = await self._generate_plan()
        
        if not plan_result['success']:
            return {'success': False, 'error': 'Planning failed'}
        
        self.current_plan = plan_result['plan']
        self.plan_index = 0
        
        # Execute plan with monitoring and replanning
        execution_result = await self._execute_with_monitoring()
        
        # Learn from experience
        self._update_experience(execution_result)
        
        return execution_result
    
    async def _generate_plan(self) -> Dict:
        """Generate plan using best available planner"""
        
        best_plan = None
        best_score = float('inf')
        planner_used = None
        
        for planner_name, planner in self.planners.items():
            try:
                plan = planner.plan(
                    self.current_state, 
                    set(self.current_goals), 
                    self.actions
                )
                
                if plan:
                    # Evaluate plan quality
                    score = self._evaluate_plan_quality(plan)
                    
                    if score < best_score:
                        best_plan = plan
                        best_score = score
                        planner_used = planner_name
                        
            except Exception as e:
                print(f"Planner {planner_name} failed: {e}")
                continue
        
        if best_plan:
            return {
                'success': True,
                'plan': best_plan,
                'planner_used': planner_used,
                'quality_score': best_score
            }
        else:
            return {'success': False, 'error': 'No planner found solution'}
    
    async def _execute_with_monitoring(self) -> Dict:
        """Execute plan with real-time monitoring and replanning"""
        
        executed_actions = []
        total_cost = 0
        
        while self.plan_index < len(self.current_plan):
            action = self.current_plan[self.plan_index]
            
            # Pre-execution validation
            if not action.is_applicable(self.current_state):
                # Plan repair needed
                repair_result = await self._repair_plan()
                
                if not repair_result['success']:
                    return {
                        'success': False,
                        'error': 'Plan execution failed, repair unsuccessful',
                        'executed_actions': executed_actions
                    }
                
                continue
            
            # Execute action
            execution_result = await self._execute_action(action)
            
            if execution_result['success']:
                # Update state
                self.current_state = execution_result['new_state']
                executed_actions.append(action)
                total_cost += action.cost
                self.plan_index += 1
                
                # Monitor for unexpected changes
                monitoring_result = self.execution_monitor.check_state(
                    self.current_state, expected_state=None
                )
                
                if monitoring_result['anomaly_detected']:
                    # Trigger replanning
                    replan_result = await self._replan()
                    
                    if not replan_result['success']:
                        return {
                            'success': False,
                            'error': 'Execution monitoring detected anomaly, replanning failed',
                            'executed_actions': executed_actions
                        }
            else:
                return {
                    'success': False,
                    'error': f'Action execution failed: {execution_result["error"]}',
                    'executed_actions': executed_actions
                }
        
        # Check goal satisfaction
        goals_achieved = self.current_state.satisfies(set(self.current_goals))
        
        return {
            'success': goals_achieved,
            'executed_actions': executed_actions,
            'final_state': self.current_state,
            'total_cost': total_cost,
            'goals_achieved': goals_achieved
        }
    
    async def _execute_action(self, action: Action) -> Dict:
        """Execute a single action with error handling"""
        
        try:
            # Simulate action execution (in real system, this would interact with environment)
            await asyncio.sleep(action.duration)
            
            # Apply action effects
            new_state = action.apply(self.current_state)
            
            # Add some probabilistic effects or failures
            success_probability = self._get_action_success_probability(action)
            
            if np.random.random() < success_probability:
                return {
                    'success': True,
                    'new_state': new_state,
                    'execution_time': action.duration
                }
            else:
                return {
                    'success': False,
                    'error': 'Action execution failed (probabilistic failure)',
                    'new_state': self.current_state
                }
                
        except Exception as e:
            return {
                'success': False,
                'error': f'Action execution exception: {str(e)}',
                'new_state': self.current_state
            }
    
    async def _repair_plan(self) -> Dict:
        """Repair current plan when preconditions are violated"""
        
        # Try to find alternative actions or insert corrective actions
        repair_actions = self.plan_repair.find_repair_actions(
            self.current_state,
            self.current_plan[self.plan_index],
            self.actions
        )
        
        if repair_actions:
            # Insert repair actions into current plan
            self.current_plan = (
                self.current_plan[:self.plan_index] + 
                repair_actions + 
                self.current_plan[self.plan_index:]
            )
            
            return {'success': True, 'repair_actions': repair_actions}
        else:
            # Full replanning needed
            return await self._replan()
    
    async def _replan(self) -> Dict:
        """Generate new plan from current state"""
        
        print("Replanning from current state...")
        
        # Update current state as initial state
        old_plan = self.current_plan[self.plan_index:]
        
        # Generate new plan
        plan_result = await self._generate_plan()
        
        if plan_result['success']:
            self.current_plan = plan_result['plan']
            self.plan_index = 0
            
            return {
                'success': True,
                'new_plan': plan_result['plan'],
                'old_plan_discarded': old_plan
            }
        else:
            return {
                'success': False,
                'error': 'Replanning failed'
            }

class ProbabilisticPlanner(PlanningAlgorithm):
    """Probabilistic planner for uncertain domains"""
    
    def __init__(self, discount_factor: float = 0.95):
        self.gamma = discount_factor
        
    def plan(self, initial_state: State, goal: Set[str], 
             actions: List[Action]) -> Optional[List[Action]]:
        """Generate probabilistic plan using value iteration"""
        
        # Simplified probabilistic planning
        # In practice, would use full MDP/POMDP solvers
        
        states = self._enumerate_reachable_states(initial_state, actions)
        
        # Value iteration
        values = {state: 0.0 for state in states}
        policy = {}
        
        for iteration in range(100):  # Max iterations
            new_values = {}
            
            for state in states:
                if state.satisfies(goal):
                    new_values[state] = 100.0  # Goal reward
                    continue
                
                best_value = float('-inf')
                best_action = None
                
                for action in actions:
                    if action.is_applicable(state):
                        # Expected value of action
                        next_state = action.apply(state)
                        action_value = -action.cost + self.gamma * values.get(next_state, 0)
                        
                        if action_value > best_value:
                            best_value = action_value
                            best_action = action
                
                new_values[state] = best_value
                if best_action:
                    policy[state] = best_action
            
            # Check convergence
            if all(abs(new_values[s] - values[s]) < 0.01 for s in states):
                break
                
            values = new_values
        
        # Extract plan by following policy
        plan = []
        current_state = initial_state
        
        while not current_state.satisfies(goal) and len(plan) < 50:
            if current_state in policy:
                action = policy[current_state]
                plan.append(action)
                current_state = action.apply(current_state)
            else:
                break
        
        return plan if current_state.satisfies(goal) else None

class ExecutionMonitor:
    """Monitor plan execution for anomalies and failures"""
    
    def __init__(self):
        self.state_history = []
        self.anomaly_threshold = 0.1
        
    def check_state(self, current_state: State, 
                   expected_state: Optional[State] = None) -> Dict:
        """Check current state for anomalies"""
        
        self.state_history.append(current_state)
        
        anomaly_detected = False
        anomaly_score = 0.0
        
        if expected_state:
            # Compare with expected state
            expected_predicates = expected_state.predicates
            actual_predicates = current_state.predicates
            
            # Calculate state difference
            missing_predicates = expected_predicates - actual_predicates
            extra_predicates = actual_predicates - expected_predicates
            
            anomaly_score = (len(missing_predicates) + len(extra_predicates)) / len(expected_predicates)
            anomaly_detected = anomaly_score > self.anomaly_threshold
        
        return {
            'anomaly_detected': anomaly_detected,
            'anomaly_score': anomaly_score,
            'state_consistent': not anomaly_detected
        }

class PlanRepair:
    """Plan repair strategies for handling execution failures"""
    
    def find_repair_actions(self, current_state: State, 
                           failed_action: Action, 
                           available_actions: List[Action]) -> List[Action]:
        """Find actions to repair plan after failure"""
        
        repair_actions = []
        
        # Find missing preconditions
        missing_preconditions = failed_action.preconditions - current_state.predicates
        
        # Find actions that can establish missing preconditions
        for precondition in missing_preconditions:
            for action in available_actions:
                if precondition in action.effects and action.is_applicable(current_state):
                    repair_actions.append(action)
                    # Update state for next action search
                    current_state = action.apply(current_state)
                    break
        
        return repair_actions

# Example usage
def create_blocks_world_domain():
    """Create a classic blocks world planning domain"""
    
    # Define actions
    actions = [
        Action(
            name="pick_up",
            preconditions={"clear(X)", "on_table(X)", "hand_empty"},
            effects={"holding(X)"},
            negative_effects={"clear(X)", "on_table(X)", "hand_empty"}
        ),
        Action(
            name="put_down", 
            preconditions={"holding(X)"},
            effects={"clear(X)", "on_table(X)", "hand_empty"},
            negative_effects={"holding(X)"}
        ),
        Action(
            name="stack",
            preconditions={"holding(X)", "clear(Y)"},
            effects={"clear(X)", "on(X,Y)", "hand_empty"},
            negative_effects={"holding(X)", "clear(Y)"}
        ),
        Action(
            name="unstack",
            preconditions={"clear(X)", "on(X,Y)", "hand_empty"},
            effects={"holding(X)", "clear(Y)"},
            negative_effects={"clear(X)", "on(X,Y)", "hand_empty"}
        )
    ]
    
    # Initial state: A on B on table, C on table
    initial_state = State(
        predicates={
            "on(A,B)", "on_table(B)", "on_table(C)",
            "clear(A)", "clear(C)", "hand_empty"
        },
        variables={},
        timestamp=0.0
    )
    
    # Goal: B on A on C
    goal = {"on(B,A)", "on(A,C)"}
    
    return actions, initial_state, goal

async def example_planning_execution():
    """Example of planning and execution"""
    
    config = {'planning_timeout': 30}
    agent = IntelligentPlanningAgent(config)
    
    # Set up blocks world domain
    actions, initial_state, goal = create_blocks_world_domain()
    agent.actions = actions
    
    # Plan and execute
    result = await agent.plan_and_execute(initial_state, list(goal))
    
    print(f"Planning and execution result: {result}")
    
    return result

Agent Design Patterns