Skip to content

🟡 [P0] Implement background job status tracking for async operations #449

@manavgup

Description

@manavgup

Priority

🟡 P0 - Critical (Silent failures blocking production)

Problem

Background tasks (reindexing, document ingestion) fail silently with no user visibility. Users have no way to track progress or see error details.

Current Behavior

  1. User clicks "Reindex Collection"
  2. Request returns 200 OK immediately
  3. Background task fails → No UI feedback
  4. User assumes success → Data inconsistency

Impact

  • Complete breakdown of user trust
  • No way to debug production failures
  • Users don't know when ingestion completes

Solution - Phase 2 (8 hours)

1. Create Job Status Table

-- backend/rag_solution/models/background_job.py
CREATE TABLE background_jobs (
    id UUID PRIMARY KEY,
    job_type VARCHAR(50),  -- 'reindex', 'ingest', 'export'
    collection_id UUID,
    user_id UUID,
    status VARCHAR(20),  -- 'queued', 'running', 'completed', 'failed'
    progress INT DEFAULT 0,  -- 0-100
    error_details JSONB,
    started_at TIMESTAMP,
    completed_at TIMESTAMP,
    metadata JSONB
);

2. Update Collection Service

# backend/rag_solution/services/collection_service.py
async def reindex_collection(self, collection_id: UUID, user_id: UUID) -> UUID:
    """Reindex with job tracking"""
    job_id = uuid4()
    
    # Create job record
    await self.job_repo.create_job(
        job_id=job_id,
        job_type="reindex",
        collection_id=collection_id,
        user_id=user_id,
        status="queued"
    )
    
    try:
        await self._update_job_status(job_id, "running", progress=0)
        
        # Process in batches with progress updates
        for i, batch in enumerate(document_batches):
            await self.process_batch(batch)
            progress = ((i + 1) / total_batches) * 100
            await self._update_job_status(job_id, "running", progress=progress)
        
        await self._update_job_status(job_id, "completed", progress=100)
        
    except Exception as e:
        await self._update_job_status(
            job_id, "failed",
            error_details={
                "error_type": type(e).__name__,
                "message": str(e),
                "traceback": traceback.format_exc()
            }
        )
        raise
    
    return job_id

3. Create Job Status API

# backend/rag_solution/router/job_router.py (NEW)
@router.get("/api/jobs/{job_id}")
async def get_job_status(job_id: UUID):
    return await job_service.get_job_status(job_id)

@router.get("/api/jobs")
async def list_jobs(user_id: UUID, status: str | None = None):
    return await job_service.list_jobs(user_id, status)

4. Add WebSocket Notifications

# backend/rag_solution/websocket/job_notifications.py (NEW)
@router.websocket("/ws/jobs/{job_id}")
async def job_status_websocket(websocket: WebSocket, job_id: UUID):
    await websocket.accept()
    
    while True:
        job = await job_service.get_job_status(job_id)
        await websocket.send_json(job.dict())
        
        if job.status in ["completed", "failed"]:
            break
        
        await asyncio.sleep(1)  # Poll every second

Acceptance Criteria

  • background_jobs table created with migration
  • Job creation on all background tasks (reindex, ingest, export)
  • Progress updates at 10% increments minimum
  • Error details captured with full context
  • REST API endpoints for job status
  • WebSocket endpoint for real-time updates
  • Jobs automatically cleaned up after 7 days

Performance Considerations

  • Job updates batched (max 1 update per second)
  • WebSocket connections auto-close on completion
  • Old jobs auto-archived to prevent table bloat

Testing

# Unit tests
make test testfile=tests/unit/test_job_service.py

# Integration test
make test testfile=tests/integration/test_background_jobs.py

Effort

8 hours

Related Issues

Files to Create/Modify

  • backend/rag_solution/models/background_job.py (new)
  • backend/rag_solution/repository/job_repository.py (new)
  • backend/rag_solution/services/job_service.py (new)
  • backend/rag_solution/router/job_router.py (new)
  • backend/rag_solution/websocket/job_notifications.py (new)
  • backend/rag_solution/services/collection_service.py
  • backend/alembic/versions/XXX_add_background_jobs.py (migration)

Metadata

Metadata

Assignees

No one assigned

    Labels

    backendBackend/API relatedenhancementNew feature or requestinfrastructureInfrastructure and deploymentpriority:criticalCritical priority - blocks production

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions