Agent Coordination Guide
This guide explains how to coordinate multiple agents within the Safeguards, covering communication patterns, resource sharing, and task allocation.
Introduction to Agent Coordination
In multi-agent systems, coordination is essential for: - Efficiently allocating resources among agents - Managing dependencies between agent tasks - Preventing conflicts and contention - Enabling specialized agents to collaborate effectively - Supporting graceful degradation when resources are constrained
The Safeguards provides several mechanisms to facilitate agent coordination.
Basic Multi-Agent Setup
Creating Multiple Agents
Start by creating different agents with appropriate priorities:
from decimal import Decimal
from safeguards.core.budget_coordination import BudgetCoordinator
from safeguards.core.notification_manager import NotificationManager
from safeguards.api import APIFactory, APIVersion
# Setup core components
notification_manager = NotificationManager()
budget_coordinator = BudgetCoordinator(notification_manager)
api_factory = APIFactory()
# Create APIs
budget_api = api_factory.create_budget_api(APIVersion.V1, budget_coordinator)
agent_api = api_factory.create_agent_api(APIVersion.V1, budget_coordinator)
# Create agents with different roles and priorities
research_agent = agent_api.create_agent(
name="research_agent",
initial_budget=Decimal("100.0"),
priority=7
)
analysis_agent = agent_api.create_agent(
name="analysis_agent",
initial_budget=Decimal("80.0"),
priority=5
)
summarization_agent = agent_api.create_agent(
name="summarization_agent",
initial_budget=Decimal("50.0"),
priority=3
)
Creating Shared Budget Pools
For resource sharing, create budget pools that agents can draw from:
# Create shared pools for different agent groups
high_priority_pool = budget_api.create_budget_pool(
name="high_priority_tasks",
initial_budget=Decimal("500.0"),
priority=8
)
general_pool = budget_api.create_budget_pool(
name="general_tasks",
initial_budget=Decimal("1000.0"),
priority=5
)
Communication Patterns
Event-Based Communication
Implement communication between agents using the notification system:
from safeguards.types import AlertSeverity
def agent_communication_handler(notification):
"""Handle inter-agent communication."""
if notification.agent_id and notification.metadata.get("message_type") == "agent_communication":
target_agent_id = notification.metadata.get("target_agent_id")
message = notification.metadata.get("message")
print(f"Message from {notification.agent_id} to {target_agent_id}: {message}")
# Process the message and take action
# ...
return True
return False
# Register the handler
notification_manager.add_handler(agent_communication_handler)
# Send a message from one agent to another
notification_manager.send_alert(
agent_id=research_agent.id,
severity=AlertSeverity.INFORMATIONAL,
message="Research results ready for analysis",
metadata={
"message_type": "agent_communication",
"target_agent_id": analysis_agent.id,
"message": "Research complete. Analysis required on data at path /tmp/research_data.json."
}
)
Shared State
For more direct coordination, implement a shared state service:
from safeguards.coordination.shared_state import SharedStateManager
# Create a shared state manager
state_manager = SharedStateManager()
# Agent 1 updates state
state_manager.update_state(
owner_id=research_agent.id,
key="research_data",
value={
"status": "complete",
"timestamp": "2023-07-26T15:30:00Z",
"location": "/tmp/research_data.json"
}
)
# Agent 2 reads state
research_data = state_manager.get_state(
reader_id=analysis_agent.id,
key="research_data"
)
if research_data and research_data.get("status") == "complete":
print(f"Analysis agent processing data from {research_data.get('location')}")
# Process the data
Task Allocation Patterns
Pipeline Pattern
Implement a sequential processing pipeline where agents perform tasks in order:
def run_analysis_pipeline(input_data):
"""Execute a multi-stage pipeline of agent tasks."""
results = {}
# Stage 1: Research agent gathers information
research_result = research_agent.run(input=input_data)
research_cost = research_result.get("cost", Decimal("0"))
# Update budget
budget_api.update_budget(
research_agent.id,
budget_api.get_budget(research_agent.id) - research_cost
)
results["research"] = research_result
# Stage 2: Analysis agent processes research data
analysis_result = analysis_agent.run(
input=research_result.get("output", "")
)
analysis_cost = analysis_result.get("cost", Decimal("0"))
# Update budget
budget_api.update_budget(
analysis_agent.id,
budget_api.get_budget(analysis_agent.id) - analysis_cost
)
results["analysis"] = analysis_result
# Stage 3: Summarization agent creates final summary
summary_result = summarization_agent.run(
input=analysis_result.get("output", "")
)
summary_cost = summary_result.get("cost", Decimal("0"))
# Update budget
budget_api.update_budget(
summarization_agent.id,
budget_api.get_budget(summarization_agent.id) - summary_cost
)
results["summary"] = summary_result
return results
Fan-Out Pattern
Implement parallel processing for independent tasks:
import concurrent.futures
from typing import List, Dict, Any
def run_parallel_tasks(task_inputs: List[str]) -> List[Dict[str, Any]]:
"""Execute multiple independent tasks in parallel."""
results = []
# Use a thread pool for concurrent execution
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Map tasks to agents
futures = [
executor.submit(research_agent.run, input=task)
for task in task_inputs
]
# Collect results
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
# Update budget after task completion
budget_api.update_budget(
research_agent.id,
budget_api.get_budget(research_agent.id) - result.get("cost", Decimal("0"))
)
except Exception as e:
print(f"Task execution failed: {str(e)}")
return results
Resource Sharing
Priority-Based Allocation
Implement resource allocation based on agent priorities:
def allocate_resources_by_priority(agents, available_budget):
"""Allocate resources based on agent priorities."""
# Sort agents by priority (highest first)
sorted_agents = sorted(agents, key=lambda a:
budget_coordinator.get_agent_priority(a.id), reverse=True)
total_priority = sum(budget_coordinator.get_agent_priority(a.id) for a in sorted_agents)
# Calculate proportional allocation
allocations = {}
for agent in sorted_agents:
priority = budget_coordinator.get_agent_priority(agent.id)
proportion = Decimal(priority) / Decimal(total_priority)
allocation = available_budget * proportion
allocations[agent.id] = allocation
# Update agent budget
budget_api.update_budget(agent.id, allocation)
return allocations
Dynamic Load Balancing
Implement dynamic reallocation based on agent needs:
def balance_agent_resources(pool_id):
"""Balance resources among agents based on current workload."""
# Get all agents in the pool
agents = budget_coordinator.get_pool_agents(pool_id)
# Calculate workload metrics
agent_workloads = {}
for agent_id in agents:
metrics = metrics_api.get_agent_metrics(agent_id)
recent_tasks = metrics.get("recent_task_count", 0)
pending_tasks = metrics.get("pending_task_count", 0)
# Higher value means higher workload
workload_score = recent_tasks * 0.3 + pending_tasks * 0.7
agent_workloads[agent_id] = workload_score
# Get total pool budget
pool_metrics = metrics_api.get_pool_metrics(pool_id)
available_budget = pool_metrics["remaining_budget"]
# Calculate new allocations
total_workload = sum(agent_workloads.values()) or 1 # Avoid division by zero
allocations = {}
for agent_id, workload in agent_workloads.items():
# Higher workload gets proportionally more budget
proportion = Decimal(workload) / Decimal(total_workload)
new_allocation = available_budget * proportion
# Ensure minimum budget
min_budget = Decimal("10.0")
allocations[agent_id] = max(new_allocation, min_budget)
# Apply new budget
budget_api.update_budget(agent_id, allocations[agent_id])
return allocations
Dependency Management
Task Dependencies
Manage dependencies between agent tasks:
from safeguards.coordination.dependency_manager import DependencyManager
# Create a dependency manager
dependency_manager = DependencyManager()
# Define task dependencies
dependency_manager.add_dependency(
task_id="analyze_data",
depends_on="gather_data",
agent_id=analysis_agent.id,
required_resources=["data_file"]
)
# Check if dependencies are met
can_execute = dependency_manager.check_dependencies(
task_id="analyze_data",
available_resources=["data_file", "config"]
)
if can_execute:
# Execute the task
result = analysis_agent.run(task="analyze_data")
else:
# Handle missing dependencies
missing = dependency_manager.get_missing_dependencies("analyze_data")
print(f"Cannot execute task due to missing dependencies: {missing}")
Agent Coordination Patterns
Supervisor Pattern
Implement a supervisor agent that coordinates other agents:
from safeguards.types.agent import Agent
from typing import Dict, Any, List
class SupervisorAgent(Agent):
def __init__(self, name, worker_agents=None):
super().__init__(name)
self.worker_agents = worker_agents or []
def run(self, **kwargs) -> Dict[str, Any]:
"""Coordinate multiple worker agents."""
task = kwargs.get("task", "")
# Step 1: Decompose the task
subtasks = self._decompose_task(task)
# Step 2: Assign subtasks to workers
assignments = self._assign_subtasks(subtasks)
# Step 3: Monitor and collect results
results = {}
for agent_id, subtask in assignments.items():
# Find the agent
agent = next((a for a in self.worker_agents if a.id == agent_id), None)
if agent:
result = agent.run(task=subtask)
results[agent_id] = result
# Update budget
current_budget = budget_api.get_budget(agent.id)
cost = result.get("cost", Decimal("0"))
budget_api.update_budget(agent.id, current_budget - cost)
# Step 4: Combine results
final_result = self._combine_results(results)
return {
"result": final_result,
"subtask_count": len(subtasks),
"worker_count": len(self.worker_agents)
}
def _decompose_task(self, task) -> List[str]:
"""Break a task into subtasks."""
# Implementation depends on task type
return [f"{task} - part {i}" for i in range(3)]
def _assign_subtasks(self, subtasks) -> Dict[str, str]:
"""Assign subtasks to worker agents."""
assignments = {}
for i, subtask in enumerate(subtasks):
if i < len(self.worker_agents):
agent = self.worker_agents[i]
assignments[agent.id] = subtask
return assignments
def _combine_results(self, results) -> Any:
"""Combine results from multiple agents."""
# Implementation depends on result type
combined = ""
for agent_id, result in results.items():
combined += f"{result.get('output', '')}\n"
return combined
Reactive Coordination
Implement event-driven coordination between agents:
# Setup event subscriptions
agent_events = {
"data_available": [],
"analysis_complete": [],
"error_reported": []
}
def subscribe_to_event(agent_id, event_type, callback):
"""Subscribe an agent to an event type."""
if event_type in agent_events:
agent_events[event_type].append({
"agent_id": agent_id,
"callback": callback
})
def publish_event(source_agent_id, event_type, data):
"""Publish an event to all subscribers."""
if event_type in agent_events:
for subscriber in agent_events[event_type]:
try:
subscriber["callback"](source_agent_id, data)
except Exception as e:
print(f"Error in event handler: {str(e)}")
# Example event handler
def handle_data_available(source_agent_id, data):
"""Handle data availability events."""
print(f"Data available from agent {source_agent_id}")
# Trigger analysis agent
analysis_result = analysis_agent.run(input=data)
# Update budget
current_budget = budget_api.get_budget(analysis_agent.id)
cost = analysis_result.get("cost", Decimal("0"))
budget_api.update_budget(analysis_agent.id, current_budget - cost)
# Publish completion event
publish_event(
analysis_agent.id,
"analysis_complete",
analysis_result.get("output", "")
)
# Subscribe analysis agent to data events
subscribe_to_event(
analysis_agent.id,
"data_available",
handle_data_available
)
# Research agent publishes event when data is ready
publish_event(
research_agent.id,
"data_available",
{"data": "Sample research data", "format": "json"}
)
Best Practices
Resource Efficiency
- Prioritize Critical Agents: Ensure critical agents have higher priority
- Use Shared Pools: Group related agents under shared budget pools
- Monitor Resource Usage: Track resource consumption across agents
- Implement Graceful Degradation: Plan for reduced functionality under resource constraints
Communication Efficiency
- Minimize Message Size: Keep coordination messages concise
- Use Appropriate Patterns: Choose the right coordination pattern for your use case
- Cache Common Data: Avoid redundant data transfers between agents
- Implement Timeouts: Don't let agents wait indefinitely for responses
Error Handling
- Propagate Failures Appropriately: Ensure errors in one agent don't silently break others
- Implement Circuit Breakers: Stop calling failing agents after repeated errors
- Plan for Recovery: Design agents to recover from coordination failures
- Log Coordination Events: Maintain logs for debugging multi-agent interactions
Advanced Coordination
Agent Teams
Create agent teams for specialized tasks:
from safeguards.coordination.team import AgentTeam
# Create a research team
research_team = AgentTeam(
name="research_team",
agents=[research_agent, analysis_agent, summarization_agent],
budget_pool_id=high_priority_pool.id
)
# Assign team task
team_result = research_team.execute_task(
task="research quantum computing",
coordination_strategy="pipeline"
)
Dynamic Agent Discovery
Implement dynamic discovery of available agents:
from safeguards.coordination.discovery import AgentDiscoveryService
# Create discovery service
discovery_service = AgentDiscoveryService(budget_coordinator)
# Register agent capabilities
discovery_service.register_capability(
agent_id=research_agent.id,
capability="data_retrieval",
metadata={"formats": ["json", "xml"], "sources": ["web", "database"]}
)
discovery_service.register_capability(
agent_id=analysis_agent.id,
capability="data_analysis",
metadata={"algorithms": ["regression", "classification"], "formats": ["json"]}
)
# Find agents with specific capabilities
analysis_agents = discovery_service.find_agents_by_capability(
capability="data_analysis",
required_metadata={"algorithms": ["regression"]}
)
if analysis_agents:
# Use the first available agent
agent_id = analysis_agents[0]
print(f"Using agent {agent_id} for regression analysis")
Conclusion
Effective agent coordination is essential for building robust multi-agent systems. By implementing appropriate coordination patterns, managing resource sharing, and handling dependencies correctly, you can create systems where agents collaborate effectively while respecting resource constraints.
For more information, see: - Budget Management Guide - Safeguards Guide - Monitoring Guide - API Reference