Skip to content

Commit 5710c0f

Browse files
committed
First pass
1 parent c1e7897 commit 5710c0f

File tree

12 files changed

+464
-276
lines changed

12 files changed

+464
-276
lines changed

client/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ uuid = { version = "1.1", features = ["v4"] }
3333

3434
[dependencies.temporal-sdk-core-protos]
3535
path = "../sdk-core-protos"
36-
version = "0.1"
36+
37+
[dependencies.temporal-sdk-core-api]
38+
path = "../core-api"
3739

3840
[dev-dependencies]
3941
assert_matches = "1"

client/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ mod retry;
1313
mod workflow_handle;
1414

1515
pub use crate::retry::{CallType, RetryClient, RETRYABLE_ERROR_CODES};
16-
pub use metrics::ClientMetricProvider;
1716
pub use raw::{HealthService, OperatorService, TestService, WorkflowService};
1817
pub use temporal_sdk_core_protos::temporal::api::{
1918
enums::v1::ArchivalState,
@@ -44,6 +43,7 @@ use std::{
4443
sync::Arc,
4544
time::{Duration, Instant},
4645
};
46+
use temporal_sdk_core_api::telemetry::metrics::CoreMeter;
4747
use temporal_sdk_core_protos::{
4848
coresdk::{workflow_commands::QueryResult, IntoPayloadsExt},
4949
grpc::health::v1::health_client::HealthClient,
@@ -294,7 +294,7 @@ impl ClientOptions {
294294
pub async fn connect(
295295
&self,
296296
namespace: impl Into<String>,
297-
metrics_meter: Option<&dyn ClientMetricProvider>,
297+
metrics_meter: Option<&dyn CoreMeter>,
298298
headers: Option<Arc<RwLock<HashMap<String, String>>>>,
299299
) -> Result<RetryClient<Client>, ClientInitError> {
300300
let client = self
@@ -312,7 +312,7 @@ impl ClientOptions {
312312
/// See [RetryClient] for more
313313
pub async fn connect_no_namespace(
314314
&self,
315-
metrics_meter: Option<&dyn ClientMetricProvider>,
315+
metrics_meter: Option<&dyn CoreMeter>,
316316
headers: Option<Arc<RwLock<HashMap<String, String>>>>,
317317
) -> Result<RetryClient<ConfiguredClient<TemporalServiceClientWithMetrics>>, ClientInitError>
318318
{
@@ -327,7 +327,7 @@ impl ClientOptions {
327327
let service = ServiceBuilder::new()
328328
.layer_fn(|channel| GrpcMetricSvc {
329329
inner: channel,
330-
metrics: metrics_meter.map(|mm| MetricsContext::new(vec![], mm)),
330+
metrics: metrics_meter.map(|mm| MetricsContext::new(mm)),
331331
})
332332
.service(channel);
333333
let headers = headers.unwrap_or_default();

client/src/metrics.rs

Lines changed: 29 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,38 @@
11
use crate::{AttachMetricLabels, LONG_POLL_METHOD_NAMES};
22
use futures::{future::BoxFuture, FutureExt};
3-
use opentelemetry::{
4-
metrics::{Counter, Histogram},
5-
KeyValue,
6-
};
73
use std::{
84
sync::Arc,
95
task::{Context, Poll},
106
time::{Duration, Instant},
117
};
8+
use temporal_sdk_core_api::telemetry::metrics::{
9+
CoreMeter, Counter, Histogram, MetricAttributes, MetricKeyValue, MetricsAttributesOptions,
10+
};
1211
use tonic::{body::BoxBody, transport::Channel};
1312
use tower::Service;
1413

1514
/// Used to track context associated with metrics, and record/update them
1615
// Possible improvement: make generic over some type tag so that methods are only exposed if the
1716
// appropriate k/vs have already been set.
18-
#[derive(Clone, Debug)]
17+
#[derive(Clone, derive_more::DebugCustom)]
18+
#[debug(fmt = "MetricsContext {{ attribs: {kvs:?}, poll_is_long: {poll_is_long} }}")]
1919
pub struct MetricsContext {
20-
ctx: opentelemetry::Context,
21-
kvs: Arc<Vec<KeyValue>>,
20+
kvs: MetricAttributes,
2221
poll_is_long: bool,
2322

24-
svc_request: Counter<u64>,
25-
svc_request_failed: Counter<u64>,
26-
long_svc_request: Counter<u64>,
27-
long_svc_request_failed: Counter<u64>,
28-
29-
svc_request_latency: Histogram<u64>,
30-
long_svc_request_latency: Histogram<u64>,
31-
}
23+
svc_request: Arc<dyn Counter>,
24+
svc_request_failed: Arc<dyn Counter>,
25+
long_svc_request: Arc<dyn Counter>,
26+
long_svc_request_failed: Arc<dyn Counter>,
3227

33-
/// Things that can provide metrics for the client implement this. Trait exists to avoid having
34-
/// to make a whole new lower-level crate just for a tiny shared wrapper around OTel meters.
35-
pub trait ClientMetricProvider: Send + Sync {
36-
/// Construct a counter metric
37-
fn counter(&self, name: &'static str) -> Counter<u64>;
38-
/// Construct a histogram metric
39-
fn histogram(&self, name: &'static str) -> Histogram<u64>;
28+
svc_request_latency: Arc<dyn Histogram>,
29+
long_svc_request_latency: Arc<dyn Histogram>,
4030
}
4131

4232
impl MetricsContext {
43-
pub(crate) fn new(kvs: Vec<KeyValue>, metric_provider: &dyn ClientMetricProvider) -> Self {
33+
pub(crate) fn new(metric_provider: &dyn CoreMeter) -> Self {
4434
Self {
45-
ctx: opentelemetry::Context::current(),
46-
kvs: Arc::new(kvs),
35+
kvs: metric_provider.new_attributes(MetricsAttributesOptions::default()),
4736
poll_is_long: false,
4837
svc_request: metric_provider.counter("request"),
4938
svc_request_failed: metric_provider.counter("request_failure"),
@@ -55,15 +44,15 @@ impl MetricsContext {
5544
}
5645

5746
/// Extend an existing metrics context with new attributes, returning a new one
58-
pub(crate) fn with_new_attrs(&self, new_kvs: impl IntoIterator<Item = KeyValue>) -> Self {
47+
pub(crate) fn with_new_attrs(&self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) -> Self {
5948
let mut r = self.clone();
6049
r.add_new_attrs(new_kvs);
6150
r
6251
}
6352

6453
/// Add new attributes to the context, mutating it
65-
pub(crate) fn add_new_attrs(&mut self, new_kvs: impl IntoIterator<Item = KeyValue>) {
66-
Arc::make_mut(&mut self.kvs).extend(new_kvs);
54+
pub(crate) fn add_new_attrs(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
55+
self.kvs.add_new_attrs(new_kvs);
6756
}
6857

6958
pub(crate) fn set_is_long_poll(&mut self) {
@@ -73,29 +62,29 @@ impl MetricsContext {
7362
/// A request to the temporal service was made
7463
pub(crate) fn svc_request(&self) {
7564
if self.poll_is_long {
76-
self.long_svc_request.add(&self.ctx, 1, &self.kvs);
65+
self.long_svc_request.add(1, &self.kvs);
7766
} else {
78-
self.svc_request.add(&self.ctx, 1, &self.kvs);
67+
self.svc_request.add(1, &self.kvs);
7968
}
8069
}
8170

8271
/// A request to the temporal service failed
8372
pub(crate) fn svc_request_failed(&self) {
8473
if self.poll_is_long {
85-
self.long_svc_request_failed.add(&self.ctx, 1, &self.kvs);
74+
self.long_svc_request_failed.add(1, &self.kvs);
8675
} else {
87-
self.svc_request_failed.add(&self.ctx, 1, &self.kvs);
76+
self.svc_request_failed.add(1, &self.kvs);
8877
}
8978
}
9079

9180
/// Record service request latency
9281
pub(crate) fn record_svc_req_latency(&self, dur: Duration) {
9382
if self.poll_is_long {
9483
self.long_svc_request_latency
95-
.record(&self.ctx, dur.as_millis() as u64, &self.kvs);
84+
.record(dur.as_millis() as u64, &self.kvs);
9685
} else {
9786
self.svc_request_latency
98-
.record(&self.ctx, dur.as_millis() as u64, &self.kvs);
87+
.record(dur.as_millis() as u64, &self.kvs);
9988
}
10089
}
10190
}
@@ -104,16 +93,16 @@ const KEY_NAMESPACE: &str = "namespace";
10493
const KEY_SVC_METHOD: &str = "operation";
10594
const KEY_TASK_QUEUE: &str = "task_queue";
10695

107-
pub(crate) fn namespace_kv(ns: String) -> KeyValue {
108-
KeyValue::new(KEY_NAMESPACE, ns)
96+
pub(crate) fn namespace_kv(ns: String) -> MetricKeyValue {
97+
MetricKeyValue::new(KEY_NAMESPACE, ns)
10998
}
11099

111-
pub(crate) fn task_queue_kv(tq: String) -> KeyValue {
112-
KeyValue::new(KEY_TASK_QUEUE, tq)
100+
pub(crate) fn task_queue_kv(tq: String) -> MetricKeyValue {
101+
MetricKeyValue::new(KEY_TASK_QUEUE, tq)
113102
}
114103

115-
pub(crate) fn svc_operation(op: String) -> KeyValue {
116-
KeyValue::new(KEY_SVC_METHOD, op)
104+
pub(crate) fn svc_operation(op: String) -> MetricKeyValue {
105+
MetricKeyValue::new(KEY_SVC_METHOD, op)
117106
}
118107

119108
/// Implements metrics functionality for gRPC (really, any http) calls

client/src/raw.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{
99
LONG_POLL_TIMEOUT,
1010
};
1111
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
12+
use temporal_sdk_core_api::telemetry::metrics::MetricKeyValue;
1213
use temporal_sdk_core_protos::{
1314
grpc::health::v1::{health_client::HealthClient, *},
1415
temporal::api::{
@@ -199,10 +200,10 @@ fn req_cloner<T: Clone>(cloneme: &Request<T>) -> Request<T> {
199200

200201
#[derive(Debug)]
201202
pub(super) struct AttachMetricLabels {
202-
pub(super) labels: Vec<opentelemetry::KeyValue>,
203+
pub(super) labels: Vec<MetricKeyValue>,
203204
}
204205
impl AttachMetricLabels {
205-
pub fn new(kvs: impl Into<Vec<opentelemetry::KeyValue>>) -> Self {
206+
pub fn new(kvs: impl Into<Vec<MetricKeyValue>>) -> Self {
206207
Self { labels: kvs.into() }
207208
}
208209
pub fn namespace(ns: impl Into<String>) -> Self {

core-api/Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,14 @@ categories = ["development-tools"]
1212

1313
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1414

15+
[features]
16+
otel_impls = ["opentelemetry"]
17+
1518
[dependencies]
1619
async-trait = "0.1"
1720
derive_builder = "0.12"
21+
derive_more = "0.99"
22+
opentelemetry = { version = "0.18", optional = true }
1823
prost-types = "0.11"
1924
serde = { version = "1.0", default_features = false, features = ["derive"] }
2025
serde_json = "1.0"
@@ -26,8 +31,4 @@ url = "2.3"
2631

2732
[dependencies.temporal-sdk-core-protos]
2833
path = "../sdk-core-protos"
29-
version = "0.1"
30-
31-
[dependencies.temporal-client]
32-
path = "../client"
33-
version = "0.1"
34+
version = "0.1"

core-api/src/telemetry.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
pub mod metrics;
2+
3+
use crate::telemetry::metrics::CoreMeter;
14
use std::{
25
collections::HashMap,
36
net::SocketAddr,
7+
sync::Arc,
48
time::{Duration, SystemTime, UNIX_EPOCH},
59
};
610
use tracing_core::Level;
@@ -82,6 +86,9 @@ pub enum MetricsExporter {
8286
Otel(OtelCollectorOptions),
8387
/// Expose metrics directly via an embedded http server bound to the provided address.
8488
Prometheus(SocketAddr),
89+
/// Export metrics calls to lang where it can inject them into whatever metrics system the
90+
/// user likes
91+
Lang(Arc<dyn CoreMeter>),
8592
}
8693

8794
/// Control where logs go

0 commit comments

Comments
 (0)