Skip to content

EPIC-001: Agent Orchestration Framework #192

@manavgup

Description

@manavgup

EPIC-001: Agent Orchestration Framework

Epic Overview

Epic Title: Implement Multi-Agent Orchestration Framework for Specialized RAG Tasks

Epic Description:
Build a comprehensive multi-agent system where specialized agents can collaborate to handle complex RAG queries. This framework will enable Research Agents, Synthesis Agents, Validation Agents, and Task Planners to work together in a coordinated manner to provide more accurate and comprehensive responses.

Business Value:

  • Enhanced query handling capabilities for complex multi-step questions
  • Improved accuracy through specialized agent expertise
  • Better fact-checking and validation through agent collaboration
  • Foundation for advanced agentic workflows

Epic Priority: High
Epic Size: Large (Epic)
Target Release: Q4 2024


Technical Architecture

Current State Analysis

  • Existing single-threaded RAG pipeline in pipeline_service.py
  • Service-based architecture with clean separation of concerns
  • Models in backend/rag_solution/models/
  • Services in backend/rag_solution/services/
  • No current agent framework or orchestration

Target Architecture

Agent Orchestration Layer
├── Agent Registry & Discovery
├── Communication Bus
├── Task Orchestrator
├── Agent Lifecycle Management
└── Inter-Agent Communication Protocol

Database Schema Changes

New Tables Required

1. Agent Registry Table

CREATE TABLE agents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(255) NOT NULL,
    agent_type VARCHAR(100) NOT NULL, -- 'research', 'synthesis', 'validation', 'planner'
    description TEXT,
    capabilities JSONB, -- Array of capability strings
    configuration JSONB, -- Agent-specific config
    status VARCHAR(50) DEFAULT 'active', -- 'active', 'inactive', 'maintenance'
    version VARCHAR(50),
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

2. Agent Sessions Table

CREATE TABLE agent_sessions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    orchestration_id UUID REFERENCES orchestrations(id),
    agent_id UUID REFERENCES agents(id),
    status VARCHAR(50) DEFAULT 'pending', -- 'pending', 'running', 'completed', 'failed'
    input_data JSONB,
    output_data JSONB,
    error_message TEXT,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    created_at TIMESTAMP DEFAULT NOW()
);

3. Orchestrations Table

CREATE TABLE orchestrations (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID REFERENCES users(id),
    collection_id UUID REFERENCES collections(id),
    query TEXT NOT NULL,
    orchestration_plan JSONB, -- DAG of agent execution plan
    status VARCHAR(50) DEFAULT 'pending', -- 'pending', 'running', 'completed', 'failed'
    final_result JSONB,
    metadata JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

4. Agent Communications Table

CREATE TABLE agent_communications (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    orchestration_id UUID REFERENCES orchestrations(id),
    sender_agent_id UUID REFERENCES agents(id),
    receiver_agent_id UUID REFERENCES agents(id),
    message_type VARCHAR(100), -- 'task', 'result', 'query', 'error'
    message_data JSONB,
    timestamp TIMESTAMP DEFAULT NOW()
);

New Models Required

1. Agent Model (backend/rag_solution/models/agent.py)

from sqlalchemy import String, DateTime, JSON
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from typing import Dict, List, Any
import uuid
from datetime import datetime
from rag_solution.file_management.database import Base

class Agent(Base):
    __tablename__ = "agents"
    
    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name: Mapped[str] = mapped_column(String(255), nullable=False)
    agent_type: Mapped[str] = mapped_column(String(100), nullable=False)
    description: Mapped[str] = mapped_column(String, nullable=True)
    capabilities: Mapped[Dict[str, Any]] = mapped_column(JSON, nullable=True)
    configuration: Mapped[Dict[str, Any]] = mapped_column(JSON, nullable=True)
    status: Mapped[str] = mapped_column(String(50), default="active")
    version: Mapped[str] = mapped_column(String(50), nullable=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
    updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, onupdate=datetime.now)
    
    # Relationships
    sessions: Mapped[List["AgentSession"]] = relationship("AgentSession", back_populates="agent")

2. Orchestration Model (backend/rag_solution/models/orchestration.py)

class Orchestration(Base):
    __tablename__ = "orchestrations"
    
    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id"))
    collection_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("collections.id"))
    query: Mapped[str] = mapped_column(String, nullable=False)
    orchestration_plan: Mapped[Dict[str, Any]] = mapped_column(JSON, nullable=True)
    status: Mapped[str] = mapped_column(String(50), default="pending")
    final_result: Mapped[Dict[str, Any]] = mapped_column(JSON, nullable=True)
    metadata: Mapped[Dict[str, Any]] = mapped_column(JSON, nullable=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
    updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now, onupdate=datetime.now)
    
    # Relationships
    user: Mapped["User"] = relationship("User")
    collection: Mapped["Collection"] = relationship("Collection")
    sessions: Mapped[List["AgentSession"]] = relationship("AgentSession", back_populates="orchestration")

