Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions python-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,24 @@ You can simply connect to exosphere state manager and start creating your nodes,

```python
from exospherehost import Runtime, BaseNode
from typing import Any
import os
from pydantic import BaseModel

class SampleNode(BaseNode):
async def execute(self, inputs: dict[str, Any]) -> dict[str, Any]:
class Inputs(BaseModel):
name: str

class Outputs(BaseModel):
message: str

async def execute(self, inputs: Inputs) -> Outputs:
print(inputs)
return {"message": "success"}
return self.Outputs(message="success")

runtime = Runtime("SampleNamespace", os.getenv("EXOSPHERE_STATE_MANAGER_URI", "http://localhost:8000"), os.getenv("EXOSPHERE_API_KEY", ""))
# EXOSPHERE_STATE_MANAGER_URI and EXOSPHERE_API_KEY are required to be set in the environment variables for authentication with exospherehost
runtime = Runtime(
namespace="SampleNamespace",
name="SampleNode"
)

runtime.connect([SampleNode()])
runtime.start()
Expand Down
36 changes: 36 additions & 0 deletions python-sdk/exospherehost/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,39 @@
"""
ExosphereHost Python SDK

A distributed workflow execution framework for building scalable, stateful applications.

This package provides the core components for creating and executing distributed
workflows using a node-based architecture. The main components are:

- Runtime: Manages the execution environment and coordinates with the state manager
- BaseNode: Abstract base class for creating executable nodes
- Status constants: Define the various states in the workflow lifecycle

Example usage:
from exospherehost import Runtime, BaseNode
from pydantic import BaseModel

class SampleNode(BaseNode):
class Inputs(BaseModel):
name: str

class Outputs(BaseModel):
message: str

async def execute(self, inputs: Inputs) -> Outputs:
print(inputs)
return self.Outputs(message="success")

runtime = Runtime(
namespace="SampleNamespace",
name="SampleNode"
)

runtime.connect([SampleNode()])
runtime.start()
"""

from ._version import version as __version__
from .runtime import Runtime
from .node.BaseNode import BaseNode
Expand Down
2 changes: 1 addition & 1 deletion python-sdk/exospherehost/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.0.5b"
version = "0.0.6b0"
72 changes: 70 additions & 2 deletions python-sdk/exospherehost/node/BaseNode.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,85 @@
from abc import ABC, abstractmethod
from typing import Optional, Any, List
from typing import Any, Optional, List
from pydantic import BaseModel


class BaseNode(ABC):
"""
Abstract base class for all nodes in the exospherehost system.

BaseNode provides the foundation for creating executable nodes that can be
connected to a Runtime for distributed processing. Each node must implement
the execute method and can optionally define Inputs and Outputs models.

Attributes:
unique_name (Optional[str]): A unique identifier for this node instance.
If None, the class name will be used as the unique name.
state (dict[str, Any]): A dictionary for storing node state between executions.
"""

def __init__(self, unique_name: Optional[str] = None):
"""
Initialize a BaseNode instance.

Args:
unique_name (Optional[str], optional): A unique identifier for this node.
If None, the class name will be used as the unique name. Defaults to None.
"""
self.unique_name: Optional[str] = unique_name
self.state: dict[str, Any] = {}

class Inputs(BaseModel):
"""
Pydantic model for defining the input schema of a node.

Subclasses should override this class to define the expected input structure.
This ensures type safety and validation of inputs before execution.
"""
pass

class Outputs(BaseModel):
"""
Pydantic model for defining the output schema of a node.

Subclasses should override this class to define the expected output structure.
This ensures type safety and validation of outputs after execution.
"""
pass

@abstractmethod
async def execute(self, inputs: dict[str, Any]) -> dict[str, Any] | List[dict[str, Any]]:
async def execute(self, inputs: Inputs) -> Outputs | List[Outputs]:
"""
Execute the node's main logic.

This is the core method that must be implemented by all concrete node classes.
It receives inputs, processes them according to the node's logic, and returns
outputs. The method can return either a single Outputs instance or a list
of Outputs instances for batch processing.

Args:
inputs (Inputs): The input data for this execution, validated against
the Inputs model defined by the node.

Returns:
Outputs | List[Outputs]: The output data from this execution. Can be
a single Outputs instance or a list of Outputs instances.

Raises:
Exception: Any exception that occurs during execution will be caught
by the Runtime and reported as an error state.
"""
pass

def get_unique_name(self) -> str:
"""
Get the unique name for this node instance.

Returns the unique_name if it was provided during initialization,
otherwise returns the class name.

Returns:
str: The unique identifier for this node instance
"""
if self.unique_name is not None:
return self.unique_name
return self.__class__.__name__
16 changes: 16 additions & 0 deletions python-sdk/exospherehost/node/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
Node module for the exospherehost package.

This module contains the core node-related components for building executable
workflow nodes. The main component is BaseNode, which provides the foundation
for creating custom nodes that can be executed by the Runtime.

Components:
- BaseNode: Abstract base class for all executable nodes
- Status constants: Define the various states in workflow execution
"""

from .BaseNode import BaseNode
from .status import Status

__all__ = ["BaseNode", "Status"]
53 changes: 44 additions & 9 deletions python-sdk/exospherehost/node/status.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,44 @@
CREATED = 'CREATED'
QUEUED = 'QUEUED'
EXECUTED = 'EXECUTED'
NEXT_CREATED = 'NEXT_CREATED'
RETRY_CREATED = 'RETRY_CREATED'
TIMEDOUT = 'TIMEDOUT'
ERRORED = 'ERRORED'
CANCELLED = 'CANCELLED'
SUCCESS = 'SUCCESS'
"""
Status constants for state management in the exospherehost system.

These constants represent the various states that a workflow state can be in
during its lifecycle from creation to completion or failure.
"""

from enum import Enum


class Status(str, Enum):
"""
Enumeration of workflow state status values.

This enum provides type-safe constants for the various states that a workflow
state can be in during its lifecycle from creation to completion or failure.
"""

# State has been created but not yet queued for execution
CREATED = 'CREATED'

# State has been queued and is waiting to be picked up by a worker
QUEUED = 'QUEUED'

# State has been successfully executed by a worker
EXECUTED = 'EXECUTED'

# Next state in the workflow has been created based on successful execution
NEXT_CREATED = 'NEXT_CREATED'

# A retry state has been created due to a previous failure
RETRY_CREATED = 'RETRY_CREATED'

# State execution has timed out
TIMEDOUT = 'TIMEDOUT'

# State execution has failed with an error
ERRORED = 'ERRORED'

# State execution has been cancelled
CANCELLED = 'CANCELLED'

# State has completed successfully (final state)
SUCCESS = 'SUCCESS'
Loading