Skip to content

Conversation

@NiveditJain
Copy link
Member

  • Added croniter as a dependency in pyproject.toml to support cron expressions.
  • Updated upsert_graph_template to handle triggers in graph templates.
  • Introduced Trigger and CronTrigger models to validate and manage triggers.
  • Enhanced GraphTemplate and request/response models to include triggers.

This update improves the flexibility of graph templates by allowing scheduled executions based on cron expressions.

…templates

- Added `croniter` as a dependency in `pyproject.toml` to support cron expressions.
- Updated `upsert_graph_template` to handle triggers in graph templates.
- Introduced `Trigger` and `CronTrigger` models to validate and manage triggers.
- Enhanced `GraphTemplate` and request/response models to include triggers.

This update improves the flexibility of graph templates by allowing scheduled executions based on cron expressions.
@safedep
Copy link

safedep bot commented Sep 26, 2025

SafeDep Report Summary

Green Malicious Packages Badge Green Vulnerable Packages Badge Green Risky License Badge

Package Details
Package Malware Vulnerability Risky License Report
icon apscheduler @ 3.11.0
state-manager/uv.lock
ok icon
ok icon
ok icon
🔗
icon croniter @ 6.0.0
state-manager/uv.lock
ok icon
ok icon
ok icon
🔗
icon python-dateutil @ 2.9.0.post0
state-manager/uv.lock
ok icon
ok icon
ok icon
🔗
icon pytz @ 2025.2
state-manager/uv.lock
ok icon
ok icon
ok icon
🔗
icon six @ 1.17.0
state-manager/uv.lock
ok icon
ok icon
ok icon
🔗
icon tzdata @ 2025.2
state-manager/uv.lock
ok icon
ok icon
ok icon
🔗
icon tzlocal @ 5.3.1
state-manager/uv.lock
ok icon
ok icon
ok icon
🔗

This report is generated by SafeDep Github App.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 26, 2025

Warning

Rate limit exceeded

@NiveditJain has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 2 minutes and 19 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 155f7cc and 5042c7a.

📒 Files selected for processing (1)
  • state-manager/app/tasks/verify_graph.py (3 hunks)

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Added cron-based triggers for graph templates, including validation, scheduling, and cancellation on updates.
    • Background scheduler processes due triggers periodically with configurable worker count.
    • API for creating/updating templates now accepts and returns triggers.
  • Chores

    • Added dependencies for scheduling: apscheduler and croniter.
  • Tests

    • Updated tests for new trigger workflow, scheduler integration, and extended verify_graph signature.

Walkthrough

Adds CRON trigger support: request/response and DB GraphTemplate include triggers; new trigger models and a DatabaseTriggers document are introduced; cron expressions validated; upsert captures old_triggers and schedules background verify_graph(graph_template, old_triggers) which reconciles CRON schedules; a scheduler runs trigger_cron to execute due triggers.

Changes

