diff --git a/exosphere-runtimes/cloud-storage-runtime/main.py b/exosphere-runtimes/cloud-storage-runtime/main.py index dc8f68be..2e0e9679 100644 --- a/exosphere-runtimes/cloud-storage-runtime/main.py +++ b/exosphere-runtimes/cloud-storage-runtime/main.py @@ -1,14 +1,19 @@ from dotenv import load_dotenv from exospherehost import Runtime from nodes.list_s3_files import ListS3FilesNode +from nodes.download_s3_file import DownloadS3FileNode # Load environment variables from .env file # EXOSPHERE_STATE_MANAGER_URI is the URI of the state manager # EXOSPHERE_API_KEY is the key of the runtime load_dotenv() +# Note on node ordering: +# The order of node classes in the `nodes` list does not define execution sequence. +# Nodes are registered with the state manager; orchestration and dependencies are handled externally. +# `ListS3FilesNode` is listed before `DownloadS3FileNode` for readability only. Runtime( name="cloud-storage-runtime", namespace="exospherehost", - nodes=[ListS3FilesNode] + nodes=[ListS3FilesNode, DownloadS3FileNode] ).start() \ No newline at end of file diff --git a/exosphere-runtimes/cloud-storage-runtime/nodes/download_s3_file.py b/exosphere-runtimes/cloud-storage-runtime/nodes/download_s3_file.py new file mode 100644 index 00000000..21bee568 --- /dev/null +++ b/exosphere-runtimes/cloud-storage-runtime/nodes/download_s3_file.py @@ -0,0 +1,33 @@ +import boto3 +from exospherehost import BaseNode +from pydantic import BaseModel + + +class DownloadS3FileNode(BaseNode): + + class Inputs(BaseModel): + bucket_name: str + key: str + + class Outputs(BaseModel): + file_path: str + + class Secrets(BaseModel): + aws_access_key_id: str + aws_secret_access_key: str + aws_region: str + + async def execute(self) -> Outputs: + + s3_client = boto3.client( + 's3', + aws_access_key_id=self.secrets.aws_access_key_id, + aws_secret_access_key=self.secrets.aws_secret_access_key, + region_name=self.secrets.aws_region + ) + + file_name = self.inputs.key.split('/')[-1] + + s3_client.download_file(self.inputs.bucket_name, self.inputs.key, file_name) + + return self.Outputs(file_path=self.outputs.file_path) diff --git a/python-sdk/README.md b/python-sdk/README.md index 69e8a7ca..be6e0fd1 100644 --- a/python-sdk/README.md +++ b/python-sdk/README.md @@ -23,6 +23,8 @@ pip install exospherehost ## Quick Start +> Important: In v1, all fields in `Inputs`, `Outputs`, and `Secrets` must be strings. If you need to pass complex data (e.g., JSON), serialize the data to a string first, then parse that string within your node. + ### Basic Node Creation Create a simple node that processes data: @@ -34,24 +36,24 @@ from pydantic import BaseModel class SampleNode(BaseNode): class Inputs(BaseModel): name: str - data: dict + data: str # v1: strings only class Outputs(BaseModel): message: str - processed_data: dict + processed_data: str # v1: strings only async def execute(self) -> Outputs: print(f"Processing data for: {self.inputs.name}") - # Your processing logic here - processed_data = {"status": "completed", "input": self.inputs.data} + # Your processing logic here; serialize complex data to strings (e.g., JSON) + processed_data = f"completed:{self.inputs.data}" return self.Outputs( - message="success", + message="success", processed_data=processed_data ) # Initialize the runtime Runtime( - namespace="MyProject", + namespace="MyProject", name="DataProcessor", nodes=[SampleNode] ).start() @@ -71,6 +73,7 @@ export EXOSPHERE_API_KEY="your-api-key" - **Distributed Execution**: Run nodes across multiple compute resources - **State Management**: Automatic state persistence and recovery - **Type Safety**: Full Pydantic integration for input/output validation +- **String-only data model (v1)**: All `Inputs`, `Outputs`, and `Secrets` fields are strings. Serialize non-string data (e.g., JSON) as needed. - **Async Support**: Native async/await support for high-performance operations - **Error Handling**: Built-in retry mechanisms and error recovery - **Scalability**: Designed for high-volume batch processing and workflows @@ -103,15 +106,16 @@ Nodes are the building blocks of your workflows. Each node: class ConfigurableNode(BaseNode): class Inputs(BaseModel): text: str - max_length: int = 100 + max_length: str = "100" # v1: strings only class Outputs(BaseModel): result: str - length: int + length: str # v1: strings only async def execute(self) -> Outputs: - result = self.inputs.text[:self.inputs.max_length] - return self.Outputs(result=result, length=len(result)) + max_length = int(self.inputs.max_length) + result = self.inputs.text[:max_length] + return self.Outputs(result=result, length=str(len(result))) ``` ### Error Handling @@ -122,7 +126,7 @@ class RobustNode(BaseNode): data: str class Outputs(BaseModel): - success: bool + success: str result: str async def execute(self) -> Outputs: @@ -137,6 +141,7 @@ Secrets allow you to securely manage sensitive configuration data like API keys, ```python from exospherehost import Runtime, BaseNode from pydantic import BaseModel +import json class APINode(BaseNode): class Inputs(BaseModel): @@ -144,7 +149,7 @@ class APINode(BaseNode): query: str class Outputs(BaseModel): - response: dict + response: str # v1: strings only status: str class Secrets(BaseModel): @@ -159,14 +164,24 @@ class APINode(BaseNode): # Use secrets for API calls import httpx async with httpx.AsyncClient() as client: - response = await client.post( + http_response = await client.post( f"{self.secrets.api_endpoint}/process", headers=headers, json={"user_id": self.inputs.user_id, "query": self.inputs.query} ) + # Serialize body: prefer JSON if valid; fallback to text or empty string + response_text = http_response.text or "" + if response_text: + try: + response_str = json.dumps(http_response.json()) + except Exception: + response_str = response_text + else: + response_str = "" + return self.Outputs( - response=response.json(), + response=response_str, status="success" ) ``` @@ -175,6 +190,7 @@ class APINode(BaseNode): - **Security**: Secrets are stored securely by the ExosphereHost Runtime and are never exposed in logs or error messages - **Validation**: The `Secrets` class uses Pydantic for automatic validation of secret values +- **String-only (v1)**: All `Secrets` fields must be strings. - **Access**: Secrets are available via `self.secrets` during node execution - **Types**: Common secret types include API keys, database credentials, encryption keys, and authentication tokens - **Injection**: Secrets are injected by the Runtime at execution time, so you don't need to handle them manually diff --git a/python-sdk/exospherehost/_version.py b/python-sdk/exospherehost/_version.py index a911248f..8174f18d 100644 --- a/python-sdk/exospherehost/_version.py +++ b/python-sdk/exospherehost/_version.py @@ -1 +1 @@ -version = "0.0.7b4" \ No newline at end of file +version = "0.0.7b5" \ No newline at end of file diff --git a/python-sdk/exospherehost/node/__init__.py b/python-sdk/exospherehost/node/__init__.py index a00cc39f..46e944b9 100644 --- a/python-sdk/exospherehost/node/__init__.py +++ b/python-sdk/exospherehost/node/__init__.py @@ -11,6 +11,5 @@ """ from .BaseNode import BaseNode -from .status import Status -__all__ = ["BaseNode", "Status"] +__all__ = ["BaseNode"] diff --git a/python-sdk/exospherehost/node/status.py b/python-sdk/exospherehost/node/status.py deleted file mode 100644 index 5a416d2b..00000000 --- a/python-sdk/exospherehost/node/status.py +++ /dev/null @@ -1,44 +0,0 @@ -""" -Status constants for state management in the exospherehost system. - -These constants represent the various states that a workflow state can be in -during its lifecycle from creation to completion or failure. -""" - -from enum import Enum - - -class Status(str, Enum): - """ - Enumeration of workflow state status values. - - This enum provides type-safe constants for the various states that a workflow - state can be in during its lifecycle from creation to completion or failure. - """ - - # State has been created but not yet queued for execution - CREATED = 'CREATED' - - # State has been queued and is waiting to be picked up by a worker - QUEUED = 'QUEUED' - - # State has been successfully executed by a worker - EXECUTED = 'EXECUTED' - - # Next state in the workflow has been created based on successful execution - NEXT_CREATED = 'NEXT_CREATED' - - # A retry state has been created due to a previous failure - RETRY_CREATED = 'RETRY_CREATED' - - # State execution has timed out - TIMEDOUT = 'TIMEDOUT' - - # State execution has failed with an error - ERRORED = 'ERRORED' - - # State execution has been cancelled - CANCELLED = 'CANCELLED' - - # State has completed successfully (final state) - SUCCESS = 'SUCCESS' \ No newline at end of file diff --git a/python-sdk/exospherehost/runtime.py b/python-sdk/exospherehost/runtime.py index a9594573..f2bb9010 100644 --- a/python-sdk/exospherehost/runtime.py +++ b/python-sdk/exospherehost/runtime.py @@ -279,6 +279,15 @@ def _validate_nodes(self): errors.append(f"{node.__name__} does not have an Secrets class") if not issubclass(node.Secrets, BaseModel): errors.append(f"{node.__name__} does not have an Secrets class that inherits from pydantic.BaseModel") + + # check all data objects are strings + for field_name, field_info in node.Inputs.model_fields.items(): + if field_info.annotation is not str: + errors.append(f"{node.__name__}.Inputs field '{field_name}' must be of type str, got {field_info.annotation}") + + for field_name, field_info in node.Outputs.model_fields.items(): + if field_info.annotation is not str: + errors.append(f"{node.__name__}.Outputs field '{field_name}' must be of type str, got {field_info.annotation}") for field_name, field_info in node.Secrets.model_fields.items(): if field_info.annotation is not str: diff --git a/python-sdk/sample.py b/python-sdk/sample.py deleted file mode 100644 index 592d672d..00000000 --- a/python-sdk/sample.py +++ /dev/null @@ -1,19 +0,0 @@ -from exospherehost import Runtime, BaseNode -from pydantic import BaseModel - -class SampleNode(BaseNode): - class Inputs(BaseModel): - name: str - - class Outputs(BaseModel): - message: str - - async def execute(self) -> Outputs: - print(self.inputs) - return self.Outputs(message="success") - -Runtime( - namespace="SampleNamespace", - name="SampleRuntime", - nodes=[SampleNode] -).start() \ No newline at end of file diff --git a/state-manager/app/models/node_template_model.py b/state-manager/app/models/node_template_model.py index 970662b7..4323e68d 100644 --- a/state-manager/app/models/node_template_model.py +++ b/state-manager/app/models/node_template_model.py @@ -7,5 +7,4 @@ class NodeTemplate(BaseModel): namespace: str = Field(..., description="Namespace of the node") identifier: str = Field(..., description="Identifier of the node") inputs: dict[str, Any] = Field(..., description="Inputs of the node") - store: dict[str, Any] = Field(..., description="Upsert data to store object for the node") next_nodes: Optional[List[str]] = Field(None, description="Next nodes to execute") \ No newline at end of file