A modular, distributed task processing system built using FastAPI, Celery, and Redis, designed to handle background jobs asynchronously and improve application responsiveness.
This system offloads time-consuming tasks (like sending emails, resizing images, generating PDFs, etc.) to background workers so that your API remains fast and responsive.
In traditional synchronous systems, long-running operations block the user request until completion. This hurts performance and user experience.
Example:
When a user signs up and you want to send a welcome email:
- ❌ Bad UX: Wait for email to be sent before responding
- ✅ Better UX: Respond immediately, send email in background via this task queue
- FastAPI: Modern async web framework for task enqueuing & status APIs
- Celery: Distributed task queue to run background jobs
- Redis: In-memory data store used as a message broker & result backend
- Python 3.10+: Programming language
- Logging: Logs tasks to
logs.txtand console - Docker (optional): For containerized deployment
This project consists of two independent services:
Provides a /enqueue route to add jobs to the queue.
type: Required – task type (e.g.,send_email)retries: Number of retry attempts if task failspayload: Task data in key-value pairs (flexible schema)
{
"task_id": "f42c6f3e-0bb0-4cb0-8222-...",
"status": "enqueued",
"message": "Task send_email has been added to the queue"
}Executes tasks from the Redis queue reliably and logs results.
Access /metrics to get:
{
"total_jobs_in_queue": 1,
"jobs_done": 120,
"jobs_failed": 3
}Call /task/{task_id} to get:
{
"task_id": "f42c6f3e...",
"status": "SUCCESS",
"result": "Email sent to worldisweird2020@gmail.com"
}Current tasks (inside app/tasks.py) include:
- ✅
send_email - ✅
resize_image - ✅
generate_pdf
Easily extensible: Just add new logic inside the process_task() function's if/elif block.
if task_type == "new_type":
return do_new_type(payload)- All task events are logged to
logs.txt - Console output for live debugging
- Includes task type, payload, and error stack trace (if failed)
uvicorn app.main:app --reloadcelery -A app.celery_app.celery_app worker --loglevel=infoUse docker-compose to run everything together:
docker-compose up --build.
├── app/
│ ├── main.py # FastAPI producer
│ ├── tasks.py # Celery task handlers
│ ├── celery_app.py # Celery configuration
│ ├── models.py # Pydantic models for request/response
│
├── logs.txt # Task logs
├── Dockerfile
├── docker-compose.yml
└── README.md
- Redis-backed asynchronous job queue
- Task retry policy with exponential backoff
- Modular task structure (easy to extend)
- Metrics for job queue and outcomes
- Structured logs with timestamps
- Docker-compatible
- Email & Notification systems
- Image or video processing
- Report generation
- API rate throttling / queuing
- Offloading costly operations in microservices
Feel free to fork, add new job types, or extend the metrics system. PRs welcome!
MIT License