Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python-sdk/exospherehost/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.0.7b6"
version = "0.0.7b7"
104 changes: 104 additions & 0 deletions python-sdk/exospherehost/statemanager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import os
import aiohttp
import asyncio
import time

from typing import Any
from pydantic import BaseModel


Expand Down Expand Up @@ -56,6 +60,12 @@ def _set_config_from_env(self):

def _get_trigger_state_endpoint(self, graph_name: str):
return f"{self._state_manager_uri}/{self._state_manager_version}/namespace/{self._namespace}/graph/{graph_name}/trigger"

def _get_upsert_graph_endpoint(self, graph_name: str):
return f"{self._state_manager_uri}/{self._state_manager_version}/namespace/{self._namespace}/graph/{graph_name}"

def _get_get_graph_endpoint(self, graph_name: str):
return f"{self._state_manager_uri}/{self._state_manager_version}/namespace/{self._namespace}/graph/{graph_name}"

async def trigger(self, graph_name: str, state: TriggerState | None = None, states: list[TriggerState] | None = None):
"""
Expand Down Expand Up @@ -121,3 +131,97 @@ async def trigger(self, graph_name: str, state: TriggerState | None = None, stat
if response.status != 200:
raise Exception(f"Failed to trigger state: {response.status} {await response.text()}")
return await response.json()

async def get_graph(self, graph_name: str):
"""
Retrieve information about a specific graph from the state manager.

This method fetches the current state and configuration of a graph,
including its validation status, nodes, and any validation errors.

Args:
graph_name (str): The name of the graph to retrieve.

Returns:
dict: The JSON response from the state manager API containing the
graph information, including validation status, nodes, and errors.

Raises:
Exception: If the API request fails with a non-200 status code. The exception
message includes the HTTP status code and response text for debugging.

Example:
```python
# Get information about a specific graph
graph_info = await state_manager.get_graph("my-workflow-graph")
print(f"Graph status: {graph_info['validation_status']}")
```
"""
endpoint = self._get_get_graph_endpoint(graph_name)
headers = {
"x-api-key": self._key
}
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, headers=headers) as response: # type: ignore
if response.status != 200:
raise Exception(f"Failed to get graph: {response.status} {await response.text()}")
return await response.json()

async def upsert_graph(self, graph_name: str, graph_nodes: list[dict[str, Any]], secrets: dict[str, str], validation_timeout: int = 60, polling_interval: int = 1):
"""
Create or update a graph in the state manager with validation.

This method sends a graph definition to the state manager API for creation
or update. After submission, it polls the API to wait for graph validation
to complete, ensuring the graph is properly configured before returning.

Args:
graph_name (str): The name of the graph to create or update.
graph_nodes (list[dict[str, Any]]): A list of node definitions that make up
the graph. Each node should contain the necessary configuration for
the graph execution engine.
secrets (dict[str, str]): A dictionary of secret values that will be
available to the graph during execution. Keys are secret names and
values are the secret values.
validation_timeout (int, optional): Maximum time in seconds to wait for
graph validation to complete. Defaults to 60.
polling_interval (int, optional): Time in seconds between validation
status checks. Defaults to 1.

Returns:
dict: The JSON response from the state manager API containing the
validated graph information.

Raises:
Exception: If the API request fails with a non-201 status code, if graph
validation times out, or if validation fails. The exception message
includes relevant error details for debugging.
"""
endpoint = self._get_upsert_graph_endpoint(graph_name)
headers = {
"x-api-key": self._key
}
body = {
"secrets": secrets,
"nodes": graph_nodes
}
async with aiohttp.ClientSession() as session:
async with session.put(endpoint, json=body, headers=headers) as response: # type: ignore
if response.status not in [200, 201]:
raise Exception(f"Failed to upsert graph: {response.status} {await response.text()}")
graph = await response.json()

validation_state = graph["validation_status"]

start_time = time.monotonic()
while validation_state == "PENDING":
if time.monotonic() - start_time > validation_timeout:
raise Exception(f"Graph validation check timed out after {validation_timeout} seconds")
await asyncio.sleep(polling_interval)
graph = await self.get_graph(graph_name)
validation_state = graph["validation_status"]

if validation_state != "VALID":
raise Exception(f"Graph validation failed: {graph['validation_status']} and errors: {graph['validation_errors']}")

return graph
Loading
Loading