Skip to content

Improve Pipeline Association Architecture for Better UX and Flexibility #222

@manavgup

Description

@manavgup

Improve Pipeline Association Architecture for Better UX and Flexibility

🎯 ARCHITECTURAL DECISION: Remove pipeline_id from SearchInput Schema

After comprehensive codebase analysis, the optimal approach is to remove pipeline_id from SearchInput entirely and implement automatic backend pipeline resolution based on user context.

Current vs. New Architecture

Current SearchInput (Complex):

class SearchInput(BaseModel):
    question: str
    collection_id: UUID4
    pipeline_id: UUID4           # ← CREATES API COMPLEXITY
    user_id: UUID4
    config_metadata: dict[str, Any] | None = None

New SearchInput (Simple):

class SearchInput(BaseModel):
    question: str
    collection_id: UUID4
    user_id: UUID4
    config_metadata: dict[str, Any] | None = None
    # NO pipeline_id - backend resolves automatically

Pipeline Resolution Strategy

Use Existing Architecture:

The PipelineConfig model already has the necessary fields:

class PipelineConfig(Base):
    user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"), nullable=False)
    is_default: Mapped[bool] = mapped_column(Boolean, default=False)  # ← USE THIS

Resolution Hierarchy:

  1. User's default pipeline (query: user_id=X AND is_default=True)
  2. System default pipeline (fallback when user has no default)

No collection-pipeline coupling - architecturally cleaner approach.

COMPREHENSIVE IMPACT ANALYSIS

1. Service Layer Changes

📁 rag_solution/services/search_service.py

Current Issues:

  • Line 239: self._validate_pipeline(search_input.pipeline_id)
  • Expects explicit pipeline_id in search method

Required Changes:

class SearchService:
    def _resolve_user_default_pipeline(self, user_id: UUID4) -> UUID4:
        """Resolve pipeline using existing is_default flag approach."""
        default_pipeline = self.pipeline_service.get_default_pipeline(user_id)
        if default_pipeline:
            return default_pipeline.id
        
        # Auto-create default pipeline for user if none exists
        provider = self.llm_provider_service.get_default_provider()
        default_pipeline = self.pipeline_service.initialize_user_pipeline(user_id, provider.id)
        return default_pipeline.id
        
    async def search(self, search_input: SearchInput) -> SearchOutput:
        # Remove: self._validate_pipeline(search_input.pipeline_id)
        # Add: resolved_pipeline_id = self._resolve_user_default_pipeline(search_input.user_id)
        # Add: self._validate_pipeline(resolved_pipeline_id)
        # Update: Pass resolved_pipeline_id to execute_pipeline

📁 rag_solution/services/pipeline_service.py

Current Issues:

  • Line ~641: search_input.pipeline_id in execute_pipeline method ❌
  • get_default_pipeline(user_id, collection_id) has unnecessary collection_id parameter

Required Changes:

# 1. Simplify get_default_pipeline signature:
def get_default_pipeline(self, user_id: UUID4) -> PipelineConfigOutput | None:
    """Get user's default pipeline using is_default flag."""
    return self.pipeline_repository.get_user_default(user_id)

# 2. Update execute_pipeline to accept resolved pipeline_id:
async def execute_pipeline(
    self, 
    search_input: SearchInput, 
    collection_name: str,
    pipeline_id: UUID4  # ← ADD THIS PARAMETER
) -> PipelineResult:
    # Remove: search_input.pipeline_id references
    # Use: pipeline_id parameter instead
    pipeline_config, llm_parameters_input, provider = self._validate_configuration(
        pipeline_id, search_input.user_id  # ← Use parameter instead of search_input.pipeline_id
    )

2. CLI Layer Changes

📁 rag_solution/cli/commands/search.py

Current Issues:

  • Lines 56-72: Complex pipeline resolution logic in CLI ❌
  • Line 79: "pipeline_id": pipeline_id in request data ❌
  • CLI fetches user pipelines and resolves defaults client-side

Required Changes:

def query(self, collection_id: str, query: str, max_chunks: int = 5) -> CommandResult:
    # Remove: pipeline_id parameter entirely from method signature
    # Remove: Lines 56-72 pipeline fetching and resolution logic
    # Simplify: Let backend handle all pipeline resolution
    
    self._require_authentication()
    
    try:
        # Get current user ID
        current_user = self.api_client.get("/api/auth/me")
        user_id = current_user.get("uuid") or current_user.get("id")
        
        # Simple request - backend resolves pipeline
        data = {
            "question": query,
            "collection_id": collection_id,
            "user_id": user_id,
            "config_metadata": {"max_chunks": max_chunks},
            # NO pipeline_id - backend handles it automatically
        }
        
        response = self.api_client.post("/api/search", data=data)
        return self._create_success_result(data=response, message="Search completed successfully")

Impact on Other CLI Methods:

  • batch_search() - Remove pipeline_id parameter and logic
  • explain(), semantic_search(), hybrid_search() - May need updates if they use SearchInput

3. Router/API Layer Changes

📁 rag_solution/router/search_router.py

Current State:No changes required
Reason: Router just passes SearchInput to service - will automatically work with new schema

4. Test Layer Changes (Extensive)

Files with SearchInput Usage (35+ files affected):

📁 tests/unit/test_simple_unit.py

  • MyPy error: Unexpected keyword argument "pipeline_id" for "SearchInput"

📁 tests/atomic/test_search_validation.py

  • Multiple SearchInput instantiations with pipeline_id parameter ❌

📁 tests/e2e/test_search_service_real.py

  • 6+ SearchInput instantiations with pipeline_id ❌

📁 tests/e2e/test_rag_search_functionality.py

  • 6+ SearchInput instantiations with pipeline_id ❌

📁 tests/e2e/test_pipeline_service_real.py

  • 3+ SearchInput instantiations with pipeline_id ❌

📁 tests/unit/test_search_service_tdd.py

  • SearchInput creation with pipeline_id in TDD tests ❌

Required Test Updates:

# OLD (Fails with new schema):
search_input = SearchInput(
    question="test",
    collection_id=uuid4(),
    pipeline_id=uuid4(),  # ← REMOVE THIS
    user_id=uuid4()
)

# NEW (Works with simplified schema):
search_input = SearchInput(
    question="test",
    collection_id=uuid4(),
    user_id=uuid4()
    # Backend resolves pipeline automatically
)

5. Repository Layer Updates

📁 rag_solution/repository/pipeline_repository.py

Verify/Update Methods:

def get_user_default(self, user_id: UUID4) -> PipelineConfigOutput | None:
    """Get user's default pipeline using is_default=True flag."""
    # Should query: user_id=user_id AND is_default=True
    # Verify this method works correctly
    
# Evaluate: get_collection_default method
def get_collection_default(self, collection_id: UUID4) -> PipelineConfigOutput | None:
    # This method may no longer be needed with new architecture
    # Collection-pipeline coupling removed

IMPLEMENTATION PHASES

Phase 1: Core Schema & Service Changes

  1. ✅ Remove pipeline_id from SearchInput schema
  2. ❌ Update SearchService with _resolve_user_default_pipeline() method
  3. ❌ Update PipelineService.execute_pipeline signature/logic
  4. ❌ Simplify PipelineService.get_default_pipeline (remove collection_id param)

Phase 2: CLI Simplification

  1. ❌ Remove pipeline_id parameter from CLI search commands
  2. ❌ Remove client-side pipeline resolution logic (lines 56-72)
  3. ❌ Update CLI help documentation and method signatures

Phase 3: Comprehensive Test Updates

  1. ❌ Update 35+ test files that create SearchInput objects
  2. ❌ Remove pipeline_id from all SearchInput instantiations
  3. ❌ Add tests for new backend pipeline resolution flow
  4. ❌ Verify all existing functionality works with resolved pipelines

Phase 4: Validation & Documentation

  1. ❌ Ensure all MyPy errors are resolved
  2. ❌ Update API documentation to reflect simplified search
  3. ❌ Test end-to-end user workflows

FILES REQUIRING CHANGES

Core Service Files:

  • rag_solution/schemas/search_schema.py (pipeline_id removed)
  • rag_solution/services/search_service.py (add pipeline resolution)
  • rag_solution/services/pipeline_service.py (update execute_pipeline)

CLI Files:

  • rag_solution/cli/commands/search.py (remove pipeline_id logic)

Test Files (35+ files):

  • tests/unit/test_simple_unit.py
  • tests/unit/test_search_service_tdd.py
  • tests/atomic/test_search_validation.py
  • tests/e2e/test_search_service_real.py
  • tests/e2e/test_rag_search_functionality.py
  • tests/e2e/test_pipeline_service_real.py
  • All other test files that instantiate SearchInput

Repository Files (Verification):

  • 🤔 rag_solution/repository/pipeline_repository.py (verify get_user_default)

ARCHITECTURAL BENEFITS

🎯 Simplified User Experience

  • Immediate Search: Users can search right after uploading documents
  • No Pipeline Setup: No mandatory pipeline configuration
  • Clean API: Search becomes simply question + collection_id + user_id

🏗️ Better Architecture

  • Clear Separation: Search logic separated from pipeline management
  • Backend Resolution: Pipeline complexity hidden from API consumers
  • User-Centric: Pipelines belong to users, not collections

🔧 Developer Experience

  • Simpler CLI: No complex pipeline fetching in CLI commands
  • Easier Testing: No need to mock pipeline selection in tests
  • Cleaner API: Fewer parameters, more intuitive interface

RISK ASSESSMENT

Breaking Changes:

  • API Schema Change: SearchInput no longer accepts pipeline_id
  • CLI Interface Change: Search commands lose pipeline_id parameter
  • Test Updates: Extensive test file modifications required

Mitigation Strategy:

  • Comprehensive testing before deployment
  • Clear migration documentation for API consumers
  • Systematic test updates across all affected files
  • Validation that all existing workflows continue to work

SUCCESS METRICS

  • All MyPy type errors resolved (35+ current errors)
  • All existing tests pass with updated SearchInput schema
  • New users can search immediately without pipeline setup
  • CLI search works without explicit pipeline specification
  • Backend pipeline resolution handles edge cases (no default, etc.)
  • Performance remains consistent
  • API documentation reflects simplified interface

PRIORITY: HIGH

This architectural change eliminates the core UX friction while maintaining system flexibility. The comprehensive scope requires systematic coordination across service layer, CLI, and extensive test suite updates.

Key Insight: Removing pipeline_id from SearchInput forces the entire system to properly separate search concerns from pipeline configuration, resulting in better architecture and dramatically improved user experience.

Next Steps:

  1. Implement service layer pipeline resolution logic
  2. Update CLI to rely on backend resolution
  3. Systematically update all 35+ affected test files
  4. Validate end-to-end user workflows work correctly

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions