Advanced Multi-Agent Coordination
This document covers advanced techniques for coordinating multiple agents within the Safeguards, focusing on complex coordination patterns, dynamic task allocation, and conflict resolution.
Advanced Coordination Patterns
Hierarchical Agent Organization
Implement a hierarchical agent structure for complex tasks:
from safeguards.types.agent import Agent
from typing import Dict, Any, List
from decimal import Decimal
class ManagerAgent(Agent):
"""Manager agent that coordinates worker agents."""
def __init__(self, name: str, worker_agents: List[Agent] = None):
super().__init__(name)
self.worker_agents = worker_agents or []
self.task_assignments = {}
def add_worker(self, worker: Agent) -> None:
"""Add a worker agent to be managed."""
self.worker_agents.append(worker)
def assign_tasks(self, tasks: List[Dict[str, Any]]) -> Dict[str, List[str]]:
"""Assign tasks to worker agents based on capabilities."""
assignments = {}
# Example task assignment logic - customize based on your needs
for i, task in enumerate(tasks):
if i < len(self.worker_agents):
worker = self.worker_agents[i]
if worker.id not in assignments:
assignments[worker.id] = []
assignments[worker.id].append(task["task_id"])
self.task_assignments[task["task_id"]] = worker.id
return assignments
def run(self, **kwargs: Any) -> Dict[str, Any]:
"""Coordinate worker agents to complete a complex task."""
tasks = kwargs.get("tasks", [])
context = kwargs.get("context", {})
# Step 1: Assign tasks to workers
assignments = self.assign_tasks(tasks)
# Step 2: Execute tasks and collect results
results = {}
for worker in self.worker_agents:
worker_tasks = assignments.get(worker.id, [])
if worker_tasks:
# Get the full task definitions for this worker
worker_task_defs = [t for t in tasks if t["task_id"] in worker_tasks]
# Execute worker tasks
worker_result = worker.run(
tasks=worker_task_defs,
context={**context, "manager_id": self.id}
)
# Store results
results[worker.id] = worker_result
# Step 3: Aggregate and process results
aggregated_result = self._aggregate_results(results)
return {
"status": "completed",
"worker_results": results,
"aggregated_result": aggregated_result,
"task_count": len(tasks)
}
def _aggregate_results(self, results: Dict[str, Any]) -> Dict[str, Any]:
"""Aggregate results from worker agents."""
# Implement your specific aggregation logic
combined_output = {}
for worker_id, result in results.items():
for key, value in result.items():
if key not in combined_output:
combined_output[key] = []
if isinstance(value, list):
combined_output[key].extend(value)
else:
combined_output[key].append(value)
return combined_output
Federated Decision Making
Implement consensus-based decision making across agents:
from typing import List, Dict, Any, Callable
from enum import Enum
class ConsensusMethod(Enum):
MAJORITY_VOTE = "majority_vote"
WEIGHTED_VOTE = "weighted_vote"
UNANIMOUS = "unanimous"
class FederatedDecisionSystem:
"""Manages decision making across multiple agents."""
def __init__(self, consensus_method: ConsensusMethod = ConsensusMethod.MAJORITY_VOTE):
self.consensus_method = consensus_method
self.agents = {} # agent_id -> agent object
self.weights = {} # agent_id -> weight (for weighted voting)
def register_agent(self, agent_id: str, agent: Any, weight: float = 1.0) -> None:
"""Register an agent with the federated system."""
self.agents[agent_id] = agent
self.weights[agent_id] = weight
def make_decision(self, question: str, options: List[Any],
decision_fn: Callable = None) -> Dict[str, Any]:
"""Make a decision across all registered agents."""
# Collect votes from each agent
votes = {}
for agent_id, agent in self.agents.items():
if decision_fn:
# Use custom decision function
vote = decision_fn(agent, question, options)
else:
# Default: let agent run with question and options
result = agent.run(
question=question,
options=options
)
vote = result.get("selected_option")
votes[agent_id] = vote
# Apply consensus method
if self.consensus_method == ConsensusMethod.MAJORITY_VOTE:
decision = self._apply_majority_vote(votes, options)
elif self.consensus_method == ConsensusMethod.WEIGHTED_VOTE:
decision = self._apply_weighted_vote(votes, options)
elif self.consensus_method == ConsensusMethod.UNANIMOUS:
decision = self._apply_unanimous(votes, options)
else:
raise ValueError(f"Unknown consensus method: {self.consensus_method}")
return {
"decision": decision,
"votes": votes,
"consensus_method": self.consensus_method.value,
"question": question
}
def _apply_majority_vote(self, votes: Dict[str, Any], options: List[Any]) -> Any:
"""Apply simple majority voting."""
# Count votes for each option
vote_counts = {option: 0 for option in options}
for vote in votes.values():
if vote in vote_counts:
vote_counts[vote] += 1
# Find option with most votes
return max(vote_counts.items(), key=lambda x: x[1])[0]
def _apply_weighted_vote(self, votes: Dict[str, Any], options: List[Any]) -> Any:
"""Apply weighted voting."""
# Count weighted votes for each option
weighted_votes = {option: 0 for option in options}
for agent_id, vote in votes.items():
if vote in weighted_votes:
weight = self.weights.get(agent_id, 1.0)
weighted_votes[vote] += weight
# Find option with highest weighted votes
return max(weighted_votes.items(), key=lambda x: x[1])[0]
def _apply_unanimous(self, votes: Dict[str, Any], options: List[Any]) -> Any:
"""Check for unanimous agreement."""
# Get unique votes
unique_votes = set(votes.values())
# If all agents voted for the same option, return it
if len(unique_votes) == 1:
return next(iter(unique_votes))
# Otherwise, no consensus
return None
Dynamic Task Allocation
Workload-Based Allocation
Dynamically allocate tasks based on agent workload:
from typing import Dict, List, Any
from decimal import Decimal
import heapq
class WorkloadBalancer:
"""Balances workload across multiple agents."""
def __init__(self):
self.agent_workloads = {} # agent_id -> current workload
self.agent_capacities = {} # agent_id -> maximum capacity
self.task_assignments = {} # task_id -> agent_id
def register_agent(self, agent_id: str, max_capacity: float) -> None:
"""Register an agent with the workload balancer."""
self.agent_workloads[agent_id] = 0
self.agent_capacities[agent_id] = max_capacity
def assign_task(self, task_id: str, estimated_load: float) -> str:
"""Assign a task to the agent with the lowest workload."""
if not self.agent_workloads:
raise ValueError("No agents registered with the workload balancer")
# Find agent with lowest workload percentage
best_agent_id = None
lowest_workload_pct = float('inf')
for agent_id, current_load in self.agent_workloads.items():
capacity = self.agent_capacities[agent_id]
workload_pct = current_load / capacity if capacity > 0 else float('inf')
if workload_pct < lowest_workload_pct:
lowest_workload_pct = workload_pct
best_agent_id = agent_id
# Update workload and store assignment
if best_agent_id:
self.agent_workloads[best_agent_id] += estimated_load
self.task_assignments[task_id] = best_agent_id
return best_agent_id
def complete_task(self, task_id: str, actual_load: float = None) -> None:
"""Mark a task as completed and update workload."""
agent_id = self.task_assignments.get(task_id)
if agent_id:
# If actual load not provided, use the estimated load (workload difference)
if actual_load is None:
# Find how much this task contributed
task_load = 0
for tid, aid in self.task_assignments.items():
if tid == task_id and aid == agent_id:
task_load = self.agent_workloads[agent_id] / len(
[t for t, a in self.task_assignments.items() if a == agent_id]
)
break
actual_load = task_load
# Update workload
self.agent_workloads[agent_id] = max(0, self.agent_workloads[agent_id] - actual_load)
# Remove assignment
self.task_assignments.pop(task_id, None)
def get_agent_utilization(self) -> Dict[str, float]:
"""Get current utilization percentage for each agent."""
utilization = {}
for agent_id, load in self.agent_workloads.items():
capacity = self.agent_capacities[agent_id]
utilization[agent_id] = (load / capacity * 100) if capacity > 0 else 0
return utilization
def rebalance(self) -> Dict[str, str]:
"""Rebalance tasks across agents."""
# Get all current tasks
tasks = list(self.task_assignments.items())
# Reset workloads
for agent_id in self.agent_workloads:
self.agent_workloads[agent_id] = 0
# Clear assignments
self.task_assignments = {}
# Reassign tasks
new_assignments = {}
for task_id, _ in tasks:
# Assign based on current workload
new_agent = self.assign_task(task_id, 1.0) # Simplified load of 1.0 for rebalancing
new_assignments[task_id] = new_agent
return new_assignments
Skill-Based Routing
Route tasks to agents based on their capabilities:
from typing import Dict, List, Set, Any
class AgentSkillRouter:
"""Routes tasks to agents based on their skills."""
def __init__(self):
self.agent_skills = {} # agent_id -> set of skills
self.skill_agents = {} # skill -> list of agent_ids
def register_agent(self, agent_id: str, skills: List[str]) -> None:
"""Register an agent with specified skills."""
skill_set = set(skills)
self.agent_skills[agent_id] = skill_set
# Update skill -> agents mapping
for skill in skill_set:
if skill not in self.skill_agents:
self.skill_agents[skill] = []
self.skill_agents[skill].append(agent_id)
def find_agents_with_skill(self, skill: str) -> List[str]:
"""Find all agents with a specific skill."""
return self.skill_agents.get(skill, [])
def find_agents_with_all_skills(self, required_skills: List[str]) -> List[str]:
"""Find agents that have all the required skills."""
if not required_skills:
return list(self.agent_skills.keys())
skill_set = set(required_skills)
qualified_agents = []
for agent_id, agent_skill_set in self.agent_skills.items():
if skill_set.issubset(agent_skill_set):
qualified_agents.append(agent_id)
return qualified_agents
def route_task(self, task_id: str, required_skills: List[str],
load_balancer = None) -> str:
"""Route a task to the best agent based on skills and optionally load."""
qualified_agents = self.find_agents_with_all_skills(required_skills)
if not qualified_agents:
return None
if load_balancer:
# Use load balancer to pick among qualified agents
best_agent = None
lowest_load = float('inf')
for agent_id in qualified_agents:
agent_load = load_balancer.agent_workloads.get(agent_id, 0)
if agent_load < lowest_load:
lowest_load = agent_load
best_agent = agent_id
if best_agent:
# Update load balancer
load_balancer.assign_task(task_id, 1.0) # Simplified load of 1.0
return best_agent
# No load balancing - just return first qualified agent
return qualified_agents[0]
Conflict Resolution
Resource Contention
Resolve conflicts when multiple agents need the same resources:
from enum import Enum
from typing import Dict, List, Any
import time
from threading import Lock
class ResourceType(Enum):
COMPUTE = "compute"
MEMORY = "memory"
STORAGE = "storage"
API_CALL = "api_call"
DATA = "data"
class ResourceContention:
"""Manages and resolves resource contentions between agents."""
def __init__(self):
self.resources = {} # resource_id -> ResourceInfo
self.locks = {} # resource_id -> Lock
self.reservations = {} # resource_id -> {agent_id: priority}
def register_resource(self, resource_id: str, resource_type: ResourceType,
capacity: float) -> None:
"""Register a resource with the contention manager."""
self.resources[resource_id] = {
"type": resource_type,
"capacity": capacity,
"allocated": 0.0,
"allocations": {} # agent_id -> amount
}
self.locks[resource_id] = Lock()
def request_allocation(self, agent_id: str, resource_id: str,
amount: float, priority: int = 5) -> bool:
"""Request allocation of a resource amount to an agent."""
if resource_id not in self.resources:
raise ValueError(f"Unknown resource: {resource_id}")
resource = self.resources[resource_id]
with self.locks[resource_id]:
# Check if there's enough capacity
if resource["allocated"] + amount <= resource["capacity"]:
# Allocate resource
if agent_id not in resource["allocations"]:
resource["allocations"][agent_id] = 0
resource["allocations"][agent_id] += amount
resource["allocated"] += amount
return True
# Not enough capacity, add to reservations for future allocation
if resource_id not in self.reservations:
self.reservations[resource_id] = {}
self.reservations[resource_id][agent_id] = priority
return False
def release_allocation(self, agent_id: str, resource_id: str,
amount: float = None) -> float:
"""Release allocated resources from an agent."""
if resource_id not in self.resources:
raise ValueError(f"Unknown resource: {resource_id}")
resource = self.resources[resource_id]
with self.locks[resource_id]:
if agent_id not in resource["allocations"]:
return 0.0
# Determine how much to release
current_allocation = resource["allocations"][agent_id]
release_amount = amount if amount is not None else current_allocation
release_amount = min(release_amount, current_allocation)
# Update allocations
resource["allocations"][agent_id] -= release_amount
resource["allocated"] -= release_amount
# Clean up if allocation is zero
if resource["allocations"][agent_id] <= 0:
resource["allocations"].pop(agent_id)
# Process waiting reservations
self._process_reservations(resource_id)
return release_amount
def _process_reservations(self, resource_id: str) -> None:
"""Process waiting reservations based on priority."""
if resource_id not in self.reservations:
return
resource = self.resources[resource_id]
reservations = self.reservations[resource_id]
# Sort agents by priority (highest first)
sorted_agents = sorted(
reservations.items(),
key=lambda x: x[1],
reverse=True
)
# Try to allocate to each agent in priority order
remaining_capacity = resource["capacity"] - resource["allocated"]
updated_reservations = {}
for agent_id, priority in sorted_agents:
# Assume each reservation is for 1.0 unit (simplified)
if remaining_capacity >= 1.0:
# Allocate resource
if agent_id not in resource["allocations"]:
resource["allocations"][agent_id] = 0
resource["allocations"][agent_id] += 1.0
resource["allocated"] += 1.0
remaining_capacity -= 1.0
else:
# Keep in reservations
updated_reservations[agent_id] = priority
# Update reservations
if updated_reservations:
self.reservations[resource_id] = updated_reservations
else:
# No more reservations
self.reservations.pop(resource_id, None)
Task Priority Arbitration
Resolve conflicts between tasks with competing priorities:
from enum import Enum
from typing import Dict, List, Any, Tuple
import time
import heapq
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
class TaskArbitrator:
"""Arbitrates between competing tasks based on priority."""
def __init__(self, max_concurrent_tasks: int = 5):
self.tasks = {} # task_id -> task info
self.running_tasks = set() # set of running task_ids
self.max_concurrent_tasks = max_concurrent_tasks
def register_task(self, task_id: str, agent_id: str, priority: int,
estimated_duration: float = None) -> None:
"""Register a task with the arbitrator."""
self.tasks[task_id] = {
"agent_id": agent_id,
"priority": priority,
"status": TaskStatus.PENDING,
"created_at": time.time(),
"started_at": None,
"completed_at": None,
"estimated_duration": estimated_duration,
"dependencies": []
}
def add_dependency(self, task_id: str, depends_on: str) -> None:
"""Add a dependency between tasks."""
if task_id not in self.tasks or depends_on not in self.tasks:
raise ValueError(f"Unknown task: {task_id} or {depends_on}")
self.tasks[task_id]["dependencies"].append(depends_on)
def start_task(self, task_id: str) -> bool:
"""Try to start a task, respecting priority and concurrency limits."""
if task_id not in self.tasks:
raise ValueError(f"Unknown task: {task_id}")
task = self.tasks[task_id]
# Check if already running
if task["status"] == TaskStatus.RUNNING:
return True
# Check if task can be started
if not self._can_start_task(task_id):
return False
# Check concurrency limit
if len(self.running_tasks) >= self.max_concurrent_tasks:
# Need to decide if we should preempt a running task
lowest_priority_task = self._find_lowest_priority_running_task()
if lowest_priority_task and self.tasks[lowest_priority_task]["priority"] < task["priority"]:
# Preempt the lowest priority task
self.pause_task(lowest_priority_task)
else:
# Can't start new task now
return False
# Start the task
task["status"] = TaskStatus.RUNNING
task["started_at"] = time.time()
self.running_tasks.add(task_id)
return True
def pause_task(self, task_id: str) -> bool:
"""Pause a running task."""
if task_id not in self.tasks:
raise ValueError(f"Unknown task: {task_id}")
task = self.tasks[task_id]
if task["status"] != TaskStatus.RUNNING:
return False
task["status"] = TaskStatus.PAUSED
self.running_tasks.remove(task_id)
return True
def complete_task(self, task_id: str, success: bool = True) -> None:
"""Mark a task as completed or failed."""
if task_id not in self.tasks:
raise ValueError(f"Unknown task: {task_id}")
task = self.tasks[task_id]
task["completed_at"] = time.time()
if success:
task["status"] = TaskStatus.COMPLETED
else:
task["status"] = TaskStatus.FAILED
if task_id in self.running_tasks:
self.running_tasks.remove(task_id)
# Check if we can start any pending tasks
self._start_pending_tasks()
def _can_start_task(self, task_id: str) -> bool:
"""Check if a task's dependencies are satisfied."""
task = self.tasks[task_id]
# Check if task is already completed or failed
if task["status"] in [TaskStatus.COMPLETED, TaskStatus.FAILED]:
return False
# Check dependencies
for dep_id in task["dependencies"]:
if dep_id not in self.tasks:
return False
dep_status = self.tasks[dep_id]["status"]
if dep_status != TaskStatus.COMPLETED:
return False
return True
def _find_lowest_priority_running_task(self) -> str:
"""Find the running task with lowest priority."""
if not self.running_tasks:
return None
lowest_priority = float('inf')
lowest_task_id = None
for task_id in self.running_tasks:
task = self.tasks[task_id]
if task["priority"] < lowest_priority:
lowest_priority = task["priority"]
lowest_task_id = task_id
return lowest_task_id
def _start_pending_tasks(self) -> None:
"""Try to start pending tasks based on priority."""
if len(self.running_tasks) >= self.max_concurrent_tasks:
return
# Create a priority queue of pending tasks
pending_tasks = []
for task_id, task in self.tasks.items():
if task["status"] == TaskStatus.PENDING and self._can_start_task(task_id):
# Higher priority first, then older tasks
# Negate priority for max-heap behavior
heapq.heappush(pending_tasks, (-task["priority"], task["created_at"], task_id))
# Start tasks in priority order until we hit the concurrency limit
while pending_tasks and len(self.running_tasks) < self.max_concurrent_tasks:
_, _, task_id = heapq.heappop(pending_tasks)
self.start_task(task_id)
Best Practices
Fault Tolerance
- Design for Partial Failure: Assume any agent might fail and design your system to handle it
- Implement Timeouts: Never wait indefinitely for an agent to respond
- Use Circuit Breakers: Stop calling unreliable agents after repeated failures
- Maintain State Backups: Keep snapshots of important coordination state
- Implement Fallback Strategies: Define what happens when an agent is unavailable
Performance Optimization
- Batch Related Tasks: Group tasks that access similar data or resources
- Minimize Communication Overhead: Use efficient message formats
- Implement Caching: Cache frequently needed data across agents
- Use Asynchronous Processing: Don't block when waiting for slow operations
- Profile and Optimize Hotspots: Identify bottlenecks in your agent communication
Coordination Patterns
- Start Simple: Begin with basic coordination before implementing advanced patterns
- Use Consistent Interfaces: Make agent APIs compatible for easier coordination
- Design Clear Protocols: Define how agents should communicate and share data
- Implement Robust Logging: Track coordination events for troubleshooting
- Test Coordination Scenarios: Verify your coordination works under stress
Conclusion
Advanced multi-agent coordination enables powerful, resilient systems by leveraging the strengths of specialized agents working together. By implementing hierarchical organization, federated decision making, dynamic task allocation, and conflict resolution strategies, you can build agent systems that efficiently handle complex tasks while maintaining safety constraints.
For more information, see: - Budget Management - Safety Guardrails - Agent Coordination Basics