Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions state-manager/app/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Settings(BaseModel):
mongo_database_name: str = Field(default="exosphere-state-manager", description="MongoDB database name")
state_manager_secret: str = Field(..., description="Secret key for API authentication")
secrets_encryption_key: str = Field(..., description="Key for encrypting secrets")
trigger_workers: int = Field(default=1, description="Number of workers to run the trigger cron")

@classmethod
def from_env(cls) -> "Settings":
Expand All @@ -20,6 +21,7 @@ def from_env(cls) -> "Settings":
mongo_database_name=os.getenv("MONGO_DATABASE_NAME", "exosphere-state-manager"), # type: ignore
state_manager_secret=os.getenv("STATE_MANAGER_SECRET"), # type: ignore
secrets_encryption_key=os.getenv("SECRETS_ENCRYPTION_KEY"), # type: ignore
trigger_workers=int(os.getenv("TRIGGER_WORKERS", 1)) # type: ignore
)


Expand Down
2 changes: 1 addition & 1 deletion state-manager/app/controller/trigger_graph.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from fastapi import HTTPException

from app.singletons.logs_manager import LogsManager
from app.models.trigger_model import TriggerGraphRequestModel, TriggerGraphResponseModel
from app.models.trigger_graph_model import TriggerGraphRequestModel, TriggerGraphResponseModel
from app.models.state_status_enum import StateStatusEnum
from app.models.db.state import State
from app.models.db.store import Store
Expand Down
13 changes: 10 additions & 3 deletions state-manager/app/controller/upsert_graph_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

async def upsert_graph_template(namespace_name: str, graph_name: str, body: UpsertGraphTemplateRequest, x_exosphere_request_id: str, background_tasks: BackgroundTasks) -> UpsertGraphTemplateResponse:
try:

old_triggers = []

graph_template = await GraphTemplate.find_one(
GraphTemplate.name == graph_name,
GraphTemplate.namespace == namespace_name
Expand All @@ -21,13 +24,15 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
"Graph template already exists in namespace", graph_template=graph_template,
namespace_name=namespace_name,
x_exosphere_request_id=x_exosphere_request_id)
old_triggers = graph_template.triggers

graph_template.set_secrets(body.secrets)
graph_template.validation_status = GraphTemplateValidationStatus.PENDING
graph_template.validation_errors = []
graph_template.retry_policy = body.retry_policy
graph_template.store_config = body.store_config
graph_template.nodes = body.nodes
graph_template.triggers = body.triggers
await graph_template.save()

else:
Expand All @@ -45,14 +50,15 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
validation_status=GraphTemplateValidationStatus.PENDING,
validation_errors=[],
retry_policy=body.retry_policy,
store_config=body.store_config
store_config=body.store_config,
triggers=body.triggers
).set_secrets(body.secrets)
)
except ValueError as e:
logger.error("Error validating graph template", error=e, x_exosphere_request_id=x_exosphere_request_id)
raise HTTPException(status_code=400, detail=f"Error validating graph template: {str(e)}")

background_tasks.add_task(verify_graph, graph_template)
background_tasks.add_task(verify_graph, graph_template, old_triggers)

return UpsertGraphTemplateResponse(
nodes=graph_template.nodes,
Expand All @@ -61,6 +67,7 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
secrets={secret_name: True for secret_name in graph_template.get_secrets().keys()},
retry_policy=graph_template.retry_policy,
store_config=graph_template.store_config,
triggers=graph_template.triggers,
created_at=graph_template.created_at,
updated_at=graph_template.updated_at
)
Expand Down
19 changes: 18 additions & 1 deletion state-manager/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from .models.db.registered_node import RegisteredNode
from .models.db.store import Store
from .models.db.run import Run
from .models.db.trigger import DatabaseTriggers

# injecting routes
from .routes import router, global_router
Expand All @@ -32,9 +33,16 @@

# importing database health check function
from .utils.check_database_health import check_database_health

#scheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from .tasks.trigger_cron import trigger_cron

# Define models list
DOCUMENT_MODELS = [State, GraphTemplate, RegisteredNode, Store, Run]
DOCUMENT_MODELS = [State, GraphTemplate, RegisteredNode, Store, Run, DatabaseTriggers]

scheduler = AsyncIOScheduler()

