diff --git a/python-sdk/README.md b/python-sdk/README.md index be6e0fd1..7df6c3c2 100644 --- a/python-sdk/README.md +++ b/python-sdk/README.md @@ -195,6 +195,117 @@ class APINode(BaseNode): - **Types**: Common secret types include API keys, database credentials, encryption keys, and authentication tokens - **Injection**: Secrets are injected by the Runtime at execution time, so you don't need to handle them manually +## State Management + +The SDK provides a `StateManager` class for programmatically triggering graph executions and managing workflow states. This is useful for integrating ExosphereHost workflows into existing applications or for building custom orchestration logic. + +### StateManager Class + +The `StateManager` class allows you to trigger graph executions with custom trigger states. It handles authentication and communication with the ExosphereHost state manager service. + +#### Initialization + +```python +from exospherehost import StateManager + +# Initialize with explicit configuration +state_manager = StateManager( + namespace="MyProject", + state_manager_uri="https://your-state-manager.exosphere.host", + key="your-api-key", + state_manager_version="v0" +) + +# Or initialize with environment variables +state_manager = StateManager(namespace="MyProject") +``` + +**Parameters:** +- `namespace` (str): The namespace for your project +- `state_manager_uri` (str, optional): The URI of the state manager service. If not provided, reads from `EXOSPHERE_STATE_MANAGER_URI` environment variable +- `key` (str, optional): Your API key. If not provided, reads from `EXOSPHERE_API_KEY` environment variable +- `state_manager_version` (str): The API version to use (default: "v0") + +#### Triggering Graph Execution + +```python +from exospherehost import StateManager, TriggerState + +# Create a single trigger state +trigger_state = TriggerState( + identifier="user-login", + inputs={ + "user_id": "12345", + "session_token": "abc123def456", + "timestamp": "2024-01-15T10:30:00Z" + } +) + +# Trigger a single state +result = await state_manager.trigger("my-graph", state=trigger_state) + +# Or trigger multiple states +trigger_states = [ + TriggerState(identifier="trigger1", inputs={"key1": "value1"}), + TriggerState(identifier="trigger2", inputs={"key2": "value2"}) +] + +result = await state_manager.trigger("my-graph", states=trigger_states) +``` + +**Parameters:** +- `graph_name` (str): The name of the graph to trigger +- `state` (TriggerState, optional): A single trigger state +- `states` (list[TriggerState], optional): A list of trigger states + +**Returns:** +- `dict`: The JSON response from the state manager API + +**Raises:** +- `ValueError`: If neither `state` nor `states` is provided, if both are provided, or if `states` is an empty list +- `Exception`: If the API request fails with a non-200 status code + +### TriggerState Class + +The `TriggerState` class represents a trigger state for graph execution. It contains an identifier and a set of input parameters that will be passed to the graph when it is triggered. + +#### Creating Trigger States + +```python +from exospherehost import TriggerState + +# Basic trigger state +trigger_state = TriggerState( + identifier="data-processing", + inputs={ + "file_path": "/path/to/data.csv", + "batch_size": "1000", + "priority": "high" + } +) + +# Trigger state with complex data (serialized as JSON) +import json + +complex_data = { + "filters": ["active", "verified"], + "date_range": {"start": "2024-01-01", "end": "2024-01-31"}, + "options": {"include_metadata": True, "format": "json"} +} + +trigger_state = TriggerState( + identifier="complex-processing", + inputs={ + "config": json.dumps(complex_data), + "user_id": "12345" + } +) +``` + +**Attributes:** +- `identifier` (str): A unique identifier for this trigger state. Used to distinguish between different trigger states and may be used by the graph to determine how to process the trigger +- `inputs` (dict[str, str]): A dictionary of input parameters that will be passed to the graph. The keys are parameter names and values are parameter values, both as strings + ## Integration with ExosphereHost Platform The Python SDK integrates seamlessly with the ExosphereHost platform, providing: diff --git a/python-sdk/exospherehost/__init__.py b/python-sdk/exospherehost/__init__.py index 412aa05a..777181b9 100644 --- a/python-sdk/exospherehost/__init__.py +++ b/python-sdk/exospherehost/__init__.py @@ -37,7 +37,8 @@ async def execute(self, inputs: Inputs) -> Outputs: from ._version import version as __version__ from .runtime import Runtime from .node.BaseNode import BaseNode +from .statemanager import StateManager, TriggerState VERSION = __version__ -__all__ = ["Runtime", "BaseNode", "VERSION"] \ No newline at end of file +__all__ = ["Runtime", "BaseNode", "StateManager", "TriggerState", "VERSION"] diff --git a/python-sdk/exospherehost/_version.py b/python-sdk/exospherehost/_version.py index 8174f18d..de14f2ae 100644 --- a/python-sdk/exospherehost/_version.py +++ b/python-sdk/exospherehost/_version.py @@ -1 +1 @@ -version = "0.0.7b5" \ No newline at end of file +version = "0.0.7b6" \ No newline at end of file diff --git a/python-sdk/exospherehost/statemanager.py b/python-sdk/exospherehost/statemanager.py new file mode 100644 index 00000000..9a935d3d --- /dev/null +++ b/python-sdk/exospherehost/statemanager.py @@ -0,0 +1,123 @@ +import os +import aiohttp +from pydantic import BaseModel + + +class TriggerState(BaseModel): + """ + Represents a trigger state for graph execution. + + A trigger state contains an identifier and a set of input parameters that + will be passed to the graph when it is triggered for execution. + + Attributes: + identifier (str): A unique identifier for this trigger state. This is used + to distinguish between different trigger states and may be used by the + graph to determine how to process the trigger. + inputs (dict[str, str]): A dictionary of input parameters that will be + passed to the graph. The keys are parameter names and values are + parameter values, both as strings. + + Example: + ```python + # Create a trigger state with identifier and inputs + trigger_state = TriggerState( + identifier="user-login", + inputs={ + "user_id": "12345", + "session_token": "abc123def456", + "timestamp": "2024-01-15T10:30:00Z" + } + ) + ``` + """ + identifier: str + inputs: dict[str, str] + + +class StateManager: + + def __init__(self, namespace: str, state_manager_uri: str | None = None, key: str | None = None, state_manager_version: str = "v0"): + self._state_manager_uri = state_manager_uri + self._key = key + self._state_manager_version = state_manager_version + self._namespace = namespace + + self._set_config_from_env() + + def _set_config_from_env(self): + """ + Set configuration from environment variables if not provided. + """ + if self._state_manager_uri is None: + self._state_manager_uri = os.environ.get("EXOSPHERE_STATE_MANAGER_URI") + if self._key is None: + self._key = os.environ.get("EXOSPHERE_API_KEY") + + def _get_trigger_state_endpoint(self, graph_name: str): + return f"{self._state_manager_uri}/{self._state_manager_version}/namespace/{self._namespace}/graph/{graph_name}/trigger" + + async def trigger(self, graph_name: str, state: TriggerState | None = None, states: list[TriggerState] | None = None): + """ + Trigger a graph execution with one or more trigger states. + + This method sends trigger states to the specified graph endpoint to initiate + graph execution. It accepts either a single trigger state or a list of trigger + states, but not both simultaneously. + + Args: + graph_name (str): The name of the graph to trigger execution for. + state (TriggerState | None, optional): A single trigger state to send. + Must be provided if `states` is None. + states (list[TriggerState] | None, optional): A list of trigger states to send. + Must be provided if `state` is None. Cannot be an empty list. + + Returns: + dict: The JSON response from the state manager API containing the + result of the trigger operation. + + Raises: + ValueError: If neither `state` nor `states` is provided, if both are provided, + or if `states` is an empty list. + Exception: If the API request fails with a non-200 status code. The exception + message includes the HTTP status code and response text for debugging. + + Example: + ```python + # Trigger with a single state + state = TriggerState(identifier="my-trigger", inputs={"key": "value"}) + result = await state_manager.trigger("my-graph", state=state) + + # Trigger with multiple states + states = [ + TriggerState(identifier="trigger1", inputs={"key1": "value1"}), + TriggerState(identifier="trigger2", inputs={"key2": "value2"}) + ] + result = await state_manager.trigger("my-graph", states=states) + ``` + """ + if state is None and states is None: + raise ValueError("Either state or states must be provided") + if state is not None and states is not None: + raise ValueError("Only one of state or states must be provided") + if states is not None and len(states) == 0: + raise ValueError("States must be a non-empty list") + + states_list = [] + if state is not None: + states_list.append(state) + if states is not None: + states_list.extend(states) + + body = { + "states": [state.model_dump() for state in states_list] + } + headers = { + "x-api-key": self._key + } + endpoint = self._get_trigger_state_endpoint(graph_name) + async with aiohttp.ClientSession() as session: + async with session.post(endpoint, json=body, headers=headers) as response: # type: ignore + if response.status != 200: + raise Exception(f"Failed to trigger state: {response.status} {await response.text()}") + return await response.json()