Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c32d6c9
plumb in memory metrics
yuandrew Sep 29, 2025
0ad6be2
simplify worker::new(), fix some heartbeat metrics, new test file
yuandrew Sep 30, 2025
d314a35
CounterImpl, final_heartbeat, more specific metric label dbg_panic ms…
yuandrew Oct 1, 2025
b00b80e
Support in-mem metrics when metrics aren't configured
yuandrew Oct 1, 2025
894d916
Move sys_info refresh to dedicated thread, use tuner's existing sys info
yuandrew Oct 3, 2025
ef96616
Format, AtomicCell
yuandrew Oct 5, 2025
0350f91
Fix unit test
yuandrew Oct 5, 2025
fe3956c
Set dynamic config for WorkerHeartbeatsEnabled and ListWorkersEnabled…
yuandrew Oct 7, 2025
d956784
Should not expect heartbeat nexus worker in metrics for non-heartbeat…
yuandrew Oct 7, 2025
98e778d
recv_timeout instead of thread::sleep, use WorkflowService::list_work…
yuandrew Oct 8, 2025
96e0186
MetricAttributes::NoOp, add mechanism to ignore dupe workers for test…
yuandrew Oct 8, 2025
e1ad0ae
More tests, sticky cache miss, plugins
yuandrew Oct 13, 2025
8f42387
Formatting, fix skip_client_worker_set_check
yuandrew Oct 14, 2025
d54a3eb
Cursor found a bug
yuandrew Oct 14, 2025
13d981c
Lower sleep time, add print for debugging
yuandrew Oct 14, 2025
f1a3634
more prints
yuandrew Oct 14, 2025
6d3a909
use semaphores for worker_heartbeat_failure_metrics
yuandrew Oct 15, 2025
a29d2df
skip_client_worker_set_check for all integ workers
yuandrew Oct 15, 2025
fc7f839
Can't use tokio semaphore in workflow code
yuandrew Oct 15, 2025
c73d7b8
use signal to test workflow_slots.last_interval_failure_tasks
yuandrew Oct 16, 2025
f52345e
Use Notify instead of semaphores, fix test flake
yuandrew Oct 16, 2025
1601f0e
Use eventually() instead of a manual sleep
yuandrew Oct 17, 2025
8f1ef5a
max_outstanding_workflow_tasks 2
yuandrew Oct 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[env]
# This temporarily overrides the version of the CLI used for integration tests, locally and in CI
CLI_VERSION_OVERRIDE = "v1.4.1-cloud-v1-29-0-139-2.0"
#CLI_VERSION_OVERRIDE = "v1.4.1-cloud-v1-29-0-139-2.0"

