diff --git a/backend/app/agents/devrel/agent.py b/backend/app/agents/devrel/agent.py index 4dd2af0f..2b356b2b 100644 --- a/backend/app/agents/devrel/agent.py +++ b/backend/app/agents/devrel/agent.py @@ -43,7 +43,8 @@ def _build_graph(self): # Phase 2: ReAct Supervisor - Decide what to do next workflow.add_node("react_supervisor", partial(react_supervisor_node, llm=self.llm)) workflow.add_node("web_search_tool", partial(web_search_tool_node, search_tool=self.search_tool, llm=self.llm)) - workflow.add_node("faq_handler_tool", partial(faq_handler_tool_node, faq_tool=self.faq_tool)) + workflow.add_node("faq_handler_tool", partial( + faq_handler_tool_node, search_tool=self.search_tool, llm=self.llm)) workflow.add_node("onboarding_tool", onboarding_tool_node) workflow.add_node("github_toolkit_tool", partial(github_toolkit_tool_node, github_toolkit=self.github_toolkit)) diff --git a/backend/app/agents/devrel/nodes/handlers/faq.py b/backend/app/agents/devrel/nodes/handlers/faq.py index 8855c323..995c10d6 100644 --- a/backend/app/agents/devrel/nodes/handlers/faq.py +++ b/backend/app/agents/devrel/nodes/handlers/faq.py @@ -1,11 +1,13 @@ import logging +from typing import List, Dict from app.agents.state import AgentState +from langchain_core.messages import HumanMessage logger = logging.getLogger(__name__) -async def handle_faq_node(state: AgentState, faq_tool) -> dict: - """Handle FAQ requests""" - logger.info(f"Handling FAQ for session {state.session_id}") +async def handle_faq_node(state: AgentState, search_tool, llm) -> dict: + """Handle FAQ requests dynamically using web search and AI synthesis""" + logger.info(f"Handling dynamic FAQ for session {state.session_id}") latest_message = "" if state.messages: @@ -13,14 +15,141 @@ async def handle_faq_node(state: AgentState, faq_tool) -> dict: elif state.context.get("original_message"): latest_message = state.context["original_message"] - # faq_tool will be passed from the agent, similar to llm for classify_intent - faq_response = await faq_tool.get_response(latest_message) + # Dynamic FAQ processing (replaces static faq_tool.get_response) + faq_response = await _dynamic_faq_process(latest_message, search_tool, llm, org_name="Devr.AI") return { "task_result": { "type": "faq", "response": faq_response, - "source": "faq_database" + "source": "dynamic_web_search" # Updated source }, "current_task": "faq_handled" } + +async def _dynamic_faq_process(message: str, search_tool, llm, org_name: str = "Devr.AI") -> str: + """ + Dynamic FAQ handler that implements the 5-step process: + 1. Intent Detection & Query Refinement + 2. Web Search (DuckDuckGo) + 3. AI-Powered Synthesis + 4. Generate Final Response + 5. Format with Sources + """ + + try: + # Step 1: Intent Detection & Query Refinement + logger.info(f"Step 1: Refining FAQ query for org '{org_name}'") + refined_query = await _refine_faq_query(message, llm, org_name) + + # Step 2: Dynamic Web Search + logger.info(f"Step 2: Searching for: {refined_query}") + search_results = await search_tool.search(refined_query) + + if not search_results: + return _generate_fallback_response(message, org_name) + + # Step 3 & 4: AI-Powered Synthesis & Response Generation + logger.info("Step 3-4: Synthesizing search results into FAQ response") + synthesized_response = await _synthesize_faq_response( + message, search_results, llm, org_name + ) + + # Step 5: Format Final Response with Sources + logger.info("Step 5: Formatting final response with sources") + final_response = _format_faq_response(synthesized_response, search_results) + + return final_response + + except Exception as e: + logger.error(f"Error in dynamic FAQ process: {e}") + return _generate_fallback_response(message, org_name) + +async def _refine_faq_query(message: str, llm, org_name: str) -> str: + """Step 1: Refine user query for organization-specific FAQ search""" + + refinement_prompt = f""" +You are helping someone find information about {org_name}. +Transform their question into an effective search query that will find official information about the organization. + +User Question: "{message}" + +Create a search query that focuses on: +- Official {org_name} information +- The organization's website, blog, or documentation +- Adding terms like "about", "mission", "projects" if relevant + +Return only the refined search query, nothing else. + +Examples: +- "What does this org do?" → "{org_name} about mission what we do" +- "How do you work?" → "{org_name} how it works process methodology" +- "What projects do you have?" → "{org_name} projects portfolio what we build" +""" + + response = await llm.ainvoke([HumanMessage(content=refinement_prompt)]) + refined_query = response.content.strip() + logger.info(f"Refined query: {refined_query}") + return refined_query + +async def _synthesize_faq_response(message: str, search_results: List[Dict], llm, org_name: str) -> str: + """Step 3-4: Use LLM to synthesize search results into a comprehensive FAQ answer""" + + # Prepare search results context + results_context = "" + for i, result in enumerate(search_results[:5]): # Top 5 results + title = result.get('title', 'N/A') + content = result.get('content', 'N/A') + url = result.get('url', 'N/A') + results_context += f"\nResult {i+1}:\nTitle: {title}\nContent: {content}\nURL: {url}\n" + + synthesis_prompt = f""" +You are an AI assistant representing {org_name}. A user asked: "{message}" + +Based on the following search results from official sources, provide a comprehensive, helpful answer about {org_name}. + +Search Results: +{results_context} + +Instructions: +1. Answer the user's question directly and conversationally +2. Focus on the most relevant and recent information +3. Be informative but concise (2-3 paragraphs max) +4. If the search results don't fully answer the question, acknowledge what you found +5. Sound helpful and knowledgeable about {org_name} +6. Don't mention "search results" in your response - speak as if you know about the organization + +Your response: +""" + + response = await llm.ainvoke([HumanMessage(content=synthesis_prompt)]) + synthesized_answer = response.content.strip() + logger.info(f"Synthesized FAQ response: {synthesized_answer[:100]}...") + return synthesized_answer + +def _format_faq_response(synthesized_answer: str, search_results: List[Dict]) -> str: + """Step 5: Format the final response with sources""" + + # Start with the synthesized answer + formatted_response = synthesized_answer + + # Add sources section + if search_results: + formatted_response += "\n\n**📚 Sources:**" + for i, result in enumerate(search_results[:3]): # Top 3 sources + title = result.get('title', 'Source') + url = result.get('url', '#') + formatted_response += f"\n{i+1}. [{title}]({url})" + + return formatted_response + +def _generate_fallback_response(message: str, org_name: str) -> str: + """Generate a helpful fallback when search fails""" + return f"""I'd be happy to help you learn about {org_name}, but I couldn't find current information to answer your question: "{message}" + +This might be because: +- The information isn't publicly available yet +- The search terms need to be more specific +- There might be connectivity issues + +Try asking a more specific question, or check out our official website and documentation for the most up-to-date information about {org_name}.""" diff --git a/backend/app/agents/devrel/nodes/handlers/web_search.py b/backend/app/agents/devrel/nodes/handlers/web_search.py index ad141312..5abd6463 100644 --- a/backend/app/agents/devrel/nodes/handlers/web_search.py +++ b/backend/app/agents/devrel/nodes/handlers/web_search.py @@ -51,7 +51,7 @@ def create_search_response(task_result: Dict[str, Any]) -> str: """ Create a user-friendly response string from search results. """ - + query = task_result.get("query") results = task_result.get("results", []) @@ -61,7 +61,7 @@ def create_search_response(task_result: Dict[str, Any]) -> str: response_parts = [f"Here's what I found for '{query}':"] for i, result in enumerate(results[:5]): title = result.get('title', 'N/A') - snippet = result.get('content', 'N/A') + snippet = result.get('content', 'N/A') url = result.get('url', '#') response_parts.append(f"{i+1}. {title}: {snippet}") response_parts.append(f" (Source: {url})") diff --git a/backend/app/agents/devrel/nodes/react_supervisor.py b/backend/app/agents/devrel/nodes/react_supervisor.py index 12bec4c0..ab4b273a 100644 --- a/backend/app/agents/devrel/nodes/react_supervisor.py +++ b/backend/app/agents/devrel/nodes/react_supervisor.py @@ -4,109 +4,264 @@ from app.agents.state import AgentState from langchain_core.messages import HumanMessage from ..prompts.react_prompt import REACT_SUPERVISOR_PROMPT +from datetime import datetime +from ..nodes.generate_response import _get_latest_message as get_latest_message_util logger = logging.getLogger(__name__) +# Configuration constants +MAX_ITERATIONS = 10 +MAX_CONVERSATION_HISTORY = 5 +VALID_ACTIONS = ["web_search", "faq_handler", "onboarding", "github_toolkit", "complete"] + async def react_supervisor_node(state: AgentState, llm) -> Dict[str, Any]: """ReAct Supervisor: Think -> Act -> Observe""" + + # Validate state first + if not _validate_state(state): + logger.error(f"Invalid state for session {getattr(state, 'session_id', 'unknown')}") + return _create_error_response(state, "Invalid state") + logger.info(f"ReAct Supervisor thinking for session {state.session_id}") - # Get current context - latest_message = _get_latest_message(state) - conversation_history = _get_conversation_history(state) - tool_results = state.context.get("tool_results", []) - iteration_count = state.context.get("iteration_count", 0) + try: + # Get current context + latest_message = _get_latest_message(state) + conversation_history = _get_conversation_history(state) + tool_results = state.context.get("tool_results", []) + iteration_count = state.context.get("iteration_count", 0) - prompt = REACT_SUPERVISOR_PROMPT.format( - latest_message=latest_message, - platform=state.platform, - interaction_count=state.interaction_count, - iteration_count=iteration_count, - conversation_history=conversation_history, - tool_results=json.dumps(tool_results, indent=2) if tool_results else "No previous tool results" - ) + # Safety check for max iterations + if iteration_count >= MAX_ITERATIONS: + logger.warning(f"Max iterations ({MAX_ITERATIONS}) reached for session {state.session_id}") + return _create_completion_response(state, "Maximum iterations reached") - response = await llm.ainvoke([HumanMessage(content=prompt)]) - decision = _parse_supervisor_decision(response.content) + logger.debug(f"Current iteration: {iteration_count}") + logger.debug(f"Latest message length: {len(latest_message)}") - logger.info(f"ReAct Supervisor decision: {decision['action']}") + prompt = REACT_SUPERVISOR_PROMPT.format( + latest_message=latest_message, + platform=getattr(state, 'platform', 'unknown'), + interaction_count=getattr(state, 'interaction_count', 0), + iteration_count=iteration_count, + conversation_history=conversation_history, + tool_results=json.dumps(tool_results, indent=2) if tool_results else "No previous tool results" + ) - # Update state with supervisor's thinking - return { - "context": { - **state.context, - "supervisor_thinking": response.content, - "supervisor_decision": decision, - "iteration_count": iteration_count + 1 - }, - "current_task": f"supervisor_decided_{decision['action']}" - } + response = await llm.ainvoke([HumanMessage(content=prompt)]) + decision = _parse_supervisor_decision(response.content) + + logger.info(f"ReAct Supervisor decision: {decision['action']}") + logger.debug(f"Supervisor thinking: {decision.get('thinking', 'N/A')[:100]}...") + logger.debug(f"Supervisor reasoning: {decision.get('reasoning', 'N/A')[:100]}...") + + # Update state with supervisor's thinking + return { + "context": { + **state.context, + "supervisor_thinking": response.content, + "supervisor_decision": decision, + "iteration_count": iteration_count + 1, + "last_action": decision['action'] + }, + "current_task": f"supervisor_decided_{decision['action']}" + } + + except Exception as e: + logger.error(f"Error in react_supervisor_node: {e}", exc_info=True) + return _create_error_response(state, f"Supervisor error: {str(e)}") def _parse_supervisor_decision(response: str) -> Dict[str, Any]: - """Parse the supervisor's decision from LLM response""" + """Enhanced parsing with better handling of multi-line responses""" try: - lines = response.strip().split('\n') decision = {"action": "complete", "reasoning": "", "thinking": ""} - for line in lines: + if not response or not response.strip(): + logger.warning("Empty response from supervisor, defaulting to complete") + return decision + + # Handle multi-line sections + current_section = None + content_buffer = [] + + for line in response.strip().split('\n'): + line = line.strip() + if not line: + continue + if line.startswith("THINK:"): - decision["thinking"] = line.replace("THINK:", "").strip() + if current_section and content_buffer: + decision[current_section] = " ".join(content_buffer) + current_section = "thinking" + content_buffer = [line.replace("THINK:", "").strip()] elif line.startswith("ACT:"): + if current_section and content_buffer: + decision[current_section] = " ".join(content_buffer) action = line.replace("ACT:", "").strip().lower() - if action in ["web_search", "faq_handler", "onboarding", "github_toolkit", "complete"]: + if action in VALID_ACTIONS: decision["action"] = action + else: + logger.warning(f"Invalid action '{action}', defaulting to 'complete'") + decision["action"] = "complete" + current_section = None + content_buffer = [] elif line.startswith("REASON:"): - decision["reasoning"] = line.replace("REASON:", "").strip() + if current_section and content_buffer: + decision[current_section] = " ".join(content_buffer) + current_section = "reasoning" + content_buffer = [line.replace("REASON:", "").strip()] + elif current_section and line: + content_buffer.append(line) + + # Handle any remaining content + if current_section and content_buffer: + decision[current_section] = " ".join(content_buffer) + + # Validate final decision + if decision["action"] not in VALID_ACTIONS: + logger.warning(f"Final validation failed for action '{decision['action']}', defaulting to 'complete'") + decision["action"] = "complete" return decision + except Exception as e: - logger.error(f"Error parsing supervisor decision: {e}") + logger.error(f"Error parsing supervisor decision: {e}", exc_info=True) return {"action": "complete", "reasoning": "Error in decision parsing", "thinking": ""} def supervisor_decision_router(state: AgentState) -> Literal["web_search", "faq_handler", "onboarding", "github_toolkit", "complete"]: """Route based on supervisor's decision""" - decision = state.context.get("supervisor_decision", {}) - action = decision.get("action", "complete") + try: + decision = state.context.get("supervisor_decision", {}) + action = decision.get("action", "complete") - # Safety check for infinite loops - iteration_count = state.context.get("iteration_count", 0) - if iteration_count > 10: - logger.warning(f"Max iterations reached for session {state.session_id}") - return "complete" + # Safety check for infinite loops + iteration_count = state.context.get("iteration_count", 0) + if iteration_count > MAX_ITERATIONS: + logger.warning(f"Max iterations reached for session {state.session_id}") + return "complete" - return action + # Validate action + if action not in VALID_ACTIONS: + logger.warning(f"Invalid routing action '{action}', defaulting to 'complete'") + return "complete" + + logger.debug(f"Routing to: {action} (iteration {iteration_count})") + return action + + except Exception as e: + logger.error(f"Error in supervisor_decision_router: {e}", exc_info=True) + return "complete" def add_tool_result(state: AgentState, tool_name: str, result: Dict[str, Any]) -> Dict[str, Any]: - """Add tool result to state context""" - tool_results = state.context.get("tool_results", []) - tool_results.append({ - "tool": tool_name, - "result": result, - "iteration": state.context.get("iteration_count", 0) - }) + """Add tool result to state context with validation""" + try: + if not _validate_state(state): + logger.error("Invalid state in add_tool_result") + return {"context": state.context if hasattr(state, 'context') else {}} + + tool_results = state.context.get("tool_results", []) + + # Validate tool result + if not isinstance(result, dict): + logger.warning(f"Tool result for {tool_name} is not a dict, converting") + result = {"result": str(result)} + + tool_entry = { + "tool": tool_name, + "result": result, + "iteration": state.context.get("iteration_count", 0), + "timestamp": datetime.now().isoformat() # Now actually stores timestamp + } + + tool_results.append(tool_entry) + # Limit tool results to prevent memory issues + if len(tool_results) > 20: + tool_results = tool_results[-20:] + logger.debug("Trimmed tool results to last 20 entries") + + tools_used = getattr(state, 'tools_used', []) + [tool_name] + + return { + "context": { + **state.context, + "tool_results": tool_results + }, + "tools_used": tools_used, + "current_task": f"completed_{tool_name}" + } + + except Exception as e: + logger.error(f"Error in add_tool_result: {e}", exc_info=True) + return {"context": state.context if hasattr(state, 'context') else {}} + +def _get_latest_message(state: AgentState) -> str: + """Extract the latest message from state with validation""" + try: + return get_latest_message_util(state) + except Exception as e: + logger.error(f"Error getting latest message: {e}") + return "" + +def _get_conversation_history(state: AgentState, max_messages: int = MAX_CONVERSATION_HISTORY) -> str: + """Get formatted conversation history with validation""" + try: + if not hasattr(state, 'messages') or not state.messages: + return "No previous conversation" + + recent_messages = state.messages[-max_messages:] + formatted_messages = [] + + for msg in recent_messages: + if isinstance(msg, dict): + role = msg.get('role', 'user') + content = msg.get('content', '') + if content: # Only include non-empty messages + formatted_messages.append(f"{role}: {content[:200]}{'...' if len(content) > 200 else ''}") + + return "\n".join(formatted_messages) if formatted_messages else "No previous conversation" + + except Exception as e: + logger.error(f"Error getting conversation history: {e}") + return "Error retrieving conversation history" + +def _validate_state(state: AgentState) -> bool: + """Validate state before processing""" + try: + if not state: + return False + + if not hasattr(state, 'session_id') or not state.session_id: + logger.error("Invalid state: missing session_id") + return False + + if not hasattr(state, 'context'): + logger.error("Invalid state: missing context") + return False + + return True + except Exception as e: + logger.error(f"Error validating state: {e}") + return False + +def _create_error_response(state: AgentState, error_message: str) -> Dict[str, Any]: + """Create standardized error response""" return { "context": { - **state.context, - "tool_results": tool_results + **(state.context if hasattr(state, 'context') else {}), + "supervisor_decision": {"action": "complete", "reasoning": error_message, "thinking": "Error occurred"}, + "error": error_message }, - "tools_used": state.tools_used + [tool_name], - "current_task": f"completed_{tool_name}" + "current_task": "supervisor_decided_complete" } -def _get_latest_message(state: AgentState) -> str: - """Extract the latest message from state""" - if state.messages: - return state.messages[-1].get("content", "") - return state.context.get("original_message", "") - -def _get_conversation_history(state: AgentState, max_messages: int = 5) -> str: - """Get formatted conversation history""" - if not state.messages: - return "No previous conversation" - - recent_messages = state.messages[-max_messages:] - return "\n".join([ - f"{msg.get('role', 'user')}: {msg.get('content', '')}" - for msg in recent_messages - ]) +def _create_completion_response(state: AgentState, reason: str) -> Dict[str, Any]: + """Create standardized completion response""" + return { + "context": { + **state.context, + "supervisor_decision": {"action": "complete", "reasoning": reason, "thinking": "Completing task"}, + "completion_reason": reason + }, + "current_task": "supervisor_decided_complete" + } diff --git a/backend/app/agents/devrel/tool_wrappers.py b/backend/app/agents/devrel/tool_wrappers.py index 7fa10bb6..baa675b1 100644 --- a/backend/app/agents/devrel/tool_wrappers.py +++ b/backend/app/agents/devrel/tool_wrappers.py @@ -16,11 +16,12 @@ async def web_search_tool_node(state: AgentState, search_tool, llm) -> Dict[str, tool_result = handler_result.get("task_result", {}) return add_tool_result(state, "web_search", tool_result) -async def faq_handler_tool_node(state: AgentState, faq_tool) -> Dict[str, Any]: +async def faq_handler_tool_node(state: AgentState, search_tool, llm) -> Dict[str, Any]: """Execute FAQ handler tool and add result to ReAct context""" logger.info(f"Executing FAQ handler tool for session {state.session_id}") - handler_result = await handle_faq_node(state, faq_tool) + # Updated to use search_tool and llm instead of faq_tool for dynamic FAQ + handler_result = await handle_faq_node(state, search_tool, llm) tool_result = handler_result.get("task_result", {}) return add_tool_result(state, "faq_handler", tool_result) @@ -32,7 +33,6 @@ async def onboarding_tool_node(state: AgentState) -> Dict[str, Any]: tool_result = handler_result.get("task_result", {}) return add_tool_result(state, "onboarding", tool_result) - async def github_toolkit_tool_node(state: AgentState, github_toolkit) -> Dict[str, Any]: """Execute GitHub toolkit tool and add result to ReAct context""" logger.info(f"Executing GitHub toolkit tool for session {state.session_id}") diff --git a/backend/app/core/handler/faq_handler.py b/backend/app/core/handler/faq_handler.py index fca3285e..3cfcb019 100644 --- a/backend/app/core/handler/faq_handler.py +++ b/backend/app/core/handler/faq_handler.py @@ -1,7 +1,7 @@ import logging from typing import Dict, Any from app.core.events.base import BaseEvent -from app.core.events.enums import EventType, PlatformType +from app.core.events.enums import EventType from app.core.handler.base import BaseHandler logger = logging.getLogger(__name__) diff --git a/backend/integrations/discord/bot.py b/backend/integrations/discord/bot.py index 26ad8dbc..e30b7fe3 100644 --- a/backend/integrations/discord/bot.py +++ b/backend/integrations/discord/bot.py @@ -65,13 +65,13 @@ async def on_message(self, message): except Exception as e: logger.error(f"Error processing message: {str(e)}") - + async def _handle_devrel_message(self, message, triage_result: Dict[str, Any]): """This now handles both new requests and follow-ups in threads.""" try: user_id = str(message.author.id) thread_id = await self._get_or_create_thread(message, user_id) - + agent_message = { "type": "devrel_request", "id": f"discord_{message.id}", @@ -91,7 +91,7 @@ async def _handle_devrel_message(self, message, triage_result: Dict[str, Any]): priority_map = {"high": QueuePriority.HIGH, "medium": QueuePriority.MEDIUM, "low": QueuePriority.LOW - } + } priority = priority_map.get(triage_result.get("priority"), QueuePriority.MEDIUM) await self.queue_manager.enqueue(agent_message, priority) @@ -101,7 +101,7 @@ async def _handle_devrel_message(self, message, triage_result: Dict[str, Any]): if thread: await thread.send("I'm processing your request, please hold on...") # ------------------------------------ - + except Exception as e: logger.error(f"Error handling DevRel message: {str(e)}") @@ -114,7 +114,7 @@ async def _get_or_create_thread(self, message, user_id: str) -> Optional[str]: return thread_id else: del self.active_threads[user_id] - + # This part only runs if it's not a follow-up message in an active thread. if isinstance(message.channel, discord.TextChannel): thread_name = f"DevRel Chat - {message.author.display_name}" @@ -139,4 +139,4 @@ async def _handle_agent_response(self, response_data: Dict[str, Any]): else: logger.error(f"Thread {thread_id} not found for agent response") except Exception as e: - logger.error(f"Error handling agent response: {str(e)}") \ No newline at end of file + logger.error(f"Error handling agent response: {str(e)}") diff --git a/backend/integrations/discord/cogs.py b/backend/integrations/discord/cogs.py index a92765f9..7bb18998 100644 --- a/backend/integrations/discord/cogs.py +++ b/backend/integrations/discord/cogs.py @@ -1,7 +1,8 @@ import discord from discord import app_commands -from discord.ext import commands, tasks +from discord.ext import commands import logging +import asyncio from app.core.orchestration.queue_manager import AsyncQueueManager, QueuePriority from app.services.auth.supabase import login_with_github from app.services.auth.management import get_or_create_user_by_discord @@ -10,36 +11,71 @@ from integrations.discord.views import OAuthView from app.core.config import settings +# Try to import tasks, fallback if not available +try: + from discord.ext import tasks + TASKS_AVAILABLE = True +except ImportError: + TASKS_AVAILABLE = False + tasks = None + logger = logging.getLogger(__name__) class DevRelCommands(commands.Cog): def __init__(self, bot: DiscordBot, queue_manager: AsyncQueueManager): self.bot = bot self.queue = queue_manager + self._cleanup_task = None + + # Set up cleanup method based on availability + if TASKS_AVAILABLE: + self._setup_tasks_cleanup() @commands.Cog.listener() async def on_ready(self): - if not self.cleanup_expired_tokens.is_running(): - print("--> Starting the token cleanup task...") - self.cleanup_expired_tokens.start() + if TASKS_AVAILABLE: + if hasattr(self, 'cleanup_expired_tokens') and not self.cleanup_expired_tokens.is_running(): + print("--> Starting the token cleanup task...") + self.cleanup_expired_tokens.start() + elif not self._cleanup_task: + self._cleanup_task = asyncio.create_task(self._manual_cleanup_loop()) + print("--> Starting manual token cleanup task...") def cog_unload(self): - self.cleanup_expired_tokens.cancel() + if TASKS_AVAILABLE and hasattr(self, 'cleanup_expired_tokens'): + self.cleanup_expired_tokens.cancel() + if self._cleanup_task: + self._cleanup_task.cancel() - @tasks.loop(minutes=5) - async def cleanup_expired_tokens(self): - """Periodic cleanup of expired verification tokens""" - try: - print("--> Running token cleanup task...") - await cleanup_expired_tokens() - print("--> Token cleanup task finished.") - except Exception as e: - logger.error(f"Error during token cleanup: {e}") + if TASKS_AVAILABLE: + @tasks.loop(minutes=5) + async def cleanup_expired_tokens(self): + """Periodic cleanup of expired verification tokens""" + try: + print("--> Running token cleanup task...") + await cleanup_expired_tokens() + print("--> Token cleanup task finished.") + except Exception as e: + logger.error(f"Error during token cleanup: {e}") - @cleanup_expired_tokens.before_loop - async def before_cleanup(self): - """Wait until the bot is ready before starting cleanup""" + @cleanup_expired_tokens.before_loop + async def before_cleanup(self): + """Wait until the bot is ready before starting cleanup""" + await self.bot.wait_until_ready() + + async def _manual_cleanup_loop(self): + """Manual cleanup loop if tasks extension is not available""" await self.bot.wait_until_ready() + while True: + try: + await asyncio.sleep(300) # 5 minutes + print("--> Running manual token cleanup task...") + await cleanup_expired_tokens() + print("--> Manual token cleanup task finished.") + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error during manual token cleanup: {e}") @app_commands.command(name="reset", description="Reset your DevRel thread and memory.") async def reset_thread(self, interaction: discord.Interaction): @@ -104,7 +140,7 @@ async def verification_status(self, interaction: discord.Interaction): async def verify_github(self, interaction: discord.Interaction): try: await interaction.response.defer(ephemeral=True) - + user_profile = await get_or_create_user_by_discord( discord_id=str(interaction.user.id), display_name=interaction.user.display_name, @@ -119,7 +155,7 @@ async def verify_github(self, interaction: discord.Interaction): ) await interaction.followup.send(embed=embed, ephemeral=True) return - + if user_profile.verification_token: embed = discord.Embed( title="⏳ Verification Pending", @@ -180,4 +216,4 @@ async def verify_github(self, interaction: discord.Interaction): async def setup(bot: commands.Bot): """This function is called by the bot to load the cog.""" - await bot.add_cog(DevRelCommands(bot, bot.queue_manager)) \ No newline at end of file + await bot.add_cog(DevRelCommands(bot, bot.queue_manager)) diff --git a/backend/main.py b/backend/main.py index ed59e6af..2f95b819 100644 --- a/backend/main.py +++ b/backend/main.py @@ -13,6 +13,7 @@ from app.database.weaviate.client import get_weaviate_client from integrations.discord.bot import DiscordBot from discord.ext import commands + # DevRel commands are now loaded dynamically (commented out below) # from integrations.discord.cogs import DevRelCommands @@ -45,11 +46,15 @@ async def start_background_tasks(self): await self.queue_manager.start(num_workers=3) # --- Load commands inside the async startup function --- - try: - await self.discord_bot.load_extension("integrations.discord.cogs") - except (ImportError, commands.ExtensionError) as e: - logger.error("Failed to load Discord cog extension: %s", e) - + # Temporarily disabled to troubleshoot import issues + # try: + # await self.discord_bot.load_extension("integrations.discord.cogs") + # except (ImportError, discord.errors.ExtensionFailed, discord.errors.ExtensionNotFound, discord.errors.NoEntryPointError) as e: + # logger.error("Failed to load Discord cog extension: %s", e) + # # Don't re-raise here to allow the app to continue without Discord cogs + # except Exception as e: + # logger.error("Unexpected error loading Discord cog extension: %s", e) + # Start the bot as a background task. asyncio.create_task( self.discord_bot.start(settings.discord_bot_token) @@ -127,4 +132,4 @@ async def favicon(): host="0.0.0.0", port=8000, reload=True - ) \ No newline at end of file + ) diff --git a/poetry.lock b/poetry.lock index aada402f..fa971396 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3084,27 +3084,6 @@ files = [ {file = "protobuf-6.31.1.tar.gz", hash = "sha256:d8cac4c982f0b957a4dc73a80e2ea24fab08e679c0de9deb835f4a12d69aca9a"}, ] -[[package]] -name = "py-cord" -version = "2.6.1" -description = "A Python wrapper for the Discord API" -optional = false -python-versions = ">=3.8" -groups = ["main"] -files = [ - {file = "py_cord-2.6.1-py3-none-any.whl", hash = "sha256:e3d3b528c5e37b0e0825f5b884cbb9267860976c1e4878e28b55da8fd3af834b"}, - {file = "py_cord-2.6.1.tar.gz", hash = "sha256:36064f225f2c7bbddfe542d5ed581f2a5744f618e039093cf7cd2659a58bc79b"}, -] - -[package.dependencies] -aiohttp = ">=3.6.0,<4.0" -typing-extensions = {version = ">=4,<5", markers = "python_version < \"3.11\""} - -[package.extras] -docs = ["furo (==2023.3.23)", "myst-parser (==1.0.0)", "sphinx (==5.3.0)", "sphinx-autodoc-typehints (==1.23.0)", "sphinx-copybutton (==0.5.2)", "sphinxcontrib-trio (==1.1.2)", "sphinxcontrib-websupport (==1.2.4)", "sphinxext-opengraph (==0.9.1)"] -speed = ["aiohttp[speedups]", "msgspec (>=0.18.6,<0.19.0)"] -voice = ["PyNaCl (>=1.3.0,<1.6)"] - [[package]] name = "pyasn1" version = "0.6.1" @@ -5254,4 +5233,8 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = ">=3.9, <4.0" + dynamic-faq-handler +content-hash = "08ba3a64e2bb86ff6b0b8df3b8dc19bd737faab40e48113133b868f3496a04e0" + content-hash = "650963e14f423a8460fab7ad2235b1c97f7d8da454feb95c1d366d7f6fd83094" +main diff --git a/pyproject.toml b/pyproject.toml index b653f853..953feeea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,6 @@ requires-python = ">=3.9, <4.0" dependencies = [ "supabase (>=2.13.0,<3.0.0)", "fastapi (>=0.115.11,<0.116.0)", - "py-cord (>=2.6.1,<3.0.0)", "pygithub (>=2.6.1,<3.0.0)", "slack-sdk (>=3.34.0,<4.0.0)", "sentence-transformers (>=3.4.1,<4.0.0)", @@ -25,7 +24,7 @@ dependencies = [ "aio-pika (>=9.5.5,<10.0.0)", "uvicorn (>=0.35.0,<0.36.0)", "ddgs (>=9.0.2,<10.0.0)", - "discord-py (>=2.5.2,<3.0.0)", + "discord-py (>=2.4.0,<3.0.0)", ] [tool.poetry]