Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tower::Service;
#[derive(Clone, derive_more::DebugCustom)]
#[debug(fmt = "MetricsContext {{ attribs: {kvs:?}, poll_is_long: {poll_is_long} }}")]
pub struct MetricsContext {
meter: Arc<dyn CoreMeter>,
kvs: MetricAttributes,
poll_is_long: bool,

Expand Down Expand Up @@ -66,19 +67,15 @@ impl MetricsContext {
unit: "ms".into(),
description: "Histogram of client long-poll request latencies".into(),
}),
meter,
}
}

/// Extend an existing metrics context with new attributes, returning a new one
pub(crate) fn with_new_attrs(&self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) -> Self {
let mut r = self.clone();
r.add_new_attrs(new_kvs);
r
}

/// Add new attributes to the context, mutating it
pub(crate) fn add_new_attrs(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
self.kvs.add_new_attrs(new_kvs);
/// Mutate this metrics context with new attributes
pub(crate) fn with_new_attrs(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
self.kvs = self
.meter
.extend_attributes(self.kvs.clone(), new_kvs.into());
}

pub(crate) fn set_is_long_poll(&mut self) {
Expand Down Expand Up @@ -152,19 +149,18 @@ impl Service<http::Request<BoxBody>> for GrpcMetricSvc {
let metrics = self
.metrics
.clone()
.map(|m| {
.map(|mut m| {
// Attach labels from client wrapper
if let Some(other_labels) = req.extensions_mut().remove::<AttachMetricLabels>() {
m.with_new_attrs(other_labels.labels)
} else {
m
}
m
})
.and_then(|mut metrics| {
// Attach method name label if possible
req.uri().to_string().rsplit_once('/').map(|split_tup| {
let method_name = split_tup.1;
metrics.add_new_attrs([svc_operation(method_name.to_string())]);
metrics.with_new_attrs([svc_operation(method_name.to_string())]);
if LONG_POLL_METHOD_NAMES.contains(&method_name) {
metrics.set_is_long_poll();
}
Expand Down
225 changes: 139 additions & 86 deletions core-api/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
use std::{borrow::Cow, collections::HashSet, fmt::Debug, sync::Arc};
use std::{
any::Any,
borrow::Cow,
fmt::Debug,
sync::{Arc, OnceLock},
};

/// Implementors of this trait are expected to be defined in each language's bridge.
/// The implementor is responsible for the allocation/instantiation of new metric meters which
/// Core has requested.
pub trait CoreMeter: Send + Sync + Debug {
fn new_attributes(&self, attribs: MetricsAttributesOptions) -> MetricAttributes;
/// Given some k/v pairs, create a return a new instantiated instance of metric attributes.
/// Only [MetricAttributes] created by this meter can be used when calling record on instruments
/// created by this meter.
fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes;
/// Extend some existing attributes with new values. Implementations should create new instances
/// when doing so, rather than mutating whatever is backing the passed in `existing` attributes.
/// Ideally that new instance retains a ref to the extended old attribute, promoting re-use.
fn extend_attributes(
&self,
existing: MetricAttributes,
attribs: NewAttributes,
) -> MetricAttributes;
fn counter(&self, params: MetricParameters) -> Arc<dyn Counter>;
fn histogram(&self, params: MetricParameters) -> Arc<dyn Histogram>;
fn gauge(&self, params: MetricParameters) -> Arc<dyn Gauge>;
Expand Down Expand Up @@ -36,48 +52,22 @@ impl From<&'static str> for MetricParameters {
#[derive(derive_more::Constructor, Clone, Debug)]
pub struct TemporalMeter {
pub inner: Arc<dyn CoreMeter>,
pub default_attribs: MetricsAttributesOptions,
}

#[derive(Debug, Clone)]
pub enum MetricEvent {
Create {
params: MetricParameters,
id: u64,
kind: MetricKind,
},
CreateAttributes {
id: u64,
attributes: Vec<MetricKeyValue>,
},
Update {
id: u64,
attributes: LangMetricAttributes,
update: MetricUpdateVal,
},
}
#[derive(Debug, Clone, Copy)]
pub enum MetricKind {
Counter,
Gauge,
Histogram,
}
#[derive(Debug, Clone, Copy)]
pub enum MetricUpdateVal {
// Currently all deltas are natural numbers
Delta(u64),
// Currently all values are natural numbers
Value(u64),
}

pub trait MetricCallBufferer: Send + Sync {
fn retrieve(&self) -> Vec<MetricEvent>;
pub default_attribs: NewAttributes,
}

impl CoreMeter for Arc<dyn CoreMeter> {
fn new_attributes(&self, attribs: MetricsAttributesOptions) -> MetricAttributes {
fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes {
self.as_ref().new_attributes(attribs)
}

fn extend_attributes(
&self,
existing: MetricAttributes,
attribs: NewAttributes,
) -> MetricAttributes {
self.as_ref().extend_attributes(existing, attribs)
}

fn counter(&self, params: MetricParameters) -> Arc<dyn Counter> {
self.as_ref().counter(params)
}
Expand All @@ -98,60 +88,37 @@ pub enum MetricAttributes {
OTel {
kvs: Arc<Vec<opentelemetry::KeyValue>>,
},
Lang(LangMetricAttributes),
}
#[derive(Clone, Debug)]
pub struct LangMetricAttributes {
/// A set of references to attributes stored in lang memory. All referenced attributes should
/// be attached to the metric when recording.
pub ids: HashSet<u64>,
/// If populated, these key values should also be used in addition to the referred-to
/// existing attributes when recording
pub new_attributes: Vec<MetricKeyValue>,
Buffer(BufferAttributes),
Dynamic(Arc<dyn CustomMetricAttributes>),
}

impl MetricAttributes {
/// Extend existing metrics attributes with others, returning a new instance
pub fn merge(&self, other: MetricAttributes) -> Self {
let mut me = self.clone();
match (&mut me, other) {
#[cfg(feature = "otel_impls")]
(MetricAttributes::OTel { ref mut kvs }, MetricAttributes::OTel { kvs: other_kvs }) => {
Arc::make_mut(kvs).extend((*other_kvs).clone());
}
(MetricAttributes::Lang(ref mut l), MetricAttributes::Lang(ol)) => {
l.ids.extend(ol.ids);
l.new_attributes.extend(ol.new_attributes);
}
_ => panic!("Cannot merge metric attributes of different kinds"),
}
me
}

/// Mutate self to add new kvs
pub fn add_new_attrs(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
match self {
#[cfg(feature = "otel_impls")]
MetricAttributes::OTel { ref mut kvs, .. } => {
Arc::make_mut(kvs).extend(new_kvs.into_iter().map(Into::into));
}
MetricAttributes::Lang(ref mut attrs, ..) => {
attrs.new_attributes.extend(new_kvs);
}
}
}
/// A reference to some attributes created lang side.
pub trait CustomMetricAttributes: Debug + Send + Sync {
/// Must be implemented to work around existing type system restrictions, see
/// [here](https://internals.rust-lang.org/t/downcast-not-from-any-but-from-any-trait/16736/12)
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
}

/// Options that are attached to metrics on a per-call basis
#[derive(Clone, Debug, Default, derive_more::Constructor)]
pub struct MetricsAttributesOptions {
pub struct NewAttributes {
pub attributes: Vec<MetricKeyValue>,
}
impl MetricsAttributesOptions {
impl NewAttributes {
pub fn extend(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
self.attributes.extend(new_kvs)
}
}
impl<I> From<I> for NewAttributes
where
I: IntoIterator<Item = MetricKeyValue>,
{
fn from(value: I) -> Self {
Self {
attributes: value.into_iter().collect(),
}
}
}

/// A K/V pair that can be used to label a specific recording of a metric
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -197,14 +164,92 @@ pub trait Gauge: Send + Sync {
fn record(&self, value: u64, attributes: &MetricAttributes);
}

#[derive(Debug, Clone)]
pub enum MetricEvent<I: BufferInstrumentRef> {
Create {
params: MetricParameters,
/// One you receive this event, call `set` on this with the initialized instrument reference
populate_into: LazyBufferInstrument<I>,
kind: MetricKind,
},
CreateAttributes {
/// One you receive this event, call `set` on this with the initialized attributes
populate_into: BufferAttributes,
/// If not `None`, use these already-initialized attributes as the base (extended with
/// `attributes`) for the ones you are about to initialize.
append_from: Option<BufferAttributes>,
attributes: Vec<MetricKeyValue>,
},
Update {
instrument: LazyBufferInstrument<I>,
attributes: BufferAttributes,
update: MetricUpdateVal,
},
}
#[derive(Debug, Clone, Copy)]
pub enum MetricKind {
Counter,
Gauge,
Histogram,
}
#[derive(Debug, Clone, Copy)]
pub enum MetricUpdateVal {
// Currently all deltas are natural numbers
Delta(u64),
// Currently all values are natural numbers
Value(u64),
}

pub trait MetricCallBufferer<I: BufferInstrumentRef>: Send + Sync {
fn retrieve(&self) -> Vec<MetricEvent<I>>;
}

/// A lazy reference to some metrics buffer attributes
pub type BufferAttributes = LazyRef<Arc<dyn CustomMetricAttributes + 'static>>;

/// Types lang uses to contain references to its lang-side defined instrument references must
/// implement this marker trait
pub trait BufferInstrumentRef {}
/// A lazy reference to a metrics buffer instrument
pub type LazyBufferInstrument<T> = LazyRef<Arc<T>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put a type constraint on T that it has to implement BufferInstrumentRef? Or maybe I can have that constraint above? Or rather, is it at least expected that T implements BufferInstrumentRef?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: Nevermind, I see this constraint is now available on the CoreMeter impl


#[derive(Debug, Clone)]
pub struct LazyRef<T> {
to_be_initted: Arc<OnceLock<T>>,
}
impl<T> LazyRef<T> {
pub fn hole() -> Self {
Self {
to_be_initted: Arc::new(OnceLock::new()),
}
}

/// Get the reference you previously initialized
///
/// # Panics
/// If `set` has not already been called. You must set the reference before using it.
pub fn get(&self) -> &T {
self.to_be_initted
.get()
.expect("You must initialize the reference before using it")
}

/// Assigns a value to fill this reference.
/// Returns according to semantics of [OnceLock].
pub fn set(&self, val: T) -> Result<(), T> {
self.to_be_initted.set(val)
}
}

#[derive(Debug)]
pub struct NoOpCoreMeter;
impl CoreMeter for NoOpCoreMeter {
fn new_attributes(&self, _: MetricsAttributesOptions) -> MetricAttributes {
MetricAttributes::Lang(LangMetricAttributes {
ids: HashSet::new(),
new_attributes: vec![],
})
fn new_attributes(&self, _: NewAttributes) -> MetricAttributes {
MetricAttributes::Dynamic(Arc::new(NoOpAttributes))
}

fn extend_attributes(&self, existing: MetricAttributes, _: NewAttributes) -> MetricAttributes {
existing
}

fn counter(&self, _: MetricParameters) -> Arc<dyn Counter> {
Expand All @@ -231,6 +276,14 @@ impl Gauge for NoOpInstrument {
fn record(&self, _: u64, _: &MetricAttributes) {}
}

#[derive(Debug, Clone)]
pub struct NoOpAttributes;
impl CustomMetricAttributes for NoOpAttributes {
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self as Arc<dyn Any + Send + Sync>
}
}

#[cfg(feature = "otel_impls")]
mod otel_impls {
use super::*;
Expand Down
Loading