-
Notifications
You must be signed in to change notification settings - Fork 4
Description
Epic: Modern RAG Search Architecture - Re-architect from Ground Up
Type: Epic
Priority: P2 - Optional (Performance already achieved)
Timeline: 4 weeks
Status: Proposed - Awaiting decision
Related: MASTER_ISSUES_ROADMAP.md (Phase 2 Option A)
📋 Executive Summary
Proposal: Re-architect SearchService from scratch using a modern pipeline architecture while preserving all external contracts (zero breaking changes).
Context: We've achieved the <15s performance target (8-22s queries) through targeted fixes (PRs #542, #544, #546, #548). However, the current search_service.py has a 400+ line monolithic method that is difficult to maintain, test, and optimize.
Decision Point: Should we invest 4 weeks to build a clean, maintainable architecture, or continue with the current working implementation?
🎯 Why Re-architect? (Option A)
Current Problems
- ❌ Monolithic code: 400+ line
search()method - ❌ No separation of concerns: Mixing orchestration, logic, and instrumentation
- ❌ Hard to test: Components tightly coupled
- ❌ Hard to optimize: Can't easily swap/improve individual stages
- ❌ Hard to understand: New developers struggle with code complexity
Benefits of Re-architecture
- ✅ Clean slate: No technical debt, optimal design from day 1
- ✅ Faster timeline: 4 weeks vs 10-17 weeks incremental
- ✅ Better testing: Each stage designed for testing (90%+ coverage target)
- ✅ Better maintainability: Clear pipeline stages (<100 lines each)
- ✅ Better performance observability: Timing instrumentation built-in
- ✅ Easier onboarding: Fresh codebase with clear architecture
- ✅ Better for users: Faster time to stable, performant system
🏗️ Architecture Overview
Key Principle: Zero Breaking Changes 🔒
Only internal SearchService implementation changes. All schemas, services, and APIs remain unchanged.
Current Architecture (Monolithic)
class SearchService:
async def search(self, search_input: SearchInput) -> SearchOutput:
# 400+ lines doing EVERYTHING:
# - Pipeline resolution
# - Query enhancement
# - Embedding generation
# - Vector retrieval
# - Reranking
# - CoT reasoning
# - LLM generation
# - Response formatting
# All in one giant method!New Architecture (Pipeline Stages)
class SearchService:
async def search(self, search_input: SearchInput) -> SearchOutput:
"""~50 lines orchestrator"""
context = SearchContext(search_input, services...)
stages = [
PipelineResolutionStage(),
QueryEnhancementStage(),
RetrievalStage(),
RerankingStage(),
ReasoningStage(), # CoT if needed
GenerationStage(),
]
for stage in stages:
context = await stage.execute(context)
return context.to_output() # SearchOutput schemaExternal Contracts (100% UNCHANGED)
- ✅
SearchInputschema - no changes - ✅
SearchOutputschema - no changes - ✅
SearchService.search()signature - no changes - ✅ All existing services - no changes
- ✅ Router layer - no changes
- ✅ All 947+ existing tests - pass without modification
📦 Epic Breakdown: 4 User Stories (4 Weeks)
User Story 1: Pipeline Framework (Week 1)
Goal: Build foundational pipeline architecture and data structures.
Acceptance Criteria:
-
BaseStageabstract class implemented -
SearchContextdataclass implemented -
SearchContext.to_output()converts toSearchOutputcorrectly - Pipeline executor implemented
- Unit tests for framework (90%+ coverage)
- All existing tests pass WITHOUT modification
-
SearchInput/SearchOutputschemas UNCHANGED
Files Created:
backend/rag_solution/pipeline/
├── __init__.py
├── base_stage.py # BaseStage abstract class
├── search_context.py # SearchContext dataclass
└── executor.py # Pipeline executor
tests/unit/pipeline/
├── test_base_stage.py
├── test_search_context.py
└── test_executor.py
Code Examples:
@dataclass
class SearchContext:
"""Internal state passed between pipeline stages"""
search_input: SearchInput
embedding_service: EmbeddingService # Injected
retrieval_service: RetrievalService # Injected
# ... other existing services
# State accumulated during pipeline
pipeline: Pipeline | None = None
query_embedding: list[float] | None = None
retrieval_results: list[QueryResult] | None = None
reranked_results: list[QueryResult] | None = None
answer: str | None = None
timing: dict[str, float] = field(default_factory=dict)
def to_output(self) -> SearchOutput:
"""Convert to external SearchOutput schema"""
return SearchOutput(
answer=self.answer,
query_results=self.reranked_results,
execution_time=sum(self.timing.values()),
metadata={"timing_breakdown": self.timing},
# ... other fields
)
class BaseStage(ABC):
"""Base class for all pipeline stages"""
@abstractmethod
async def execute(self, context: SearchContext) -> SearchContext:
"""Execute stage, update context, return modified context"""
pass
def should_skip(self, context: SearchContext) -> bool:
"""Override to skip stage conditionally"""
return FalseUser Story 2: Core Retrieval Pipeline (Week 2)
Goal: Implement first 3 pipeline stages (pipeline resolution, query enhancement, retrieval).
Acceptance Criteria:
-
PipelineResolutionStageimplemented (wraps existingPipelineService) -
QueryEnhancementStageimplemented (wraps existingEmbeddingService) -
RetrievalStageimplemented (wraps existingRetrievalService) - Each stage <100 lines
- Unit tests for each stage (90%+ coverage)
- Integration tests for retrieval pipeline
- All existing services interfaces UNCHANGED
- All integration tests pass WITHOUT modification
Files Created:
backend/rag_solution/pipeline/stages/
├── __init__.py
├── pipeline_resolution_stage.py
├── query_enhancement_stage.py
└── retrieval_stage.py
tests/unit/pipeline/stages/
├── test_pipeline_resolution_stage.py
├── test_query_enhancement_stage.py
└── test_retrieval_stage.py
tests/integration/
└── test_retrieval_pipeline.py
Code Example (RetrievalStage):
class RetrievalStage(BaseStage):
"""Handles vector retrieval - wraps existing RetrievalService"""
async def execute(self, context: SearchContext) -> SearchContext:
start = time.time()
# Call existing service (UNCHANGED)
results = await context.retrieval_service.retrieve(
query_embedding=context.query_embedding,
collection_id=context.search_input.collection_id,
top_k=context.config.get("top_k", 20),
)
context.retrieval_results = results
context.timing["retrieval"] = time.time() - start
logger.info("Retrieved %d documents in %.2fs",
len(results), context.timing["retrieval"])
return contextUser Story 3: Reranking & Generation (Week 3)
Goal: Implement final 3 pipeline stages (reranking, reasoning, generation).
Acceptance Criteria:
-
RerankingStageimplemented (uses cross-encoder) -
ReasoningStageimplemented (wraps existingChainOfThoughtService) -
GenerationStageimplemented (wraps existingGenerationService) - Each stage <100 lines
- Cross-encoder reranking works in new architecture
- CoT integration preserved
- Unit tests for each stage (90%+ coverage)
- Integration tests for full pipeline
- Performance equal or better (<15s p95)
- All existing services interfaces UNCHANGED
Files Created:
backend/rag_solution/pipeline/stages/
├── reranking_stage.py
├── reasoning_stage.py
└── generation_stage.py
tests/unit/pipeline/stages/
├── test_reranking_stage.py
├── test_reasoning_stage.py
└── test_generation_stage.py
tests/integration/
└── test_full_search_pipeline.py
Code Example (RerankingStage):
class RerankingStage(BaseStage):
"""Handles reranking - uses cross-encoder"""
def should_skip(self, context: SearchContext) -> bool:
return not context.config.get("reranking_enabled", True)
async def execute(self, context: SearchContext) -> SearchContext:
if self.should_skip(context):
context.reranked_results = context.retrieval_results
return context
start = time.time()
# Use cross-encoder reranker (80ms)
reranked = await context.reranker.rerank_async(
query=context.search_input.question,
results=context.retrieval_results,
top_k=context.config.get("number_of_results", 5),
)
context.reranked_results = reranked
context.timing["reranking"] = time.time() - start
logger.info("Reranked %d → %d documents in %.2fms",
len(context.retrieval_results),
len(reranked),
context.timing["reranking"] * 1000)
return contextUser Story 4: Migration & Rollout (Week 4)
Goal: Deploy new architecture behind feature flag and gradually roll out to production.
Acceptance Criteria:
- Feature flag
USE_PIPELINE_ARCHITECTUREimplemented (default: false) - Old and new implementations produce IDENTICAL output
- All 947+ tests pass with BOTH implementations
- Performance equal or better (<15s p95)
- Zero API breaking changes
- A/B testing framework implemented
- Gradual rollout plan: 5% → 25% → 50% → 100%
- Rollback plan documented
- Migration documentation complete
Files Modified:
backend/core/config.py
- Add USE_PIPELINE_ARCHITECTURE flag
backend/rag_solution/services/search_service.py
- Add conditional routing:
if config.USE_PIPELINE_ARCHITECTURE:
return await self._search_pipeline(search_input)
else:
return await self._search_legacy(search_input)
tests/integration/
- test_feature_flag_migration.py
- test_output_equivalence.py
docs/migration/
- pipeline-architecture-migration.md
Rollout Strategy:
# Week 4 Day 1-2: Internal testing
USE_PIPELINE_ARCHITECTURE=true # 100% internal traffic
# Week 4 Day 3: Canary rollout
if user_id % 100 < 5: # 5% of users
USE_PIPELINE_ARCHITECTURE=true
# Week 4 Day 4: Expanded rollout
if user_id % 4 == 0: # 25% of users
USE_PIPELINE_ARCHITECTURE=true
# Week 4 Day 5: Majority rollout
if user_id % 2 == 0: # 50% of users
USE_PIPELINE_ARCHITECTURE=true
# Week 4 Day 6-7: Full rollout (if no issues)
USE_PIPELINE_ARCHITECTURE=true # 100%⚠️ Risks & Mitigation Strategies
Risk 1: Big Bang Rewrite Could Introduce Bugs
Mitigation:
- ✅ TDD approach (tests first, then code)
- ✅ All existing tests must pass without modification
- ✅ Feature flag allows easy rollback
- ✅ Gradual rollout (5% → 25% → 50% → 100%)
Risk 2: Migration Complexity (Supporting 2 Implementations)
Mitigation:
- ✅ Feature flag makes it easy to switch
- ✅ Both implementations share same services (no duplication)
- ✅ Only orchestration logic differs
- ✅ Keep old code for 2-4 weeks post-migration, then remove
Risk 3: Resource Intensive (4 Weeks Focused Work)
Mitigation:
- ✅ Clear milestones (1 user story per week)
- ✅ Can pause/resume if higher priority work emerges
- ✅ Each week delivers testable component
- ✅ Faster than incremental approach (4 weeks vs 10-17 weeks)
✅ Success Criteria
Must Pass Before Merging Each User Story:
- All existing tests pass WITHOUT modification
- SearchInput/SearchOutput schemas UNCHANGED
- SearchService.search() signature UNCHANGED
- All existing service interfaces UNCHANGED
- Performance benchmarks pass (<15s p95)
- No API breaking changes
- Code review approved
- 90%+ code coverage for new code
Final Success Criteria (before 100% rollout):
- All 947+ tests pass with new architecture
- Performance equal or better than current (p95 < 15s)
- 90%+ code coverage overall
- <100 lines per stage
- Clear documentation
- Zero regression bugs
- Old code removed after 2-4 weeks
📊 Integration Points - No Changes Required
| Component | Current State | New Architecture | Strategy |
|---|---|---|---|
| SearchInput/Output schemas | schemas/search_schema.py |
UNCHANGED | Pipeline produces same output |
| EmbeddingService | services/embedding_service.py |
UNCHANGED | Injected, called by QueryEnhancementStage |
| RetrievalService | services/retrieval_service.py |
UNCHANGED | Injected, called by RetrievalStage |
| CrossEncoderReranker | retrieval/reranker.py |
UNCHANGED | Injected, called by RerankingStage |
| ChainOfThoughtService | services/chain_of_thought_service.py |
UNCHANGED | Injected, called by ReasoningStage |
| GenerationService | services/generation_service.py |
UNCHANGED | Injected, called by GenerationStage |
| PipelineService | services/pipeline_service.py |
UNCHANGED | Called by PipelineResolutionStage |
| Router layer | router/search.py |
UNCHANGED | Calls SearchService.search() |
| All existing tests | 947+ tests | UNCHANGED | Must pass without modification |
📅 Timeline
Week 1: Pipeline Framework
Week 2: Core Retrieval Pipeline
Week 3: Reranking & Generation
Week 4: Migration & Rollout
Total: 4 weeks (vs 10-17 weeks for incremental approach)
🔄 Migration Validation Checklist
Before merging each User Story:
- All existing tests pass WITHOUT modification
- SearchInput/SearchOutput schemas UNCHANGED
- SearchService.search() signature UNCHANGED
- All existing services interfaces UNCHANGED
- Performance benchmarks pass (<15s p95)
- No API breaking changes
- Code review approved
- Integration tests pass
📝 Related Documentation
- Roadmap:
MASTER_ISSUES_ROADMAP.md(Section: Phase 2 Option A) - Current Performance: PR feat: Add production-grade cross-encoder reranking #548 (cross-encoder reranking)
- Related Issues: 🔴 CRITICAL: RAG Accuracy - Multiple root causes identified #540 (RAG accuracy - resolved)
- Architecture Reference: OpenAI ReAct, LangChain, LlamaIndex patterns
🤔 Decision Required
Should we proceed with this 4-week re-architecture?
Pros:
- ✅ Cleaner, more maintainable code
- ✅ Easier to test and optimize
- ✅ Faster than incremental (4 weeks vs 10-17 weeks)
- ✅ Better developer experience
- ✅ No breaking changes
Cons:
- ❌ 4 weeks of focused work
- ❌ Risk of introducing bugs (mitigated with TDD + feature flags)
- ❌ Current system already works well
Alternative: Skip Phase 2 entirely, use current implementation (already meets performance goals)
👥 Labels
epic architecture refactoring optional p2-medium needs-decision