Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,4 @@ cython_debug/
.pypirc

# Local Netlify folder
.netlify
.netlify
3 changes: 3 additions & 0 deletions backend/app/core/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class Settings(BaseSettings):
classification_agent_model: str = "gemini-1.5-flash"
agent_timeout: int = 30
max_retries: int = 3

# RabbitMQ configuration
rabbitmq_url: str = ""

# Backend URL
backend_url: str = ""
Expand Down
119 changes: 64 additions & 55 deletions backend/app/core/orchestration/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from typing import Dict, Any, Callable, Optional
from datetime import datetime
from enum import Enum
import aio_pika
import json
from app.core.config import settings

logger = logging.getLogger(__name__)

Expand All @@ -12,27 +15,45 @@ class QueuePriority(str, Enum):
LOW = "low"

class AsyncQueueManager:
"""AsyncIO-based queue manager for agent orchestration"""
"""Queue manager for agent orchestration"""

def __init__(self):
self.queues = {
QueuePriority.HIGH: asyncio.Queue(),
QueuePriority.MEDIUM: asyncio.Queue(),
QueuePriority.LOW: asyncio.Queue()
QueuePriority.HIGH: 'high_task_queue',
QueuePriority.MEDIUM: 'medium_task_queue',
QueuePriority.LOW: 'low_task_queue'
}
self.handlers: Dict[str, Callable] = {}
self.running = False
self.worker_tasks = []
self.connection: Optional[aio_pika.RobustConnection] = None
self.channel: Optional[aio_pika.abc.AbstractChannel] = None



async def connect(self):
try:
rabbitmq_url = getattr(settings, 'rabbitmq_url', 'amqp://guest:guest@localhost/')
self.connection = await aio_pika.connect_robust(rabbitmq_url)
self.channel = await self.connection.channel()
# Declare queues
for queue_name in self.queues.values():
await self.channel.declare_queue(queue_name, durable=True)
logger.info("Successfully connected to RabbitMQ")
except Exception as e:
logger.error(f"Failed to connect to RabbitMQ: {e}")
raise

async def start(self, num_workers: int = 3):
"""Start the queue processing workers"""
await self.connect()
self.running = True

for i in range(num_workers):
task = asyncio.create_task(self._worker(f"worker-{i}"))
self.worker_tasks.append(task)

logger.info(f"Started {num_workers} queue workers")
logger.info(f"Started {num_workers} async queue workers")

async def stop(self):
"""Stop the queue processing"""
Expand All @@ -43,7 +64,11 @@ async def stop(self):
task.cancel()

await asyncio.gather(*self.worker_tasks, return_exceptions=True)
logger.info("Stopped all queue workers")
if self.channel:
await self.channel.close()
if self.connection:
await self.connection.close()
logger.info("Stopped all queue workers and closed connection")

async def enqueue(self,
message: Dict[str, Any],
Expand All @@ -56,13 +81,15 @@ async def enqueue(self,

queue_item = {
"id": message.get("id", f"msg_{datetime.now().timestamp()}"),
"timestamp": datetime.now().isoformat(),
"priority": priority,
"data": message
}

await self.queues[priority].put(queue_item)
logger.debug(f"Enqueued message {queue_item['id']} with priority {priority}")
json_message = json.dumps(queue_item).encode()
await self.channel.default_exchange.publish(
aio_pika.Message(body=json_message),
routing_key=self.queues[priority]
)
logger.info(f"Enqueued message {queue_item['id']} with priority {priority}")

def register_handler(self, message_type: str, handler: Callable):
"""Register a handler for a specific message type"""
Expand All @@ -72,50 +99,29 @@ def register_handler(self, message_type: str, handler: Callable):
async def _worker(self, worker_name: str):
"""Worker coroutine to process queue items"""
logger.info(f"Started queue worker: {worker_name}")

# Each worker listens to all queues by priority
queues = [
await self.channel.declare_queue(self.queues[priority], durable=True)
for priority in [QueuePriority.HIGH, QueuePriority.MEDIUM, QueuePriority.LOW]
]
while self.running:
try:
# Process queues by priority
item = await self._get_next_item()

if item:
await self._process_item(item, worker_name)
else:
# No items available, wait a bit
await asyncio.sleep(0.1)

except asyncio.CancelledError:
logger.info(f"Worker {worker_name} cancelled")
break
except (ConnectionError, TimeoutError) as e:
logger.error(f"Connection error in worker {worker_name}: {str(e)}")
await asyncio.sleep(5) # Longer pause for connection issues
except Exception as e:
logger.error(f"Unexpected error in worker {worker_name}: {str(e)}")
await asyncio.sleep(1) # Brief pause on error

async def _get_next_item(self) -> Optional[Dict[str, Any]]:
"""Get the next item from queues (priority-based)"""

# Try high priority first
try:
return self.queues[QueuePriority.HIGH].get_nowait()
except asyncio.QueueEmpty:
pass

# Then medium priority
try:
return self.queues[QueuePriority.MEDIUM].get_nowait()
except asyncio.QueueEmpty:
pass

# Finally low priority
try:
return self.queues[QueuePriority.LOW].get_nowait()
except asyncio.QueueEmpty:
pass

return None
for queue in queues:
try:
message = await queue.get(no_ack=False, fail=False)
if message:
try:
item = json.loads(message.body.decode())
await self._process_item(item, worker_name)
await message.ack()
except Exception as e:
logger.error(f"Error processing message: {e}")
await message.nack(requeue=False)
except asyncio.CancelledError:
logger.info(f"Worker {worker_name} cancelled")
return
except Exception as e:
logger.error(f"Worker {worker_name} error: {e}")
await asyncio.sleep(0.1)

async def _process_item(self, item: Dict[str, Any], worker_name: str):
"""Process a queue item"""
Expand All @@ -127,9 +133,12 @@ async def _process_item(self, item: Dict[str, Any], worker_name: str):

if handler:
logger.debug(f"Worker {worker_name} processing {item['id']} (type: {message_type})")
await handler(message_data)
if asyncio.iscoroutinefunction(handler):
await handler(message_data)
else:
handler(message_data)
else:
logger.warning(f"No handler found for message type: {message_type}")

except Exception as e:
logger.error(f"Error processing item {item['id']}: {str(e)}")
logger.error(f"Error processing item {item.get('id', 'unknown')}: {str(e)}")
2 changes: 1 addition & 1 deletion backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,4 @@ async def favicon():
host="0.0.0.0",
port=8000,
reload=True
)
)
Loading