Skip to content

Conversation

@ryanolson
Copy link
Contributor

@ryanolson ryanolson commented Aug 11, 2025

I needed something similar for kvbm, so I made it general.

Summary by CodeRabbit

  • New Features
    • Added robust management for critical background tasks, including monitoring, graceful shutdown, cancellation, and explicit failure propagation for improved reliability.
  • Refactor
    • Reorganized runtime utilities into clearer modules without changing the public interface or behavior.
  • Chores
    • Enabled additional runtime capabilities in async utilities to support improved task handling.
  • Tests
    • Added comprehensive tests covering success, failure, cancellation, panics, timeouts, and lifecycle enforcement to ensure stability.

@github-actions github-actions bot added the feat label Aug 11, 2025
@ryanolson ryanolson marked this pull request as ready for review August 11, 2025 22:14
@ryanolson ryanolson requested a review from a team as a code owner August 11, 2025 22:14
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 11, 2025

Walkthrough

Adds Tokio tokio-util "rt" feature in Cargo.toml. Introduces utils::tasks module with submodules critical and tracker. Moves CriticalTask* implementation from utils/task.rs to utils/tasks/critical.rs, keeping public API via re-export. Adds CriticalTaskExecutionHandle with monitor, cancellation, join, and detach behavior; exposes new constructors and status methods.

Changes

Cohort / File(s) Summary
Dependency features
Cargo.toml
Updated [workspace.dependencies] for tokio-util to include feature rt alongside codec and net.
Module wiring
lib/runtime/src/utils.rs, lib/runtime/src/utils/tasks.rs
Exported new tasks module; added pub mod critical; and pub mod tracker; under utils::tasks.
API re-export
lib/runtime/src/utils/task.rs
Removed in-file implementation; now re-exports utils::tasks::critical::* to preserve public API paths.
Critical tasks implementation
lib/runtime/src/utils/tasks/critical.rs
Added CriticalTaskHandler and CriticalTaskExecutionHandle with spawning, monitoring, cancellation, join, detach, and Drop enforcement; includes tests.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant Runtime
    participant MainTask
    participant Monitor
    participant ParentToken
    participant Oneshoot as OneShot(Result)

    Caller->>Runtime: new()/new_with_runtime(task_fn, parent_token, desc)
    Runtime->>MainTask: spawn task_fn(graceful_token)
    Runtime->>Monitor: spawn monitor(MainTask, parent_token, OneShot)

    alt MainTask Ok(())
        MainTask-->>Monitor: Ok
        Monitor-->>Oneshoot: send Ok(())
    else MainTask Err or Panic
        MainTask-->>Monitor: Err/Panic
        Monitor->>ParentToken: cancel()
        Monitor-->>Oneshoot: send Err(...)
    end

    Caller->>Oneshoot: join()
    Oneshoot-->>Caller: Result<()>
    Caller->>Caller: detach() optional
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Poem

A rabbit taps the runtime’s drum,
Spawns a task—then watchers come.
If it stumbles, cancel fast;
If it dances, all’s to last.
With a gentle token’s sway,
Join, detach—then hop away.
🐇🛠️✨


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
lib/runtime/src/utils/tasks/critical.rs (1)

170-174: Doc nit: clarify subject of is_finished()

"Server's background event loop" looks like a leftover reference.

-    /// Check if the task awaiting on the [Server]s background event loop has finished.
+    /// Check if the critical task (monitored by this handle) has finished.
lib/runtime/src/utils/task.rs (1)

4-4: Optional: Soft-deprecate this legacy path to nudge callers to utils::tasks::critical

Keep the re-export for compatibility, but mark it deprecated to encourage gradual migration.

- pub use super::tasks::critical::*;
+#[deprecated(
+    since = "0.4.0",
+    note = "Moved to utils::tasks::critical; this re-export remains for compatibility."
+)]
+pub use super::tasks::critical::*;
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fd35899 and 463cb9e.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • lib/bindings/python/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • Cargo.toml (1 hunks)
  • lib/runtime/src/utils.rs (1 hunks)
  • lib/runtime/src/utils/task.rs (1 hunks)
  • lib/runtime/src/utils/tasks.rs (1 hunks)
  • lib/runtime/src/utils/tasks/critical.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
lib/runtime/src/utils/tasks/critical.rs (3)
lib/runtime/src/utils/tasks/tracker.rs (4)
  • tokio (2571-2571)
  • tokio (2833-2833)
  • tokio (2878-2878)
  • tokio (2910-2910)
lib/runtime/src/pipeline/nodes/sources/base.rs (1)
  • oneshot (73-73)
lib/runtime/src/runtime.rs (1)
  • handle (123-128)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Mirror Repository to GitLab
  • GitHub Check: Build and Test - dynamo
  • GitHub Check: pre-merge-rust (lib/runtime/examples)
  • GitHub Check: pre-merge-rust (lib/bindings/python)
  • GitHub Check: pre-merge-rust (.)
🔇 Additional comments (5)
Cargo.toml (1)

62-62: No feature gate needed for CancellationToken

According to the tokio-util v0.7 source (see https://docs.rs/tokio-util/latest/source/src/lib.rs), the sync module (and thus CancellationToken) is exposed unconditionally—there is no separate "sync" Cargo feature to enable. Your current dependency line already provides it:

tokio-util = { version = "0.7", features = ["codec", "net", "rt"] }

No change is required to use tokio_util::sync::CancellationToken.

Likely an incorrect or invalid review comment.

lib/runtime/src/utils/tasks/critical.rs (2)

49-60: Constructor ergonomics LGTM

new() delegating to new_with_runtime via Handle::try_current() is clean and fail-fast outside a runtime. API is clear.


104-160: Monitor task correctly propagates failures and panics to parent token

The monitor awaits the main task, cancels parent on error/panic, and forwards the result through oneshot. Good separation of concerns and observability via tracing.

lib/runtime/src/utils/tasks.rs (1)

4-5: Module layout LGTM

Publicly exposing critical and tracker modules under utils::tasks is clear and future-proof.

lib/runtime/src/utils.rs (1)

22-22: Export of utils::tasks is correct and non-breaking

Keeps existing modules and adds tasks; pairs well with the re-export in utils::task for compatibility.

@ryanolson ryanolson requested a review from jthomson04 August 11, 2025 22:21
@ryanolson
Copy link
Contributor Author

Some observations:

  1. TaskMetrics impl contains almost the exact same methods as HierarchicalTaskMetrics trait. Both have 16 identical methods with the same signatures (only difference is pub vs fn keywords). The only difference is that TaskMetrics is missing the success() method that HierarchicalTaskMetrics requires.

If you want to enforce parity in them, which I think you want to (esp. in future changes/refactors), then it's best to enforce that in Traits (on TaskMetrics).

  1. Question for you: what is the more intensive operation:
  • A) Very frequent updates to a bunch of RootTaskMetrics/DefaultRootTaskMetrics/ChildTaskMetrics, with infrequent Prometheus fmt output? Or...
  • B) Very very frequent Prometheus fmt output, with infrequent updates to the values?
  • I suspect it would be A). If that's the case, it would be better to use AtomicU64 only. Then, during these infrequent Prometheus fmt output, copy cover to a Prometheus data-structure (gauge). You don't need to keep the Prometheus data-structure persistently -- it's just used for output.
  1. This tree structure of RootTaskMetrics, DefaultRootTaskMetrics, and ChildTaskMetrics is very confusing to me.
  • RootTaskMetrics contains local_metrics as well as the Prometheus mirrors

  • DefaultRootTaskMetrics contains local_metrics

  • ChildTaskMetrics contains local_metrics and a parent, and the incr call will incr both

  • What do you think of simplifying it such that there is just one type of node in this tree structure:

    • TaskMetricsNode contains the 6 AtomicU64, and a Vec of children TaskMetricsNode. No Prometheus types. Trees have generic nodes: root_node->[child_node1, child_node2 -> [...], ...].
    • With this tree structure, you can still perform cancelled(), active(), success(), etc by traversing through the children and summing up the values. Traversing through the children should be a lot cheaper than constantly updating two parallel data-structures.

