Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(platform): Support opening graphs with version and execution id #9332

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions autogpt_platform/backend/backend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ def event():
@test.command()
@click.argument("server_address")
@click.argument("graph_id")
def websocket(server_address: str, graph_id: str):
@click.argument("graph_version")
def websocket(server_address: str, graph_id: str, graph_version: int):
"""
Tests the websocket connection.
"""
Expand All @@ -237,7 +238,9 @@ async def send_message(server_address: str):
try:
msg = WsMessage(
method=Methods.SUBSCRIBE,
data=ExecutionSubscription(graph_id=graph_id).model_dump(),
data=ExecutionSubscription(
graph_id=graph_id, graph_version=graph_version
).model_dump(),
).model_dump_json()
await websocket.send(msg)
print(f"Sending: {msg}")
Expand Down
1 change: 1 addition & 0 deletions autogpt_platform/backend/backend/data/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class GraphExecutionEntry(BaseModel):
user_id: str
graph_exec_id: str
graph_id: str
graph_version: int
start_node_execs: list["NodeExecutionEntry"]


Expand Down
3 changes: 2 additions & 1 deletion autogpt_platform/backend/backend/executor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,9 +791,9 @@ def db_client(self) -> "DatabaseManager":
def add_execution(
self,
graph_id: str,
graph_version: int,
data: BlockInput,
user_id: str,
graph_version: int | None = None,
) -> GraphExecutionEntry:
graph: GraphModel | None = self.db_client.get_graph(
graph_id=graph_id, user_id=user_id, version=graph_version
Expand Down Expand Up @@ -861,6 +861,7 @@ def add_execution(
graph_exec = GraphExecutionEntry(
user_id=user_id,
graph_id=graph_id,
graph_version=graph_version,
graph_exec_id=graph_exec_id,
start_node_execs=starting_node_execs,
)
Expand Down
2 changes: 1 addition & 1 deletion autogpt_platform/backend/backend/executor/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def execute_graph(**kwargs):
try:
log(f"Executing recurring job for graph #{args.graph_id}")
get_execution_client().add_execution(
args.graph_id, args.input_data, args.user_id
args.graph_id, args.graph_version, args.input_data, args.user_id
)
except Exception as e:
logger.exception(f"Error executing graph {args.graph_id}: {e}")
Expand Down
36 changes: 21 additions & 15 deletions autogpt_platform/backend/backend/server/conn_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
class ConnectionManager:
def __init__(self):
self.active_connections: Set[WebSocket] = set()
self.subscriptions: Dict[str, Set[WebSocket]] = {}
self.subscriptions: Dict[int, Set[WebSocket]] = {}

async def connect(self, websocket: WebSocket):
await websocket.accept()
Expand All @@ -20,24 +20,30 @@ def disconnect(self, websocket: WebSocket):
for subscribers in self.subscriptions.values():
subscribers.discard(websocket)

async def subscribe(self, graph_id: str, websocket: WebSocket):
if graph_id not in self.subscriptions:
self.subscriptions[graph_id] = set()
self.subscriptions[graph_id].add(websocket)

async def unsubscribe(self, graph_id: str, websocket: WebSocket):
if graph_id in self.subscriptions:
self.subscriptions[graph_id].discard(websocket)
if not self.subscriptions[graph_id]:
del self.subscriptions[graph_id]
async def subscribe(self, graph_id: str, graph_version: int, websocket: WebSocket):
key = hash((graph_id, graph_version))
if key not in self.subscriptions:
self.subscriptions[key] = set()
self.subscriptions[key].add(websocket)

async def unsubscribe(
self, graph_id: str, graph_version: int, websocket: WebSocket
):
key = hash((graph_id, graph_version))
if key in self.subscriptions:
self.subscriptions[key].discard(websocket)
if not self.subscriptions[key]:
del self.subscriptions[key]

async def send_execution_result(self, result: execution.ExecutionResult):
graph_id = result.graph_id
if graph_id in self.subscriptions:
key = hash((result.graph_id, result.graph_version))
if key in self.subscriptions:
message = WsMessage(
method=Methods.EXECUTION_EVENT,
channel=graph_id,
channel=str(
key
), # todo kcze verify that frontend can handle this (cannot hash the same way)
data=result.model_dump(),
).model_dump_json()
for connection in self.subscriptions[graph_id]:
for connection in self.subscriptions[key]:
await connection.send_text(message)
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,18 @@ def execute_graph_block(


@v1_router.post(
path="/graphs/{graph_id}/execute",
path="/graphs/{graph_id}/execute/{graph_version}",
tags=["graphs"],
)
def execute_graph(
graph_id: str,
graph_version: int,
node_input: dict[Any, Any],
api_key: APIKey = Depends(require_permission(APIKeyPermission.EXECUTE_GRAPH)),
) -> dict[str, Any]:
try:
graph_exec = execution_manager_client().add_execution(
graph_id, node_input, user_id=api_key.user_id
graph_id, graph_version, node_input, user_id=api_key.user_id
)
return {"id": graph_exec.graph_exec_id}
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ async def webhook_ingress_generic(
logger.debug(f"Executing graph #{node.graph_id} node #{node.id}")
executor.add_execution(
node.graph_id,
node.graph_version,
data={f"webhook_{webhook_id}_payload": payload},
user_id=webhook.user_id,
)
Expand Down
7 changes: 3 additions & 4 deletions autogpt_platform/backend/backend/server/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ class WsMessage(pydantic.BaseModel):

class ExecutionSubscription(pydantic.BaseModel):
graph_id: str
graph_version: int


class SubscriptionDetails(pydantic.BaseModel):
event_type: str
channel: str
graph_id: str
class ExecuteGraphResponse(pydantic.BaseModel):
graph_exec_id: str


class CreateGraph(pydantic.BaseModel):
Expand Down
13 changes: 9 additions & 4 deletions autogpt_platform/backend/backend/server/routers/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
CreateAPIKeyRequest,
CreateAPIKeyResponse,
CreateGraph,
ExecuteGraphResponse,
RequestTopUp,
SetGraphActiveVersion,
UpdatePermissionsRequest,
Expand Down Expand Up @@ -430,20 +431,24 @@ def get_credentials(credentials_id: str) -> "Credentials | None":


@v1_router.post(
path="/graphs/{graph_id}/execute",
path="/graphs/{graph_id}/execute/{graph_version}",
tags=["graphs"],
dependencies=[Depends(auth_middleware)],
)
def execute_graph(
graph_id: str,
graph_version: int,
node_input: dict[Any, Any],
user_id: Annotated[str, Depends(get_user_id)],
) -> dict[str, Any]: # FIXME: add proper return type
) -> ExecuteGraphResponse:
try:
graph_exec = execution_manager_client().add_execution(
graph_id, node_input, user_id=user_id
graph_id,
graph_version,
node_input,
user_id,
)
return {"id": graph_exec.graph_exec_id}
return ExecuteGraphResponse(graph_exec_id=graph_exec.graph_exec_id)
except Exception as e:
msg = e.__str__().encode().decode("unicode_escape")
raise HTTPException(status_code=400, detail=msg)
Expand Down
4 changes: 2 additions & 2 deletions autogpt_platform/backend/backend/server/ws_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async def handle_subscribe(
)
else:
ex_sub = ExecutionSubscription.model_validate(message.data)
await manager.subscribe(ex_sub.graph_id, websocket)
await manager.subscribe(ex_sub.graph_id, ex_sub.graph_version, websocket)
logger.debug(f"New execution subscription for graph {ex_sub.graph_id}")
await websocket.send_text(
WsMessage(
Expand All @@ -110,7 +110,7 @@ async def handle_unsubscribe(
)
else:
ex_sub = ExecutionSubscription.model_validate(message.data)
await manager.unsubscribe(ex_sub.graph_id, websocket)
await manager.unsubscribe(ex_sub.graph_id, ex_sub.graph_version, websocket)
logger.debug(f"Removed execution subscription for graph {ex_sub.graph_id}")
await websocket.send_text(
WsMessage(
Expand Down
16 changes: 9 additions & 7 deletions autogpt_platform/backend/test/server/test_con_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,31 @@ def test_disconnect(
connection_manager: ConnectionManager, mock_websocket: AsyncMock
) -> None:
connection_manager.active_connections.add(mock_websocket)
connection_manager.subscriptions["test_graph"] = {mock_websocket}
connection_manager.subscriptions[hash(("test_graph", 1))] = {mock_websocket}

connection_manager.disconnect(mock_websocket)

assert mock_websocket not in connection_manager.active_connections
assert mock_websocket not in connection_manager.subscriptions["test_graph"]
assert (
mock_websocket not in connection_manager.subscriptions[hash(("test_graph", 1))]
)


@pytest.mark.asyncio
async def test_subscribe(
connection_manager: ConnectionManager, mock_websocket: AsyncMock
) -> None:
await connection_manager.subscribe("test_graph", mock_websocket)
assert mock_websocket in connection_manager.subscriptions["test_graph"]
await connection_manager.subscribe("test_graph", 1, mock_websocket)
assert mock_websocket in connection_manager.subscriptions[hash(("test_graph", 1))]


@pytest.mark.asyncio
async def test_unsubscribe(
connection_manager: ConnectionManager, mock_websocket: AsyncMock
) -> None:
connection_manager.subscriptions["test_graph"] = {mock_websocket}
connection_manager.subscriptions[hash(("test_graph", 1))] = {mock_websocket}

await connection_manager.unsubscribe("test_graph", mock_websocket)
await connection_manager.unsubscribe("test_graph", 1, mock_websocket)

assert "test_graph" not in connection_manager.subscriptions

Expand All @@ -65,7 +67,7 @@ async def test_unsubscribe(
async def test_send_execution_result(
connection_manager: ConnectionManager, mock_websocket: AsyncMock
) -> None:
connection_manager.subscriptions["test_graph"] = {mock_websocket}
connection_manager.subscriptions[hash(("test_graph", 1))] = {mock_websocket}
result: ExecutionResult = ExecutionResult(
graph_id="test_graph",
graph_version=1,
Expand Down
2 changes: 2 additions & 0 deletions autogpt_platform/frontend/src/app/build/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export default function Home() {
<FlowEditor
className="flow-container"
flowID={query.get("flowID") ?? undefined}
flowVersion={query.get("flowVersion") ?? undefined}
flowExecutionID={query.get("flowExecutionID") ?? undefined}
/>
);
}
11 changes: 9 additions & 2 deletions autogpt_platform/frontend/src/components/Flow.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ export const FlowContext = createContext<FlowContextType | null>(null);

const FlowEditor: React.FC<{
flowID?: string;
flowVersion?: string;
flowExecutionID?: string;
className?: string;
}> = ({ flowID, className }) => {
}> = ({ flowID, flowVersion, flowExecutionID, className }) => {
const {
addNodes,
addEdges,
Expand Down Expand Up @@ -107,7 +109,12 @@ const FlowEditor: React.FC<{
setNodes,
edges,
setEdges,
} = useAgentGraph(flowID, visualizeBeads !== "no");
} = useAgentGraph(
flowID,
flowVersion ? parseInt(flowVersion) : undefined,
flowExecutionID,
visualizeBeads !== "no",
);

const router = useRouter();
const pathname = usePathname();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export const FlowInfo: React.FC<
setNodes,
edges,
setEdges,
} = useAgentGraph(flow.id, false);
} = useAgentGraph(flow.id, flow.version, undefined, false);

const api = useBackendAPI();
const { toast } = useToast();
Expand Down Expand Up @@ -224,7 +224,7 @@ export const FlowInfo: React.FC<
)}
<Link
className={buttonVariants({ variant: "default" })}
href={`/build?flowID=${flow.id}`}
href={`/build?flowID=${flow.id}&flowVersion=${flow.version}`}
>
<Pencil2Icon className="mr-2" />
Open in Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export const FlowRunInfo: React.FC<
</Button>
<Link
className={buttonVariants({ variant: "default" })}
href={`/build?flowID=${flow.id}`}
href={`/build?flowID=${flow.id}&flowVersion=${execution.graph_version}&flowExecutionID=${execution.execution_id}`} //todo kcze executionID
>
<Pencil2Icon className="mr-2" /> Open in Builder
</Link>
Expand Down
Loading
Loading