Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/docs/exosphere/create-graph.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async def create_graph_template():
}

try:
# Create or update the graph template
# Create or update the graph template (with optional store, beta)
result = await state_manager.upsert_graph(
graph_name="my-workflow",
graph_nodes=graph_nodes,
Expand All @@ -186,6 +186,9 @@ async def create_graph_template():
"strategy": "EXPONENTIAL",
"backoff_factor": 2000,
"exponent": 2
},
store_config={ # beta
"ttl": 7200 # seconds to keep key/values
}
)
print("Graph template created successfully!")
Expand Down
20 changes: 7 additions & 13 deletions docs/docs/exosphere/trigger-graph.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,14 @@ The recommended way to trigger graphs is using the Exosphere Python SDK, which p
state_manager_uri=EXOSPHERE_STATE_MANAGER_URI,
key=EXOSPHERE_API_KEY
)

# Create trigger state
trigger_state = TriggerState(
identifier="data_loader", # Must match a node identifier in your graph
inputs={
"source": "/path/to/data.csv",
"format": "csv",
"batch_size": "1000"
}
)


try:
# Trigger the graph
result = await state_manager.trigger("my-graph", state=trigger_state)
# Trigger the graph with optional store (beta)
result = await state_manager.trigger(
"my-graph",
inputs={"user_id": "123"},
store={"cursor": "0"} # persisted across nodes (beta)
)
print(f"Graph triggered successfully!")
print(f"Run ID: {result['run_id']}")
return result
Expand Down
99 changes: 17 additions & 82 deletions python-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export EXOSPHERE_API_KEY="your-api-key"
- **Async Support**: Native async/await support for high-performance operations
- **Error Handling**: Built-in retry mechanisms and error recovery
- **Scalability**: Designed for high-volume batch processing and workflows
- **Graph Store (beta)**: Strings-only key-value store with per-run scope for sharing data across nodes (not durable across separate runs or clusters)

## Architecture

Expand Down Expand Up @@ -241,95 +242,29 @@ trigger_state = TriggerState(
}
)

# Trigger a single state
result = await state_manager.trigger("my-graph", state=trigger_state)

# Or trigger multiple states
trigger_states = [
TriggerState(identifier="trigger1", inputs={"key1": "value1"}),
TriggerState(identifier="trigger2", inputs={"key2": "value2"})
]

result = await state_manager.trigger("my-graph", states=trigger_states)
```

**Parameters:**
- `graph_name` (str): The name of the graph to trigger
- `state` (TriggerState, optional): A single trigger state
- `states` (list[TriggerState], optional): A list of trigger states

**Returns:**
- `dict`: The JSON response from the state manager API

**Raises:**
- `ValueError`: If neither `state` nor `states` is provided, if both are provided, or if `states` is an empty list
- `Exception`: If the API request fails with a non-200 status code

### TriggerState Class

The `TriggerState` class represents a trigger state for graph execution. It contains an identifier and a set of input parameters that will be passed to the graph when it is triggered.

#### Creating Trigger States

```python
from exospherehost import TriggerState

# Basic trigger state
trigger_state = TriggerState(
identifier="data-processing",
inputs={
"file_path": "/path/to/data.csv",
"batch_size": "1000",
"priority": "high"
}
)

# Trigger state with complex data (serialized as JSON)
import json

complex_data = {
"filters": ["active", "verified"],
"date_range": {"start": "2024-01-01", "end": "2024-01-31"},
"options": {"include_metadata": True, "format": "json"}
}