3. Agent Session Model (backend/rag_solution/models/agent_session.py)

4. Agent Communication Model (backend/rag_solution/models/agent_communication.py)


New Schemas Required

1. Agent Schemas (backend/rag_solution/schemas/agent_schema.py)

from pydantic import BaseModel, Field
from typing import Dict, Any, List, Optional
from datetime import datetime
import uuid

class AgentBase(BaseModel):
    name: str = Field(..., description="Agent name")
    agent_type: str = Field(..., description="Type of agent")
    description: Optional[str] = None
    capabilities: Optional[Dict[str, Any]] = None
    configuration: Optional[Dict[str, Any]] = None

class AgentCreate(AgentBase):
    pass

class AgentUpdate(BaseModel):
    name: Optional[str] = None
    description: Optional[str] = None
    capabilities: Optional[Dict[str, Any]] = None
    configuration: Optional[Dict[str, Any]] = None
    status: Optional[str] = None

class Agent(AgentBase):
    id: uuid.UUID
    status: str
    version: Optional[str]
    created_at: datetime
    updated_at: datetime
    
    class Config:
        from_attributes = True

2. Orchestration Schemas (backend/rag_solution/schemas/orchestration_schema.py)

3. Agent Session Schemas (backend/rag_solution/schemas/agent_session_schema.py)


New Services Required

1. Agent Registry Service (backend/rag_solution/services/agent_registry_service.py)

Responsibilities:

  • Register and discover agents
  • Manage agent lifecycle
  • Handle agent capabilities and configuration

Key Methods:

class AgentRegistryService:
    async def register_agent(self, agent_data: AgentCreate) -> Agent
    async def get_agent_by_id(self, agent_id: UUID) -> Agent
    async def get_agents_by_type(self, agent_type: str) -> List[Agent]
    async def get_agents_by_capability(self, capability: str) -> List[Agent]
    async def update_agent(self, agent_id: UUID, update_data: AgentUpdate) -> Agent
    async def deactivate_agent(self, agent_id: UUID) -> bool

2. Orchestration Service (backend/rag_solution/services/orchestration_service.py)

Responsibilities:

  • Plan and execute multi-agent workflows
  • Coordinate agent communication
  • Handle orchestration lifecycle

Key Methods:

class OrchestrationService:
    async def create_orchestration(self, user_id: UUID, collection_id: UUID, query: str) -> Orchestration
    async def plan_execution(self, orchestration_id: UUID) -> Dict[str, Any]
    async def execute_orchestration(self, orchestration_id: UUID) -> Dict[str, Any]
    async def get_orchestration_status(self, orchestration_id: UUID) -> str
    async def cancel_orchestration(self, orchestration_id: UUID) -> bool

3. Agent Communication Service (backend/rag_solution/services/agent_communication_service.py)

Responsibilities:

  • Handle inter-agent messaging
  • Maintain communication logs
  • Provide communication bus functionality

4. Specialized Agent Services:

  • Research Agent Service (backend/rag_solution/services/research_agent_service.py)
  • Synthesis Agent Service (backend/rag_solution/services/synthesis_agent_service.py)
  • Validation Agent Service (backend/rag_solution/services/validation_agent_service.py)
  • Task Planner Service (backend/rag_solution/services/task_planner_service.py)

New Router Endpoints Required

1. Agent Management Router (backend/rag_solution/router/agent_router.py)

# Agent CRUD operations
POST   /api/v1/agents                    # Register new agent
GET    /api/v1/agents                    # List all agents
GET    /api/v1/agents/{agent_id}         # Get agent details
PUT    /api/v1/agents/{agent_id}         # Update agent
DELETE /api/v1/agents/{agent_id}         # Deactivate agent
GET    /api/v1/agents/types/{agent_type} # Get agents by type
GET    /api/v1/agents/capabilities/{capability} # Get agents by capability

2. Orchestration Router (backend/rag_solution/router/orchestration_router.py)

# Orchestration management
POST   /api/v1/orchestrations           # Create new orchestration
GET    /api/v1/orchestrations           # List user orchestrations
GET    /api/v1/orchestrations/{orchestration_id} # Get orchestration details
POST   /api/v1/orchestrations/{orchestration_id}/execute # Execute orchestration
POST   /api/v1/orchestrations/{orchestration_id}/cancel  # Cancel orchestration
GET    /api/v1/orchestrations/{orchestration_id}/status  # Get execution status
GET    /api/v1/orchestrations/{orchestration_id}/logs    # Get execution logs

3. Agent Communication Router (backend/rag_solution/router/agent_communication_router.py)

# Agent communication
GET    /api/v1/communications/{orchestration_id} # Get communication logs
POST   /api/v1/communications/send               # Send inter-agent message

Frontend Changes Required

1. New React Components

Agent Management Interface (webui/src/components/AgentManagement/)

  • AgentList.jsx - Display registered agents
  • AgentDetails.jsx - Show agent capabilities and status
  • AgentRegistration.jsx - Register new agents
  • AgentConfiguration.jsx - Configure agent settings

Orchestration Interface (webui/src/components/Orchestration/)

  • OrchestrationDashboard.jsx - Main orchestration view
  • OrchestrationPlan.jsx - Visual DAG of execution plan
  • OrchestrationLogs.jsx - Real-time execution logs
  • AgentCommunicationView.jsx - Inter-agent communication viewer

Enhanced Search Interface

  • Update SearchInterface.jsx to include orchestration options
  • Add toggle for "Enable Multi-Agent Processing"
  • Display orchestration progress during query execution

2. New Context Providers

  • AgentContext.jsx - Manage agent state
  • OrchestrationContext.jsx - Manage orchestration state

3. New API Integration

  • api/agentService.js - Agent management API calls
  • api/orchestrationService.js - Orchestration API calls
  • Add WebSocket support for real-time orchestration updates

Database Migration Scripts

Migration Script: migrations/add_agent_orchestration_tables.sql

-- Create agent orchestration tables
-- (Include all CREATE TABLE statements from above)

-- Create indexes for performance
CREATE INDEX idx_agents_type ON agents(agent_type);
CREATE INDEX idx_agents_status ON agents(status);
CREATE INDEX idx_orchestrations_user_id ON orchestrations(user_id);
CREATE INDEX idx_orchestrations_status ON orchestrations(status);
CREATE INDEX idx_agent_sessions_orchestration_id ON agent_sessions(orchestration_id);
CREATE INDEX idx_agent_sessions_agent_id ON agent_sessions(agent_id);
CREATE INDEX idx_agent_communications_orchestration_id ON agent_communications(orchestration_id);

Testing Strategy

Atomic Tests

Create backend/tests/atomic/test_agent_orchestration.py:

  • test_agent_model_creation() - Test Agent model instantiation
  • test_orchestration_model_creation() - Test Orchestration model instantiation
  • test_agent_session_model_creation() - Test AgentSession model instantiation
  • test_agent_communication_model_creation() - Test AgentCommunication model instantiation
  • test_agent_schema_validation() - Test Pydantic schema validation
  • test_orchestration_schema_validation() - Test orchestration schema validation

Unit Tests

Agent Registry Service Tests (backend/tests/unit/test_agent_registry_service.py)

  • test_register_agent_success() - Test successful agent registration
  • test_register_agent_duplicate_name() - Test duplicate agent name handling
  • test_get_agent_by_id_success() - Test agent retrieval by ID
  • test_get_agent_by_id_not_found() - Test agent not found case
  • test_get_agents_by_type() - Test filtering agents by type
  • test_get_agents_by_capability() - Test filtering agents by capability
  • test_update_agent_success() - Test agent updates
  • test_deactivate_agent() - Test agent deactivation

Orchestration Service Tests (backend/tests/unit/test_orchestration_service.py)

  • test_create_orchestration_success() - Test orchestration creation
  • test_plan_execution_simple_query() - Test execution planning
  • test_plan_execution_complex_query() - Test complex query planning
  • test_execute_orchestration_single_agent() - Test single agent execution
  • test_execute_orchestration_multi_agent() - Test multi-agent execution
  • test_orchestration_status_tracking() - Test status updates
  • test_cancel_orchestration() - Test orchestration cancellation

Specialized Agent Service Tests

  • test_research_agent_service() - Test research agent functionality
  • test_synthesis_agent_service() - Test synthesis agent functionality
  • test_validation_agent_service() - Test validation agent functionality
  • test_task_planner_service() - Test task planning functionality

Integration Tests

Agent Communication Tests (backend/tests/integration/test_agent_communication.py)

  • test_inter_agent_messaging() - Test message passing between agents
  • test_communication_logging() - Test communication persistence
  • test_communication_bus_functionality() - Test communication bus
  • test_agent_discovery_and_selection() - Test agent discovery process

End-to-End Orchestration Tests (backend/tests/integration/test_orchestration_e2e.py)

  • test_simple_research_task() - Test single research agent task
  • test_multi_agent_synthesis() - Test research + synthesis workflow
  • test_validation_workflow() - Test research + synthesis + validation
  • test_complex_query_decomposition() - Test task planner functionality
  • test_orchestration_error_handling() - Test error propagation and recovery

Database Integration Tests (backend/tests/integration/test_agent_database.py)

  • test_agent_crud_operations() - Test agent CRUD through database
  • test_orchestration_persistence() - Test orchestration data persistence
  • test_agent_session_tracking() - Test session data integrity
  • test_communication_log_storage() - Test communication log persistence

E2E Tests

Frontend Integration Tests (backend/tests/e2e/test_agent_ui.py)

  • test_agent_registration_flow() - Test agent registration through UI
  • test_orchestration_creation_flow() - Test orchestration creation through UI
  • test_real_time_orchestration_monitoring() - Test WebSocket orchestration updates
  • test_agent_communication_visualization() - Test communication view

API Integration Tests (backend/tests/e2e/test_agent_api.py)

  • test_agent_management_api() - Test all agent management endpoints
  • test_orchestration_api() - Test all orchestration endpoints
  • test_communication_api() - Test communication endpoints
  • test_agent_orchestration_full_workflow() - Test complete workflow through API

Performance Tests

Load Testing (backend/tests/performance/test_agent_performance.py)

  • test_concurrent_orchestrations() - Test multiple orchestrations
  • test_agent_communication_throughput() - Test message throughput
  • test_large_scale_agent_registration() - Test agent registry performance
  • test_orchestration_memory_usage() - Test memory consumption during orchestration

Success Criteria & Milestones

Milestone 1: Core Infrastructure (Week 1-2)

Success Criteria:

  • All database tables created and migrated
  • All models implemented with proper relationships
  • All schemas defined with validation
  • Basic agent registry service functional
  • Atomic tests passing (100% coverage)

Deliverables:

  • Database migration scripts
  • Model classes with relationships
  • Pydantic schemas
  • Basic agent registry service
  • Atomic test suite

