-
DescriptionAlso discussed in the langchain server https://discord.com/channels/1038097195422978059/1257970369705545769 When the Example Codefrom typing import TypedDict, Annotated, Optional, Any, AsyncIterator
import asyncio
from langchain_core.runnables.schema import StreamEvent
from langgraph.checkpoint import MemorySaver
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from langchain_core.runnables import Runnable, RunnableConfig
from langgraph.graph import add_messages, StateGraph
checkpointer = MemorySaver()
class State(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
# A class that wraps a compiled graph and injects a configuration
class CompiledGraphProxy(Runnable):
def __init__(self, config: RunnableConfig):
graph_builder = StateGraph(State)
fake_ai_message = AIMessage(content="response")
graph_builder.add_node("llm", lambda state: State(messages=[fake_ai_message]))
graph_builder.set_entry_point("llm")
graph_builder.set_finish_point("llm")
graph = graph_builder.compile(checkpointer=checkpointer)
self.graph = graph
self.config = config
def invoke(
self,
input: State,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> State:
del config
return self.graph.invoke(
input,
config=self.config,
**kwargs,
)
# Comment this method out and no InvalidUpdateError will be raised
def astream(
self,
input: State,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> AsyncIterator[dict[str, Any]]:
del config
return self.graph.astream(
input,
config=self.config,
**kwargs,
)
def astream_events(
self,
input: Any,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]:
del config
return self.graph.astream_events(
input,
config=self.config,
**kwargs,
)
async def main():
init_state = State(messages=[HumanMessage("Hello")])
config = RunnableConfig(configurable={"thread_id": 1})
subgraph = CompiledGraphProxy(config)
result = subgraph.invoke(init_state) # No error is raised
# print(result)
async for event in subgraph.astream_events(init_state, version="v2"):
# print(event)
pass # No error is raised
nested_graph_builder = StateGraph(State)
nested_graph_builder.add_node("subgraph", subgraph)
nested_graph_builder.set_entry_point("subgraph")
nested_graph_builder.set_finish_point("subgraph")
nested_graph = nested_graph_builder.compile(checkpointer=checkpointer)
result = nested_graph.invoke(init_state, config) # No error is raised
print(result)
async for event in nested_graph.astream_events( # InvalidUpdateError
init_state,
config,
version="v2",
# debug=True,
):
# print(event)
pass
if __name__ == "__main__":
asyncio.run(main())
# Stacktrace:
# > async for event in nested_graph.astream_events( # InvalidUpdateError
# init_state,
# config,
# version="v2",
# # debug=True,
# ):
#
# <venv>/lib/python3.11/site-packages/langchain_core/runnables/base.py:1146: in astream_events
# async for event in event_stream:
# <venv>/lib/python3.11/site-packages/langchain_core/tracers/event_stream.py:947: in _astream_events_implementation_v2
# await task
# <venv>/lib/python3.11/site-packages/langchain_core/tracers/event_stream.py:907: in consume_astream
# async for _ in event_streamer.tap_output_aiter(run_id, stream):
# <venv>/lib/python3.11/site-packages/langchain_core/tracers/event_stream.py:153: in tap_output_aiter
# first = await py_anext(output, default=sentinel)
# <venv>/lib/python3.11/site-packages/langchain_core/utils/aiter.py:65: in anext_impl
# return await __anext__(iterator)
# <venv>/lib/python3.11/site-packages/langgraph/pregel/__init__.py:1431: in astream
# _panic_or_proceed(done, inflight, step)
# <venv>/lib/python3.11/site-packages/langgraph/pregel/__init__.py:1643: in _panic_or_proceed
# raise exc
# <venv>/lib/python3.11/site-packages/langgraph/pregel/retry.py:117: in arun_with_retry
# async for _ in task.proc.astream(task.input, task.config):
# <venv>/lib/python3.11/site-packages/langchain_core/runnables/base.py:2908: in astream
# async for chunk in self.atransform(input_aiter(), config, **kwargs):
# <venv>/lib/python3.11/site-packages/langchain_core/runnables/base.py:2891: in atransform
# async for chunk in self._atransform_stream_with_config(
# <venv>/lib/python3.11/site-packages/langchain_core/runnables/base.py:1974: in _atransform_stream_with_config
# chunk: Output = await asyncio.create_task( # type: ignore[call-arg]
# <venv>/lib/python3.11/site-packages/langchain_core/tracers/event_stream.py:153: in tap_output_aiter
# first = await py_anext(output, default=sentinel)
# <venv>/lib/python3.11/site-packages/langchain_core/utils/aiter.py:65: in anext_impl
# return await __anext__(iterator)
# <venv>/lib/python3.11/site-packages/langchain_core/runnables/base.py:2861: in _atransform
# async for output in final_pipeline:
# <venv>/lib/python3.11/site-packages/langchain_core/runnables/base.py:1215: in atransform
# async for output in self.astream(final, config, **kwargs):
# <venv>/lib/python3.11/site-packages/langchain_core/runnables/base.py:828: in astream
# yield await self.ainvoke(input, config, **kwargs)
# <venv>/lib/python3.11/site-packages/langgraph/utils.py:107: in ainvoke
# ret = await self._acall_with_config(
# <venv>/lib/python3.11/site-packages/langchain_core/runnables/base.py:1649: in _acall_with_config
# output: Output = await asyncio.create_task(coro, context=context) # type: ignore
# <venv>/lib/python3.11/site-packages/langgraph/pregel/write.py:141: in _awrite
# self.do_write(
# _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
#
# config = {'callbacks': <langchain_core.callbacks.manager.AsyncCallbackManager object at 0x7f28aa383750>, 'configurable': {'__pr...nggraph_step': 10, 'langgraph_task_idx': 0, 'langgraph_triggers': ['start:subgraph'], ...}, 'recursion_limit': 25, ...}
# values = [('subgraph', 'subgraph'), ('messages', <object object at 0x7f28bd359d60>)]
# require_at_least_one_of = ['messages']
#
# @staticmethod
# def do_write(
# config: RunnableConfig,
# values: List[Tuple[str, Any]],
# require_at_least_one_of: Optional[Sequence[str]] = None,
# ) -> None:
# filtered = [(chan, val) for chan, val in values if val is not SKIP_WRITE]
# if require_at_least_one_of is not None:
# if not {chan for chan, _ in filtered} & set(require_at_least_one_of):
# > raise InvalidUpdateError(
# f"Must write to at least one of {require_at_least_one_of}"
# )
# E langgraph.errors.InvalidUpdateError: Must write to at least one of ['messages']
#
# <venv>/lib/python3.11/site-packages/langgraph/pregel/write.py:157: InvalidUpdateError System InfoReproduced on two systems with identical package information System Information
System Information
Package Information
|
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 2 replies
-
In my real codebase, the InvalidUpdateError is slightly more informational. It seems like for whatever reason one of the node is returning
|
Beta Was this translation helpful? Give feedback.
-
@Glinte looking into this issue. in the meantime, if i understand what you're trying to do with the above example, i believe you can just do this for subgraph instead of your proxy: graph_builder = StateGraph(State)
graph_builder.add_node("llm", lambda state: State(messages=[AIMessage(content="response")]))
graph_builder.set_entry_point("llm")
graph_builder.set_finish_point("llm")
subgraph = graph_builder.compile()
config = {} # whichever config you're using, but probably one that has a thread ID
subgraph = graph_builder.compile(checkpointer=checkpointer).with_config(config) |
Beta Was this translation helpful? Give feedback.
-
if you still want to use proxy, you just need to pass let me know if that helps! |
Beta Was this translation helpful? Give feedback.
if you still want to use proxy, you just need to pass
stream_mode="values"
tograph.astream
. defaultgraph.astream
was instead returning state updates from each node instead of full state, which is why you were running into your issuelet me know if that helps!