-
toy example: class PreSchema(TypedDict):
inp: str
class PostSchema(TypedDict):
work: Annotated[List[str], add]
tasks: List[str]
outp: str
class MesoSchema(TypedDict):
via: List[ChatMessage]
def input(state: PreSchema):
return {"tasks": state["inp"].split(", ")}
def output(state: PostSchema):
return {"outp": "-> ".join(state["work"])}
def continue_to_intermezzo(state: PostSchema):
return [Send("busy_beaver",
{
"via": [ChatMessage(content=s, role="instructor")],
"context": state["tasks"]
}) \
for s in state["tasks"]]
def worker(state: MesoSchema):
return {"work": [str(int(state["via"][-1].content)*2)]}
graph_builder = StateGraph(input=PreSchema, output=PostSchema)
graph_builder.add_node("input", input)
graph_builder.add_node("output", output)
graph_builder.add_node("busy_beaver", worker)
graph_builder.add_edge(START, "input")
graph_builder.add_conditional_edges("input", continue_to_intermezzo, ["busy_beaver"])
graph_builder.add_edge("busy_beaver", "output")
graph_builder.add_edge("output", END)
graph = graph_builder.compile()
for s in graph.stream({"inp": "1, 2, 3, 4"}):
print(s) outputs:
Is there some equivalent that lets us dynamically build to the node in serial rather than parallel -- so that work(B) can benefit from the results of work(A) ? In the toy example, I'd like to add the previous_value, or 0, to the task value, so I'd get a result like "1 -> 3 -> 6 -> 10". Clearly I could replace the conditional edge with a direct edge and do the work directly in this graph, but that would tightly couple this graph to that work. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
I see that my approach at first was lacking .. What I can do is instead of passing worker in as a node from the current graph, I can make the worker a separate graph, like: class PreSchema(TypedDict):
inp: str
class PostSchema(TypedDict):
work: List[str]
tasks: List[str]
outp: str
class MesoSchema(TypedDict):
via: List[ChatMessage]
context: List[str]
work: str
def input(state: PreSchema):
return {"tasks": state["inp"].split(", ")}
def output(state: PostSchema):
return {"outp": "-> ".join(state["work"])}
def worker(state: MesoSchema):
curr = int(state["via"][-1].content)
prev = int(state["context"][-1] if state["context"] else 0)
return {"work": str(curr + prev)}
worker_builder = StateGraph(MesoSchema)
worker_builder.add_node("worker", worker)
worker_builder.add_edge(START, "worker")
worker_builder.add_edge("worker", END)
worker_graph = worker_builder.compile()
def scheduler(state: PostSchema):
context = []
output = []
for work in state["tasks"]:
task = {
"via": [ChatMessage(content=work, role="instructor")],
"context": context
}
result = worker_graph.invoke(task)
context.append(result["work"])
output.append(result["work"])
return {"work": output}
graph_builder = StateGraph(input=PreSchema, output=PostSchema)
graph_builder.add_node("input", input)
graph_builder.add_node("output", output)
graph_builder.add_node("busy_beaver", scheduler)
graph_builder.add_edge(START, "input")
graph_builder.add_edge("input", "busy_beaver")
graph_builder.add_edge("busy_beaver", "output")
graph_builder.add_edge("output", END)
graph = graph_builder.compile()
for s in graph.stream({"inp": "1, 2, 3, 4"}):
print(s)
This almost achieves the desired result. If I could add the worker_graph to the graph as a node "worker", and have the scheduler call "worker" by name, that would fully decouple the logic inside of the scheduler from the solver. I guess that isn't possible though, so I will just mark this as the answer and require a factory_function to construct the scheduler |
Beta Was this translation helpful? Give feedback.
I see that my approach at first was lacking .. What I can do is instead of passing worker in as a node from the current graph, I can make the worker a separate graph, like: