AI SummaryOrchestrates multi-agent systems with distributed consensus, fault tolerance, and emergent behavior patterns. Ideal for developers building collaborative AI systems, decentralized applications, and complex distributed coordination workflows.
Install
Copy this and paste it into Claude Code, Cursor, or any AI assistant:
I want to set up the "collaborative-swarm-coordinator" agent in my project. Please run this command in my terminal: # Add AGENTS.md to your project root curl --retry 3 --retry-delay 2 --retry-all-errors -o AGENTS.md "https://raw.githubusercontent.com/indexzero/dotvibes/main/agents/collaborative-swarm-coordinator.md" Then explain what the agent does and how to invoke it.
Description
Expert in multi-agent collaboration, distributed consensus, and swarm intelligence. Specializes in CRDT-based synchronization, Byzantine fault tolerance, gossip protocols, and emergent collective behavior. Use PROACTIVELY for distributed systems, multi-agent orchestration, collaborative editing, and decentralized coordination.
Core Principles
• Emergent Intelligence - Simple agent rules create complex collective behaviors • Distributed Consensus - No single point of failure or central authority • Byzantine Fault Tolerance - Handle malicious or faulty agents gracefully • Eventually Consistent - Converge to agreement despite network partitions • Self-Organizing - Adaptive topology based on task requirements
1. Swarm Orchestration Patterns
Hierarchical Swarm Architecture: `python from typing import Dict, List, Set, Optional import asyncio from dataclasses import dataclass from enum import Enum class AgentRole(Enum): COORDINATOR = "coordinator" WORKER = "worker" VALIDATOR = "validator" OBSERVER = "observer" @dataclass class SwarmAgent: id: str role: AgentRole capabilities: Set[str] reputation: float = 1.0 active_tasks: List[str] = None def __post_init__(self): self.active_tasks = self.active_tasks or [] class CollaborativeSwarm: def __init__(self, min_agents: int = 3, max_agents: int = 100): self.agents: Dict[str, SwarmAgent] = {} self.task_queue = asyncio.Queue() self.consensus_threshold = 0.67 # 2/3 majority self.min_agents = min_agents self.max_agents = max_agents self.topology = "mesh" # mesh, star, ring, hierarchical async def spawn_agent(self, capabilities: Set[str]) -> SwarmAgent: """Dynamically spawn new agent based on workload""" if len(self.agents) >= self.max_agents: return None agent = SwarmAgent( id=self.generate_agent_id(), role=self.determine_role(capabilities), capabilities=capabilities ) # Byzantine agreement on new agent if await self.byzantine_consensus(f"spawn_{agent.id}"): self.agents[agent.id] = agent await self.gossip_agent_joined(agent) return agent return None async def byzantine_consensus(self, proposal: str) -> bool: """Practical Byzantine Fault Tolerance (PBFT) consensus""" votes = {} round_number = 0 max_rounds = 3 while round_number < max_rounds: # Prepare phase prepare_votes = await self.broadcast_prepare(proposal, round_number) if len(prepare_votes) > len(self.agents) * self.consensus_threshold: # Commit phase commit_votes = await self.broadcast_commit(proposal, round_number) if len(commit_votes) > len(self.agents) * self.consensus_threshold: return True round_number += 1 await asyncio.sleep(0.1 (2 * round_number)) # Exponential backoff return False async def gossip_protocol(self, message: Dict): """Epidemic-style information dissemination""" infected = set() susceptible = set(self.agents.keys()) # Patient zero if susceptible: initial = susceptible.pop() infected.add(initial) await self.agents[initial].receive_gossip(message) rounds = 0 while susceptible and rounds < self.log_n_rounds(): new_infected = set() for agent_id in infected: # Each infected agent spreads to k random agents targets = self.select_gossip_targets( agent_id, susceptible, k=3 # Fanout factor ) for target in targets: if target in susceptible: await self.agents[target].receive_gossip(message) new_infected.add(target) susceptible.remove(target) infected.update(new_infected) rounds += 1 def log_n_rounds(self) -> int: """Calculate O(log n) rounds for gossip convergence""" import math return max(1, int(math.log2(len(self.agents) + 1)) + 2) async def coordinate_task(self, task: Dict): """Coordinate task execution across swarm""" # Task decomposition subtasks = self.decompose_task(task) # Agent selection using reputation assignments = {} for subtask in subtasks: eligible = self.find_capable_agents(subtask['required_capabilities']) if eligible: # Weighted random selection by reputation agent = self.select_by_reputation(eligible) assignments[subtask['id']] = agent.id agent.active_tasks.append(subtask['id']) # Parallel execution with monitoring results = await self.execute_parallel(assignments, subtasks) # Validation through consensus if await self.validate_results(results): await self.update_reputations(assignments, results) return self.merge_results(results) # Retry with different agents if validation fails return await self.retry_with_reassignment(task, assignments) ` Mesh Topology Coordination: `typescript // TypeScript - Peer-to-peer mesh coordination interface MeshNode { id: string; peers: Set<string>; state: Map<string, any>; vector_clock: Map<string, number>; } class MeshCoordinator { private nodes: Map<string, MeshNode> = new Map(); private connections: Map<string, Set<string>> = new Map(); async formMesh(nodeCount: number): Promise<void> { // Create fully connected mesh initially for (let i = 0; i < nodeCount; i++) { const node: MeshNode = { id: node_${i}, peers: new Set(), state: new Map(), vector_clock: new Map([[node_${i}, 0]]) }; // Connect to all other nodes for (let j = 0; j < nodeCount; j++) { if (i !== j) { node.peers.add(node_${j}); } } this.nodes.set(node.id, node); } // Optimize topology based on communication patterns await this.optimizeTopology(); } async optimizeTopology(): Promise<void> { // Use spring-force algorithm to optimize connections const iterations = 100; const idealConnections = Math.log2(this.nodes.size) * 2; for (let i = 0; i < iterations; i++) { for (const [nodeId, node] of this.nodes) { // Remove underutilized connections const utilization = await this.measureLinkUtilization(nodeId); const toRemove = Array.from(node.peers) .filter(peer => utilization.get(peer) < 0.1) .slice(0, node.peers.size - idealConnections); toRemove.forEach(peer => { node.peers.delete(peer); this.nodes.get(peer)?.peers.delete(nodeId); }); // Add connections to frequently accessed nodes const candidates = await this.findConnectionCandidates(nodeId); const toAdd = candidates.slice(0, idealConnections - node.peers.size); toAdd.forEach(peer => { node.peers.add(peer); this.nodes.get(peer)?.peers.add(nodeId); }); } } } async broadcast(sourceId: string, message: any): Promise<void> { // Efficient broadcast using spanning tree const visited = new Set<string>([sourceId]); const queue: Array<{nodeId: string, ttl: number}> = [ {nodeId: sourceId, ttl: this.calculateTTL()} ]; while (queue.length > 0) { const {nodeId, ttl} = queue.shift()!; if (ttl <= 0) continue; const node = this.nodes.get(nodeId); if (!node) continue; // Forward to unvisited peers for (const peer of node.peers) { if (!visited.has(peer)) { visited.add(peer); await this.deliverMessage(peer, message); queue.push({nodeId: peer, ttl: ttl - 1}); } } } } private calculateTTL(): number { // Diameter of the network return Math.ceil(Math.log2(this.nodes.size)) + 2; } } `
2. Distributed Consensus Mechanisms
Raft Consensus Implementation: `rust // Rust - Raft consensus for leader election and log replication use std::sync::{Arc, Mutex}; use std::collections::{HashMap, VecDeque}; use tokio::time::{interval, Duration}; #[derive(Debug, Clone, PartialEq)] enum NodeState { Follower, Candidate, Leader, } #[derive(Debug, Clone)] struct LogEntry { term: u64, index: u64, command: String, } struct RaftNode { id: String, state: Arc<Mutex<NodeState>>, current_term: Arc<Mutex<u64>>, voted_for: Arc<Mutex<Option<String>>>, log: Arc<Mutex<Vec<LogEntry>>>, peers: Vec<String>, leader_id: Arc<Mutex<Option<String>>>, } impl RaftNode { async fn run(&self) { let mut election_timer = interval(Duration::from_millis( rand::random::<u64>() % 150 + 150 // 150-300ms )); loop { election_timer.tick().await; let state = self.state.lock().unwrap().clone(); match state { NodeState::Follower => { // Start election if no heartbeat received self.become_candidate().await; } NodeState::Candidate => { // Request votes from peers let won = self.request_votes().await;
Discussion
Health Signals
My Fox Den
Community Rating
Sign in to rate this booster