Milestone 2: Agent Framework (Week 3-4)

Success Criteria:

  • Agent registration and discovery working
  • Basic orchestration service implemented
  • Agent communication framework functional
  • Unit tests passing (90%+ coverage)
  • Agent lifecycle management working

Deliverables:

  • Complete agent registry service
  • Orchestration service with basic planning
  • Agent communication service
  • Unit test suite
  • Agent lifecycle management

Milestone 3: Specialized Agents (Week 5-6)

Success Criteria:

  • Research agent implemented and functional
  • Synthesis agent implemented and functional
  • Validation agent implemented and functional
  • Task planner agent implemented and functional
  • Integration tests passing

Deliverables:

  • Four specialized agent services
  • Agent capability framework
  • Integration test suite
  • Agent performance optimizations

Milestone 4: API & Communication (Week 7-8)

Success Criteria:

  • All REST API endpoints implemented
  • Agent communication bus fully functional
  • WebSocket support for real-time updates
  • API integration tests passing
  • Inter-agent messaging reliable

Deliverables:

  • Complete REST API for agents and orchestrations
  • WebSocket integration
  • Communication bus implementation
  • API documentation
  • Integration test suite

Milestone 5: Frontend Integration (Week 9-10)

Success Criteria:

  • Agent management interface functional
  • Orchestration dashboard implemented
  • Real-time orchestration monitoring working
  • Agent communication visualization complete
  • E2E tests passing

Deliverables:

  • Complete React components for agent management
  • Orchestration dashboard with real-time updates
  • Agent communication visualization
  • Enhanced search interface
  • E2E test suite

Milestone 6: Production Readiness (Week 11-12)

Success Criteria:

  • Performance tests passing
  • Error handling and recovery implemented
  • Documentation complete
  • Security review passed
  • Load testing successful

Deliverables:

  • Performance test suite
  • Error handling framework
  • Complete documentation
  • Security audit results
  • Load testing reports

Risk Assessment & Mitigation

High Risks

  1. Inter-agent communication complexity

    • Mitigation: Start with simple message passing, iterate to complex protocols
  2. Performance impact of orchestration overhead

    • Mitigation: Implement async processing, add performance monitoring
  3. Agent state management complexity

    • Mitigation: Use proven state management patterns, implement comprehensive logging

Medium Risks

  1. Database performance with complex queries

    • Mitigation: Implement proper indexing, consider query optimization
  2. Frontend complexity for real-time updates

    • Mitigation: Use established WebSocket patterns, implement fallback mechanisms

Low Risks

  1. API endpoint complexity
    • Mitigation: Follow existing patterns, implement comprehensive testing

Dependencies

Internal Dependencies

  • Existing user authentication system
  • Current RAG pipeline services
  • Database infrastructure
  • Frontend framework

External Dependencies

  • PostgreSQL for orchestration data
  • WebSocket support for real-time updates
  • Message queue for agent communication (consider Redis/RabbitMQ)

Post-Epic Considerations

Future Enhancements

  1. Agent learning and adaptation
  2. Dynamic agent deployment
  3. Agent marketplace/plugin system
  4. Advanced orchestration patterns (DAG workflows)
  5. Agent performance analytics

Technical Debt

  1. Consider microservice architecture for agents
  2. Implement proper agent versioning
  3. Add comprehensive monitoring and observability
  4. Consider event sourcing for orchestration history

Definition of Done

Epic-Level DoD

  • All user stories completed and accepted
  • All tests passing (atomic, unit, integration, E2E, performance)
  • Documentation complete and reviewed
  • Security review completed
  • Performance benchmarks met
  • Production deployment successful
  • User acceptance testing completed

Story-Level DoD

  • Feature implemented according to specifications
  • Unit tests written and passing
  • Integration tests written and passing
  • Code reviewed and approved
  • Documentation updated
  • Database migrations tested
  • API endpoints tested
  • Frontend components tested

Metadata

Metadata

Assignees

No one assigned

    Labels

    agenticAgentic AI featuresenhancementNew feature or request

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions