Skip to content

Commit 6f11bbf

Browse files
authored
Fix Metrics prefixing & remove trace export configs (#599)
* Eliminate trace exporting from configs * Fix metrics prefixing & expose non-prefixed meter
1 parent e132dfe commit 6f11bbf

File tree

7 files changed

+65
-89
lines changed

7 files changed

+65
-89
lines changed

core-api/src/telemetry.rs

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,21 @@ pub trait CoreTelemetry {
2929
#[derive(Debug, Clone, derive_builder::Builder)]
3030
#[non_exhaustive]
3131
pub struct TelemetryOptions {
32-
/// Optional trace exporter - set as None to disable.
33-
#[builder(setter(into, strip_option), default)]
34-
pub tracing: Option<TraceExportConfig>,
3532
/// Optional logger - set as None to disable.
3633
#[builder(setter(into, strip_option), default)]
3734
pub logging: Option<Logger>,
3835
/// Optional metrics exporter - set as None to disable.
3936
#[builder(setter(into, strip_option), default)]
4037
pub metrics: Option<Arc<dyn CoreMeter>>,
41-
/// If set true, strip the prefix `temporal_` from metrics, if present. Will be removed
42-
/// eventually as the prefix is consistent with other SDKs.
43-
#[builder(default)]
44-
pub no_temporal_prefix_for_metrics: bool,
4538
/// If set true (the default) explicitly attach a `service_name` label to all metrics. Turn this
4639
/// off if your collection system supports the `target_info` metric from the OpenMetrics spec.
4740
/// For more, see
4841
/// [here](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)
4942
#[builder(default = "true")]
5043
pub attach_service_name: bool,
44+
/// A prefix to be applied to all core-created metrics. Defaults to "temporal_".
45+
#[builder(default = "METRIC_PREFIX.to_string()")]
46+
pub metric_prefix: String,
5147
}
5248

5349
/// Options for exporting to an OpenTelemetry Collector
@@ -67,9 +63,6 @@ pub struct OtelCollectorOptions {
6763
// A map of tags to be applied to all metrics
6864
#[builder(default)]
6965
pub global_tags: HashMap<String, String>,
70-
/// A prefix to be applied to all metrics. Defaults to "temporal_".
71-
#[builder(default = "METRIC_PREFIX")]
72-
pub metric_prefix: &'static str,
7366
}
7467

7568
/// Options for exporting metrics to Prometheus
@@ -79,9 +72,6 @@ pub struct PrometheusExporterOptions {
7972
// A map of tags to be applied to all metrics
8073
#[builder(default)]
8174
pub global_tags: HashMap<String, String>,
82-
/// A prefix to be applied to all metrics. Defaults to "temporal_".
83-
#[builder(default = "METRIC_PREFIX")]
84-
pub metric_prefix: &'static str,
8575
/// If set true, all counters will include a "_total" suffix
8676
#[builder(default = "false")]
8777
pub counters_total_suffix: bool,
@@ -91,23 +81,6 @@ pub struct PrometheusExporterOptions {
9181
pub unit_suffix: bool,
9282
}
9383

94-
/// Configuration for the external export of traces
95-
#[derive(Debug, Clone)]
96-
pub struct TraceExportConfig {
97-
/// An [EnvFilter](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/struct.EnvFilter.html) filter string.
98-
pub filter: String,
99-
/// Where they should go
100-
pub exporter: TraceExporter,
101-
}
102-
103-
/// Control where traces are exported.
104-
#[derive(Debug, Clone)]
105-
pub enum TraceExporter {
106-
// TODO: Remove
107-
/// Export traces to an OpenTelemetry Collector <https://opentelemetry.io/docs/collector/>.
108-
Otel(OtelCollectorOptions),
109-
}
110-
11184
/// Control where logs go
11285
#[derive(Debug, Clone)]
11386
pub enum Logger {

core/src/lib.rs

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,7 @@ use std::sync::Arc;
5555
use temporal_client::{ConfiguredClient, TemporalServiceClientWithMetrics};
5656
use temporal_sdk_core_api::{
5757
errors::{CompleteActivityError, PollActivityError, PollWfError},
58-
telemetry::{
59-
metrics::{CoreMeter, TemporalMeter},
60-
TelemetryOptions,
61-
},
58+
telemetry::TelemetryOptions,
6259
Worker as WorkerTrait,
6360
};
6461
use temporal_sdk_core_protos::coresdk::ActivityHeartbeat;
@@ -265,27 +262,14 @@ impl CoreRuntime {
265262
self.runtime_handle.clone()
266263
}
267264

268-
/// Returns the metric meter used for recording metrics, if they were enabled.
269-
pub fn metric_meter(&self) -> Option<TemporalMeter> {
270-
self.telemetry.get_metric_meter()
271-
}
272-
273-
/// Return the trace subscriber associated with the telemetry options/instance. Can be used
274-
/// to manually set the default for a thread or globally using the `tracing` crate, or with
275-
/// [set_trace_subscriber_for_current_thread]
276-
pub fn trace_subscriber(&self) -> Arc<dyn tracing::Subscriber + Send + Sync> {
277-
self.telemetry.trace_subscriber()
278-
}
279-
280265
/// Return a reference to the owned [TelemetryInstance]
281266
pub fn telemetry(&self) -> &TelemetryInstance {
282267
&self.telemetry
283268
}
284269

285-
/// Some metric meters cannot be initialized until after a tokio runtime has started and after
286-
/// other telemetry has initted (ex: prometheus). They can be attached here.
287-
pub fn attach_late_init_metrics(&mut self, meter: Arc<dyn CoreMeter + 'static>) {
288-
self.telemetry.attach_late_init_metrics(meter)
270+
/// Return a mutable reference to the owned [TelemetryInstance]
271+
pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance {
272+
&mut self.telemetry
289273
}
290274
}
291275

core/src/telemetry/metrics.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ impl MetricsContext {
8989
}
9090

9191
pub(crate) fn top_level(namespace: String, tq: String, telemetry: &TelemetryInstance) -> Self {
92-
if let Some(mut meter) = telemetry.get_metric_meter() {
92+
if let Some(mut meter) = telemetry.get_temporal_metric_meter() {
9393
meter
9494
.default_attribs
9595
.attributes
@@ -778,7 +778,7 @@ impl Gauge for MemoryGaugeU64 {
778778

779779
#[derive(Debug, derive_more::Constructor)]
780780
pub(crate) struct PrefixedMetricsMeter<CM> {
781-
prefix: &'static str,
781+
prefix: String,
782782
meter: CM,
783783
}
784784
impl<CM: CoreMeter> CoreMeter for PrefixedMetricsMeter<CM> {
@@ -787,17 +787,17 @@ impl<CM: CoreMeter> CoreMeter for PrefixedMetricsMeter<CM> {
787787
}
788788

789789
fn counter(&self, mut params: MetricParameters) -> Arc<dyn Counter> {
790-
params.name = (self.prefix.to_string() + &*params.name).into();
790+
params.name = (self.prefix.clone() + &*params.name).into();
791791
self.meter.counter(params)
792792
}
793793

794794
fn histogram(&self, mut params: MetricParameters) -> Arc<dyn Histogram> {
795-
params.name = (self.prefix.to_string() + &*params.name).into();
795+
params.name = (self.prefix.clone() + &*params.name).into();
796796
self.meter.histogram(params)
797797
}
798798

799799
fn gauge(&self, mut params: MetricParameters) -> Arc<dyn Gauge> {
800-
params.name = (self.prefix.to_string() + &*params.name).into();
800+
params.name = (self.prefix.clone() + &*params.name).into();
801801
self.meter.gauge(params)
802802
}
803803
}
@@ -815,7 +815,7 @@ mod tests {
815815
let telem_instance = TelemetryInstance::new(
816816
no_op_subscriber,
817817
None,
818-
METRIC_PREFIX,
818+
METRIC_PREFIX.to_string(),
819819
Some(call_buffer.clone()),
820820
true,
821821
);

core/src/telemetry/mod.rs

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub fn construct_filter_string(core_level: Level, other_level: Level) -> String
4747

4848
/// Holds initialized tracing/metrics exporters, etc
4949
pub struct TelemetryInstance {
50-
metric_prefix: &'static str,
50+
metric_prefix: String,
5151
logs_out: Option<Mutex<CoreLogsOut>>,
5252
metrics: Option<Arc<dyn CoreMeter + 'static>>,
5353
trace_subscriber: Arc<dyn Subscriber + Send + Sync>,
@@ -58,7 +58,7 @@ impl TelemetryInstance {
5858
fn new(
5959
trace_subscriber: Arc<dyn Subscriber + Send + Sync>,
6060
logs_out: Option<Mutex<CoreLogsOut>>,
61-
metric_prefix: &'static str,
61+
metric_prefix: String,
6262
metrics: Option<Arc<dyn CoreMeter + 'static>>,
6363
attach_service_name: bool,
6464
) -> Self {
@@ -71,8 +71,9 @@ impl TelemetryInstance {
7171
}
7272
}
7373

74-
/// Returns a trace subscriber which can be used with the tracing crate, or with our own
75-
/// [set_trace_subscriber_for_current_thread] function.
74+
/// Return the trace subscriber associated with the telemetry options/instance. Can be used
75+
/// to manually set the default for a thread or globally using the `tracing` crate, or with
76+
/// [set_trace_subscriber_for_current_thread]
7677
pub fn trace_subscriber(&self) -> Arc<dyn Subscriber + Send + Sync> {
7778
self.trace_subscriber.clone()
7879
}
@@ -83,21 +84,37 @@ impl TelemetryInstance {
8384
self.metrics = Some(meter);
8485
}
8586

86-
/// Returns our wrapper for metric meters, can be used to, ex: initialize clients
87-
pub fn get_metric_meter(&self) -> Option<TemporalMeter> {
87+
/// Returns our wrapper for metric meters, including the `metric_prefix` from
88+
/// [TelemetryOptions]. This should be used to initialize clients or for any other
89+
/// temporal-owned metrics. User defined metrics should use [Self::get_metric_meter].
90+
pub fn get_temporal_metric_meter(&self) -> Option<TemporalMeter> {
8891
self.metrics.clone().map(|m| {
89-
let kvs = if self.attach_service_name {
90-
vec![MetricKeyValue::new("service_name", TELEM_SERVICE_NAME)]
91-
} else {
92-
vec![]
93-
};
92+
let kvs = self.default_kvs();
9493
let attribs = MetricsAttributesOptions::new(kvs);
9594
TemporalMeter::new(
96-
Arc::new(PrefixedMetricsMeter::new(self.metric_prefix, m)) as Arc<dyn CoreMeter>,
95+
Arc::new(PrefixedMetricsMeter::new(self.metric_prefix.clone(), m))
96+
as Arc<dyn CoreMeter>,
9797
attribs,
9898
)
9999
})
100100
}
101+
102+
/// Returns our wrapper for metric meters, including attaching the service name if enabled.
103+
pub fn get_metric_meter(&self) -> Option<TemporalMeter> {
104+
self.metrics.clone().map(|m| {
105+
let kvs = self.default_kvs();
106+
let attribs = MetricsAttributesOptions::new(kvs);
107+
TemporalMeter::new(m, attribs)
108+
})
109+
}
110+
111+
fn default_kvs(&self) -> Vec<MetricKeyValue> {
112+
if self.attach_service_name {
113+
vec![MetricKeyValue::new("service_name", TELEM_SERVICE_NAME)]
114+
} else {
115+
vec![]
116+
}
117+
}
101118
}
102119

103120
thread_local! {
@@ -122,14 +139,6 @@ pub fn remove_trace_subscriber_for_current_thread() {
122139
SUB_GUARD.with(|sg| sg.take());
123140
}
124141

125-
fn metric_prefix(opts: &TelemetryOptions) -> &'static str {
126-
if opts.no_temporal_prefix_for_metrics {
127-
""
128-
} else {
129-
"temporal_"
130-
}
131-
}
132-
133142
impl CoreTelemetry for TelemetryInstance {
134143
fn fetch_buffered_logs(&self) -> Vec<CoreLog> {
135144
if let Some(logs_out) = self.logs_out.as_ref() {
@@ -155,7 +164,6 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
155164
// way which is nice.
156165
// Parts of telem dat ====
157166
let mut logs_out = None;
158-
let metric_prefix = metric_prefix(&opts);
159167
// =======================
160168

161169
// Tracing subscriber layers =========
@@ -211,7 +219,7 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
211219
Ok(TelemetryInstance::new(
212220
Arc::new(reg),
213221
logs_out,
214-
metric_prefix,
222+
opts.metric_prefix,
215223
opts.metrics,
216224
opts.attach_service_name,
217225
))

test-utils/src/lib.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use temporal_sdk_core_api::{
4040
errors::{PollActivityError, PollWfError},
4141
telemetry::{
4242
metrics::CoreMeter, Logger, OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder,
43-
TelemetryOptions, TelemetryOptionsBuilder, TraceExportConfig, TraceExporter,
43+
TelemetryOptions, TelemetryOptionsBuilder,
4444
},
4545
Worker as CoreWorker,
4646
};
@@ -146,7 +146,7 @@ pub fn init_integ_telem() {
146146
let telemetry_options = get_integ_telem_options();
147147
let rt =
148148
CoreRuntime::new_assume_tokio(telemetry_options).expect("Core runtime inits cleanly");
149-
let _ = tracing::subscriber::set_global_default(rt.trace_subscriber());
149+
let _ = tracing::subscriber::set_global_default(rt.telemetry().trace_subscriber());
150150
rt
151151
});
152152
}
@@ -590,10 +590,6 @@ pub fn get_integ_telem_options() -> TelemetryOptions {
590590
.url(url)
591591
.build()
592592
.unwrap();
593-
ob.tracing(TraceExportConfig {
594-
filter: filter_string.clone(),
595-
exporter: TraceExporter::Otel(opts.clone()),
596-
});
597593
ob.metrics(Arc::new(build_otlp_metric_exporter(opts).unwrap()) as Arc<dyn CoreMeter>);
598594
}
599595
if let Some(addr) = env::var(PROM_ENABLE_ENV_VAR)

tests/integ_tests/metrics_tests.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};
33
use temporal_client::{WorkflowClientTrait, WorkflowOptions, WorkflowService};
44
use temporal_sdk_core::{init_worker, telemetry::start_prometheus_metric_exporter, CoreRuntime};
55
use temporal_sdk_core_api::{
6-
telemetry::{metrics::CoreMeter, PrometheusExporterOptionsBuilder, TelemetryOptions},
6+
telemetry::{
7+
metrics::{CoreMeter, MetricAttributes, MetricParameters},
8+
PrometheusExporterOptionsBuilder, TelemetryOptions,
9+
},
710
worker::WorkerConfigBuilder,
811
Worker,
912
};
@@ -69,7 +72,7 @@ async fn prometheus_metrics_exported() {
6972
let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap();
7073
let opts = get_integ_server_options();
7174
let mut raw_client = opts
72-
.connect_no_namespace(rt.metric_meter(), None)
75+
.connect_no_namespace(rt.telemetry().get_temporal_metric_meter(), None)
7376
.await
7477
.unwrap();
7578
assert!(raw_client.get_client().capabilities().is_some());
@@ -88,6 +91,17 @@ async fn prometheus_metrics_exported() {
8891
));
8992
// Verify counter names are appropriate (don't end w/ '_total')
9093
assert!(body.contains("temporal_request{"));
94+
// Verify non-temporal metrics meter does not prefix
95+
let mm = rt.telemetry().get_metric_meter().unwrap();
96+
let g = mm.inner.gauge(MetricParameters::from("mygauge"));
97+
g.record(
98+
42,
99+
&MetricAttributes::OTel {
100+
kvs: Arc::new(vec![]),
101+
},
102+
);
103+
let body = get_text(format!("http://{addr}/metrics")).await;
104+
assert!(body.contains("\nmygauge 42"));
91105
}
92106

93107
#[tokio::test]
@@ -434,11 +448,12 @@ fn runtime_new() {
434448
let handle = rt.tokio_handle();
435449
let _rt = handle.enter();
436450
let (telemopts, addr, _aborter) = prom_metrics();
437-
rt.attach_late_init_metrics(telemopts.metrics.unwrap());
451+
rt.telemetry_mut()
452+
.attach_late_init_metrics(telemopts.metrics.unwrap());
438453
let opts = get_integ_server_options();
439454
handle.block_on(async {
440455
let mut raw_client = opts
441-
.connect_no_namespace(rt.metric_meter(), None)
456+
.connect_no_namespace(rt.telemetry().get_temporal_metric_meter(), None)
442457
.await
443458
.unwrap();
444459
assert!(raw_client.get_client().capabilities().is_some());

tests/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ mod integ_tests {
3737
let opts = get_integ_server_options();
3838
let runtime = CoreRuntime::new_assume_tokio(get_integ_telem_options()).unwrap();
3939
let mut retrying_client = opts
40-
.connect_no_namespace(runtime.metric_meter(), None)
40+
.connect_no_namespace(runtime.telemetry().get_temporal_metric_meter(), None)
4141
.await
4242
.unwrap();
4343

0 commit comments

Comments
 (0)