I will go through your test cases and examples this weekend.

I got rid of the prom format/output method entirely since it's handled by the registry. we only every inc/dec/set counters/gauges

@ryanolson
Copy link
Contributor Author

5. This tree structure of RootTaskMetrics, DefaultRootTaskMetrics, and ChildTaskMetrics is very confusing to me.

  • RootTaskMetrics contains local_metrics as well as the Prometheus mirrors

  • DefaultRootTaskMetrics contains local_metrics

  • ChildTaskMetrics contains local_metrics and a parent, and the incr call will incr both

  • What do you think of simplifying it such that there is just one type of node in this tree structure:

    • TaskMetricsNode contains the 6 AtomicU64, and a Vec of children TaskMetricsNode. No Prometheus types. Trees have generic nodes: root_node->[child_node1, child_node2 -> [...], ...].
    • With this tree structure, you can still perform cancelled(), active(), success(), etc by traversing through the children and summing up the values. Traversing through the children should be a lot cheaper than constantly updating two parallel data-structures.

The idea is the root can be a prometheus root or a local root.

RootTaskMetrics should probably be RootMetricsWithPrometheus and DefaultRootTaskMetrics should be RootTaskMetricsWithoutPrometheus.

ChildTaskMetrics are always local counters with a link to a parent.

All are HierarchicalTaskMetrics.

@ryanolson ryanolson requested a review from keivenchang August 18, 2025 20:33
@ryanolson
Copy link
Contributor Author

@keivenchang - we can remove these:

        let active_gauge = registry.create_intgauge(
            &format!("{}_tasks_active", component_name),
            "Current number of active tasks",
            &[],
        )?;

        let queued_gauge = registry.create_intgauge(
            &format!("{}_tasks_queued", component_name),
            "Current number of tasks queued in scheduler",
            &[],
        )?;

as active = issued - completed and queued.

we could just have a started counter, so then we could get queues from active - started.

@ryanolson
Copy link
Contributor Author

@keivenchang - we can remove these:

        let active_gauge = registry.create_intgauge(
            &format!("{}_tasks_active", component_name),
            "Current number of active tasks",
            &[],
        )?;

        let queued_gauge = registry.create_intgauge(
            &format!("{}_tasks_queued", component_name),
            "Current number of tasks queued in scheduler",
            &[],
        )?;

as active = issued - completed and queued.

we could just have a started counter, so then we could get queues from active - started.

removed the active gauge in favor of only monotonic counters.

@grahamking
Copy link
Contributor

Approving without review, as requested. 7k lines. What can you do?

Signed-off-by: Ryan Olson <ryanolson@users.noreply.github.com>
@ryanolson ryanolson enabled auto-merge (squash) August 19, 2025 15:48
@ryanolson ryanolson merged commit a33033b into main Aug 19, 2025
13 of 14 checks passed
@ryanolson ryanolson deleted the ryan/task-scheduler branch August 19, 2025 18:35
@keivenchang
Copy link
Contributor

keivenchang commented Aug 19, 2025

Just looked through the metrics part. The refactor looks more understandable now. Let me regurgitate this as I type it, so that I can remember it (and for future reference). Basically:

  • trait HierarchicalTaskMetrics has increment_*, started/issued/success/cancelled/failed/rejected/… and this is implemented(extended) for the 3 structs below.
  • TaskMetrics struct uses AtomicU64
  • PrometheusTaskMetrics struct uses Prom IntCounters, and using component_name as prefix
  • ChildTaskMetrics struct contains TaskMetrics and Arc (composition)

Thank you for simplifying it (reducing the number of Metrics) and making it more readable.

hhzhang16 pushed a commit that referenced this pull request Aug 27, 2025
Signed-off-by: Ryan Olson <ryanolson@users.noreply.github.com>
Signed-off-by: Hannah Zhang <hannahz@nvidia.com>
nv-anants pushed a commit that referenced this pull request Aug 28, 2025
Signed-off-by: Ryan Olson <ryanolson@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants