Skip to content

Commit

Permalink
feat(platform): Support opening graphs with version and execution id (#…
Browse files Browse the repository at this point in the history
…9332)

Currently it's only possible to open latest graph from monitor and see
the node execution results only when manually running. This PR adds
ability to open running and finished graphs in builder.

### Changes 🏗️

Builder now handles graph version and execution ID in addition to graph
ID when opening a graph. When an execution ID is provided, node
execution results are fetched and subscribed to in real time. This makes
it possible to open a graph that is already executing and see both
existing node execution data and real-time updates (if it's still
running).

- Use graph version and execution id on the builder page and in
`useAgentGraph`
- Use graph version on the `execute_graph` endpoint
- Use graph version on the websockets to distinguish between versions
- Move `formatEdgeID` to utils; it's used in `useAgentGraph.ts` and in
`Flow.tsx`

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Opening finished execution restores node results
- [x] Opening running execution restores results and continues to run
properly
  - [x] Results are separate for each graph across multiple tabs

#### For configuration changes:
- [ ] `.env.example` is updated or already compatible with my changes
- [ ] `docker-compose.yml` is updated or already compatible with my
changes
- [ ] I have included a list of my configuration changes in the PR
description (under **Changes**)

<details>
  <summary>Examples of configuration changes</summary>

  - Changing ports
  - Adding new services that need to communicate with each other
  - Secrets or environment variable changes
  - New or infrastructure changes such as databases
</details>

---------

Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
  • Loading branch information
kcze and majdyz authored Feb 7, 2025
1 parent c693875 commit 1a1fe7c
Show file tree
Hide file tree
Showing 22 changed files with 283 additions and 235 deletions.
7 changes: 5 additions & 2 deletions autogpt_platform/backend/backend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ def event():
@test.command()
@click.argument("server_address")
@click.argument("graph_id")
def websocket(server_address: str, graph_id: str):
@click.argument("graph_version")
def websocket(server_address: str, graph_id: str, graph_version: int):
"""
Tests the websocket connection.
"""
Expand All @@ -237,7 +238,9 @@ async def send_message(server_address: str):
try:
msg = WsMessage(
method=Methods.SUBSCRIBE,
data=ExecutionSubscription(graph_id=graph_id).model_dump(),
data=ExecutionSubscription(
graph_id=graph_id, graph_version=graph_version
).model_dump(),
).model_dump_json()
await websocket.send(msg)
print(f"Sending: {msg}")
Expand Down
1 change: 1 addition & 0 deletions autogpt_platform/backend/backend/data/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class GraphExecutionEntry(BaseModel):
user_id: str
graph_exec_id: str
graph_id: str
graph_version: int
start_node_execs: list["NodeExecutionEntry"]


Expand Down
1 change: 1 addition & 0 deletions autogpt_platform/backend/backend/executor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ def add_execution(
graph_exec = GraphExecutionEntry(
user_id=user_id,
graph_id=graph_id,
graph_version=graph_version or 0,
graph_exec_id=graph_exec_id,
start_node_execs=starting_node_execs,
)
Expand Down
32 changes: 18 additions & 14 deletions autogpt_platform/backend/backend/server/conn_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,28 @@ def disconnect(self, websocket: WebSocket):
for subscribers in self.subscriptions.values():
subscribers.discard(websocket)

async def subscribe(self, graph_id: str, websocket: WebSocket):
if graph_id not in self.subscriptions:
self.subscriptions[graph_id] = set()
self.subscriptions[graph_id].add(websocket)

async def unsubscribe(self, graph_id: str, websocket: WebSocket):
if graph_id in self.subscriptions:
self.subscriptions[graph_id].discard(websocket)
if not self.subscriptions[graph_id]:
del self.subscriptions[graph_id]
async def subscribe(self, graph_id: str, graph_version: int, websocket: WebSocket):
key = f"{graph_id}_{graph_version}"
if key not in self.subscriptions:
self.subscriptions[key] = set()
self.subscriptions[key].add(websocket)

async def unsubscribe(
self, graph_id: str, graph_version: int, websocket: WebSocket
):
key = f"{graph_id}_{graph_version}"
if key in self.subscriptions:
self.subscriptions[key].discard(websocket)
if not self.subscriptions[key]:
del self.subscriptions[key]

async def send_execution_result(self, result: execution.ExecutionResult):
graph_id = result.graph_id
if graph_id in self.subscriptions:
key = f"{result.graph_id}_{result.graph_version}"
if key in self.subscriptions:
message = WsMessage(
method=Methods.EXECUTION_EVENT,
channel=graph_id,
channel=key,
data=result.model_dump(),
).model_dump_json()
for connection in self.subscriptions[graph_id]:
for connection in self.subscriptions[key]:
await connection.send_text(message)
8 changes: 6 additions & 2 deletions autogpt_platform/backend/backend/server/external/routes/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,21 @@ def execute_graph_block(


@v1_router.post(
path="/graphs/{graph_id}/execute",
path="/graphs/{graph_id}/execute/{graph_version}",
tags=["graphs"],
)
def execute_graph(
graph_id: str,
graph_version: int,
node_input: dict[Any, Any],
api_key: APIKey = Depends(require_permission(APIKeyPermission.EXECUTE_GRAPH)),
) -> dict[str, Any]:
try:
graph_exec = execution_manager_client().add_execution(
graph_id, node_input, user_id=api_key.user_id
graph_id,
graph_version=graph_version,
data=node_input,
user_id=api_key.user_id,
)
return {"id": graph_exec.graph_exec_id}
except Exception as e:
Expand Down
7 changes: 3 additions & 4 deletions autogpt_platform/backend/backend/server/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ class WsMessage(pydantic.BaseModel):

class ExecutionSubscription(pydantic.BaseModel):
graph_id: str
graph_version: int


class SubscriptionDetails(pydantic.BaseModel):
event_type: str
channel: str
graph_id: str
class ExecuteGraphResponse(pydantic.BaseModel):
graph_exec_id: str


class CreateGraph(pydantic.BaseModel):
Expand Down
7 changes: 4 additions & 3 deletions autogpt_platform/backend/backend/server/routers/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
CreateAPIKeyRequest,
CreateAPIKeyResponse,
CreateGraph,
ExecuteGraphResponse,
RequestTopUp,
SetGraphActiveVersion,
UpdatePermissionsRequest,
Expand Down Expand Up @@ -491,7 +492,7 @@ def get_credentials(credentials_id: str) -> "Credentials | None":


@v1_router.post(
path="/graphs/{graph_id}/execute",
path="/graphs/{graph_id}/execute/{graph_version}",
tags=["graphs"],
dependencies=[Depends(auth_middleware)],
)
Expand All @@ -500,12 +501,12 @@ def execute_graph(
node_input: dict[Any, Any],
user_id: Annotated[str, Depends(get_user_id)],
graph_version: Optional[int] = None,
) -> dict[str, Any]: # FIXME: add proper return type
) -> ExecuteGraphResponse:
try:
graph_exec = execution_manager_client().add_execution(
graph_id, node_input, user_id=user_id, graph_version=graph_version
)
return {"id": graph_exec.graph_exec_id}
return ExecuteGraphResponse(graph_exec_id=graph_exec.graph_exec_id)
except Exception as e:
msg = e.__str__().encode().decode("unicode_escape")
raise HTTPException(status_code=400, detail=msg)
Expand Down
8 changes: 4 additions & 4 deletions autogpt_platform/backend/backend/server/ws_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ async def handle_subscribe(
)
else:
ex_sub = ExecutionSubscription.model_validate(message.data)
await manager.subscribe(ex_sub.graph_id, websocket)
await manager.subscribe(ex_sub.graph_id, ex_sub.graph_version, websocket)
logger.debug(f"New execution subscription for graph {ex_sub.graph_id}")
await websocket.send_text(
WsMessage(
method=Methods.SUBSCRIBE,
success=True,
channel=ex_sub.graph_id,
channel=f"{ex_sub.graph_id}_{ex_sub.graph_version}",
).model_dump_json()
)

Expand All @@ -110,13 +110,13 @@ async def handle_unsubscribe(
)
else:
ex_sub = ExecutionSubscription.model_validate(message.data)
await manager.unsubscribe(ex_sub.graph_id, websocket)
await manager.unsubscribe(ex_sub.graph_id, ex_sub.graph_version, websocket)
logger.debug(f"Removed execution subscription for graph {ex_sub.graph_id}")
await websocket.send_text(
WsMessage(
method=Methods.UNSUBSCRIBE,
success=True,
channel=ex_sub.graph_id,
channel=f"{ex_sub.graph_id}_{ex_sub.graph_version}",
).model_dump_json()
)

Expand Down
2 changes: 1 addition & 1 deletion autogpt_platform/backend/backend/usecases/block_autogen.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ async def block_autogen_agent():
print(response)
result = await wait_execution(
graph_id=test_graph.id,
graph_exec_id=response["id"],
graph_exec_id=response.graph_exec_id,
timeout=1200,
user_id=test_user.id,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ async def reddit_marketing_agent():
test_graph.id, input_data, test_user.id
)
print(response)
result = await wait_execution(test_user.id, test_graph.id, response["id"], 120)
result = await wait_execution(
test_user.id, test_graph.id, response.graph_exec_id, 120
)
print(result)


Expand Down
4 changes: 3 additions & 1 deletion autogpt_platform/backend/backend/usecases/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ async def sample_agent():
test_graph.id, input_data, test_user.id
)
print(response)
result = await wait_execution(test_user.id, test_graph.id, response["id"], 10)
result = await wait_execution(
test_user.id, test_graph.id, response.graph_exec_id, 10
)
print(result)


Expand Down
2 changes: 1 addition & 1 deletion autogpt_platform/backend/test/executor/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def execute_graph(
graph_version=test_graph.version,
node_input=input_data,
)
graph_exec_id = response["id"]
graph_exec_id = response.graph_exec_id
logger.info(f"Created execution with ID: {graph_exec_id}")

# Execution queue should be empty
Expand Down
16 changes: 8 additions & 8 deletions autogpt_platform/backend/test/server/test_con_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,29 @@ def test_disconnect(
connection_manager: ConnectionManager, mock_websocket: AsyncMock
) -> None:
connection_manager.active_connections.add(mock_websocket)
connection_manager.subscriptions["test_graph"] = {mock_websocket}
connection_manager.subscriptions["test_graph_1"] = {mock_websocket}

connection_manager.disconnect(mock_websocket)

assert mock_websocket not in connection_manager.active_connections
assert mock_websocket not in connection_manager.subscriptions["test_graph"]
assert mock_websocket not in connection_manager.subscriptions["test_graph_1"]


@pytest.mark.asyncio
async def test_subscribe(
connection_manager: ConnectionManager, mock_websocket: AsyncMock
) -> None:
await connection_manager.subscribe("test_graph", mock_websocket)
assert mock_websocket in connection_manager.subscriptions["test_graph"]
await connection_manager.subscribe("test_graph", 1, mock_websocket)
assert mock_websocket in connection_manager.subscriptions["test_graph_1"]


@pytest.mark.asyncio
async def test_unsubscribe(
connection_manager: ConnectionManager, mock_websocket: AsyncMock
) -> None:
connection_manager.subscriptions["test_graph"] = {mock_websocket}
connection_manager.subscriptions["test_graph_1"] = {mock_websocket}

await connection_manager.unsubscribe("test_graph", mock_websocket)
await connection_manager.unsubscribe("test_graph", 1, mock_websocket)

assert "test_graph" not in connection_manager.subscriptions

