Skip to content
Merged
74 changes: 72 additions & 2 deletions backend/app/agents/devrel/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from functools import partial
from langgraph.graph import StateGraph, END
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.checkpoint.memory import InMemorySaver
from ..shared.base_agent import BaseAgent, AgentState
from ..shared.classification_router import MessageCategory
from .tools.search_tool import TavilySearchTool
Expand All @@ -14,6 +15,7 @@
from .nodes.handle_technical_support_node import handle_technical_support_node
from .nodes.handle_onboarding_node import handle_onboarding_node
from .nodes.generate_response_node import generate_response_node
from .nodes.summarization_node import check_summarization_needed, summarize_conversation_node, store_summary_to_database

logger = logging.getLogger(__name__)

Expand All @@ -29,6 +31,7 @@ def __init__(self, config: Dict[str, Any] = None):
)
self.search_tool = TavilySearchTool()
self.faq_tool = FAQTool()
self.checkpointer = InMemorySaver()
super().__init__("DevRelAgent", self.config)
Comment on lines +34 to 35
Copy link
Contributor

@coderabbitai coderabbitai bot Jun 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Share a single InMemorySaver instead of instantiating per-agent

self.checkpointer = InMemorySaver() creates a brand-new store for every DevRelAgent instance.
If the application spins up a fresh agent per request (very common in serverless / web frameworks), each request will operate on an isolated store and no memory carry-over will occur, defeating the whole purpose of the saver.

Create one saver at module level (or inject it from the orchestrator) and reuse it across agents:

-        self.checkpointer = InMemorySaver()
+# backend/app/agents/devrel/agent.py
+CHECKPOINTER = InMemorySaver()   # shared singleton
+
+        self.checkpointer = CHECKPOINTER

No other code changes are required because workflow.compile(checkpointer=…) already accepts the instance.
This minor refactor preserves true cross-session memory and avoids silent data loss.

Also applies to: 91-92

🤖 Prompt for AI Agents
In backend/app/agents/devrel/agent.py around lines 34 to 35, the code creates a
new InMemorySaver instance for each DevRelAgent, causing isolated memory stores
per agent. To fix this, move the InMemorySaver instantiation to the module level
so that a single shared instance is reused across all DevRelAgent instances.
This change preserves memory across sessions without modifying other code since
the saver instance is already accepted by workflow.compile. Apply the same
refactor to lines 91-92 as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a problem as only one instance lives for the entire application lifetime with thread and InMemorySaver() based system to remember user conversations throughout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smokeyScraper, thank you for the clarification! That makes perfect sense - with a singleton DevRelAgent instance living for the entire application lifetime, the InMemorySaver created in the constructor will indeed be shared across all conversations through the thread-based system. This architecture effectively achieves memory persistence without the isolation concerns I raised. I appreciate you explaining the application's design pattern.


✏️ Learnings added
Learnt from: smokeyScraper
PR: AOSSIE-Org/Devr.AI#75
File: backend/app/agents/devrel/agent.py:34-35
Timestamp: 2025-06-13T21:56:19.160Z
Learning: In the Devr.AI backend, the DevRelAgent follows a singleton pattern where only one instance exists for the entire application lifetime, using InMemorySaver with thread-based conversation management to persist user conversations across sessions.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


def _build_graph(self):
Expand All @@ -43,6 +46,8 @@ def _build_graph(self):
workflow.add_node("handle_technical_support", handle_technical_support_node)
workflow.add_node("handle_onboarding", handle_onboarding_node)
workflow.add_node("generate_response", partial(generate_response_node, llm=self.llm))
workflow.add_node("check_summarization", check_summarization_needed)
workflow.add_node("summarize_conversation", partial(summarize_conversation_node, llm=self.llm))