[alias]
# Not sure why --all-features doesn't work
Expand Down
18 changes: 18 additions & 0 deletions client/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,15 @@ proxier! {
r.extensions_mut().insert(labels);
}
);
(
describe_worker,
DescribeWorkerRequest,
DescribeWorkerResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
(
record_worker_heartbeat,
RecordWorkerHeartbeatRequest,
Expand Down Expand Up @@ -1382,6 +1391,15 @@ proxier! {
r.extensions_mut().insert(labels);
}
);
(
set_worker_deployment_manager,
SetWorkerDeploymentManagerRequest,
SetWorkerDeploymentManagerResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
}

proxier! {
Expand Down
63 changes: 29 additions & 34 deletions client/src/worker_registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ impl ClientWorkerSetImpl {
fn register(
&mut self,
worker: Arc<dyn ClientWorker + Send + Sync>,
skip_client_worker_set_check: bool,
) -> Result<(), anyhow::Error> {
let slot_key = SlotKey::new(
worker.namespace().to_string(),
worker.task_queue().to_string(),
);
if self.slot_providers.contains_key(&slot_key) {
if self.slot_providers.contains_key(&slot_key) && !skip_client_worker_set_check {
bail!(
"Registration of multiple workers on the same namespace and task queue for the same client not allowed: {slot_key:?}, worker_instance_key: {:?}.",
worker.worker_instance_key()
Expand Down Expand Up @@ -133,14 +134,8 @@ impl ClientWorkerSetImpl {

if let Some(w) = self.shared_worker.get_mut(worker.namespace()) {
let (callback, is_empty) = w.unregister_callback(worker.worker_instance_key());
if let Some(cb) = callback {
if is_empty {
self.shared_worker.remove(worker.namespace());
}

// To maintain single ownership of the callback, we must re-register the callback
// back to the ClientWorker
worker.register_callback(cb);
if callback.is_some() && is_empty {
self.shared_worker.remove(worker.namespace());
}
}

Expand Down Expand Up @@ -212,6 +207,17 @@ impl ClientWorkerSet {
.try_reserve_wft_slot(namespace, task_queue)
}

/// Register a local worker that can provide WFT processing slots and potentially worker heartbeating.
pub fn register_worker(
&self,
worker: Arc<dyn ClientWorker + Send + Sync>,
skip_client_worker_set_check: bool,
) -> Result<(), anyhow::Error> {
self.worker_manager
.write()
.register(worker, skip_client_worker_set_check)
}

/// Unregisters a local worker, typically when that worker starts shutdown.
pub fn unregister_worker(
&self,
Expand All @@ -220,14 +226,6 @@ impl ClientWorkerSet {
self.worker_manager.write().unregister(worker_instance_key)
}

/// Register a local worker that can provide WFT processing slots and potentially worker heartbeating.
pub fn register_worker(
&self,
worker: Arc<dyn ClientWorker + Send + Sync>,
) -> Result<(), anyhow::Error> {
self.worker_manager.write().register(worker)
}

/// Returns the worker grouping key, which is unique for each worker.
pub fn worker_grouping_key(&self) -> Uuid {
self.worker_grouping_key
Expand Down Expand Up @@ -256,7 +254,7 @@ impl std::fmt::Debug for ClientWorkerSet {
}

/// Contains a worker heartbeat callback, wrapped for mocking
pub type HeartbeatCallback = Box<dyn Fn() -> WorkerHeartbeat + Send + Sync>;
pub type HeartbeatCallback = Arc<dyn Fn() -> WorkerHeartbeat + Send + Sync>;

/// Represents a complete worker that can handle both slot management
/// and worker heartbeat functionality.
Expand All @@ -276,7 +274,7 @@ pub trait ClientWorker: Send + Sync {
fn try_reserve_wft_slot(&self) -> Option<Box<dyn Slot + Send>>;

/// Unique identifier for this worker instance.
/// This must be stable across the worker's lifetime but unique per instance.
/// This must be stable across the worker's lifetime and unique per instance.
fn worker_instance_key(&self) -> Uuid;

/// Indicates if worker heartbeating is enabled for this client worker.
Expand All @@ -289,9 +287,6 @@ pub trait ClientWorker: Send + Sync {
fn new_shared_namespace_worker(
&self,
) -> Result<Box<dyn SharedNamespaceWorkerTrait + Send + Sync>, anyhow::Error>;

/// Registers a worker heartbeat callback, typically when a worker is unregistered from a client
fn register_callback(&self, callback: HeartbeatCallback);
}

#[cfg(test)]
Expand Down Expand Up @@ -350,7 +345,7 @@ mod tests {
new_mock_provider(namespace, "bar_q".to_string(), false, false, false);
let worker_instance_key = mock_provider.worker_instance_key();

let result = manager.register_worker(Arc::new(mock_provider));
let result = manager.register_worker(Arc::new(mock_provider), false);
if result.is_ok() {
successful_registrations += 1;
worker_keys.push(worker_instance_key);
Expand Down Expand Up @@ -453,7 +448,7 @@ mod tests {
if heartbeat_enabled {
mock_provider
.expect_heartbeat_callback()
.returning(|| Some(Box::new(WorkerHeartbeat::default)));
.returning(|| Some(Arc::new(WorkerHeartbeat::default)));

let namespace_clone = namespace.clone();
mock_provider
Expand All @@ -463,8 +458,6 @@ mod tests {
namespace_clone.clone(),
)))
});

mock_provider.expect_register_callback().returning(|_| {});
}

mock_provider
Expand All @@ -489,10 +482,10 @@ mod tests {
Uuid::new_v4(),
);

manager.register_worker(Arc::new(worker1)).unwrap();
manager.register_worker(Arc::new(worker1), false).unwrap();

// second worker register should fail due to duplicate namespace+task_queue
let result = manager.register_worker(Arc::new(worker2));
let result = manager.register_worker(Arc::new(worker2), false);
assert!(result.is_err());
assert!(
result
Expand Down Expand Up @@ -528,8 +521,8 @@ mod tests {
Uuid::new_v4(),
);

manager.register_worker(Arc::new(worker1)).unwrap();
manager.register_worker(Arc::new(worker2)).unwrap();
manager.register_worker(Arc::new(worker1), false).unwrap();
manager.register_worker(Arc::new(worker2), false).unwrap();

assert_eq!(2, manager.num_providers());
assert_eq!(manager.num_heartbeat_workers(), 2);
Expand Down Expand Up @@ -558,8 +551,8 @@ mod tests {
Uuid::new_v4(),
);

manager.register_worker(Arc::new(worker1)).unwrap();
manager.register_worker(Arc::new(worker2)).unwrap();
manager.register_worker(Arc::new(worker1), false).unwrap();
manager.register_worker(Arc::new(worker2), false).unwrap();

assert_eq!(2, manager.num_providers());
assert_eq!(manager.num_heartbeat_workers(), 2);
Expand Down Expand Up @@ -590,8 +583,10 @@ mod tests {
let worker_instance_key1 = worker1.worker_instance_key();
let worker_instance_key2 = worker2.worker_instance_key();

manager.register_worker(Arc::new(worker1)).unwrap();
manager.register_worker(Arc::new(worker2)).unwrap();
assert_ne!(worker_instance_key1, worker_instance_key2);

manager.register_worker(Arc::new(worker1), false).unwrap();
manager.register_worker(Arc::new(worker2), false).unwrap();

// Verify initial state: 2 slot providers, 2 heartbeat workers, 1 shared worker
assert_eq!(2, manager.num_providers());
Expand Down
1 change: 1 addition & 0 deletions core-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tonic = { workspace = true }
tracing = "0.1"
tracing-core = "0.1"
url = "2.5"
uuid = { version = "1.18.1", features = ["v4"] }

[dependencies.temporal-sdk-core-protos]
path = "../sdk-core-protos"
Expand Down
9 changes: 9 additions & 0 deletions core-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use temporal_sdk_core_protos::coresdk::{
workflow_activation::WorkflowActivation,
workflow_completion::WorkflowActivationCompletion,
};
use uuid::Uuid;

/// This trait is the primary way by which language specific SDKs interact with the core SDK.
/// It represents one worker, which has a (potentially shared) client for connecting to the service
Expand Down Expand Up @@ -138,6 +139,10 @@ pub trait Worker: Send + Sync {
/// This should be called only after [Worker::shutdown] has resolved and/or both polling
/// functions have returned `ShutDown` errors.
async fn finalize_shutdown(self);

/// Unique identifier for this worker instance.
/// This must be stable across the worker's lifetime and unique per instance.
fn worker_instance_key(&self) -> Uuid;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -205,6 +210,10 @@ where
async fn finalize_shutdown(self) {
panic!("Can't finalize shutdown on Arc'd worker")
}

fn worker_instance_key(&self) -> Uuid {
(**self).worker_instance_key()
}
}

macro_rules! dbg_panic {
Expand Down
Loading
Loading