Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ use std::sync::Arc;
use temporal_client::{ConfiguredClient, TemporalServiceClientWithMetrics};
use temporal_sdk_core_api::{
errors::{CompleteActivityError, PollActivityError, PollWfError},
telemetry::{metrics::TemporalMeter, TelemetryOptions},
telemetry::{
metrics::{CoreMeter, TemporalMeter},
TelemetryOptions,
},
Worker as WorkerTrait,
};
use temporal_sdk_core_protos::coresdk::ActivityHeartbeat;
Expand Down Expand Up @@ -278,6 +281,12 @@ impl CoreRuntime {
pub fn telemetry(&self) -> &TelemetryInstance {
&self.telemetry
}

/// Some metric meters cannot be initialized until after a tokio runtime has started and after
/// other telemetry has initted (ex: prometheus). They can be attached here.
pub fn attach_late_init_metrics(&mut self, meter: Arc<dyn CoreMeter + 'static>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason I shouldn't always use this? It's easier for my code if I don't have some special knowledge of which CoreMeter impl needs late binding and which doesn't.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to always do it

self.telemetry.attach_late_init_metrics(meter)
}
}

impl Drop for CoreRuntime {
Expand Down
6 changes: 6 additions & 0 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ impl TelemetryInstance {
self.trace_subscriber.clone()
}

/// Some metric meters cannot be initialized until after a tokio runtime has started and after
/// other telemetry has initted (ex: prometheus). They can be attached here.
pub fn attach_late_init_metrics(&mut self, meter: Arc<dyn CoreMeter + 'static>) {
self.metrics = Some(meter);
}

/// Returns our wrapper for metric meters, can be used to, ex: initialize clients
pub fn get_metric_meter(&self) -> Option<TemporalMeter> {
self.metrics.clone().map(|m| {
Expand Down
27 changes: 27 additions & 0 deletions tests/integ_tests/metrics_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,3 +423,30 @@ async fn query_of_closed_workflow_doesnt_tick_terminal_metric(
.expect("Must find matching metric");
assert!(matching_line.ends_with('1'));
}

#[test]
fn runtime_new() {
let mut rt = CoreRuntime::new(
get_integ_telem_options(),
tokio::runtime::Builder::new_multi_thread(),
)
.unwrap();
let handle = rt.tokio_handle();
let _rt = handle.enter();
let (telemopts, addr, _aborter) = prom_metrics();
rt.attach_late_init_metrics(telemopts.metrics.unwrap());
let opts = get_integ_server_options();
handle.block_on(async {
let mut raw_client = opts
.connect_no_namespace(rt.metric_meter(), None)
.await
.unwrap();
assert!(raw_client.get_client().capabilities().is_some());
let _ = raw_client
.list_namespaces(ListNamespacesRequest::default())
.await
.unwrap();
let body = get_text(format!("http://{addr}/metrics")).await;
assert!(body.contains("temporal_request"));
});
}