langgraph/how-tos/persistence_postgres/ #894
Replies: 38 comments 65 replies
-
How do I pass config and recursion limit together while streaming the graph? Currently, I'm getting this error - TypeError: Pregel.stream() takes from 2 to 3 positional arguments but 4 were given |
Beta Was this translation helpful? Give feedback.
-
Can you also update this guide but for JS version, and maybe a seperate general guide to explain in detail how to create a custom checkpointer and each component a checkpointer should have? I feel like this is a really important feature to bring LangGraph to production and there should be more guidance for it. |
Beta Was this translation helpful? Give feedback.
-
Please add a specific section on accessing and modifying the state variables of nodes from outside the node functions. |
Beta Was this translation helpful? Give feedback.
-
William
Thanks for the reply, this is not exactly what I am looking for. Consider a simple scenario that an attribute in the state object has to be modified.
I have a state object and want to modify a string parameter and set it to a value , for e.g. plan attribute . A simple document to modify state attributes with examples will be useful.
class AgentState(TypedDict):
task: str
plan: str
draft: str
critique: str
content: List[str]
revision_number: int
max_revisions: int
workflow = StateGraph(AgentState)
rampi
…________________________________
From: William FH ***@***.***>
Sent: 09 July 2024 03:47
To: langchain-ai/langgraph ***@***.***>
Cc: Ramprasad ***@***.***>; Comment ***@***.***>
Subject: Re: [langchain-ai/langgraph] langgraph/how-tos/persistence_postgres/ (Discussion #894)
You mean like this? https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/edit-graph-state/<https://eu-central-1.protection.sophos.com/?d=langchain-ai.github.io&u=aHR0cHM6Ly9sYW5nY2hhaW4tYWkuZ2l0aHViLmlvL2xhbmdncmFwaC9ob3ctdG9zL2h1bWFuX2luX3RoZV9sb29wL2VkaXQtZ3JhcGgtc3RhdGUv&i=NWY1NzY4MDNhOGI2OTAwZWFlOGRmNDI2&t=QkNXVHhueXB5K0FEKzQ2eFNNSEpXelZJTCtEbCs5VGdscllybnBqZThGdz0=&h=afb31f07d9264eedb4c62fc9d0a21266&s=AVNPUEhUT0NFTkNSWVBUSVY6K0T94M0wPyDMzKN9BocAHrXlbSFJ3THbbw-AzQ3TEw>
—
Reply to this email directly, view it on GitHub<https://eu-central-1.protection.sophos.com/?d=github.com&u=aHR0cHM6Ly9naXRodWIuY29tL2xhbmdjaGFpbi1haS9sYW5nZ3JhcGgvZGlzY3Vzc2lvbnMvODk0I2Rpc2N1c3Npb25jb21tZW50LTk5OTIwOTM=&i=NWY1NzY4MDNhOGI2OTAwZWFlOGRmNDI2&t=V0pKTnpNS3A2clF5NG54MUJYeStraGNLOHoxcVZBRklqU3dQV0xMbGF5RT0=&h=afb31f07d9264eedb4c62fc9d0a21266&s=AVNPUEhUT0NFTkNSWVBUSVY6K0T94M0wPyDMzKN9BocAHrXlbSFJ3THbbw-AzQ3TEw>, or unsubscribe<https://eu-central-1.protection.sophos.com/?d=github.com&u=aHR0cHM6Ly9naXRodWIuY29tL25vdGlmaWNhdGlvbnMvdW5zdWJzY3JpYmUtYXV0aC9CRjVDSlQyVkNRUFJLTFFMWTZDVkdLVFpMTUZZSEFWQ05GU002QUFBQUFCS0U0N1RENlZISTJEU01WUVdJWDNMTVY0M1NSREpPTlJYSzQzVE5GWFc0UTNQTlZXV0szVFVITTRUU09KU0dBNFRH&i=NWY1NzY4MDNhOGI2OTAwZWFlOGRmNDI2&t=SDdWRUxYR1o5RmROWXNnNWhrdktVWGVvZW9rOXY0eDBlbWdsWXdjcVZUST0=&h=afb31f07d9264eedb4c62fc9d0a21266&s=AVNPUEhUT0NFTkNSWVBUSVY6K0T94M0wPyDMzKN9BocAHrXlbSFJ3THbbw-AzQ3TEw>.
You are receiving this because you commented.Message ID: ***@***.***>
This E-MAIL message is a property of Premjiinvest and is intended for use only by the individual or entity to which it is addressed. The information contained may be confidential and privileged. If this is a forwarded message, the content of this E-MAIL may not have been sent with the authority of the company. If you have received this message by mistake, kindly notify the sender by return E-MAIL and delete this message. Premjiinvest will not be responsible for any viruses or defects or any forwarded attachments emanating either from within or outside. Premjiinvest reserves the right to monitor and review the content of all messages sent to or from Premjiinvest E-MAIL addresses. Messages sent to or from this e-mail address may be stored on the Premjiinvest E-MAIL system or elsewhere.
|
Beta Was this translation helpful? Give feedback.
-
Hello, How can I limit the memory, avoiding the graph to continuously have more history on its prompt? E.g., by limiting to the last 10 messages |
Beta Was this translation helpful? Give feedback.
-
I am currently implementing a customer support bot and have been exploring the use of persistent memory to manage user interactions. My initial approach was to load the memory for a specific user from persistent storage, ensuring that all interactions from different users remain isolated. This seemed straightforward for maintaining user-specific contexts across sessions. However, I am a bit confused about the specific utility of threads in this context. I understand that threads can provide additional granularity by keeping separate interactions within the same user session, but I am struggling to come up with concrete use cases where this granularity would be beneficial. One example I am considering is within a single chat interaction. Even with persistent memory loaded through a checkpointer, it seems possible to add more granularity by using threads to manage multiple sub-interactions within a single user session. For instance, in a customer support bot scenario, a user might request multiple tasks, and these could be managed as separate threads within the same interaction. My question is about the practical management of this granularity. How does the agent determine which thread to use at any given time within an ongoing interaction? Are there specific strategies or examples where combining both checkpointers and threads has proven particularly effective? I would greatly appreciate insights from those with more experience in this area. |
Beta Was this translation helpful? Give feedback.
-
Any plans to add to the documentation an explanation of the "state' structure once its persisted? e.g: To understand what each of those values here represent :) |
Beta Was this translation helpful? Give feedback.
-
Why we need the |
Beta Was this translation helpful? Give feedback.
-
It should have docs about the way checkpointer work. |
Beta Was this translation helpful? Give feedback.
-
I get this error from get_tuple: CheckpointTuple.new() got an unexpected keyword argument 'pending_writes' and sure enough it doesn't: class CheckpointTuple(NamedTuple): Anybody any ideas? |
Beta Was this translation helpful? Give feedback.
-
Hi folks! We just released a new Postgres checkpointer library -- you can install it with Please let me know if you run into any issues with the library! |
Beta Was this translation helpful? Give feedback.
-
from psycopg_pool import AsyncConnectionPool
async with AsyncConnectionPool(
# Example configuration
conninfo=DB_URI,
max_size=20,
kwargs=connection_kwargs
) as pool, pool.connection() as conn:
checkpointer = AsyncPostgresSaver(conn)
# NOTE: you need to call .setup() the first time you're using your checkpointer
# await checkpointer.setup()
graph = create_react_agent(model, tools=tools, checkpointer=checkpointer)
config = {"configurable": {"thread_id": "4"}}
res = await graph.ainvoke(
{"messages": [("human", "what's the weather in nyc")]}, config
)
checkpoint = await checkpointer.aget(config) Regarding this piece of pool code, the pool has 20 connections available, but the code only takes one connection from the pool and assigns it to Does this mean that an AsyncPostgresSaver will always have only one connection? Additionally, I have a similar question to @GaryFail: how exactly should I compile the graph and the checkpointer in the API? |
Beta Was this translation helpful? Give feedback.
-
Any idea on which implementation works the best for the production environment ..?
|
Beta Was this translation helpful? Give feedback.
-
Hey when I run my agent outside of LangGraph studio it's accessing the DB fine. I've even used ngrok to expose to TCP port. But from the langgraph studio docker container it won't connect. Any ideas? |
Beta Was this translation helpful? Give feedback.
-
i am getting error on imports error:
|
Beta Was this translation helpful? Give feedback.
This comment has been hidden.
This comment has been hidden.
-
Are there any examples that don't use async context managers? The examples are good for simple graphs but creating complex graphs everytime we call a query eats a lot of time. I'm also having connection leaks and errors after running a large amount of queries in a short period |
Beta Was this translation helpful? Give feedback.
-
Why using psycopg3 instead of asyncpg? Is there any motivation behind? |
Beta Was this translation helpful? Give feedback.
-
How can we do memory management with this case, lets say i want the last 3 checkpointers to work as chat_history of the agent, if this cannot be done then how the current implementation works under the hood, Kindly share if someone have some idea? |
Beta Was this translation helpful? Give feedback.
-
Even if I add something to the checkpoint namespace under
In sample code above, the checkpoint with the corrrect threadId and initialised. But checkpoint_ns is still empty in the database. |
Beta Was this translation helpful? Give feedback.
-
Is there any way we can use sqlalchemy |
Beta Was this translation helpful? Give feedback.
-
when i used AsyncPostgresStore ,version 2.0.8
build a graph,and async node中的 await store.age should deadlock ,like this
but if like this no problem
add sleep then no problem
why???
change pool no problem |
Beta Was this translation helpful? Give feedback.
-
How to use Checkpointer if I am using LangGraph Cloud? |
Beta Was this translation helpful? Give feedback.
-
I keep getting the following error in checkpointer.setup(), what could be the problem? InvalidSchemaName: no schema has been selected to create in |
Beta Was this translation helpful? Give feedback.
-
Looks like a lot of documentation is missing. For example a table |
Beta Was this translation helpful? Give feedback.
-
I am using supabase and postgres sql saver. It seems to work ok. My usecase is sales agent that collects orders. In my UI I would like to present the conversation history, so I am planning to create a view in supabase, do you have any suggestions how to do it, as the structure is not clearly documented? |
Beta Was this translation helpful? Give feedback.
-
What’s the best way to do data retention for the Postgres checkpoint save? I want to make sure the data is deleted after 30 days from the Postgres DB. |
Beta Was this translation helpful? Give feedback.
-
How can i get the rephrased question on the basis of user_history from the response, any callbacks or technique to do that? |
Beta Was this translation helpful? Give feedback.
-
Hi! I'm using the PostgresSaver to persist our conversations. The agent is connected to a frontend that retrieves the list of previous messages from the checkpoint associated with each conversation. Currently, we are storing all the messages in the state. We're considering using message trimming or deletion to ensure that our conversation fits within the model's window size. However, this approach would modify the state and, in turn, change how the user sees their conversation history. Questions:
More info can be found here |
Beta Was this translation helpful? Give feedback.
-
Hello everyone! However, after prototyping with a simple graph (chatnode powered by ChatAgent + ToolNode), we now want to build a multi agent system. To get into that, Ive created a simple Agent Team (Supervisor, + 2 Worker Nodes with create_react_agent() Agents). When using the in-RAM MemorySaver() on the outer graph, it works absolutely fine. The Supervisor works and the Workers do their work and are able to recall the previous messages. When trying to use the AsyncPostgresSaver however (meaning I literally just gave the outer graph that one instead of the MemorySaver), the supervisor fires, but after that nothing happens. No Worker is fired, nor do I get any Errors or Exceptions of any kind. Anyone got an Idea what the problem is here? Any and all help is greatly appreciated! async def build_testteam_graph() -> StateGraph:
from langchain_core.messages import HumanMessage
from langchain_anthropic import ChatAnthropic
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from typing import List, Optional, Literal, TypedDict
from langchain_core.language_models.chat_models import BaseChatModel
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
global pool
async with pool.connection() as conn:
# Initialize persistent chat memory
memory = AsyncPostgresSaver(conn)
await memory.setup()
class State(MessagesState):
next: str
def make_supervisor_node(llm: BaseChatModel, members: list[str]) -> str:
options = ["FINISH"] + members
print(options)
system_prompt = (
"You are a supervisor tasked with completing the task from the User below. As the first step, you will assign a task to analyze the request and then based on the returned workplan, assign the next workers from"
f" one of the following workers: {members}. They can write Poems,or analyze user requests accordingly. They will fullfill their task and respond with their"
" respond with the worker to act next. Each worker will perform a"
" results and status. When you think the task is completely fullfilled,"
" respond with FINISH."
)
class Router(TypedDict):
"""Worker to route to next. If no workers needed, route to FINISH."""
next: Literal[*options]
def supervisor_node(state: State) -> Command[Literal[*members, "__end__"]]:
"""An LLM-based router."""
messages = [
{"role": "system", "content": system_prompt},
] + state["messages"]
response = llm.with_structured_output(Router).invoke(messages)
goto = response["next"]
if goto == "FINISH":
goto = END
return Command(goto=goto, update={"next": goto})
return supervisor_node
@tool
def create_poem() -> str:
"""Use this to return the poem. There is only one poem, it is always the same."""
return "ich mag mein haus, das sieht gut aus und in ihm wohnt ne' maus!"
llm = ChatOpenAI(model="gpt-4o", streaming=True, stream_usage=True)
create_poem_agent = create_react_agent(llm, tools=[create_poem], state_modifier="Your task is to create a poem! After that you are done, do not create anything else!", checkpointer=memory)
def create_poem_node(state: State) -> Command[Literal["supervisor"]]:
result = create_poem_agent.invoke(state)
return Command(
update={
"messages": [
HumanMessage(content=result["messages"][-1].content, name="poem_creator")
]
},
# We want our workers to ALWAYS "report back" to the supervisor when done
goto="supervisor",
)
@tool
def analyze_request(list_of_steps: List) -> str:
"""Use this tool to pack the request into a list of tuples of required steps with instructuions and the respective worker from these: [poem_creator, DKR_Searcher].! After that you are done, do not create anything else!"""
return list_of_steps
analyze_request_agent = create_react_agent(llm, tools=[analyze_request], state_modifier=f"Your task is to analyze the request and put it into a list of tuples of required steps with instructuions and the respective worker from these: [poem_creator].! After that you are done, do not create anything else!", checkpointer=memory)
def analyze_request_node(state: State) -> Command[Literal["supervisor"]]:
result = analyze_request_agent.invoke(state)
return Command(
update={
"messages": [
HumanMessage(content=result["messages"][-1].content, name="request_analyzer")
]
},
goto="supervisor",
)
supervisor_node = make_supervisor_node(llm, ["poem_creator", "request_analyzer"])
builder = StateGraph(State)
builder.add_edge(START, "supervisor")
builder.add_node("supervisor", supervisor_node)
builder.add_node("poem_creator", create_poem_node)
builder.add_node("request_analyzer", analyze_request_node)
graph = builder.compile(checkpointer=memory)
from PIL import Image
import io
im = Image.open(io.BytesIO(graph.get_graph().draw_mermaid_png()))
im.show()
return graph |
Beta Was this translation helpful? Give feedback.
-
langgraph/how-tos/persistence_postgres/
Build language agents as graphs
https://langchain-ai.github.io/langgraph/how-tos/persistence_postgres/
Beta Was this translation helpful? Give feedback.
All reactions