@asynccontextmanager
async def lifespan(app: FastAPI):
Expand All @@ -59,11 +67,20 @@ async def lifespan(app: FastAPI):
# perform database health check
await check_database_health(DOCUMENT_MODELS)

scheduler.add_job(
trigger_cron,
CronTrigger.from_crontab("* * * * *"),
replace_existing=True,
id="every_minute_task"
)
scheduler.start()

# main logic of the server
yield

# end of the server
await client.close()
scheduler.shutdown()
logger.info("server stopped")


Expand Down
2 changes: 2 additions & 0 deletions state-manager/app/models/db/graph_template_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from app.models.dependent_string import DependentString
from app.models.retry_policy_model import RetryPolicyModel
from app.models.store_config_model import StoreConfig
from app.models.trigger_models import Trigger

class GraphTemplate(BaseDatabaseModel):
name: str = Field(..., description="Name of the graph")
Expand All @@ -21,6 +22,7 @@ class GraphTemplate(BaseDatabaseModel):
validation_status: GraphTemplateValidationStatus = Field(..., description="Validation status of the graph")
validation_errors: List[str] = Field(default_factory=list, description="Validation errors of the graph")
secrets: Dict[str, str] = Field(default_factory=dict, description="Secrets of the graph")
triggers: List[Trigger] = Field(default_factory=list, description="Triggers of the graph")
retry_policy: RetryPolicyModel = Field(default_factory=RetryPolicyModel, description="Retry policy of the graph")
store_config: StoreConfig = Field(default_factory=StoreConfig, description="Store config of the graph")

Expand Down
36 changes: 36 additions & 0 deletions state-manager/app/models/db/trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from pydantic import Field
from beanie import Document
from typing import Optional

from pymongo import IndexModel
from ..trigger_models import TriggerTypeEnum, TriggerStatusEnum
from datetime import datetime

class DatabaseTriggers(Document):
type: TriggerTypeEnum = Field(..., description="Type of the trigger")
expression: Optional[str] = Field(default=None, description="Expression of the trigger")
graph_name: str = Field(..., description="Name of the graph")
namespace: str = Field(..., description="Namespace of the graph")
trigger_time: datetime = Field(..., description="Trigger time of the trigger")
trigger_status: TriggerStatusEnum = Field(..., description="Status of the trigger")

class Settings:
indexes = [
IndexModel(
[
("trigger_time", -1),
],
name="idx_trigger_time"
),
IndexModel(
[
("type", 1),
("expression", 1),
("graph_name", 1),
("namespace", 1),
("trigger_time", 1),
],
name="uniq_graph_type_expr_time",
unique=True
)
]
3 changes: 3 additions & 0 deletions state-manager/app/models/graph_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@
from .graph_template_validation_status import GraphTemplateValidationStatus
from .retry_policy_model import RetryPolicyModel
from .store_config_model import StoreConfig
from .trigger_models import Trigger


class UpsertGraphTemplateRequest(BaseModel):
secrets: Dict[str, str] = Field(..., description="Dictionary of secrets that are used while graph execution")
nodes: List[NodeTemplate] = Field(..., description="List of node templates that define the graph structure")
retry_policy: RetryPolicyModel = Field(default_factory=RetryPolicyModel, description="Retry policy of the graph")
store_config: StoreConfig = Field(default_factory=StoreConfig, description="Store config of the graph")
triggers: List[Trigger] = Field(default_factory=list, description="Triggers of the graph")


class UpsertGraphTemplateResponse(BaseModel):
nodes: List[NodeTemplate] = Field(..., description="List of node templates that define the graph structure")
secrets: Dict[str, bool] = Field(..., description="Dictionary of secrets that are used while graph execution")
retry_policy: RetryPolicyModel = Field(default_factory=RetryPolicyModel, description="Retry policy of the graph")
store_config: StoreConfig = Field(default_factory=StoreConfig, description="Store config of the graph")
triggers: List[Trigger] = Field(default_factory=list, description="Triggers of the graph")
created_at: datetime = Field(..., description="Timestamp when the graph template was created")
updated_at: datetime = Field(..., description="Timestamp when the graph template was last updated")
validation_status: GraphTemplateValidationStatus = Field(..., description="Current validation status of the graph template")
Expand Down
36 changes: 36 additions & 0 deletions state-manager/app/models/trigger_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from pydantic import BaseModel, Field, field_validator, model_validator
from enum import Enum
from croniter import croniter
from typing import Self