# Add edges
workflow.add_conditional_edges(
Expand All @@ -65,12 +70,26 @@ def _build_graph(self):
for node in ["handle_faq", "handle_web_search", "handle_technical_support", "handle_onboarding"]:
workflow.add_edge(node, "generate_response")

workflow.add_edge("generate_response", END)
workflow.add_edge("generate_response", "check_summarization")

# Conditional edge for summarization
workflow.add_conditional_edges(
"check_summarization",
self._should_summarize,
{
"summarize": "summarize_conversation",
"end": END
}
)

# End after summarization
workflow.add_edge("summarize_conversation", END)

# Set entry point
workflow.set_entry_point("gather_context")

self.graph = workflow.compile()
# Compile with InMemorySaver checkpointer
self.graph = workflow.compile(checkpointer=self.checkpointer)

def _route_to_handler(self, state: AgentState) -> str:
"""Route to the appropriate handler based on intent"""
Expand Down Expand Up @@ -98,3 +117,54 @@ def _route_to_handler(self, state: AgentState) -> str:
# Later to be changed to handle anomalies
logger.info(f"Unknown intent '{intent}', routing to technical support")
return MessageCategory.TECHNICAL_SUPPORT

def _should_summarize(self, state: AgentState) -> str:
"""Determine if conversation should be summarized"""
if state.summarization_needed:
logger.info(f"Summarization needed for session {state.session_id}")
return "summarize"
return "end"

async def get_thread_state(self, thread_id: str) -> Dict[str, Any]:
"""Get the current state of a thread"""
try:
config = {"configurable": {"thread_id": thread_id}}
state = self.graph.get_state(config)
return state.values if state else {}
except Exception as e:
logger.error(f"Error getting thread state: {str(e)}")
return {}

async def clear_thread_memory(self, thread_id: str, force_clear: bool = False) -> bool:
"""Clear memory for a specific thread using memory_timeout_reached flag"""
try:
config = {"configurable": {"thread_id": thread_id}}
state = self.graph.get_state(config)

if state and state.values:
agent_state = AgentState(**state.values)

# Check the memory_timeout_reached flag
if agent_state.memory_timeout_reached or force_clear:
if agent_state.memory_timeout_reached:
logger.info(f"Thread {thread_id} timeout flag set, storing final summary and clearing memory")
else:
logger.info(f"Force clearing memory for thread {thread_id}")

# Store final summary to database before clearing
await store_summary_to_database(agent_state)

# Delete the thread from InMemorySaver
self.checkpointer.delete_thread(thread_id)
logger.info(f"Successfully cleared memory for thread {thread_id}")
return True
else:
logger.info(f"Thread {thread_id} has not timed out, memory preserved")
return False
else:
logger.info(f"No state found for thread {thread_id}, nothing to clear")
return True

except Exception as e:
logger.error(f"Error clearing thread memory: {str(e)}")
return False
Empty file.
22 changes: 18 additions & 4 deletions backend/app/agents/devrel/nodes/gather_context_node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import datetime
from app.agents.shared.state import AgentState
from app.agents.shared.classification_router import MessageCategory

Expand All @@ -11,12 +12,25 @@ async def gather_context_node(state: AgentState) -> AgentState:
# TODO: Add context gathering from databases
# Currently, context is simple
# In production, query databases for user history, etc.

original_message = state.context.get("original_message", "")

new_message = {
"role": "user",
"content": original_message,
"timestamp": datetime.now().isoformat()
}

context_data = {
"user_profile": {"user_id": state.user_id, "platform": state.platform},
"conversation_context": len(state.messages),
"conversation_context": len(state.messages) + 1, # +1 for the new message
"session_info": {"session_id": state.session_id}
}

state.context.update(context_data)
state.current_task = "context_gathered"
return state
updated_context = {**state.context, **context_data}

return {
"messages": [new_message],
"context": updated_context,
"current_task": "context_gathered"
}
55 changes: 39 additions & 16 deletions backend/app/agents/devrel/nodes/generate_response_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,43 +34,66 @@ async def _create_llm_response(state: AgentState, task_result: Dict[str, Any], l
elif state.context.get("original_message"):
latest_message = state.context["original_message"]

conversation_summary = state.conversation_summary or "This is the beginning of our conversation."

recent_messages_count = min(10, len(state.messages))
conversation_history_str = "\n".join([
f"{msg.get('type', 'unknown')}: {msg.get('content', '')}"
for msg in state.conversation_history[-5:]
f"{msg.get('role', 'user')}: {msg.get('content', '')}"
for msg in state.messages[-recent_messages_count:]
])
current_context_str = str(state.context)
task_type_str = str(task_result.get("type", "N/A"))
task_details_str = str(task_result)

total_messages = len(state.messages)
if total_messages > recent_messages_count:
conversation_history_str = f"[Showing last {recent_messages_count} of {total_messages} messages]\n" + \
conversation_history_str

context_parts = [
f"Platform: {state.platform}",
f"Total interactions: {state.interaction_count}",
f"Session duration: {(state.last_interaction_time - state.session_start_time).total_seconds() / 60:.1f} minutes"
]

if state.key_topics:
context_parts.append(f"Key topics discussed: {', '.join(state.key_topics)}")

if state.user_profile:
context_parts.append(f"User profile: {state.user_profile}")

current_context_str = "\n".join(context_parts)

try:
prompt = GENERAL_LLM_RESPONSE_PROMPT.format(
conversation_summary=conversation_summary,
latest_message=latest_message,
conversation_history=conversation_history_str,
current_context=current_context_str,
task_type=task_type_str,
task_details=task_details_str
task_type=task_result.get("type", "general"),
task_details=str(task_result)
)

logger.info(f"Prompt includes summary: {len(conversation_summary)} chars, "
f"recent history: {recent_messages_count} messages, "
f"total history: {total_messages} messages")
except KeyError as e:
logger.error(f"Missing key in GENERAL_LLM_RESPONSE_PROMPT: {e}")
return "Error: Response template formatting error."

response = await llm.ainvoke([HumanMessage(content=prompt)])
return response.content.strip()

async def generate_response_node(state: AgentState, llm) -> AgentState:
async def generate_response_node(state: AgentState, llm) -> dict:
"""Generate final response to user"""
logger.info(f"Generating response for session {state.session_id}")
task_result = state.task_result or {}

if task_result.get("type") == "faq":
state.final_response = task_result.get("response", "I don't have a specific answer for that question.")
final_response = task_result.get("response", "I don't have a specific answer for that question.")
elif task_result.get("type") == "web_search":
response = await _create_search_response(task_result)
state.final_response = response
final_response = await _create_search_response(task_result)
else:
# Pass the llm instance to _create_llm_response
response = await _create_llm_response(state, task_result, llm)
state.final_response = response
final_response = await _create_llm_response(state, task_result, llm)

state.current_task = "response_generated"
return state
return {
"final_response": final_response,
"current_task": "response_generated"
}
16 changes: 8 additions & 8 deletions backend/app/agents/devrel/nodes/handle_faq_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

logger = logging.getLogger(__name__)

async def handle_faq_node(state: AgentState, faq_tool) -> AgentState:
async def handle_faq_node(state: AgentState, faq_tool) -> dict:
"""Handle FAQ requests"""
logger.info(f"Handling FAQ for session {state.session_id}")

Expand All @@ -16,11 +16,11 @@ async def handle_faq_node(state: AgentState, faq_tool) -> AgentState:
# faq_tool will be passed from the agent, similar to llm for classify_intent
faq_response = await faq_tool.get_response(latest_message)

state.task_result = {
"type": "faq",
"response": faq_response,
"source": "faq_database"
return {
"task_result": {
"type": "faq",
"response": faq_response,
"source": "faq_database"
},
"current_task": "faq_handled"
}

state.current_task = "faq_handled"
return state
14 changes: 7 additions & 7 deletions backend/app/agents/devrel/nodes/handle_onboarding_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ async def handle_onboarding_node(state: AgentState) -> AgentState:
"""Handle onboarding requests"""
logger.info(f"Handling onboarding for session {state.session_id}")

state.task_result = {
"type": "onboarding",
"action": "welcome_and_guide",
"next_steps": ["setup_environment", "first_contribution", "join_community"]
return {
"task_result": {
"type": "onboarding",
"action": "welcome_and_guide",
"next_steps": ["setup_environment", "first_contribution", "join_community"]
},
"current_task": "onboarding_handled"
}

state.current_task = "onboarding_handled"
return state
14 changes: 7 additions & 7 deletions backend/app/agents/devrel/nodes/handle_technical_support_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ async def handle_technical_support_node(state: AgentState) -> AgentState:
"""Handle technical support requests"""
logger.info(f"Handling technical support for session {state.session_id}")

state.task_result = {
"type": "technical_support",
"action": "provide_guidance",
"requires_human_review": False
return {
"task_result": {
"type": "technical_support",
"action": "provide_guidance",
"requires_human_review": False
},
"current_task": "technical_support_handled"
}

state.current_task = "technical_support_handled"
return state
19 changes: 10 additions & 9 deletions backend/app/agents/devrel/nodes/handle_web_search_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def _extract_search_query(message: str, llm) -> str:
logger.info(f"Extracted search query: {search_query}")
return search_query

async def handle_web_search_node(state: AgentState, search_tool, llm) -> AgentState:
async def handle_web_search_node(state: AgentState, search_tool, llm) -> dict:
"""Handle web search requests"""
logger.info(f"Handling web search for session {state.session_id}")

Expand All @@ -31,12 +31,13 @@ async def handle_web_search_node(state: AgentState, search_tool, llm) -> AgentSt
search_query = await _extract_search_query(latest_message, llm)
search_results = await search_tool.search(search_query)

state.task_result = {
"type": "web_search",
"query": search_query,
"results": search_results,
"source": "tavily_search"
return {
"task_result": {
"type": "web_search",
"query": search_query,
"results": search_results,
"source": "tavily_search"
},
"tools_used": ["tavily_search"],
"current_task": "web_search_handled"
}
state.tools_used.append("tavily_search")
state.current_task = "web_search_handled"
return state
Empty file.
Loading