-
Notifications
You must be signed in to change notification settings - Fork 3
Description
Phase 2: Early Modular RAG - Routing, Orchestration & Verification
Parent Epic: #256 - RAG Modulo Evolution
Depends On: #257 - Phase 1 (Advanced RAG)
Timeline: 6-8 weeks
Priority: High
Complexity: High
Overview
Phase 2 transitions RAG Modulo from Advanced RAG to Early Modular RAG by implementing intelligent routing, orchestration, and answer verification. This phase adds "intelligence" to the system - making smart decisions about when and how to retrieve, and ensuring answer quality through verification.
Current State (After Phase 1)
What We Have:
- ✅ Reranking and compression (Phase 1)
- ✅ Multi-query generation (Phase 1)
- ✅ Hierarchical chunking (Phase 1)
- ✅ Chain of Thought reasoning (🧠 Implement Chain of Thought (CoT) Reasoning for Enhanced RAG Search Quality #136)
- ✅ Automatic pipeline resolution (Improve Pipeline Association Architecture for Better UX and Flexibility #222)
What's Missing:
- ❌ Semantic routing (query → pipeline selection)
- ❌ Hard vs soft prompt detection
- ❌ Answer verification and fact-checking
- ❌ Hallucination detection
- ❌ External knowledge integration
- ❌ Knowledge graph foundation
Goals and Success Criteria
Goals
- Implement semantic router for intelligent pipeline selection
- Add query scheduler for hard/soft prompt distinction
- Build answer verification and hallucination detection
- Integrate external knowledge sources (web search fallback)
- Foundation for knowledge graph-based retrieval
Success Criteria
Quantitative:
- Routing Accuracy: >85% correct pipeline selection
- Hallucination Rate: <5% hallucinated facts
- Scheduling Efficiency: 30% reduction in unnecessary retrievals
- Verification Accuracy: >90% correct fact validation
- End-to-end Latency: <5s for complex queries
Qualitative:
- Smart resource allocation (avoid retrieval when not needed)
- Confident answers with source attribution
- Reduced false information in responses
- Better handling of out-of-domain queries
Implementation Plan
Week 7-9: Routing & Orchestration
Task 1: Semantic Router
New File: backend/rag_solution/orchestration/router.py
"""Semantic routing for pipeline selection."""
from enum import Enum
from typing import Any
from core.config import Settings
from core.logging_utils import get_logger
logger = get_logger("orchestration.router")
class PipelineStrategy(str, Enum):
"""Available pipeline strategies."""
SIMPLE = "simple" # Direct retrieval + generation
HYBRID = "hybrid" # Hybrid retrieval + reranking
CHAIN_OF_THOUGHT = "chain_of_thought" # CoT reasoning
MULTI_QUERY = "multi_query" # Multi-query expansion
class SemanticRouter:
"""Routes queries to appropriate pipeline strategy."""
def __init__(self, settings: Settings):
self.settings = settings
self.complexity_threshold = settings.routing_complexity_threshold
self.multi_query_threshold = settings.routing_multi_query_threshold
def route(self, query: str, context: dict[str, Any] | None = None) -> PipelineStrategy:
"""Determine best pipeline strategy for query."""
logger.info(f"Routing query: {query[:50]}...")
# Analyze query complexity
complexity = self._assess_complexity(query)
logger.debug(f"Query complexity: {complexity}")
# Check for multi-part questions
is_multi_part = self._is_multi_part_question(query)
# Check if CoT is needed
if complexity > self.complexity_threshold or is_multi_part:
logger.info("Routed to: CHAIN_OF_THOUGHT")
return PipelineStrategy.CHAIN_OF_THOUGHT
# Check if multi-query would help
if self._needs_multiple_perspectives(query):
logger.info("Routed to: MULTI_QUERY")
return PipelineStrategy.MULTI_QUERY
# Check if hybrid retrieval needed
if self._needs_hybrid_search(query):
logger.info("Routed to: HYBRID")
return PipelineStrategy.HYBRID
# Default to simple pipeline
logger.info("Routed to: SIMPLE")
return PipelineStrategy.SIMPLE
def _assess_complexity(self, query: str) -> float:
"""Score query complexity (0.0 = simple, 1.0 = complex)."""
complexity_score = 0.0
# Word count factor
word_count = len(query.split())
if word_count > 20:
complexity_score += 0.3
elif word_count > 10:
complexity_score += 0.1
# Question depth indicators
deep_keywords = ["why", "how", "explain", "compare", "analyze", "evaluate"]
if any(keyword in query.lower() for keyword in deep_keywords):
complexity_score += 0.3
# Multiple questions
question_marks = query.count("?")
if question_marks > 1:
complexity_score += 0.2
# Logical operators
if " and " in query.lower() or " or " in query.lower():
complexity_score += 0.2
return min(complexity_score, 1.0)
def _is_multi_part_question(self, query: str) -> bool:
"""Detect multi-part questions."""
indicators = [
query.count("?") > 1,
" and " in query.lower() and "?" in query,
any(x in query.lower() for x in ["first", "second", "also", "additionally"])
]
return any(indicators)
def _needs_multiple_perspectives(self, query: str) -> bool:
"""Check if query benefits from multiple query variations."""
# Ambiguous queries
ambiguous_terms = ["it", "this", "that", "he", "she", "they"]
has_ambiguity = any(term in query.lower().split() for term in ambiguous_terms)
# Broad queries
is_broad = len(query.split()) < 5 and "?" in query
return has_ambiguity or is_broad
def _needs_hybrid_search(self, query: str) -> bool:
"""Check if hybrid retrieval would help."""
# Specific technical terms or names
has_caps = any(word[0].isupper() for word in query.split() if len(word) > 1)
# Keyword-heavy queries
has_quotes = '"' in query or "'" in query
return has_caps or has_quotesIntegration: Update backend/rag_solution/services/search_service.py
from rag_solution.orchestration.router import SemanticRouter, PipelineStrategy
class SearchService:
def __init__(self, db: Session, settings: Settings) -> None:
# ... existing init ...
self._router: SemanticRouter | None = None
@property
def router(self) -> SemanticRouter:
"""Lazy initialization of semantic router."""
if self._router is None:
self._router = SemanticRouter(self.settings)
return self._router
async def search(self, search_input: SearchInput) -> SearchOutput:
# ROUTING STEP (NEW)
strategy = self.router.route(search_input.question)
logger.info(f"Selected strategy: {strategy}")
# Execute appropriate strategy
if strategy == PipelineStrategy.CHAIN_OF_THOUGHT:
return await self._search_with_cot(search_input)
elif strategy == PipelineStrategy.MULTI_QUERY:
return await self._search_with_multi_query(search_input)
elif strategy == PipelineStrategy.HYBRID:
return await self._search_with_hybrid(search_input)
else:
return await self._search_simple(search_input)
async def _search_with_cot(self, search_input: SearchInput) -> SearchOutput:
"""Execute CoT strategy."""
# Use existing CoT implementation
...
async def _search_with_multi_query(self, search_input: SearchInput) -> SearchOutput:
"""Execute multi-query strategy."""
# Use Phase 1 multi-query implementation
...
async def _search_with_hybrid(self, search_input: SearchInput) -> SearchOutput:
"""Execute hybrid retrieval strategy."""
# Use Phase 1 hybrid retrieval with reranking
...
async def _search_simple(self, search_input: SearchInput) -> SearchOutput:
"""Execute simple vector retrieval strategy."""
# Basic vector search + generation
...Configuration: Update backend/core/config.py
class Settings(BaseSettings):
# ... existing settings ...
# Routing settings
enable_semantic_routing: bool = Field(default=False, description="Enable semantic routing")
routing_complexity_threshold: float = Field(default=0.7, ge=0.0, le=1.0, description="Complexity threshold for CoT")
routing_multi_query_threshold: float = Field(default=0.5, ge=0.0, le=1.0, description="Threshold for multi-query")Testing: Create backend/tests/unit/test_semantic_router.py
import pytest
from rag_solution.orchestration.router import SemanticRouter, PipelineStrategy
class TestSemanticRouter:
@pytest.fixture
def router(self, settings):
settings.routing_complexity_threshold = 0.7
return SemanticRouter(settings)
def test_simple_query_routed_to_simple(self, router):
query = "What is AI?"
strategy = router.route(query)
assert strategy == PipelineStrategy.SIMPLE
def test_complex_query_routed_to_cot(self, router):
query = "How does machine learning work and what are the key differences between supervised and unsupervised learning?"
strategy = router.route(query)
assert strategy == PipelineStrategy.CHAIN_OF_THOUGHT
def test_multi_part_routed_to_cot(self, router):
query = "What is deep learning? How does it differ from traditional ML?"
strategy = router.route(query)
assert strategy == PipelineStrategy.CHAIN_OF_THOUGHT
def test_ambiguous_query_routed_to_multi_query(self, router):
query = "What is it?" # Ambiguous "it"
strategy = router.route(query)
assert strategy == PipelineStrategy.MULTI_QUERY
def test_technical_query_routed_to_hybrid(self, router):
query = "Find information about TensorFlow and PyTorch"
strategy = router.route(query)
assert strategy == PipelineStrategy.HYBRIDTask 2: Query Scheduler (Hard vs Soft Prompts)
New File: backend/rag_solution/orchestration/scheduler.py
"""Query scheduling for retrieval decisions."""
from typing import Any
from core.config import Settings
from core.logging_utils import get_logger
logger = get_logger("orchestration.scheduler")
class QueryScheduler:
"""Decides if retrieval is needed (hard vs soft prompt)."""
def __init__(self, settings: Settings):
self.settings = settings
def should_retrieve(
self,
query: str,
context: dict[str, Any] | None = None
) -> bool:
"""
Determine if retrieval is necessary.
Hard prompts: Need retrieval (domain-specific, factual)
Soft prompts: LLM knowledge sufficient (general, creative)
"""
logger.info(f"Scheduling query: {query[:50]}...")
# Check for explicit retrieval indicators
if self._has_retrieval_indicators(query):
logger.info("Decision: RETRIEVE (explicit indicators)")
return True
# Check for general knowledge questions
if self._is_general_knowledge(query):
logger.info("Decision: SKIP RETRIEVAL (general knowledge)")
return False
# Check for factual/specific questions
if self._is_factual_query(query):
logger.info("Decision: RETRIEVE (factual query)")
return True
# Check for creative/opinion questions
if self._is_creative_query(query):
logger.info("Decision: SKIP RETRIEVAL (creative query)")
return False
# Default: retrieve
logger.info("Decision: RETRIEVE (default)")
return True
def _has_retrieval_indicators(self, query: str) -> bool:
"""Check for explicit retrieval indicators."""
indicators = [
"according to the document",
"in the file",
"from the collection",
"search for",
"find information about",
"based on the data"
]
return any(ind in query.lower() for ind in indicators)
def _is_general_knowledge(self, query: str) -> bool:
"""Check if query is general knowledge."""
general_patterns = [
"what is",
"define",
"who is",
"when was",
"where is"
]
# Only if short and starts with general pattern
is_short = len(query.split()) <= 10
starts_general = any(query.lower().startswith(p) for p in general_patterns)
# But not if it mentions specific domain terms
has_specific_terms = any(
term in query.lower()
for term in context.get("domain_terms", []) if context
)
return is_short and starts_general and not has_specific_terms
def _is_factual_query(self, query: str) -> bool:
"""Check if query requires factual data."""
factual_keywords = [
"how many", "how much", "when did", "what are the steps",
"procedure", "process", "statistics", "data", "numbers",
"specific", "details", "exact"
]
return any(kw in query.lower() for kw in factual_keywords)
def _is_creative_query(self, query: str) -> bool:
"""Check if query is creative/opinion-based."""
creative_keywords = [
"imagine", "suppose", "what if", "create", "generate",
"write a", "compose", "opinion", "think", "feel"
]
return any(kw in query.lower() for kw in creative_keywords)Integration: Update SearchService
from rag_solution.orchestration.scheduler import QueryScheduler
class SearchService:
def __init__(self, db: Session, settings: Settings) -> None:
# ... existing init ...
self._scheduler: QueryScheduler | None = None
@property
def scheduler(self) -> QueryScheduler:
if self._scheduler is None:
self._scheduler = QueryScheduler(self.settings)
return self._scheduler
async def search(self, search_input: SearchInput) -> SearchOutput:
# SCHEDULING STEP (NEW)
should_retrieve = self.scheduler.should_retrieve(
search_input.question,
context={"domain_terms": self._get_collection_domain_terms(search_input.collection_id)}
)
if not should_retrieve:
logger.info("Skipping retrieval - using LLM knowledge only")
return await self._generate_without_retrieval(search_input)
# ... continue with routing and retrieval ...Week 10-12: Verification & Knowledge Enhancement
Task 3: Answer Verification
New File: backend/rag_solution/generation/verifier.py
"""Answer verification and hallucination detection."""
from typing import Any
from vectordbs.data_types import QueryResult
from core.config import Settings
from core.logging_utils import get_logger
logger = get_logger("generation.verifier")
class AnswerVerifier:
"""Verify answer quality and detect hallucinations."""
def __init__(self, settings: Settings):
self.settings = settings
self.confidence_threshold = settings.verification_confidence_threshold
def verify(
self,
question: str,
answer: str,
sources: list[QueryResult]
) -> dict[str, Any]:
"""
Verify answer against sources.
Returns:
{
"is_verified": bool,
"confidence": float,
"supported_claims": list[str],
"unsupported_claims": list[str],
"attribution": dict[str, list[str]]
}
"""
logger.info("Verifying answer against sources")
# Extract claims from answer
claims = self._extract_claims(answer)
logger.debug(f"Extracted {len(claims)} claims")
# Check each claim against sources
verification_results = []
for claim in claims:
is_supported = self._check_claim_support(claim, sources)
verification_results.append({
"claim": claim,
"supported": is_supported,
"sources": self._find_supporting_sources(claim, sources) if is_supported else []
})
# Calculate overall confidence
supported_count = sum(1 for r in verification_results if r["supported"])
confidence = supported_count / len(claims) if claims else 1.0
is_verified = confidence >= self.confidence_threshold
return {
"is_verified": is_verified,
"confidence": confidence,
"supported_claims": [r["claim"] for r in verification_results if r["supported"]],
"unsupported_claims": [r["claim"] for r in verification_results if not r["supported"]],
"attribution": self._build_attribution(verification_results)
}
def _extract_claims(self, answer: str) -> list[str]:
"""Extract factual claims from answer."""
# Simple sentence splitting
import re
sentences = re.split(r'[.!?]+', answer)
# Filter out non-factual sentences
claims = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) > 10 and not self._is_opinion(sentence):
claims.append(sentence)
return claims
def _is_opinion(self, sentence: str) -> bool:
"""Check if sentence is opinion vs fact."""
opinion_indicators = ["i think", "in my opinion", "arguably", "perhaps", "might", "could"]
return any(ind in sentence.lower() for ind in opinion_indicators)
def _check_claim_support(self, claim: str, sources: list[QueryResult]) -> bool:
"""Check if claim is supported by sources."""
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
if not sources:
return False
# Compute similarity between claim and all sources
source_texts = [s.text for s in sources]
all_texts = [claim] + source_texts
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(all_texts)
# Similarity between claim (index 0) and sources (indices 1+)
similarities = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:])[0]
# Check if any source has high similarity
max_similarity = max(similarities) if similarities.size > 0 else 0
return max_similarity > 0.5 # Threshold for support
def _find_supporting_sources(
self,
claim: str,
sources: list[QueryResult]
) -> list[str]:
"""Find which sources support a claim."""
supporting = []
for source in sources:
if self._check_claim_support(claim, [source]):
supporting.append(source.document_id)
return supporting
def _build_attribution(
self,
verification_results: list[dict[str, Any]]
) -> dict[str, list[str]]:
"""Build claim → source attribution mapping."""
attribution = {}
for result in verification_results:
if result["supported"] and result["sources"]:
attribution[result["claim"]] = result["sources"]
return attributionIntegration: Update SearchService
from rag_solution.generation.verifier import AnswerVerifier
class SearchService:
def __init__(self, db: Session, settings: Settings) -> None:
# ... existing init ...
self._verifier: AnswerVerifier | None = None
@property
def verifier(self) -> AnswerVerifier | None:
if self.settings.enable_verification and self._verifier is None:
self._verifier = AnswerVerifier(self.settings)
return self._verifier
async def search(self, search_input: SearchInput) -> SearchOutput:
# ... existing retrieval and generation ...
# VERIFICATION STEP (NEW)
if self.verifier:
logger.info("Verifying answer against sources")
verification = self.verifier.verify(
search_input.question,
answer_text,
query_results
)
if not verification["is_verified"]:
logger.warning(f"Answer not verified (confidence: {verification['confidence']})")
# Add warning to response
search_output.metadata["verification"] = verification
search_output.metadata["confidence"] = verification["confidence"]
# Add claim attribution to response
search_output.metadata["attribution"] = verification["attribution"]
return search_outputConfiguration:
class Settings(BaseSettings):
# ... existing settings ...
# Verification settings
enable_verification: bool = Field(default=False, description="Enable answer verification")
verification_confidence_threshold: float = Field(default=0.7, ge=0.0, le=1.0, description="Minimum confidence for verified answers")Testing: Create backend/tests/unit/test_verifier.py
Files to Create/Modify
New Files (Phase 2)
backend/rag_solution/orchestration/__init__.pybackend/rag_solution/orchestration/router.py(~300 lines)backend/rag_solution/orchestration/scheduler.py(~200 lines)backend/rag_solution/generation/verifier.py(~300 lines)backend/rag_solution/knowledge_graph/__init__.pybackend/rag_solution/knowledge_graph/graph_builder.py(~400 lines)- Test files (~1000 lines total):
backend/tests/unit/test_semantic_router.pybackend/tests/unit/test_query_scheduler.pybackend/tests/unit/test_answer_verifier.pybackend/tests/unit/test_knowledge_graph.pybackend/tests/integration/test_phase2_orchestration.pybackend/tests/integration/test_phase2_verification.pybackend/tests/performance/test_phase2_benchmarks.py
Modified Files (Phase 2)
backend/core/config.py- Add Phase 2 settingsbackend/rag_solution/services/search_service.py- Integrate routing, scheduling, verificationbackend/rag_solution/schemas/search_schema.py- Add verification metadatabackend/pyproject.toml- Add dependencies (networkx for graphs)
Total Estimate: ~2,500 lines new code, ~400 lines modifications
Testing Strategy
Unit Tests
- Router: Test all routing decisions
- Scheduler: Test hard/soft prompt classification
- Verifier: Test claim extraction and verification
- Knowledge graph: Test entity extraction and linking
Integration Tests
- Full pipeline with routing
- Verification integration
- Scheduler integration
Performance Benchmarks
- Routing latency < 100ms
- Routing accuracy > 85%
- Verification accuracy > 90%
- Hallucination detection > 90%
Acceptance Criteria
- All unit tests passing (>90% coverage)
- Routing accuracy > 85%
- Hallucination detection > 90%
- 30% reduction in unnecessary retrievals
- Documentation complete
- Phase 1 functionality preserved
Related Issues
- Parent: Epic: RAG Modulo Evolution - Naive → Advanced → Modular RAG Architecture #256 - RAG Modulo Evolution Epic
- Depends On: Phase 1: Complete Advanced RAG - Reranking, Compression & Query Enhancement #257 - Phase 1 (Advanced RAG)
- Next: #TBD - Phase 3 (Full Modular RAG)
Next Phase
After Phase 2, proceed to Phase 3: Full Modular RAG with retriever fine-tuning and advanced indexing.