class TriggerTypeEnum(str, Enum):
CRON = "CRON"

class TriggerStatusEnum(str, Enum):
PENDING = "PENDING"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
TRIGGERED = "TRIGGERED"
TRIGGERING = "TRIGGERING"

class CronTrigger(BaseModel):
expression: str = Field(..., description="Cron expression for the trigger")

@field_validator("expression")
@classmethod
def validate_expression(cls, v: str) -> str:
if not croniter.is_valid(v):
raise ValueError("Invalid cron expression")
return v

class Trigger(BaseModel):
type: TriggerTypeEnum = Field(..., description="Type of the trigger")
value: dict = Field(default_factory=dict, description="Value of the trigger")

@model_validator(mode="after")
def validate_trigger(self) -> Self:
if self.type == TriggerTypeEnum.CRON:
CronTrigger.model_validate(self.value)
else:
raise ValueError(f"Unsupported trigger type: {self.type}")
return self
2 changes: 1 addition & 1 deletion state-manager/app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .models.enqueue_request import EnqueueRequestModel
from .controller.enqueue_states import enqueue_states

from .models.trigger_model import TriggerGraphRequestModel, TriggerGraphResponseModel
from .models.trigger_graph_model import TriggerGraphRequestModel, TriggerGraphResponseModel
from .controller.trigger_graph import trigger_graph

from .models.executed_models import ExecutedRequestModel, ExecutedResponseModel
Expand Down
75 changes: 75 additions & 0 deletions state-manager/app/tasks/trigger_cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from datetime import datetime
from uuid import uuid4
from app.models.db.trigger import DatabaseTriggers
from app.models.trigger_models import TriggerStatusEnum, TriggerTypeEnum
from app.singletons.logs_manager import LogsManager
from app.controller.trigger_graph import trigger_graph
from app.models.trigger_graph_model import TriggerGraphRequestModel
from pymongo import ReturnDocument
from app.config.settings import get_settings
import croniter
import asyncio

logger = LogsManager().get_logger()

async def get_due_triggers(cron_time: datetime) -> DatabaseTriggers | None:
data = await DatabaseTriggers.get_pymongo_collection().find_one_and_update(
{
"trigger_time": {"$lte": cron_time},
"trigger_status": TriggerStatusEnum.PENDING
},
{
"$set": {"trigger_status": TriggerStatusEnum.TRIGGERING}
},
return_document=ReturnDocument.AFTER
)
return DatabaseTriggers(**data) if data else None

async def call_trigger_graph(trigger: DatabaseTriggers):
await trigger_graph(
namespace_name=trigger.namespace,
graph_name=trigger.graph_name,
body=TriggerGraphRequestModel(),
x_exosphere_request_id=str(uuid4())
)

async def mark_as_failed(trigger: DatabaseTriggers):
await DatabaseTriggers.get_pymongo_collection().update_one(
{"_id": trigger.id},
{"$set": {"trigger_status": TriggerStatusEnum.FAILED}}
)

async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime):
assert trigger.expression is not None
iter = croniter.croniter(trigger.expression, cron_time)
next_trigger_time = iter.get_next(datetime)

await DatabaseTriggers(
type=TriggerTypeEnum.CRON,
expression=trigger.expression,
graph_name=trigger.graph_name,
namespace=trigger.namespace,
trigger_time=next_trigger_time,
trigger_status=TriggerStatusEnum.PENDING
).insert()

async def mark_as_triggered(trigger: DatabaseTriggers):
await DatabaseTriggers.get_pymongo_collection().update_one(
{"_id": trigger.id},
{"$set": {"trigger_status": TriggerStatusEnum.TRIGGERED}}
)

async def handle_trigger(cron_time: datetime):
while(trigger:= await get_due_triggers(cron_time)):
try:
await call_trigger_graph(trigger)
await create_next_triggers(trigger, cron_time)
await mark_as_triggered(trigger)
except Exception as e:
await mark_as_failed(trigger)
logger.error(f"Error calling trigger graph: {e}")

async def trigger_cron():
cron_time = datetime.now()
logger.info(f"starting trigger_cron: {cron_time}")
await asyncio.gather(*[handle_trigger(cron_time) for _ in range(get_settings().trigger_workers)])
Loading