Expand All @@ -65,7 +65,7 @@ async def test_unsubscribe(
async def test_send_execution_result(
connection_manager: ConnectionManager, mock_websocket: AsyncMock
) -> None:
connection_manager.subscriptions["test_graph"] = {mock_websocket}
connection_manager.subscriptions["test_graph_1"] = {mock_websocket}
result: ExecutionResult = ExecutionResult(
graph_id="test_graph",
graph_version=1,
Expand All @@ -87,7 +87,7 @@ async def test_send_execution_result(
mock_websocket.send_text.assert_called_once_with(
WsMessage(
method=Methods.EXECUTION_EVENT,
channel="test_graph",
channel="test_graph_1",
data=result.model_dump(),
).model_dump_json()
)
Expand Down
22 changes: 14 additions & 8 deletions autogpt_platform/backend/test/server/test_ws_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ async def test_websocket_router_subscribe(
) -> None:
mock_websocket.receive_text.side_effect = [
WsMessage(
method=Methods.SUBSCRIBE, data={"graph_id": "test_graph"}
method=Methods.SUBSCRIBE,
data={"graph_id": "test_graph", "graph_version": 1},
).model_dump_json(),
WebSocketDisconnect(),
]
Expand All @@ -40,7 +41,7 @@ async def test_websocket_router_subscribe(
)

mock_manager.connect.assert_called_once_with(mock_websocket)
mock_manager.subscribe.assert_called_once_with("test_graph", mock_websocket)
mock_manager.subscribe.assert_called_once_with("test_graph", 1, mock_websocket)
mock_websocket.send_text.assert_called_once()
assert '"method":"subscribe"' in mock_websocket.send_text.call_args[0][0]
assert '"success":true' in mock_websocket.send_text.call_args[0][0]
Expand All @@ -53,7 +54,8 @@ async def test_websocket_router_unsubscribe(
) -> None:
mock_websocket.receive_text.side_effect = [
WsMessage(
method=Methods.UNSUBSCRIBE, data={"graph_id": "test_graph"}
method=Methods.UNSUBSCRIBE,
data={"graph_id": "test_graph", "graph_version": 1},
).model_dump_json(),
WebSocketDisconnect(),
]
Expand All @@ -63,7 +65,7 @@ async def test_websocket_router_unsubscribe(
)

mock_manager.connect.assert_called_once_with(mock_websocket)
mock_manager.unsubscribe.assert_called_once_with("test_graph", mock_websocket)
mock_manager.unsubscribe.assert_called_once_with("test_graph", 1, mock_websocket)
mock_websocket.send_text.assert_called_once()
assert '"method":"unsubscribe"' in mock_websocket.send_text.call_args[0][0]
assert '"success":true' in mock_websocket.send_text.call_args[0][0]
Expand Down Expand Up @@ -94,13 +96,15 @@ async def test_websocket_router_invalid_method(
async def test_handle_subscribe_success(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
message = WsMessage(method=Methods.SUBSCRIBE, data={"graph_id": "test_graph"})
message = WsMessage(
method=Methods.SUBSCRIBE, data={"graph_id": "test_graph", "graph_version": 1}
)

await handle_subscribe(
cast(WebSocket, mock_websocket), cast(ConnectionManager, mock_manager), message
)

mock_manager.subscribe.assert_called_once_with("test_graph", mock_websocket)
mock_manager.subscribe.assert_called_once_with("test_graph", 1, mock_websocket)
mock_websocket.send_text.assert_called_once()
assert '"method":"subscribe"' in mock_websocket.send_text.call_args[0][0]
assert '"success":true' in mock_websocket.send_text.call_args[0][0]
Expand All @@ -126,13 +130,15 @@ async def test_handle_subscribe_missing_data(
async def test_handle_unsubscribe_success(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
message = WsMessage(method=Methods.UNSUBSCRIBE, data={"graph_id": "test_graph"})
message = WsMessage(
method=Methods.UNSUBSCRIBE, data={"graph_id": "test_graph", "graph_version": 1}
)

await handle_unsubscribe(
cast(WebSocket, mock_websocket), cast(ConnectionManager, mock_manager), message
)

mock_manager.unsubscribe.assert_called_once_with("test_graph", mock_websocket)
mock_manager.unsubscribe.assert_called_once_with("test_graph", 1, mock_websocket)
mock_websocket.send_text.assert_called_once()
assert '"method":"unsubscribe"' in mock_websocket.send_text.call_args[0][0]
assert '"success":true' in mock_websocket.send_text.call_args[0][0]
Expand Down
1 change: 1 addition & 0 deletions autogpt_platform/frontend/src/app/build/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export default function Home() {
<FlowEditor
className="flow-container"
flowID={query.get("flowID") ?? undefined}
flowVersion={query.get("flowVersion") ?? undefined}
/>
);
}
Loading

0 comments on commit 1a1fe7c

Please sign in to comment.