diff --git a/exosphere-runtimes/cloud-storage-runtime/.gitignore b/exosphere-runtimes/cloud-storage-runtime/.gitignore new file mode 100644 index 00000000..ea0fb619 --- /dev/null +++ b/exosphere-runtimes/cloud-storage-runtime/.gitignore @@ -0,0 +1,66 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual Environment +venv/ +env/ +ENV/ +.env +.venv/ + +# IDE +.vscode/ +*.swp +*.swo +.idea/ +*.iws +*.iml +*.ipr + + +# Local development +.env.local +.env.development.local +.env.test.local +.env.production.local + +# Database +*.db +*.sqlite3 + +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +#logs +*.log +logs/*.* +!logs/.gitkeep + +# local files +files/ +!files/.gitkeep \ No newline at end of file diff --git a/exosphere-runtimes/cloud-storage-runtime/.python-version b/exosphere-runtimes/cloud-storage-runtime/.python-version new file mode 100644 index 00000000..e4fba218 --- /dev/null +++ b/exosphere-runtimes/cloud-storage-runtime/.python-version @@ -0,0 +1 @@ +3.12 diff --git a/exosphere-runtimes/cloud-storage-runtime/README.md b/exosphere-runtimes/cloud-storage-runtime/README.md new file mode 100644 index 00000000..e69de29b diff --git a/exosphere-runtimes/cloud-storage-runtime/main.py b/exosphere-runtimes/cloud-storage-runtime/main.py new file mode 100644 index 00000000..dc8f68be --- /dev/null +++ b/exosphere-runtimes/cloud-storage-runtime/main.py @@ -0,0 +1,14 @@ +from dotenv import load_dotenv +from exospherehost import Runtime +from nodes.list_s3_files import ListS3FilesNode + +# Load environment variables from .env file +# EXOSPHERE_STATE_MANAGER_URI is the URI of the state manager +# EXOSPHERE_API_KEY is the key of the runtime +load_dotenv() + +Runtime( + name="cloud-storage-runtime", + namespace="exospherehost", + nodes=[ListS3FilesNode] +).start() \ No newline at end of file diff --git a/exosphere-runtimes/cloud-storage-runtime/nodes/__init__.py b/exosphere-runtimes/cloud-storage-runtime/nodes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/exosphere-runtimes/cloud-storage-runtime/nodes/list_s3_files.py b/exosphere-runtimes/cloud-storage-runtime/nodes/list_s3_files.py new file mode 100644 index 00000000..26155580 --- /dev/null +++ b/exosphere-runtimes/cloud-storage-runtime/nodes/list_s3_files.py @@ -0,0 +1,39 @@ +import boto3 +import os + +from exospherehost import BaseNode +from typing import List +from pydantic import BaseModel + + +class ListS3FilesNode(BaseNode): + + class Inputs(BaseModel): + bucket_name: str + prefix: str = '' + files_only: bool = False + recursive: bool = False + + class Outputs(BaseModel): + key: str + + class Secrets(BaseModel): + aws_access_key_id: str = os.getenv("S3_ACCESS_KEY_ID") + aws_secret_access_key: str = os.getenv("S3_SECRET_ACCESS_KEY") + aws_region: str = os.getenv("S3_REGION") + + async def execute(self) -> List[Outputs]: + print(self.inputs) + + s3_client = boto3.client( + 's3', + aws_access_key_id=os.getenv("S3_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("S3_SECRET_ACCESS_KEY"), + region_name=os.getenv("S3_REGION") + ) + response = s3_client.list_objects_v2(Bucket=self.inputs.bucket_name, Prefix=self.inputs.prefix) + + return [ + self.Outputs(key=data['Key']) + for data in response['Contents'] + ] diff --git a/exosphere-runtimes/cloud-storage-runtime/pyproject.toml b/exosphere-runtimes/cloud-storage-runtime/pyproject.toml new file mode 100644 index 00000000..860a3693 --- /dev/null +++ b/exosphere-runtimes/cloud-storage-runtime/pyproject.toml @@ -0,0 +1,10 @@ +[project] +name = "cloud-storage-runtime" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "boto3>=1.40.1", + "python-dotenv>=1.1.1", +] diff --git a/exosphere-runtimes/cloud-storage-runtime/uv.lock b/exosphere-runtimes/cloud-storage-runtime/uv.lock new file mode 100644 index 00000000..80c17b28 --- /dev/null +++ b/exosphere-runtimes/cloud-storage-runtime/uv.lock @@ -0,0 +1,106 @@ +version = 1 +revision = 2 +requires-python = ">=3.12" + +[[package]] +name = "boto3" +version = "1.40.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "botocore" }, + { name = "jmespath" }, + { name = "s3transfer" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/48/4d/70d209fdebf0377db233f80dfdf26ca2bc25d2b2e89d4882e0edccd2227f/boto3-1.40.1.tar.gz", hash = "sha256:985ed4bf64729807f870eadbc46ad98baf93096917f7194ec39d743ff75b3f1d", size = 111817, upload-time = "2025-08-01T19:24:18.017Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/97/0e/f0cb4f71c40ba07e6ed5b47699a737a080d3c4f4b7b26657d5671de48621/boto3-1.40.1-py3-none-any.whl", hash = "sha256:7c007d5c8ee549e9fcad0927536502da199b27891006ef515330f429aca9671f", size = 139880, upload-time = "2025-08-01T19:24:16.581Z" }, +] + +[[package]] +name = "botocore" +version = "1.40.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "jmespath" }, + { name = "python-dateutil" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c6/d2/d914999f4a128f0f840f2a9cc8327cd98aa661d6b33b331a81a8111ab970/botocore-1.40.1.tar.gz", hash = "sha256:bdf30e2c0e8cdb939d81fc243182a6d1dd39c416694b406c5f2ea079b1c2f3f5", size = 14280398, upload-time = "2025-08-01T19:24:08.599Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d4/c1/aa7922c9bf74b6d6594d2430af6f854d234faff23187e269aaba89c326c8/botocore-1.40.1-py3-none-any.whl", hash = "sha256:e039774b55fbd6fe59f0f4fea51d156a2433bd4d8faa64fc1b87aee9a03f415d", size = 13940950, upload-time = "2025-08-01T19:24:03.889Z" }, +] + +[[package]] +name = "cloud-storage-runtime" +version = "0.1.0" +source = { virtual = "." } +dependencies = [ + { name = "boto3" }, + { name = "python-dotenv" }, +] + +[package.metadata] +requires-dist = [ + { name = "boto3", specifier = ">=1.40.1" }, + { name = "python-dotenv", specifier = ">=1.1.1" }, +] + +[[package]] +name = "jmespath" +version = "1.0.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/00/2a/e867e8531cf3e36b41201936b7fa7ba7b5702dbef42922193f05c8976cd6/jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe", size = 25843, upload-time = "2022-06-17T18:00:12.224Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/31/b4/b9b800c45527aadd64d5b442f9b932b00648617eb5d63d2c7a6587b7cafc/jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980", size = 20256, upload-time = "2022-06-17T18:00:10.251Z" }, +] + +[[package]] +name = "python-dateutil" +version = "2.9.0.post0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "six" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/66/c0/0c8b6ad9f17a802ee498c46e004a0eb49bc148f2fd230864601a86dcf6db/python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3", size = 342432, upload-time = "2024-03-01T18:36:20.211Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892, upload-time = "2024-03-01T18:36:18.57Z" }, +] + +[[package]] +name = "python-dotenv" +version = "1.1.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f6/b0/4bc07ccd3572a2f9df7e6782f52b0c6c90dcbb803ac4a167702d7d0dfe1e/python_dotenv-1.1.1.tar.gz", hash = "sha256:a8a6399716257f45be6a007360200409fce5cda2661e3dec71d23dc15f6189ab", size = 41978, upload-time = "2025-06-24T04:21:07.341Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5f/ed/539768cf28c661b5b068d66d96a2f155c4971a5d55684a514c1a0e0dec2f/python_dotenv-1.1.1-py3-none-any.whl", hash = "sha256:31f23644fe2602f88ff55e1f5c79ba497e01224ee7737937930c448e4d0e24dc", size = 20556, upload-time = "2025-06-24T04:21:06.073Z" }, +] + +[[package]] +name = "s3transfer" +version = "0.13.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "botocore" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/6d/05/d52bf1e65044b4e5e27d4e63e8d1579dbdec54fce685908ae09bc3720030/s3transfer-0.13.1.tar.gz", hash = "sha256:c3fdba22ba1bd367922f27ec8032d6a1cf5f10c934fb5d68cf60fd5a23d936cf", size = 150589, upload-time = "2025-07-18T19:22:42.31Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/6d/4f/d073e09df851cfa251ef7840007d04db3293a0482ce607d2b993926089be/s3transfer-0.13.1-py3-none-any.whl", hash = "sha256:a981aa7429be23fe6dfc13e80e4020057cbab622b08c0315288758d67cabc724", size = 85308, upload-time = "2025-07-18T19:22:40.947Z" }, +] + +[[package]] +name = "six" +version = "1.17.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/94/e7/b2c673351809dca68a0e064b6af791aa332cf192da575fd474ed7d6f16a2/six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81", size = 34031, upload-time = "2024-12-04T17:35:28.174Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" }, +] + +[[package]] +name = "urllib3" +version = "2.5.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/15/22/9ee70a2574a4f4599c47dd506532914ce044817c7752a79b6a51286319bc/urllib3-2.5.0.tar.gz", hash = "sha256:3fc47733c7e419d4bc3f6b3dc2b4f890bb743906a30d56ba4a5bfa4bbff92760", size = 393185, upload-time = "2025-06-18T14:07:41.644Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a7/c2/fe1e52489ae3122415c51f387e221dd0773709bad6c6cdaa599e8a2c5185/urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc", size = 129795, upload-time = "2025-06-18T14:07:40.39Z" }, +] diff --git a/python-sdk/README.md b/python-sdk/README.md index 74f94e42..5c0439d4 100644 --- a/python-sdk/README.md +++ b/python-sdk/README.md @@ -1,8 +1,31 @@ # ExosphereHost Python SDK -This is the official Python SDK for ExosphereHost and for interacting with ExosphereHost. -## Node Creation -You can simply connect to exosphere state manager and start creating your nodes, as shown in sample below: +[![PyPI version](https://badge.fury.io/py/exospherehost.svg)](https://badge.fury.io/py/exospherehost) +[![Python 3.12+](https://img.shields.io/badge/python-3.12+-blue.svg)](https://www.python.org/downloads/) +[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) + +The official Python SDK for [ExosphereHost](https://exosphere.host) - an open-source infrastructure layer for background AI workflows and agents. This SDK enables you to create distributed, stateful applications using a node-based architecture. + +## Overview + +ExosphereHost provides a robust, affordable, and effortless infrastructure for building scalable AI workflows and agents. The Python SDK allows you to: + +- Create distributed workflows using a simple node-based architecture. +- Build stateful applications that can scale across multiple compute resources. +- Execute complex AI workflows with automatic state management. +- Integrate with the ExosphereHost platform for optimized performance. + +## Installation + +```bash +pip install exospherehost +``` + +## Quick Start + +### Basic Node Creation + +Create a simple node that processes data: ```python from exospherehost import Runtime, BaseNode @@ -11,23 +34,126 @@ from pydantic import BaseModel class SampleNode(BaseNode): class Inputs(BaseModel): name: str + data: dict class Outputs(BaseModel): message: str + processed_data: dict + + async def execute(self) -> Outputs: + print(f"Processing data for: {self.inputs.name}") + # Your processing logic here + processed_data = {"status": "completed", "input": self.inputs.data} + return self.Outputs( + message="success", + processed_data=processed_data + ) + +# Initialize the runtime +Runtime( + namespace="MyProject", + name="DataProcessor", + nodes=[SampleNode] +).start() +``` + +## Environment Configuration + +The SDK requires the following environment variables for authentication with ExosphereHost: + +```bash +export EXOSPHERE_STATE_MANAGER_URI="your-state-manager-uri" +export EXOSPHERE_API_KEY="your-api-key" +``` + +## Key Features + +- **Distributed Execution**: Run nodes across multiple compute resources +- **State Management**: Automatic state persistence and recovery +- **Type Safety**: Full Pydantic integration for input/output validation +- **Async Support**: Native async/await support for high-performance operations +- **Error Handling**: Built-in retry mechanisms and error recovery +- **Scalability**: Designed for high-volume batch processing and workflows + +## Architecture + +The SDK is built around two core concepts: + +### Runtime + +The `Runtime` class manages the execution environment and coordinates with the ExosphereHost state manager. It handles: + +- Node lifecycle management +- State coordination +- Error handling and recovery +- Resource allocation - async def execute(self, inputs: Inputs) -> Outputs: - print(inputs) - return self.Outputs(message="success") +### Nodes +Nodes are the building blocks of your workflows. Each node: +- Defines input/output schemas using Pydantic models +- Implements an `execute` method for processing logic +- Can be connected to other nodes to form workflows +- Automatically handles state persistence -# EXOSPHERE_STATE_MANAGER_URI and EXOSPHERE_API_KEY are required to be set in the environment variables for authentication with exospherehost -runtime = Runtime( - namespace="SampleNamespace", - name="SampleNode" -) +## Advanced Usage -runtime.connect([SampleNode()]) -runtime.start() +### Custom Node Configuration + +```python +class ConfigurableNode(BaseNode): + class Inputs(BaseModel): + text: str + max_length: int = 100 + + class Outputs(BaseModel): + result: str + length: int + + async def execute(self) -> Outputs: + result = self.inputs.text[:self.inputs.max_length] + return self.Outputs(result=result, length=len(result)) +``` + +### Error Handling + +```python +class RobustNode(BaseNode): + class Inputs(BaseModel): + data: str + + class Outputs(BaseModel): + success: bool + result: str + + async def execute(self) -> Outputs: + raise Exception("This is a test error") ``` +Error handling is automatically handled by the runtime and the state manager. + +## Integration with ExosphereHost Platform + +The Python SDK integrates seamlessly with the ExosphereHost platform, providing: + +- **Cost Optimization**: Leverage ExosphereHost's optimized infrastructure for significant cost savings +- **Reliability**: Built-in fault tolerance and automatic recovery +- **Scalability**: Automatic scaling based on workload demands +- **Monitoring**: Integrated logging and monitoring capabilities + +## Documentation + +For more detailed information, visit our [documentation](https://docs.exosphere.host). + +## Contributing + +We welcome contributions! Please see our [contributing guidelines](https://github.com/exospherehost/exospherehost/blob/main/CONTRIBUTING.md) for details. ## Support -For first-party support and questions, do not hesitate to reach out to us at . \ No newline at end of file + +For support and questions: +- **Email**: [nivedit@exosphere.host](mailto:nivedit@exosphere.host) +- **Documentation**: [https://docs.exosphere.host](https://docs.exosphere.host) +- **GitHub Issues**: [https://github.com/exospherehost/exospherehost/issues](https://github.com/exospherehost/exospherehost/issues) + +## License + +This Python SDK is licensed under the MIT License. The main ExosphereHost project is licensed under the Elastic License 2.0. \ No newline at end of file diff --git a/python-sdk/exospherehost/_version.py b/python-sdk/exospherehost/_version.py index cdd88a41..91aeb08a 100644 --- a/python-sdk/exospherehost/_version.py +++ b/python-sdk/exospherehost/_version.py @@ -1 +1 @@ -version = "0.0.7b0" +version = "0.0.7b1" \ No newline at end of file diff --git a/python-sdk/exospherehost/node/BaseNode.py b/python-sdk/exospherehost/node/BaseNode.py index 48281dd2..ae563e26 100644 --- a/python-sdk/exospherehost/node/BaseNode.py +++ b/python-sdk/exospherehost/node/BaseNode.py @@ -1,85 +1,73 @@ from abc import ABC, abstractmethod -from typing import Any, Optional, List +from typing import Optional, List from pydantic import BaseModel class BaseNode(ABC): """ Abstract base class for all nodes in the exospherehost system. - - BaseNode provides the foundation for creating executable nodes that can be - connected to a Runtime for distributed processing. Each node must implement - the execute method and can optionally define Inputs and Outputs models. - + + This class defines the interface and structure for executable nodes that can be + managed by an Exosphere Runtime. Subclasses should define their own `Inputs` and + `Outputs` models (as subclasses of pydantic.BaseModel) to specify the input and + output schemas for the node, and must implement the `execute` method containing + the node's main logic. + Attributes: - unique_name (Optional[str]): A unique identifier for this node instance. - If None, the class name will be used as the unique name. - state (dict[str, Any]): A dictionary for storing node state between executions. + inputs (Optional[BaseNode.Inputs]): The validated input data for the node execution. """ - def __init__(self, unique_name: Optional[str] = None): + def __init__(self): """ Initialize a BaseNode instance. - - Args: - unique_name (Optional[str], optional): A unique identifier for this node. - If None, the class name will be used as the unique name. Defaults to None. + + Sets the `inputs` attribute to None. The `inputs` attribute will be populated + with validated input data before execution. """ - self.unique_name: Optional[str] = unique_name - self.state: dict[str, Any] = {} + self.inputs: Optional[BaseNode.Inputs] = None class Inputs(BaseModel): """ - Pydantic model for defining the input schema of a node. - - Subclasses should override this class to define the expected input structure. - This ensures type safety and validation of inputs before execution. + Input schema for the node. + + Subclasses should override this class to define the expected input fields. """ pass class Outputs(BaseModel): """ - Pydantic model for defining the output schema of a node. - - Subclasses should override this class to define the expected output structure. - This ensures type safety and validation of outputs after execution. + Output schema for the node. + + Subclasses should override this class to define the expected output fields. """ pass - @abstractmethod - async def execute(self, inputs: Inputs) -> Outputs | List[Outputs]: + async def _execute(self, inputs: Inputs) -> Outputs | List[Outputs]: """ - Execute the node's main logic. - - This is the core method that must be implemented by all concrete node classes. - It receives inputs, processes them according to the node's logic, and returns - outputs. The method can return either a single Outputs instance or a list - of Outputs instances for batch processing. - + Internal method to execute the node with validated inputs. + Args: - inputs (Inputs): The input data for this execution, validated against - the Inputs model defined by the node. - + inputs (Inputs): The validated input data for this execution. + Returns: - Outputs | List[Outputs]: The output data from this execution. Can be - a single Outputs instance or a list of Outputs instances. - - Raises: - Exception: Any exception that occurs during execution will be caught - by the Runtime and reported as an error state. + Outputs | List[Outputs]: The output(s) produced by the node. """ - pass + self.inputs = inputs + return await self.execute() - def get_unique_name(self) -> str: + @abstractmethod + async def execute(self) -> Outputs | List[Outputs]: """ - Get the unique name for this node instance. - - Returns the unique_name if it was provided during initialization, - otherwise returns the class name. - + Main logic for the node. + + This method must be implemented by all subclasses. It should use `self.inputs` + (populated with validated input data) to perform the node's computation and + return either a single Outputs instance or a list of Outputs instances. + Returns: - str: The unique identifier for this node instance + Outputs | List[Outputs]: The output(s) produced by the node. + + Raises: + Exception: Any exception raised here will be caught and reported as an error state by the Runtime. """ - if self.unique_name is not None: - return self.unique_name - return self.__class__.__name__ \ No newline at end of file + raise NotImplementedError("execute method must be implemented by all concrete node classes") diff --git a/python-sdk/exospherehost/runtime.py b/python-sdk/exospherehost/runtime.py index f202afaf..e8c1bda3 100644 --- a/python-sdk/exospherehost/runtime.py +++ b/python-sdk/exospherehost/runtime.py @@ -2,6 +2,8 @@ import os from asyncio import Queue, sleep from typing import List + +from pydantic import BaseModel, ValidationError from .node.BaseNode import BaseNode from aiohttp import ClientSession from logging import getLogger @@ -10,66 +12,65 @@ class Runtime: """ - A runtime environment for executing nodes connected to exospherehost. - - The Runtime class manages the execution of BaseNode instances in a distributed - environment. It handles state management, worker coordination, and communication - with the state manager service. - - Attributes: - _name (str): The name of this runtime instance - _namespace (str): The namespace this runtime operates in - _key (str): API key for authentication with the state manager - _batch_size (int): Number of states to process in each batch - _connected (bool): Whether the runtime is connected to nodes - _state_queue (Queue): Queue for managing state processing - _workers (int): Number of worker tasks to spawn - _nodes (List[BaseNode]): List of connected node instances - _node_names (List[str]): List of node unique names - _state_manager_uri (str): URI of the state manager service - _state_manager_version (str): Version of the state manager API - _poll_interval (int): Interval between polling operations in seconds - _node_mapping (dict): Mapping of node names to node instances + Runtime for distributed execution of Exosphere nodes. + + The `Runtime` class manages the lifecycle and execution of a set of `BaseNode` subclasses + in a distributed environment. It handles node registration, state polling, execution, + and communication with a remote state manager service. + + Key Features: + - Registers node schemas and runtime metadata with the state manager. + - Polls for new states to process and enqueues them for execution. + - Spawns worker tasks to execute node logic asynchronously. + - Notifies the state manager of successful or failed executions. + - Handles configuration via constructor arguments or environment variables. + + Args: + namespace (str): Namespace for this runtime instance. + name (str): Name of this runtime instance. + nodes (List[type[BaseNode]]): List of node classes to register and execute. + state_manager_uri (str | None, optional): URI of the state manager service. + If not provided, will use the EXOSPHERE_STATE_MANAGER_URI environment variable. + key (str | None, optional): API key for authentication. + If not provided, will use the EXOSPHERE_API_KEY environment variable. + batch_size (int, optional): Number of states to fetch per poll. Defaults to 16. + workers (int, optional): Number of concurrent worker tasks. Defaults to 4. + state_manage_version (str, optional): State manager API version. Defaults to "v0". + poll_interval (int, optional): Seconds between polling for new states. Defaults to 1. + + Raises: + ValueError: If configuration is invalid (e.g., missing URI or key, batch_size/workers < 1). + ValidationError: If node classes are invalid or duplicate. + + Usage: + runtime = Runtime(namespace="myspace", name="myruntime", nodes=[MyNode]) + runtime.start() """ - def __init__(self, namespace: str, name: str, state_manager_uri: str | None = None, key: str | None = None, batch_size: int = 16, workers: int = 4, state_manage_version: str = "v0", poll_interval: int = 1): - """ - Initialize the Runtime instance. - - Args: - namespace (str): The namespace this runtime operates in - name (str): The name of this runtime instance - state_manager_uri (str | None, optional): URI of the state manager service. - If None, will be read from EXOSPHERE_STATE_MANAGER_URI environment variable. - key (str | None, optional): API key for authentication. - If None, will be read from EXOSPHERE_API_KEY environment variable. - batch_size (int, optional): Number of states to process in each batch. Defaults to 16. - workers (int, optional): Number of worker tasks to spawn. Defaults to 4. - state_manage_version (str, optional): Version of the state manager API. Defaults to "v0". - poll_interval (int, optional): Interval between polling operations in seconds. Defaults to 1. - - Raises: - ValueError: If batch_size or workers is less than 1, or if required - configuration (state_manager_uri, key) is not provided. - """ + def __init__(self, namespace: str, name: str, nodes: List[type[BaseNode]], state_manager_uri: str | None = None, key: str | None = None, batch_size: int = 16, workers: int = 4, state_manage_version: str = "v0", poll_interval: int = 1): self._name = name self._namespace = namespace self._key = key self._batch_size = batch_size self._state_queue = Queue(maxsize=2*batch_size) self._workers = workers - self._nodes = [] - self._node_names = [] + self._nodes = nodes + self._node_names = [node.__name__ for node in nodes] self._state_manager_uri = state_manager_uri self._state_manager_version = state_manage_version self._poll_interval = poll_interval - self._node_mapping = {} + self._node_mapping = { + node.__name__: node for node in nodes + } self._set_config_from_env() self._validate_runtime() + self._validate_nodes() def _set_config_from_env(self): - """Set configuration from environment variables if not provided.""" + """ + Set configuration from environment variables if not provided. + """ if self._state_manager_uri is None: self._state_manager_uri = os.environ.get("EXOSPHERE_STATE_MANAGER_URI") if self._key is None: @@ -78,7 +79,7 @@ def _set_config_from_env(self): def _validate_runtime(self): """ Validate runtime configuration. - + Raises: ValueError: If batch_size or workers is less than 1, or if required configuration (state_manager_uri, key) is not provided. @@ -93,23 +94,36 @@ def _validate_runtime(self): raise ValueError("API key is not set") def _get_enque_endpoint(self): - """Get the endpoint URL for enqueueing states.""" + """ + Construct the endpoint URL for enqueueing states. + """ return f"{self._state_manager_uri}/{str(self._state_manager_version)}/namespace/{self._namespace}/states/enqueue" def _get_executed_endpoint(self, state_id: str): - """Get the endpoint URL for notifying executed states.""" + """ + Construct the endpoint URL for notifying executed states. + """ return f"{self._state_manager_uri}/{str(self._state_manager_version)}/namespace/{self._namespace}/states/{state_id}/executed" def _get_errored_endpoint(self, state_id: str): - """Get the endpoint URL for notifying errored states.""" + """ + Construct the endpoint URL for notifying errored states. + """ return f"{self._state_manager_uri}/{str(self._state_manager_version)}/namespace/{self._namespace}/states/{state_id}/errored" def _get_register_endpoint(self): - """Get the endpoint URL for registering nodes with runtime""" + """ + Construct the endpoint URL for registering nodes with the runtime. + """ return f"{self._state_manager_uri}/{str(self._state_manager_version)}/namespace/{self._namespace}/nodes/" - async def _register_nodes(self): - """Register nodes with the runtime""" + async def _register(self): + """ + Register node schemas and runtime metadata with the state manager. + + Raises: + RuntimeError: If registration fails. + """ async with ClientSession() as session: endpoint = self._get_register_endpoint() body = { @@ -117,7 +131,7 @@ async def _register_nodes(self): "runtime_namespace": self._namespace, "nodes": [ { - "name": node.get_unique_name(), + "name": node.__name__, "namespace": self._namespace, "inputs_schema": node.Inputs.model_json_schema(), "outputs_schema": node.Outputs.model_json_schema(), @@ -133,34 +147,13 @@ async def _register_nodes(self): raise RuntimeError(f"Failed to register nodes: {res}") return res - - - async def _register(self, nodes: List[BaseNode]): - """ - Connect nodes to the runtime. - - This method validates and registers the provided nodes with the runtime. - The nodes will be available for execution when the runtime starts. - - Args: - nodes (List[BaseNode]): List of BaseNode instances to connect - - Raises: - ValueError: If any node does not inherit from BaseNode - """ - self._nodes = self._validate_nodes(nodes) - self._node_names = [node.get_unique_name() for node in nodes] - self._node_mapping = {node.get_unique_name(): node for node in self._nodes} - - await self._register_nodes() - async def _enqueue_call(self): """ - Make an API call to enqueue states from the state manager. - + Request a batch of states to process from the state manager. + Returns: - dict: Response from the state manager containing states to process + dict: Response from the state manager containing states to process. """ async with ClientSession() as session: endpoint = self._get_enque_endpoint() @@ -177,16 +170,15 @@ async def _enqueue_call(self): async def _enqueue(self): """ - Continuously enqueue states from the state manager. - - This method runs in a loop, polling the state manager for new states - to process and adding them to the internal queue. + Poll the state manager for new states and enqueue them for processing. + + This runs continuously, polling at the configured interval. """ while True: try: if self._state_queue.qsize() < self._batch_size: data = await self._enqueue_call() - for state in data["states"]: + for state in data.get("states", []): await self._state_queue.put(state) except Exception as e: logger.error(f"Error enqueuing states: {e}") @@ -195,11 +187,11 @@ async def _enqueue(self): async def _notify_executed(self, state_id: str, outputs: List[BaseNode.Outputs]): """ - Notify the state manager that a state has been executed successfully. - + Notify the state manager that a state was executed successfully. + Args: - state_id (str): The ID of the executed state - outputs (List[BaseNode.Outputs]): The outputs from the node execution + state_id (str): The ID of the executed state. + outputs (List[BaseNode.Outputs]): Outputs from the node execution. """ async with ClientSession() as session: endpoint = self._get_executed_endpoint(state_id) @@ -214,11 +206,11 @@ async def _notify_executed(self, state_id: str, outputs: List[BaseNode.Outputs]) async def _notify_errored(self, state_id: str, error: str): """ - Notify the state manager that a state has encountered an error. - + Notify the state manager that a state execution failed. + Args: - state_id (str): The ID of the errored state - error (str): The error message + state_id (str): The ID of the errored state. + error (str): The error message. """ async with ClientSession() as session: endpoint = self._get_errored_endpoint(state_id) @@ -231,43 +223,55 @@ async def _notify_errored(self, state_id: str, error: str): if response.status != 200: logger.error(f"Failed to notify errored state {state_id}: {res}") - def _validate_nodes(self, nodes: List[BaseNode]): + def _validate_nodes(self): """ - Validate that all nodes inherit from BaseNode. - + Validate that all provided nodes are valid BaseNode subclasses. + Args: - nodes (List[BaseNode]): List of nodes to validate - + nodes (List[type[BaseNode]]): List of node classes to validate. + Returns: - List[BaseNode]: The validated list of nodes - + List[type[BaseNode]]: The validated list of node classes. + Raises: - ValueError: If any node does not inherit from BaseNode + ValidationError: If any node is invalid or duplicate class names are found. """ - invalid_nodes = [] - - for node in nodes: - if not isinstance(node, BaseNode): - invalid_nodes.append(f"{node.__class__.__name__}") - - if invalid_nodes: - raise ValueError(f"Following nodes do not inherit from exospherehost.node.BaseNode: {invalid_nodes}") + errors = [] + + for node in self._nodes: + if not issubclass(node, BaseNode): + errors.append(f"{node.__name__} does not inherit from exospherehost.BaseNode") + if not hasattr(node, "Inputs"): + errors.append(f"{node.__name__} does not have an Inputs class") + if not hasattr(node, "Outputs"): + errors.append(f"{node.__name__} does not have an Outputs class") + if not issubclass(node.Inputs, BaseModel): + errors.append(f"{node.__name__} does not have an Inputs class that inherits from pydantic.BaseModel") + if not issubclass(node.Outputs, BaseModel): + errors.append(f"{node.__name__} does not have an Outputs class that inherits from pydantic.BaseModel") + + # Find nodes with the same __class__.__name__ + class_names = [node.__name__ for node in self._nodes] + duplicate_class_names = [name for name in set(class_names) if class_names.count(name) > 1] + if duplicate_class_names: + errors.append(f"Duplicate node class names found: {duplicate_class_names}") + + if len(errors) > 0: + raise ValidationError("Following errors while validating nodes: " + "\n".join(errors)) - return nodes - async def _worker(self): """ Worker task that processes states from the queue. - - This method runs in a loop, taking states from the queue and executing - the corresponding node. It handles both successful execution and errors. + + Continuously fetches states from the queue, executes the corresponding node, + and notifies the state manager of the result. """ while True: state = await self._state_queue.get() try: node = self._node_mapping[state["node_name"]] - outputs = await node.execute(state["inputs"]) # type: ignore + outputs = await node()._execute(node.Inputs(**state["inputs"])) if outputs is None: outputs = [] @@ -282,36 +286,34 @@ async def _worker(self): self._state_queue.task_done() # type: ignore - async def _start(self, nodes: List[BaseNode]): + async def _start(self): """ - Start the runtime execution. - - This method starts the enqueue polling task and spawns worker tasks - to process states from the queue. - + Start the runtime event loop. + + Registers nodes, starts the polling and worker tasks, and runs until stopped. + Raises: - RuntimeError: If the runtime is not connected (no nodes registered) + RuntimeError: If the runtime is not connected (no nodes registered). """ - await self._register(nodes) + await self._register() poller = asyncio.create_task(self._enqueue()) worker_tasks = [asyncio.create_task(self._worker()) for _ in range(self._workers)] await asyncio.gather(poller, *worker_tasks) - def start(self, nodes: List[BaseNode]): + def start(self): """ - Start the runtime execution. - - This method starts the runtime in the current event loop or creates - a new one if none exists. It returns a task that can be awaited - or runs the runtime until completion. - + Start the runtime in the current or a new asyncio event loop. + + If called from within an existing event loop, returns a task for the runtime. + Otherwise, runs the runtime until completion. + Returns: - asyncio.Task: The runtime task if running in an existing event loop + asyncio.Task | None: The runtime task if running in an existing event loop, else None. """ try: loop = asyncio.get_running_loop() - return loop.create_task(self._start(nodes)) + return loop.create_task(self._start()) except RuntimeError: - asyncio.run(self._start(nodes)) + asyncio.run(self._start()) diff --git a/python-sdk/sample.py b/python-sdk/sample.py index 4ef9f6ab..592d672d 100644 --- a/python-sdk/sample.py +++ b/python-sdk/sample.py @@ -8,14 +8,12 @@ class Inputs(BaseModel): class Outputs(BaseModel): message: str - async def execute(self, inputs: Inputs) -> Outputs: - print(inputs) + async def execute(self) -> Outputs: + print(self.inputs) return self.Outputs(message="success") -runtime = Runtime( +Runtime( namespace="SampleNamespace", - name="SampleNode" -) - -runtime.connect([SampleNode()]) -runtime.start() \ No newline at end of file + name="SampleRuntime", + nodes=[SampleNode] +).start() \ No newline at end of file diff --git a/state-manager/app/controller/create_states.py b/state-manager/app/controller/create_states.py index ad9177e1..19f443af 100644 --- a/state-manager/app/controller/create_states.py +++ b/state-manager/app/controller/create_states.py @@ -20,7 +20,8 @@ async def create_states(namespace_name: str, body: CreateRequestModel, x_exosphe namespace_name=namespace_name, status=StateStatusEnum.CREATED, inputs=state.inputs, - outputs={} + outputs={}, + error=None ) )