Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions python-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,117 @@ class APINode(BaseNode):
- **Types**: Common secret types include API keys, database credentials, encryption keys, and authentication tokens
- **Injection**: Secrets are injected by the Runtime at execution time, so you don't need to handle them manually

## State Management

The SDK provides a `StateManager` class for programmatically triggering graph executions and managing workflow states. This is useful for integrating ExosphereHost workflows into existing applications or for building custom orchestration logic.

### StateManager Class

The `StateManager` class allows you to trigger graph executions with custom trigger states. It handles authentication and communication with the ExosphereHost state manager service.

#### Initialization

```python
from exospherehost import StateManager

# Initialize with explicit configuration
state_manager = StateManager(
namespace="MyProject",
state_manager_uri="https://your-state-manager.exosphere.host",
key="your-api-key",
state_manager_version="v0"
)

# Or initialize with environment variables
state_manager = StateManager(namespace="MyProject")
```

**Parameters:**
- `namespace` (str): The namespace for your project
- `state_manager_uri` (str, optional): The URI of the state manager service. If not provided, reads from `EXOSPHERE_STATE_MANAGER_URI` environment variable
- `key` (str, optional): Your API key. If not provided, reads from `EXOSPHERE_API_KEY` environment variable
- `state_manager_version` (str): The API version to use (default: "v0")

#### Triggering Graph Execution

```python
from exospherehost import StateManager, TriggerState

# Create a single trigger state
trigger_state = TriggerState(
identifier="user-login",
inputs={
"user_id": "12345",
"session_token": "abc123def456",
"timestamp": "2024-01-15T10:30:00Z"
}
)

# Trigger a single state
result = await state_manager.trigger("my-graph", state=trigger_state)

# Or trigger multiple states
trigger_states = [
TriggerState(identifier="trigger1", inputs={"key1": "value1"}),
TriggerState(identifier="trigger2", inputs={"key2": "value2"})
]

result = await state_manager.trigger("my-graph", states=trigger_states)
```

**Parameters:**
- `graph_name` (str): The name of the graph to trigger
- `state` (TriggerState, optional): A single trigger state
- `states` (list[TriggerState], optional): A list of trigger states

**Returns:**
- `dict`: The JSON response from the state manager API

**Raises:**
- `ValueError`: If neither `state` nor `states` is provided, if both are provided, or if `states` is an empty list
- `Exception`: If the API request fails with a non-200 status code

### TriggerState Class

The `TriggerState` class represents a trigger state for graph execution. It contains an identifier and a set of input parameters that will be passed to the graph when it is triggered.

#### Creating Trigger States

```python
from exospherehost import TriggerState

# Basic trigger state
trigger_state = TriggerState(
identifier="data-processing",
inputs={
"file_path": "/path/to/data.csv",
"batch_size": "1000",
"priority": "high"
}
)

# Trigger state with complex data (serialized as JSON)
import json

complex_data = {
"filters": ["active", "verified"],
"date_range": {"start": "2024-01-01", "end": "2024-01-31"},
"options": {"include_metadata": True, "format": "json"}
}

trigger_state = TriggerState(
identifier="complex-processing",
inputs={
"config": json.dumps(complex_data),
"user_id": "12345"
}
)
```

**Attributes:**
- `identifier` (str): A unique identifier for this trigger state. Used to distinguish between different trigger states and may be used by the graph to determine how to process the trigger
- `inputs` (dict[str, str]): A dictionary of input parameters that will be passed to the graph. The keys are parameter names and values are parameter values, both as strings

## Integration with ExosphereHost Platform

The Python SDK integrates seamlessly with the ExosphereHost platform, providing:
Expand Down
3 changes: 2 additions & 1 deletion python-sdk/exospherehost/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ async def execute(self, inputs: Inputs) -> Outputs:
from ._version import version as __version__
from .runtime import Runtime
from .node.BaseNode import BaseNode
from .statemanager import StateManager, TriggerState

VERSION = __version__

__all__ = ["Runtime", "BaseNode", "VERSION"]
__all__ = ["Runtime", "BaseNode", "StateManager", "TriggerState", "VERSION"]
2 changes: 1 addition & 1 deletion python-sdk/exospherehost/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.0.7b5"
version = "0.0.7b6"
123 changes: 123 additions & 0 deletions python-sdk/exospherehost/statemanager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import os
import aiohttp
from pydantic import BaseModel


class TriggerState(BaseModel):
"""
Represents a trigger state for graph execution.

A trigger state contains an identifier and a set of input parameters that
will be passed to the graph when it is triggered for execution.

Attributes:
identifier (str): A unique identifier for this trigger state. This is used
to distinguish between different trigger states and may be used by the
graph to determine how to process the trigger.
inputs (dict[str, str]): A dictionary of input parameters that will be
passed to the graph. The keys are parameter names and values are
parameter values, both as strings.

Example:
```python
# Create a trigger state with identifier and inputs
trigger_state = TriggerState(
identifier="user-login",
inputs={
"user_id": "12345",
"session_token": "abc123def456",
"timestamp": "2024-01-15T10:30:00Z"
}
)
```
"""
identifier: str
inputs: dict[str, str]


class StateManager:

def __init__(self, namespace: str, state_manager_uri: str | None = None, key: str | None = None, state_manager_version: str = "v0"):
self._state_manager_uri = state_manager_uri
self._key = key
self._state_manager_version = state_manager_version
self._namespace = namespace

self._set_config_from_env()

def _set_config_from_env(self):
"""
Set configuration from environment variables if not provided.
"""
if self._state_manager_uri is None:
self._state_manager_uri = os.environ.get("EXOSPHERE_STATE_MANAGER_URI")
if self._key is None:
self._key = os.environ.get("EXOSPHERE_API_KEY")
Comment on lines +48 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The configuration values _state_manager_uri and _key are fetched from environment variables if not provided, but there's no check to ensure they are actually set. This can lead to runtime errors later when API calls are made. It's crucial to validate that these have values after attempting to load from the environment, similar to how it's done in the Runtime class.

Suggested change
def _set_config_from_env(self):
"""
Set configuration from environment variables if not provided.
"""
if self._state_manager_uri is None:
self._state_manager_uri = os.environ.get("EXOSPHERE_STATE_MANAGER_URI")
if self._key is None:
self._key = os.environ.get("EXOSPHERE_API_KEY")
def _set_config_from_env(self):
"""
Set configuration from environment variables if not provided.
"""
if self._state_manager_uri is None:
self._state_manager_uri = os.environ.get("EXOSPHERE_STATE_MANAGER_URI")
if self._key is None:
self._key = os.environ.get("EXOSPHERE_API_KEY")
if self._state_manager_uri is None:
raise ValueError("State manager URI is not set. Provide it as an argument or set the EXOSPHERE_STATE_MANAGER_URI environment variable.")
if self._key is None:
raise ValueError("API key is not set. Provide it as an argument or set the EXOSPHERE_API_KEY environment variable.")


def _get_trigger_state_endpoint(self, graph_name: str):
return f"{self._state_manager_uri}/{self._state_manager_version}/namespace/{self._namespace}/graph/{graph_name}/trigger"

async def trigger(self, graph_name: str, state: TriggerState | None = None, states: list[TriggerState] | None = None):
"""
Trigger a graph execution with one or more trigger states.

This method sends trigger states to the specified graph endpoint to initiate
graph execution. It accepts either a single trigger state or a list of trigger
states, but not both simultaneously.

Args:
graph_name (str): The name of the graph to trigger execution for.
state (TriggerState | None, optional): A single trigger state to send.
Must be provided if `states` is None.
states (list[TriggerState] | None, optional): A list of trigger states to send.
Must be provided if `state` is None. Cannot be an empty list.

Returns:
dict: The JSON response from the state manager API containing the
result of the trigger operation.

Raises:
ValueError: If neither `state` nor `states` is provided, if both are provided,
or if `states` is an empty list.
Exception: If the API request fails with a non-200 status code. The exception
message includes the HTTP status code and response text for debugging.

Example:
```python
# Trigger with a single state
state = TriggerState(identifier="my-trigger", inputs={"key": "value"})
result = await state_manager.trigger("my-graph", state=state)

# Trigger with multiple states
states = [
TriggerState(identifier="trigger1", inputs={"key1": "value1"}),
TriggerState(identifier="trigger2", inputs={"key2": "value2"})
]
result = await state_manager.trigger("my-graph", states=states)
```
"""
if state is None and states is None:
raise ValueError("Either state or states must be provided")
if state is not None and states is not None:
raise ValueError("Only one of state or states must be provided")
if states is not None and len(states) == 0:
raise ValueError("States must be a non-empty list")

states_list = []
if state is not None:
states_list.append(state)
if states is not None:
states_list.extend(states)
Comment on lines +106 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for constructing states_list is correct but can be expressed more concisely using a conditional expression. This improves readability.

Suggested change
states_list = []
if state is not None:
states_list.append(state)
if states is not None:
states_list.extend(states)
states_list = [state] if state is not None else states


body = {
"states": [state.model_dump() for state in states_list]
}
headers = {
"x-api-key": self._key
}
endpoint = self._get_trigger_state_endpoint(graph_name)
async with aiohttp.ClientSession() as session:
async with session.post(endpoint, json=body, headers=headers) as response: # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The # type: ignore comment suppresses a potential type error. The validation I suggested for _state_manager_uri and _key should ensure they are not None, which will likely resolve the underlying type issue for the endpoint and headers parameters. Once the validation is in place, please check if this type: ignore can be removed.

if response.status != 200:
raise Exception(f"Failed to trigger state: {response.status} {await response.text()}")
return await response.json()