trigger_state = TriggerState(
identifier="complex-processing",
# Trigger the graph (beta store support)
result = await state_manager.trigger(
"my-graph",
inputs={
"config": json.dumps(complex_data),
"user_id": "12345"
"user_id": "12345",
"session_token": "abc123def456"
},
store={
"cursor": "0" # persisted across nodes (beta)
}
)
```

**Attributes:**
- `identifier` (str): A unique identifier for this trigger state. Used to distinguish between different trigger states and may be used by the graph to determine how to process the trigger
- `inputs` (dict[str, str]): A dictionary of input parameters that will be passed to the graph. The keys are parameter names and values are parameter values, both as strings

## Integration with ExosphereHost Platform

The Python SDK integrates seamlessly with the ExosphereHost platform, providing:

- **Performance**: Optimized execution with intelligent resource allocation and parallel processing
- **Reliability**: Built-in fault tolerance, automatic recovery, and failover capabilities
- **Scalability**: Automatic scaling based on workload demands
- **Monitoring**: Integrated logging and monitoring capabilities

## Documentation

For more detailed information, visit our [documentation](https://docs.exosphere.host).

## Contributing
**Parameters:**

We welcome contributions! Please see our [contributing guidelines](https://github.com/exospherehost/exospherehost/blob/main/CONTRIBUTING.md) for details.
- `graph_name` (str): Name of the graph to execute
- `inputs` (dict[str, str] | None): Key/value inputs for the first node (strings only)
- `store` (dict[str, str] | None): Graph-level key/value store (beta) persisted across nodes

## Support
**Returns:**

For support and questions:
- **Email**: [nivedit@exosphere.host](mailto:nivedit@exosphere.host)
- **Documentation**: [https://docs.exosphere.host](https://docs.exosphere.host)
- **GitHub Issues**: [https://github.com/exospherehost/exospherehost/issues](https://github.com/exospherehost/exospherehost/issues)
- `dict`: JSON payload from the state manager

## License
**Raises:**

This Python SDK is licensed under the MIT License. The main ExosphereHost project is licensed under the Elastic License 2.0.
- `Exception`: If the HTTP request fails
2 changes: 1 addition & 1 deletion python-sdk/exospherehost/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.0.2b2"
version = "0.0.2b3"
116 changes: 54 additions & 62 deletions python-sdk/exospherehost/statemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,60 +67,49 @@ def _get_upsert_graph_endpoint(self, graph_name: str):
def _get_get_graph_endpoint(self, graph_name: str):
return f"{self._state_manager_uri}/{self._state_manager_version}/namespace/{self._namespace}/graph/{graph_name}"

async def trigger(self, graph_name: str, state: TriggerState | None = None, states: list[TriggerState] | None = None):
async def trigger(self, graph_name: str, inputs: dict[str, str] | None = None, store: dict[str, str] | None = None):
"""
Trigger a graph execution with one or more trigger states.
Trigger execution of a graph.

This method sends trigger states to the specified graph endpoint to initiate
graph execution. It accepts either a single trigger state or a list of trigger
states, but not both simultaneously.
Beta: This method now supports an optional **store** parameter that lets you
pass a key-value map that is persisted for the lifetime of the graph run. All
keys **and** values must be strings in the current beta release – the schema
may change in future versions.

Args:
graph_name (str): The name of the graph to trigger execution for.
state (TriggerState | None, optional): A single trigger state to send.
Must be provided if `states` is None.
states (list[TriggerState] | None, optional): A list of trigger states to send.
Must be provided if `state` is None. Cannot be an empty list.
graph_name (str): Name of the graph you want to run.
inputs (dict[str, str] | None): Optional inputs for the first node in the
graph. Strings only.
store (dict[str, str] | None): Optional key-value store that will be merged
into the graph-level store before execution (beta).

Returns:
dict: The JSON response from the state manager API containing the
result of the trigger operation.
dict: JSON payload returned by the state-manager API.

Raises:
ValueError: If neither `state` nor `states` is provided, if both are provided,
or if `states` is an empty list.
Exception: If the API request fails with a non-200 status code. The exception
message includes the HTTP status code and response text for debugging.
Exception: If the request fails.

Example:
```python
# Trigger with a single state
state = TriggerState(identifier="my-trigger", inputs={"key": "value"})
result = await state_manager.trigger("my-graph", state=state)

# Trigger with multiple states
states = [
TriggerState(identifier="trigger1", inputs={"key1": "value1"}),
TriggerState(identifier="trigger2", inputs={"key2": "value2"})
]
result = await state_manager.trigger("my-graph", states=states)
# Trigger with inputs only
await state_manager.trigger("my-graph", inputs={"user_id": "123"})

# Trigger with inputs **and** a beta store
await state_manager.trigger(
"my-graph",
inputs={"user_id": "123"},
store={"cursor": "0"} # beta
)
```
"""
if state is None and states is None:
raise ValueError("Either state or states must be provided")
if state is not None and states is not None:
raise ValueError("Only one of state or states must be provided")
if states is not None and len(states) == 0:
raise ValueError("States must be a non-empty list")
if inputs is None:
inputs = {}
if store is None:
store = {}

states_list = []
if state is not None:
states_list.append(state)
if states is not None:
states_list.extend(states)

body = {
"states": [state.model_dump() for state in states_list]
"inputs": inputs,
"store": store
}
headers = {
"x-api-key": self._key
Expand Down Expand Up @@ -167,35 +156,32 @@ async def get_graph(self, graph_name: str):
raise Exception(f"Failed to get graph: {response.status} {await response.text()}")
return await response.json()

async def upsert_graph(self, graph_name: str, graph_nodes: list[dict[str, Any]], secrets: dict[str, str], validation_timeout: int = 60, polling_interval: int = 1):
async def upsert_graph(self, graph_name: str, graph_nodes: list[dict[str, Any]], secrets: dict[str, str], retry_policy: dict[str, Any] | None = None, store_config: dict[str, Any] | None = None, validation_timeout: int = 60, polling_interval: int = 1):
"""
Create or update a graph in the state manager with validation.
Create or update a graph definition.

Beta: `store_config` is a new field that allows you to configure a
namespaced key-value store that lives for the duration of a graph run. The
feature is in beta and the shape of `store_config` may change.

This method sends a graph definition to the state manager API for creation
or update. After submission, it polls the API to wait for graph validation
to complete, ensuring the graph is properly configured before returning.
After submitting the graph, this helper polls the state-manager until the
graph has been validated (or the timeout is hit).

Args:
graph_name (str): The name of the graph to create or update.
graph_nodes (list[dict[str, Any]]): A list of node definitions that make up
the graph. Each node should contain the necessary configuration for
the graph execution engine.
secrets (dict[str, str]): A dictionary of secret values that will be
available to the graph during execution. Keys are secret names and
values are the secret values.
validation_timeout (int, optional): Maximum time in seconds to wait for
graph validation to complete. Defaults to 60.
polling_interval (int, optional): Time in seconds between validation
status checks. Defaults to 1.

graph_name (str): Graph identifier.
graph_nodes (list[dict[str, Any]]): Graph node list.
secrets (dict[str, str]): Secrets available to all nodes.
retry_policy (dict[str, Any] | None): Optional per-node retry policy.
store_config (dict[str, Any] | None): Beta configuration for the
graph-level store (schema is subject to change).
validation_timeout (int): Seconds to wait for validation (default 60).
polling_interval (int): Polling interval in seconds (default 1).

Returns:
dict: The JSON response from the state manager API containing the
validated graph information.

dict: Validated graph object returned by the API.

Raises:
Exception: If the API request fails with a non-201 status code, if graph
validation times out, or if validation fails. The exception message
includes relevant error details for debugging.
Exception: If validation fails or times out.
"""
endpoint = self._get_upsert_graph_endpoint(graph_name)
headers = {
Expand All @@ -205,6 +191,12 @@ async def upsert_graph(self, graph_name: str, graph_nodes: list[dict[str, Any]],
"secrets": secrets,
"nodes": graph_nodes
}

if retry_policy is not None:
body["retry_policy"] = retry_policy
if store_config is not None:
body["store_config"] = store_config

async with aiohttp.ClientSession() as session:
async with session.put(endpoint, json=body, headers=headers) as response: # type: ignore
if response.status not in [200, 201]:
Expand Down
Loading