Skip to content

Conversation

yuandrew
Copy link
Contributor

@yuandrew yuandrew commented Sep 29, 2025

What was changed

NOTE: targeting worker-heartbeat feature branch.

New in memory mechanism to keep track of certain metrics for worker heartbeating.

Why?

Checklist

  1. Closes

  2. How was this tested:

  1. Any docs updates needed?

Note

Adds in-memory metrics backing and wires full worker heartbeat reporting (pollers, slots, cache, status, timing), plus a client API to list workers, with supporting poller/tuner and API changes.

  • Telemetry/metrics:
    • Introduce in-memory metric tracking (HeartbeatMetricType, WorkerHeartbeatMetrics) and dual-recording instruments (counter_with_in_memory, gauge_with_in_memory, histogram_duration_with_in_memory).
    • Extend MetricAttributes/NoOp meter to support label value access and in-memory attrs; add per-metric label extraction.
  • Worker heartbeat:
    • Collect and send rich heartbeat data (status, host CPU/mem, sticky cache stats, poller info with last successful poll times, slot supplier kind, per-interval processed/failed counts, plugins, deployment version).
    • Convert heartbeat callbacks to Arc, simplify shared-namespace worker callback handling.
    • Client populates heartbeat client fields and supports final heartbeat on shutdown.
  • Pollers:
    • Track last successful poll times for workflow (sticky/non-sticky), activity, and nexus pollers; expose poller autoscaling info.
  • Core API:
    • Add worker_instance_key() to Worker; import uuid.
  • Tuner/System info:
    • Add supplier kind reporting; move RealSysInfo to a background refresh thread; expose sys info via tuner.
  • Client API:
    • Add list_workers RPC wrapper; enhance worker client heartbeat/shutdown methods.
  • Misc/Tests:
    • Update worker registry types and ordering; numerous unit/integration tests for heartbeat/metrics; enable server dynamic configs for heartbeats/workers list.

Written by Cursor Bugbot for commit d956784. This will update automatically on new commits. Configure here.

}
pub type Counter = LazyBoundMetric<

pub type CounterType = LazyBoundMetric<
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding Type as the suffix here to these reads confusingly to me. Maybe Impl works better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call, CounterType was lazy me literally seeing the word type and just appending it

kinda like https://www.youtube.com/watch?v=6haBMbtXSLg

metric: inner,
attributes: MetricAttributes::Empty,
bound_cache: OnceLock::new(),
primary: LazyBoundMetric {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably easier to just unify these constructors and take in_memory as optional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helps completely mask the concept of in memory metrics from the CoreMeter trait. Moving this into new() would mean callers would have to pass in None for the in_memory field, whereas how things currently are, there's no exposure of the concept of in_mem

shutdown: CancellationToken,
num_pollers_handler: Option<impl Fn(usize) + Send + Sync + 'static>,
options: WorkflowTaskOptions,
last_successful_poll_time: Arc<parking_lot::Mutex<Option<SystemTime>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would invert this. Rather than passing these in, add a get_last_successful_poll_handle() -> Arc<AtomicCell<Option<SystemTime> (btw you can use https://docs.rs/crossbeam/latest/crossbeam/atomic/struct.AtomicCell.html since SystemTime is Copy here which will be better than a lock)

Which you can call after these are built in worker setup, and then pass them into the heartbeat manager. One fewer thing to add to these mega arg lists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sold that this is better, style wise. To plumb the metric out to the caller new_with_pollers, we'd need to create a new struct return type to house

last_successful_poll_time: Arc<AtomicCell<Option<SystemTime>>>
sticky_last_successful_poll_time: Option<Arc<AtomicCell<Option<SystemTime>>>>
and

impl Stream<
    Item = Result<
        (
            ValidPollWFTQResponse,
            OwnedMeteredSemPermit<WorkflowSlotKind>,
        ),
        tonic::Status,
    >,
> + Sized
+ 'static,

Seems easier to pass in the params?

);
}
self.shutdown_token.cancel();
*self.status.lock() = WorkerStatus::ShuttingDown;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might hold the lock for the rest of this scope. Maybe doesn't, but, better to be safe and just put this line inside it's own { }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do!

Shouldn't the lock only last for this line, since we aren't assigning the lock() to any local var, once that line passes the MutexGuard drops and the lock unlocks?

@yuandrew yuandrew marked this pull request as ready for review October 5, 2025 03:34
@yuandrew yuandrew requested a review from a team as a code owner October 5, 2025 03:34
cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

fn new() -> (Arc<Self>, Arc<AtomicU64>) {
let used = Arc::new(AtomicU64::new(0));
(Self { used: used.clone() }, used)
(Arc::new(Self { used: used.clone() }), used)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Test Helper Signature Change Causes Compilation Errors

The FakeMIS::new() test helper function's signature changed to return (Arc<Self>, Arc<AtomicU64>). This creates a type mismatch with existing call sites (likely in tests) that still expect (Self, Arc<AtomicU64>), leading to a compilation error.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants