ai-agent-orchestrator
30
总安装量
3
周安装量
#12331
全站排名
安装命令
npx skills add https://github.com/dengineproblem/agents-monorepo --skill ai-agent-orchestrator
Agent 安装分布
github-copilot
3
amp
2
claude-code
2
kimi-cli
2
gemini-cli
2
Skill 文档
AI Agent Orchestrator Expert
ÐкÑпеÑÑ Ð¿Ð¾ пÑоекÑиÑÐ¾Ð²Ð°Ð½Ð¸Ñ Ð¸ ÑеализаÑии многоагенÑнÑÑ ÑиÑÑем.
ÐÑновнÑе пÑинÑипÑ
ÐеÑаÑÑ Ð¸Ñ Ð°Ð³ÐµÐ½Ñов
- ÐÑкеÑÑÑаÑоÑ: ÐлавнÑй кооÑдинаÑоÑ, делегиÑÑÐµÑ Ð·Ð°Ð´Ð°Ñи
- СпеÑиализиÑованнÑе агенÑÑ: ÐÐ»Ñ ÐºÐ¾Ð½ÐºÑеÑнÑÑ Ð´Ð¾Ð¼ÐµÐ½Ð¾Ð² (иÑÑледованиÑ, анализ, код)
- УÑилиÑаÑнÑе агенÑÑ: ÐÑпомогаÑелÑнÑе ÑÑнкÑии (валидаÑиÑ, ÑоÑмаÑиÑование)
- ÐониÑоÑинг: ÐдоÑовÑе ÑиÑÑÐµÐ¼Ñ Ð¸ обÑабоÑка оÑибок
ÐаÑÑеÑÐ½Ñ Ð°ÑÑ Ð¸ÑекÑÑÑÑ
Hub and Spoke
class OrchestratorAgent:
def __init__(self):
self.agents = {
'researcher': ResearchAgent(),
'analyzer': AnalysisAgent(),
'writer': WritingAgent(),
'validator': ValidationAgent()
}
async def orchestrate_task(self, task):
subtasks = self.decompose_task(task)
results = []
for subtask in subtasks:
agent_type = self.route_task(subtask)
result = await self.agents[agent_type].execute(subtask)
results.append(result)
return self.synthesize_results(results)
Pipeline Pattern
class AgentPipeline:
def __init__(self):
self.stages = [
DataIngestionAgent(),
ProcessingAgent(),
AnalysisAgent(),
OutputAgent()
]
async def execute_pipeline(self, input_data):
data = input_data
for stage in self.stages:
try:
data = await stage.process(data)
except Exception as e:
return await self.handle_pipeline_error(stage, e, data)
return data
ÐаÑÑÑÑÑизаÑÐ¸Ñ Ð·Ð°Ð´Ð°Ñ
class TaskRouter:
def __init__(self):
self.agent_capabilities = {
'code_analysis': ['python', 'javascript', 'sql'],
'research': ['web_search', 'document_analysis'],
'writing': ['technical', 'creative', 'business']
}
self.agent_load = {}
def route_task(self, task):
required_skills = self.extract_skills(task)
capable_agents = [
agent_id for agent_id, skills in self.agent_capabilities.items()
if self.has_required_skills(skills, required_skills)
]
return self.select_least_loaded_agent(capable_agents)
ÐежагенÑÐ½Ð°Ñ ÐºÐ¾Ð¼Ð¼ÑникаÑиÑ
@dataclass
class AgentMessage:
sender_id: str
receiver_id: str
message_type: MessageType
payload: Dict[str, Any]
correlation_id: str
timestamp: float
priority: int = 5
class MessageBus:
async def send_message(self, message: AgentMessage):
await self.validate_message(message)
await self.route_message(message)
await self.log_message(message)
ÐбÑабоÑка оÑибок
Circuit Breaker
class AgentCircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.state = 'CLOSED'
async def call_agent(self, agent, task):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'HALF_OPEN'
else:
raise CircuitBreakerOpenError()
try:
result = await agent.execute(task)
if self.state == 'HALF_OPEN':
self.reset()
return result
except Exception as e:
self.record_failure()
raise
Graceful Degradation
class ResilientOrchestrator:
def __init__(self):
self.agent_priorities = {
'primary': ['gpt-4', 'claude-3'],
'fallback': ['gpt-3.5', 'local-model'],
'emergency': ['rule-based-agent']
}
async def execute_with_fallback(self, task):
for tier in ['primary', 'fallback', 'emergency']:
for agent_id in self.agent_priorities[tier]:
try:
if await self.is_agent_healthy(agent_id):
return await self.execute_on_agent(agent_id, task)
except Exception:
continue
raise AllAgentsFailedError()
ÐÑÑÑие пÑакÑики
- РеализÑйÑе комплекÑное логиÑование Ñ correlation ID
- ÐÑÑлеживайÑе меÑÑики пÑоизводиÑелÑноÑÑи агенÑов
- ÐÑполÑзÑйÑе ÑаÑпÑеделеннÑÑ ÑÑаÑÑиÑÐ¾Ð²ÐºÑ Ð´Ð»Ñ ÑложнÑÑ workflows
- ÐалидиÑÑйÑе вÑÑ Ð¼ÐµÐ¶Ð°Ð³ÐµÐ½ÑнÑÑ ÐºÐ¾Ð¼Ð¼ÑникаÑиÑ
- ÐÑоекÑиÑÑйÑе агенÑов как stateless когда возможно
- ÐÑполÑзÑйÑе оÑеÑеди ÑообÑений Ð´Ð»Ñ ÑазвÑзки