Cohort / File(s) Summary
Upsert flow & Controller
state-manager/app/controller/upsert_graph_template.py
Wire triggers through upsert: accept, persist, include in response; capture old_triggers and pass them to scheduled background verify_graph.
API models
state-manager/app/models/graph_models.py
Add triggers: List[Trigger] to UpsertGraphTemplateRequest and UpsertGraphTemplateResponse.
DB GraphTemplate model
state-manager/app/models/db/graph_template_model.py
Add triggers: List[Trigger] field to GraphTemplate and import Trigger.
Trigger models & validation
state-manager/app/models/trigger_models.py
Add CronTrigger, Trigger, TriggerTypeEnum, TriggerStatusEnum with validation (cron expression checked via croniter).
Triggers DB document
state-manager/app/models/db/trigger.py
New Beanie DatabaseTriggers document with fields and indexes (idx_trigger_time, uniq_graph_type_expr_time).
Verification & scheduling tasks
state-manager/app/tasks/verify_graph.py
verify_graph now accepts old_triggers; added cancel_crons and create_crons to reconcile CRON triggers and schedule DB entries.
Cron worker & scheduler integration
state-manager/app/tasks/trigger_cron.py, state-manager/app/main.py
New trigger_cron worker and helpers (get_due_triggers, call_trigger_graph, create_next_triggers, mark_as_triggered/failed); register DatabaseTriggers in DOCUMENT_MODELS; initialize APScheduler to run trigger_cron periodically.
Config & dependencies
state-manager/app/config/settings.py, state-manager/pyproject.toml
Add trigger_workers setting (default 1); add dependencies croniter>=6.0.0, apscheduler>=3.11.0.
Imports, routes & controllers
state-manager/app/routes.py, state-manager/app/controller/trigger_graph.py
Update import paths for TriggerGraph models.
Tests & fixtures
tests/*, state-manager/tests/unit/with_database/conftest.py
Update tests to the new verify_graph signature, scheduler mocking, and loosen some background task assertions; mock scheduler in lifespan fixture.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Client
  participant Controller as UpsertGraphTemplateController
  participant ReqModel as Request (validates triggers)
  participant DB as DB (GraphTemplate & DatabaseTriggers)
  participant Verifier as Background verify_graph
  note right of Verifier #e6f7ff: reconcile CRON triggers (cancel/create)

  Client->>Controller: POST/PUT /graph-template (body includes triggers)
  Controller->>ReqModel: validate body (cron expressions validated)
  alt validation succeeds
    Controller->>DB: insert/update GraphTemplate(..., triggers=body.triggers)
    DB-->>Controller: saved GraphTemplate (provides old_triggers on update)
    Controller->>Verifier: schedule verify_graph(graph_template, old_triggers)
    Verifier->>DB: cancel_crons(old_triggers, graph_template.triggers)
    Verifier->>DB: create_crons(graph_template.triggers, ahead_window)
    DB-->>Verifier: persist DatabaseTriggers entries
    Controller-->>Client: 200/201 with response (includes triggers)
  else validation fails
    Controller-->>Client: 4xx error
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • nk-ag

Poem

I twitched my whiskers at the cron-lit sky,
New triggers hop where schedules lie.
I check each expression, neat and spry—
Templates wake, pending jobs reply.
Hop on, little tasks, we’ll nibble time. 🐇⏰

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title Check ✅ Passed The title “Adding triggers to graphs” clearly and concisely summarizes the primary change of the pull request, which is to introduce trigger functionality into graph templates without extraneous detail or file lists. It directly reflects the main objective of enhancing graph templates with scheduled execution capabilities.
Description Check ✅ Passed The pull request description accurately outlines the key changes—adding croniter as a dependency, updating the controller to handle triggers, and introducing new trigger and cron models—directly matching the modifications in the code. It remains on topic by describing how graph templates are extended to include triggers for scheduled execution.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @NiveditJain, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the system by introducing scheduled execution capabilities for graph templates. It integrates cron-based triggering mechanisms, allowing users to define specific schedules for their graph templates. This functionality is supported by new data models for triggers, robust validation logic for cron expressions, and updates to existing graph template models and API endpoints to accommodate these new scheduling capabilities, ultimately improving automation and flexibility.

Highlights

  • Croniter Dependency: The croniter library has been added as a new dependency to enable robust validation of cron expressions, which are essential for defining scheduled tasks.
  • Trigger Models Introduced: New Trigger and CronTrigger Pydantic models have been introduced. These models define and validate scheduled execution patterns, specifically supporting cron expressions with built-in validation logic.
  • Graph Template Integration: The GraphTemplate database model, along with the UpsertGraphTemplateRequest and UpsertGraphTemplateResponse API models, have been updated to include a list of Trigger objects. This allows graph templates to be associated with and managed by scheduled executions.
  • Controller Update: The upsert_graph_template controller has been modified to correctly handle the persistence and retrieval of trigger configurations when creating or updating graph templates.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@NiveditJain NiveditJain changed the title Add croniter dependency and implement trigger functionality in graph templates Adding triggers to graphs Sep 26, 2025
@codecov
Copy link

codecov bot commented Sep 26, 2025

Codecov Report

❌ Patch coverage is 67.42424% with 43 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
state-manager/app/tasks/trigger_cron.py 48.71% 20 Missing ⚠️
state-manager/app/tasks/verify_graph.py 42.85% 16 Missing ⚠️
state-manager/app/models/trigger_models.py 75.86% 7 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces trigger functionality for graph templates, allowing for scheduled executions via cron expressions. A new dependency croniter is added for this purpose. The changes include new Pydantic models for triggers and updates to the graph template models and the upsert controller to handle triggers.

The implementation of the trigger models is functional, but I've provided a couple of suggestions in app/models/trigger_models.py to improve robustness and extensibility, particularly around type hinting and validation logic. These changes will make the system easier to maintain as new trigger types are added in the future.

Overall, the changes are well-structured and address the feature requirement effectively.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
state-manager/app/controller/upsert_graph_template.py (2)

25-33: Reset internal caches after updating nodes to avoid stale graph metadata.

Assigning graph_template.nodes replaces the graph, but cached PrivateAttr structures (_node_by_identifier, _parents_by_identifier, _root_node, _path_by_identifier) are not invalidated. Subsequent validation or background verification can use stale caches and yield incorrect results.

Suggested fix here (minimal, local invalidation):

                 graph_template.retry_policy = body.retry_policy
                 graph_template.store_config = body.store_config
-                graph_template.nodes = body.nodes
+                graph_template.nodes = body.nodes
+                # Invalidate internal caches so validators and lookups rebuild correctly
+                graph_template._node_by_identifier = None
+                graph_template._parents_by_identifier = None
+                graph_template._root_node = None
+                graph_template._path_by_identifier = None
                 graph_template.triggers = body.triggers
                 await graph_template.save()

Alternatively, add a public method on GraphTemplate to reset caches and call it here (preferred API design). I can draft that method if you want.


59-67: Avoid decrypting secrets just to expose presence flags.

get_secrets() decrypts values only to fetch keys. Use the stored encrypted keys instead to reduce work and avoid unnecessary crypto operations.

Apply:

-            secrets={secret_name: True for secret_name in graph_template.get_secrets().keys()},
+            secrets={secret_name: True for secret_name in graph_template.secrets.keys()},
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8a9972f and 19bc778.

⛔ Files ignored due to path filters (1)
  • state-manager/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • state-manager/app/controller/upsert_graph_template.py (3 hunks)
  • state-manager/app/models/db/graph_template_model.py (2 hunks)
  • state-manager/app/models/graph_models.py (1 hunks)
  • state-manager/app/models/trigger_models.py (1 hunks)
  • state-manager/pyproject.toml (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
state-manager/app/models/db/graph_template_model.py (1)
state-manager/app/models/trigger_models.py (1)
  • Trigger (19-27)
state-manager/app/controller/upsert_graph_template.py (1)
state-manager/app/models/db/graph_template_model.py (1)
  • set_secrets (269-271)
state-manager/app/models/graph_models.py (1)
state-manager/app/models/trigger_models.py (1)
  • Trigger (19-27)
🔇 Additional comments (4)
state-manager/app/models/db/graph_template_model.py (1)

16-16: Triggers field addition looks correct and consistent.

Import and Field(default_factory=list) are appropriate; Pydantic will validate Trigger instances. No DB index changes needed.

Also applies to: 25-25

state-manager/app/models/graph_models.py (2)

16-16: Request model addition for triggers looks good.

Defaulting to an empty list with proper typing is correct and aligns with controller/DB changes.


24-24: Response model addition for triggers looks good.

Matches the persisted field and controller response.

state-manager/pyproject.toml (1)

13-13: croniter>=6.0.0 is valid
croniter 6.0.0 is available on PyPI; no change needed.

@coderabbitai coderabbitai bot added the enhancement New feature or request label Sep 28, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 19bc778 and 37b4843.

📒 Files selected for processing (6)
  • state-manager/app/config/settings.py (2 hunks)
  • state-manager/app/controller/upsert_graph_template.py (4 hunks)
  • state-manager/app/models/db/trigger.py (1 hunks)
  • state-manager/app/models/trigger_models.py (1 hunks)
  • state-manager/app/routes.py (1 hunks)
  • state-manager/app/tasks/verify_graph.py (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
state-manager/app/models/db/trigger.py (1)
state-manager/app/models/trigger_models.py (2)
  • TriggerTypeEnum (6-7)
  • TriggerStatusEnum (9-13)
state-manager/app/controller/upsert_graph_template.py (1)
state-manager/app/tasks/verify_graph.py (1)
  • verify_graph (152-184)
state-manager/app/tasks/verify_graph.py (5)
state-manager/app/models/db/graph_template_model.py (1)
  • GraphTemplate (18-347)
state-manager/app/models/db/registered_node.py (1)
  • RegisteredNode (8-44)
state-manager/app/models/trigger_models.py (4)
  • Trigger (25-33)
  • CronTrigger (15-23)
  • TriggerStatusEnum (9-13)
  • TriggerTypeEnum (6-7)
state-manager/app/models/db/trigger.py (1)
  • Triggers (9-24)
state-manager/app/config/settings.py (1)
  • get_settings (32-36)
state-manager/app/routes.py (1)
state-manager/app/models/trigger_graph_model.py (2)
  • TriggerGraphRequestModel (4-7)
  • TriggerGraphResponseModel (9-11)
🪛 GitHub Actions: State Manager Unit Tests
state-manager/app/config/settings.py

[error] 1-1: ModuleNotFoundError: No module named 'config'

state-manager/app/tasks/verify_graph.py

[error] 1-1: ModuleNotFoundError: No module named 'config' while importing verify_graph (from config.settings import get_settings)

🔇 Additional comments (1)
state-manager/app/models/trigger_models.py (1)

26-27: Use default_factory for mutable trigger payload

Echoing the earlier feedback: Field(default={}) reuses the same dict across Trigger instances under pydantic v2, so one trigger mutating value will leak into others. Switch to default_factory=dict so each trigger gets its own payload container.

-    value: dict[str, str] = Field(default={}, description="Value of the trigger")
+    value: dict[str, str] = Field(default_factory=dict, description="Value of the trigger")

- Updated the import path for `get_settings` to align with the new structure.
- Modified the `create_crons` function to ensure at least one event is inserted for each cron trigger, improving the handling of trigger times and preventing unnecessary database inserts.
- Changed import path for TriggerGraphRequestModel and TriggerGraphResponseModel to reflect new model structure.
- Introduced a new file for trigger_cron task to manage scheduled trigger executions.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 37b4843 and 72ff4b1.

📒 Files selected for processing (1)
  • state-manager/app/tasks/verify_graph.py (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
state-manager/app/tasks/verify_graph.py (4)
state-manager/app/models/db/registered_node.py (1)
  • RegisteredNode (8-44)
state-manager/app/models/trigger_models.py (4)
  • Trigger (25-33)
  • CronTrigger (15-23)
  • TriggerStatusEnum (9-13)
  • TriggerTypeEnum (6-7)
state-manager/app/models/db/trigger.py (1)
  • Triggers (9-24)
state-manager/app/config/settings.py (1)
  • get_settings (32-36)
🔇 Additional comments (5)
state-manager/app/tasks/verify_graph.py (5)

12-14: Import root fixed — good catch.

Switching to from app.config.settings import get_settings resolves the prior module resolution error.


154-156: Empty bulk insert guard — good.

Avoids insert_many([]) failures.


105-123: Sets of Pydantic models are unhashable; diff on validated strings + skip no‑ops.

Current set([CronTrigger(...)]) raises TypeError. Build sets on the validated expression string and return early when nothing to cancel.

 async def cancel_crons(graph_template: GraphTemplate, old_triggers: list[Trigger]):
-    old_crons = set([CronTrigger(**trigger.value) for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON])
-    new_crons = set([CronTrigger(**trigger.value) for trigger in graph_template.triggers if trigger.type == TriggerTypeEnum.CRON])
+    old_crons = {
+        CronTrigger(**trigger.value).expression
+        for trigger in (old_triggers or [])
+        if trigger.type == TriggerTypeEnum.CRON
+    }
+    new_crons = {
+        CronTrigger(**trigger.value).expression
+        for trigger in (graph_template.triggers or [])
+        if trigger.type == TriggerTypeEnum.CRON
+    }
 
-    removed = old_crons - new_crons
+    removed = old_crons - new_crons
+    if not removed:
+        return
 
     await DatabaseTriggers.find(
         DatabaseTriggers.graph_name == graph_template.name,
         DatabaseTriggers.trigger_status == TriggerStatusEnum.PENDING,
         DatabaseTriggers.type == TriggerTypeEnum.CRON,
-        In(DatabaseTriggers.expression, [cron.expression for cron in removed])
+        In(DatabaseTriggers.expression, list(removed))
     ).update(
         {
             "$set": {
                 "trigger_status": TriggerStatusEnum.CANCELLED
             }
         }
     ) # type: ignore

124-129: Same unhashable-set bug in create path.

Mirror the fix: diff on expression strings and guard graph_template.triggers being None.

 async def create_crons(graph_template: GraphTemplate, old_triggers: list[Trigger]):
-    old_crons = set([CronTrigger(**trigger.value) for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON])
-    new_crons = set([CronTrigger(**trigger.value) for trigger in graph_template.triggers if trigger.type == TriggerTypeEnum.CRON])
+    old_crons = {
+        CronTrigger(**trigger.value).expression
+        for trigger in (old_triggers or [])
+        if trigger.type == TriggerTypeEnum.CRON
+    }
+    new_crons = {
+        CronTrigger(**trigger.value).expression
+        for trigger in (graph_template.triggers or [])
+        if trigger.type == TriggerTypeEnum.CRON
+    }
 
     crons_to_create = new_crons - old_crons

105-156: No None guard needed for graph_template.triggers
GraphTemplate.triggers is defined with Field(default_factory=list) and will never be None, so wrapping it in (graph_template.triggers or []) is unnecessary.

Likely an incorrect or invalid review comment.

NiveditJain and others added 3 commits September 28, 2025 18:58
- Introduced the DatabaseTriggers model to manage trigger data in the database.
- Updated the DOCUMENT_MODELS list in main.py to include DatabaseTriggers.
- Adjusted import statements in verify_graph.py to reflect the new model name.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…tion logic

- Added a new 'namespace' field to the DatabaseTriggers model to specify the graph's namespace.
- Updated the 'create_crons' function in verify_graph.py to include the namespace when creating cron triggers, ensuring better organization and management of triggers.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 54ff19b and eb5acab.

📒 Files selected for processing (4)
  • state-manager/app/main.py (2 hunks)
  • state-manager/app/models/db/trigger.py (1 hunks)
  • state-manager/app/models/trigger_models.py (1 hunks)
  • state-manager/app/tasks/verify_graph.py (3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: NiveditJain
PR: exospherehost/exospherehost#428
File: state-manager/app/tasks/verify_graph.py:4-5
Timestamp: 2025-09-28T13:35:42.851Z
Learning: In the cron trigger scheduling logic for state-manager/app/tasks/verify_graph.py, the system intentionally schedules at least one trigger beyond the trigger_ahead_time window to ensure continuity of scheduled executions. The current logic of appending an event then breaking is by design to guarantee "at least one next" trigger.
📚 Learning: 2025-09-28T13:35:42.851Z
Learnt from: NiveditJain
PR: exospherehost/exospherehost#428
File: state-manager/app/tasks/verify_graph.py:4-5
Timestamp: 2025-09-28T13:35:42.851Z
Learning: In the cron trigger scheduling logic for state-manager/app/tasks/verify_graph.py, the system intentionally schedules at least one trigger beyond the trigger_ahead_time window to ensure continuity of scheduled executions. The current logic of appending an event then breaking is by design to guarantee "at least one next" trigger.

Applied to files:

  • state-manager/app/tasks/verify_graph.py
🧬 Code graph analysis (3)
state-manager/app/main.py (1)
state-manager/app/models/db/trigger.py (1)
  • DatabaseTriggers (9-36)
state-manager/app/models/db/trigger.py (1)
state-manager/app/models/trigger_models.py (2)
  • TriggerTypeEnum (6-7)
  • TriggerStatusEnum (9-13)
state-manager/app/tasks/verify_graph.py (5)
state-manager/app/models/db/graph_template_model.py (1)
  • GraphTemplate (18-347)
state-manager/app/models/db/registered_node.py (1)
  • RegisteredNode (8-44)
state-manager/app/models/trigger_models.py (4)
  • Trigger (25-35)
  • CronTrigger (15-23)
  • TriggerStatusEnum (9-13)
  • TriggerTypeEnum (6-7)
state-manager/app/models/db/trigger.py (1)
  • DatabaseTriggers (9-36)
state-manager/app/config/settings.py (1)
  • get_settings (32-36)
🔇 Additional comments (7)
state-manager/app/main.py (2)

25-25: Wiring DatabaseTriggers into startup looks correct.

Import is consistent with other app-local models.


38-38: Beanie model registration added — good.

This ensures indexes are created and health checks include triggers. Verify your PyMongo/async-pymongo version supports AsyncMongoClient initialization used above.

state-manager/app/tasks/verify_graph.py (3)

2-2: croniter dependency risk (tracking).

croniter is unmaintained; keep the pin tight and plan migration (issue already opened). No action blocking this PR.


105-116: Unhashable CronTrigger in sets; missing namespace in cancellation filter.

  • Pydantic models are unhashable → TypeError at runtime.
  • Cancellation query lacks namespace and can cancel other tenants’ triggers.

Apply:

 async def cancel_crons(graph_template: GraphTemplate, old_triggers: list[Trigger]):
-    old_crons = set([CronTrigger(**trigger.value) for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON])
-    new_crons = set([CronTrigger(**trigger.value) for trigger in graph_template.triggers if trigger.type == TriggerTypeEnum.CRON])
+    old_crons = {
+        CronTrigger(**t.value).expression
+        for t in old_triggers
+        if t.type == TriggerTypeEnum.CRON
+    }
+    new_crons = {
+        CronTrigger(**t.value).expression
+        for t in graph_template.triggers
+        if t.type == TriggerTypeEnum.CRON
+    }
 
-    removed = old_crons - new_crons
+    removed = old_crons - new_crons
+    if not removed:
+        return
 
     await DatabaseTriggers.find(
         DatabaseTriggers.graph_name == graph_template.name,
+        DatabaseTriggers.namespace == graph_template.namespace,
         DatabaseTriggers.trigger_status == TriggerStatusEnum.PENDING,
         DatabaseTriggers.type == TriggerTypeEnum.CRON,
-        In(DatabaseTriggers.expression, [cron.expression for cron in removed])
+        In(DatabaseTriggers.expression, list(removed))
     ).update(
         {
             "$set": {
                 "trigger_status": TriggerStatusEnum.CANCELLED
             }
         }
     ) # type: ignore

179-191: Don’t flip a VALID template to INVALID due to scheduling errors. Save first; gather with return_exceptions.

Keeps validation result independent of CRON persistence issues; log errors instead.

-        graph_template.validation_status = GraphTemplateValidationStatus.VALID
-        graph_template.validation_errors = []
-
-        await asyncio.gather(*[cancel_crons(graph_template, old_triggers), create_crons(graph_template, old_triggers)])
-
-        await graph_template.save()
+        graph_template.validation_status = GraphTemplateValidationStatus.VALID
+        graph_template.validation_errors = []
+        await graph_template.save()
+
+        results = await asyncio.gather(
+            cancel_crons(graph_template, old_triggers),
+            create_crons(graph_template, old_triggers),
+            return_exceptions=True,
+        )
+        for r in results:
+            if isinstance(r, Exception):
+                logger.error(
+                    f"CRON scheduling error for graph template {graph_template.id}: {r}",
+                    exc_info=True,
+                )
state-manager/app/models/db/trigger.py (1)

9-36: Model and indexes look solid for idempotent scheduling.

Unique compound index includes namespace to avoid cross-tenant collisions; time index supports scanning upcoming triggers.

state-manager/app/models/trigger_models.py (1)

18-23: Trim and clarify cron expression validation.

Improves error messages and rejects whitespace-only inputs.

     @field_validator("expression")
     @classmethod
     def validate_expression(cls, v: str) -> str:
-        if not croniter.is_valid(v):
-            raise ValueError("Invalid cron expression")
-        return v
+        v = v.strip()
+        if not v:
+            raise ValueError("Cron expression must not be empty")
+        if not croniter.is_valid(v):
+            raise ValueError(f"Invalid cron expression: {v!r}")
+        return v

- Introduced APScheduler as a dependency in `pyproject.toml` to facilitate scheduled tasks.
- Implemented a cron job in `main.py` that triggers the `trigger_cron` function every minute.
- Created a new `trigger_cron.py` file to define the task executed by the scheduler.

This enhancement allows for periodic execution of tasks, improving the application's scheduling capabilities.
…ality

- Updated import statements in multiple test files to replace `TriggerGraphRequestModel` with the new `trigger_graph_model`.
- Enhanced the `test_lifespan_init_beanie_with_correct_models` to include `DatabaseTriggers` in the expected document models.
- Improved mock setups in `test_verify_graph.py` to handle asynchronous operations and ensure proper validation checks.
- Adjusted background task assertions in `test_upsert_graph_template.py` for clarity and correctness.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
state-manager/app/main.py (1)

8-8: Wrong Mongo client and incorrect close semantics (will crash at runtime).

  • PyMongo does not expose AsyncMongoClient here; Beanie expects Motor’s AsyncIOMotorClient.
  • Motor’s close() is sync; remove await.
-from pymongo import AsyncMongoClient
+from motor.motor_asyncio import AsyncIOMotorClient
@@
-    await client.close()
+    client.close()

Also applies to: 82-82

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eb5acab and b83fc22.

⛔ Files ignored due to path filters (1)
  • state-manager/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • state-manager/app/main.py (3 hunks)
  • state-manager/app/tasks/trigger_cron.py (1 hunks)
  • state-manager/pyproject.toml (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: NiveditJain
PR: exospherehost/exospherehost#428
File: state-manager/app/tasks/verify_graph.py:4-5
Timestamp: 2025-09-28T13:35:42.851Z
Learning: In the cron trigger scheduling logic for state-manager/app/tasks/verify_graph.py, the system intentionally schedules at least one trigger beyond the trigger_ahead_time window to ensure continuity of scheduled executions. The current logic of appending an event then breaking is by design to guarantee "at least one next" trigger.
🧬 Code graph analysis (1)
state-manager/app/main.py (3)
state-manager/app/models/db/trigger.py (1)
  • DatabaseTriggers (9-36)
state-manager/app/models/trigger_models.py (1)
  • CronTrigger (15-23)
state-manager/app/tasks/trigger_cron.py (1)
  • trigger_cron (3-4)
🔇 Additional comments (1)
state-manager/app/main.py (1)

70-75: Confirm that this every-minute job aligns with the trigger reconciliation flow.

If verify_graph now owns CRON reconciliation (create/cancel within ahead window), ensure this heartbeat job either invokes that path or is removed to avoid duplicate scheduling logic. Based on learnings.

- Added functions to retrieve and update due triggers in the database, changing their status from PENDING to TRIGGERED.
- Implemented the `call_trigger_graph` function to execute the trigger graph with a unique request ID.
- Enhanced the `trigger_cron` function to log the start time and process due triggers in a loop, calling the necessary functions for each trigger.
- Introduced a placeholder for `create_next_triggers` to facilitate future trigger creation logic.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
state-manager/tests/unit/tasks/test_verify_graph.py (1)

542-567: Verify cron helpers run in the “valid graph” path.

Same Ruff F841 issue here: the newly patched cron mocks aren’t touched. Assert they were awaited with the expected arguments to confirm the cron scheduling branch executes and to unblock the lint job.

         graph_template.triggers = []
         graph_template.name = "test_graph"

         # This should mark the graph as valid
         await verify_graph(graph_template, [])
+
+        mock_cancel_crons.assert_awaited_once_with(graph_template, [])
+        mock_create_crons.assert_awaited_once_with(graph_template, [])
 
     # Verify that the graph was processed (status may vary based on actual validation)
     # The specific status depends on the actual validation logic
     assert graph_template.save.called
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b83fc22 and 9fdda88.

📒 Files selected for processing (7)
  • state-manager/app/tasks/trigger_cron.py (1 hunks)
  • state-manager/tests/unit/controller/test_trigger_graph.py (1 hunks)
  • state-manager/tests/unit/controller/test_upsert_graph_template.py (2 hunks)
  • state-manager/tests/unit/tasks/test_verify_graph.py (7 hunks)
  • state-manager/tests/unit/test_main.py (2 hunks)
  • state-manager/tests/unit/test_routes.py (1 hunks)
  • state-manager/tests/unit/with_database/conftest.py (2 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: NiveditJain
PR: exospherehost/exospherehost#428
File: state-manager/app/tasks/verify_graph.py:4-5
Timestamp: 2025-09-28T13:35:42.851Z
Learning: In the cron trigger scheduling logic for state-manager/app/tasks/verify_graph.py, the system intentionally schedules at least one trigger beyond the trigger_ahead_time window to ensure continuity of scheduled executions. The current logic of appending an event then breaking is by design to guarantee "at least one next" trigger.
📚 Learning: 2025-09-28T13:35:42.851Z
Learnt from: NiveditJain
PR: exospherehost/exospherehost#428
File: state-manager/app/tasks/verify_graph.py:4-5
Timestamp: 2025-09-28T13:35:42.851Z
Learning: In the cron trigger scheduling logic for state-manager/app/tasks/verify_graph.py, the system intentionally schedules at least one trigger beyond the trigger_ahead_time window to ensure continuity of scheduled executions. The current logic of appending an event then breaking is by design to guarantee "at least one next" trigger.

Applied to files:

  • state-manager/app/tasks/trigger_cron.py
🧬 Code graph analysis (6)
state-manager/tests/unit/test_routes.py (1)
state-manager/app/models/trigger_graph_model.py (1)
  • TriggerGraphRequestModel (4-7)
state-manager/tests/unit/controller/test_upsert_graph_template.py (2)
state-manager/tests/unit/test_routes.py (1)
  • mock_background_tasks (361-363)
state-manager/tests/unit/controller/test_executed_state.py (1)
  • mock_background_tasks (27-28)
state-manager/app/tasks/trigger_cron.py (5)
state-manager/app/models/db/trigger.py (1)
  • DatabaseTriggers (9-36)
state-manager/app/models/trigger_models.py (1)
  • TriggerStatusEnum (9-13)
state-manager/app/singletons/logs_manager.py (2)
  • LogsManager (9-66)
  • get_logger (65-66)
state-manager/app/controller/trigger_graph.py (1)
  • trigger_graph (31-115)
state-manager/app/models/trigger_graph_model.py (1)
  • TriggerGraphRequestModel (4-7)
state-manager/tests/unit/tasks/test_verify_graph.py (3)
state-manager/app/tasks/verify_graph.py (1)
  • verify_graph (158-191)
state-manager/app/models/db/registered_node.py (1)
  • list_nodes_by_templates (34-44)
state-manager/app/models/graph_template_validation_status.py (1)
  • GraphTemplateValidationStatus (4-8)
state-manager/tests/unit/controller/test_trigger_graph.py (1)
state-manager/app/models/trigger_graph_model.py (1)
  • TriggerGraphRequestModel (4-7)
state-manager/tests/unit/test_main.py (1)
state-manager/app/models/db/trigger.py (1)
  • DatabaseTriggers (9-36)
🪛 GitHub Actions: Ruff check on changed files only
state-manager/tests/unit/tasks/test_verify_graph.py

[error] 388-388: F841 Local variable 'mock_cancel_crons' is assigned to but never used (remove assignment).


[error] 389-389: F841 Local variable 'mock_create_crons' is assigned to but never used (remove assignment).


[error] 502-502: F841 Local variable 'mock_cancel_crons' is assigned to but never used (remove assignment).


[error] 503-503: F841 Local variable 'mock_create_crons' is assigned to but never used (remove assignment).


[error] 542-542: F841 Local variable 'mock_cancel_crons' is assigned to but never used (remove assignment).


[error] 543-543: F841 Local variable 'mock_create_crons' is assigned to but never used (remove assignment).

- Updated the Settings model to replace `trigger_ahead_time` with `trigger_workers`, allowing for configurable worker count for trigger processing.
- Added a new status `TRIGGERING` to the TriggerStatusEnum to better represent the trigger lifecycle.
- Enhanced the `trigger_cron` function to utilize the new worker configuration, enabling concurrent processing of due triggers.
- Refined the `create_next_triggers` function to calculate the next trigger time based on the cron expression, improving trigger scheduling logic.
@NiveditJain
Copy link
Member Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces cron-based triggers for graphs, a significant feature enhancement. The implementation correctly uses apscheduler for scheduling and croniter for cron expression handling. The database models and API endpoints are updated accordingly. My review focuses on improving the robustness of the new trigger handling logic, particularly around error handling and race conditions, to prevent triggers from getting into a stuck state. I've also included a suggestion to strengthen a unit test.

- Removed unused imports from verify_graph.py to streamline the code.
- Updated mock setups in test_verify_graph.py to improve clarity by replacing mock variable names with underscores, enhancing readability without changing functionality.
@NiveditJain
Copy link
Member Author

@coderabbitai review this

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 28, 2025

@NiveditJain I'll review the changes in this PR which introduces cron-based trigger functionality for graph templates.

This is a significant enhancement that adds scheduled execution capabilities to your graph system. The implementation includes:

  • New trigger models (Trigger, CronTrigger, TriggerTypeEnum, TriggerStatusEnum) with proper validation
  • Database integration with DatabaseTriggers document model and appropriate indexes
  • APScheduler integration for cron job execution
  • Enhanced GraphTemplate to include triggers field
  • Background task updates to handle trigger lifecycle management

The changes look comprehensive and well-structured. I'll provide a detailed review of all the modifications.

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

- Introduced a new function `mark_as_failed` to update the trigger status to FAILED in case of errors during processing.
- Enhanced the `handle_trigger` function to include a try-except block, ensuring that exceptions are logged and the trigger status is updated appropriately when errors occur.
- This change improves the robustness of the trigger processing logic by handling failures gracefully.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6b2bf85 and 155f7cc.

📒 Files selected for processing (1)
  • state-manager/app/tasks/trigger_cron.py (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-28T13:35:42.851Z
Learnt from: NiveditJain
PR: exospherehost/exospherehost#428
File: state-manager/app/tasks/verify_graph.py:4-5
Timestamp: 2025-09-28T13:35:42.851Z
Learning: In the cron trigger scheduling logic for state-manager/app/tasks/verify_graph.py, the system intentionally schedules at least one trigger beyond the trigger_ahead_time window to ensure continuity of scheduled executions. The current logic of appending an event then breaking is by design to guarantee "at least one next" trigger.

Applied to files:

  • state-manager/app/tasks/trigger_cron.py
🧬 Code graph analysis (1)
state-manager/app/tasks/trigger_cron.py (6)
state-manager/app/models/db/trigger.py (1)
  • DatabaseTriggers (9-36)
state-manager/app/models/trigger_models.py (2)
  • TriggerStatusEnum (9-14)
  • TriggerTypeEnum (6-7)
state-manager/app/singletons/logs_manager.py (2)
  • LogsManager (9-66)
  • get_logger (65-66)
state-manager/app/controller/trigger_graph.py (1)
  • trigger_graph (31-115)
state-manager/app/models/trigger_graph_model.py (1)
  • TriggerGraphRequestModel (4-7)
state-manager/app/config/settings.py (1)
  • get_settings (32-36)
🔇 Additional comments (2)
state-manager/app/tasks/trigger_cron.py (2)

47-54: Handle duplicate next-trigger inserts without failing the run

DatabaseTriggers has a unique index on (type, expression, graph_name, namespace, trigger_time). When verify_graph or another worker has already enqueued the same next occurrence, this insert raises DuplicateKeyError. The exception bubbles, we mark the already-fired trigger as FAILED, and the run looks broken even though the graph invocation succeeded. Catch the duplicate, log at info/debug, and continue so we still mark the trigger as TRIGGERED.

-from pymongo import ReturnDocument
+from pymongo import ReturnDocument
+from pymongo.errors import DuplicateKeyError
@@
-    await DatabaseTriggers(
-        type=TriggerTypeEnum.CRON,
-        expression=trigger.expression,
-        graph_name=trigger.graph_name,
-        namespace=trigger.namespace,
-        trigger_time=next_trigger_time,
-        trigger_status=TriggerStatusEnum.PENDING
-    ).insert()
+    try:
+        await DatabaseTriggers(
+            type=TriggerTypeEnum.CRON,
+            expression=trigger.expression,
+            graph_name=trigger.graph_name,
+            namespace=trigger.namespace,
+            trigger_time=next_trigger_time,
+            trigger_status=TriggerStatusEnum.PENDING,
+        ).insert()
+    except DuplicateKeyError:
+        logger.info(
+            "next trigger already scheduled",
+            trigger_id=str(trigger.id),
+            next_trigger_time=next_trigger_time.isoformat(),
+        )

73-74: Make cron_time UTC-aware before querying Mongo

We still compute cron_time = datetime.now() (naive local time). Database trigger timestamps are stored in UTC, so on any host not set to UTC we start missing or delaying firings. Please switch to datetime.now(timezone.utc) (and add the import) and log with structured fields instead of string interpolation.

-from datetime import datetime
+from datetime import datetime, timezone
@@
-    cron_time = datetime.now()
-    logger.info(f"starting trigger_cron: {cron_time}")
+    cron_time = datetime.now(timezone.utc)
+    logger.info("starting trigger_cron", cron_time=cron_time.isoformat())

- Removed the CronTrigger class usage and replaced it with direct handling of cron expressions for improved clarity and performance.
- Updated the logic in `cancel_crons` and `create_crons` functions to work with cron expressions instead of CronTrigger instances, streamlining the trigger management process.
- Enhanced the readability of the code by simplifying the data structures used for managing cron triggers.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant