diff --git a/ipa-core/Cargo.toml b/ipa-core/Cargo.toml index ecb68cc33..38e8e6b03 100644 --- a/ipa-core/Cargo.toml +++ b/ipa-core/Cargo.toml @@ -44,7 +44,7 @@ web-app = [ "http-body", "http-body-util", ] -test-fixture = ["weak-field"] +test-fixture = ["weak-field", "ipa-metrics/partitions", "ipa-metrics-tracing"] # Include observability instruments that detect lack of progress inside MPC. If there is a bug that leads to helper # miscommunication, this feature helps to detect it. Turning it on has some cost. # If "shuttle" feature is enabled, turning this on has no effect. @@ -78,6 +78,8 @@ ipa-prf = [] [dependencies] ipa-step = { version = "*", path = "../ipa-step" } ipa-step-derive = { version = "*", path = "../ipa-step-derive" } +ipa-metrics = { version = "*", path = "../ipa-metrics"} +ipa-metrics-tracing = { optional = true, version = "*", path = "../ipa-metrics-tracing" } aes = "0.8.3" async-trait = "0.1.79" @@ -163,6 +165,8 @@ serde = { version = "1.0", features = ["derive"] } command-fds = "0.2.2" hex = "0.4" ipa-step = { version = "*", path = "../ipa-step", features = ["build", "string-step"] } +ipa-metrics = { version = "*", path = "../ipa-metrics", features = ["partitions"] } +ipa-metrics-tracing = { version = "*", path = "../ipa-metrics-tracing" } permutation = "0.4.1" proptest = "1.4" rustls = { version = "0.23" } diff --git a/ipa-core/src/helpers/gateway/send.rs b/ipa-core/src/helpers/gateway/send.rs index fc73caf5d..761c1dffc 100644 --- a/ipa-core/src/helpers/gateway/send.rs +++ b/ipa-core/src/helpers/gateway/send.rs @@ -158,13 +158,21 @@ impl SendingEnd { ))] pub async fn send>(&self, record_id: RecordId, msg: B) -> Result<(), Error> { let r = self.inner.send(record_id, msg).await; - metrics::increment_counter!(RECORDS_SENT, - STEP => self.inner.channel_id.gate.as_ref().to_string(), - ROLE => self.sender_id.as_str(), + // metrics::increment_counter!(RECORDS_SENT, + // STEP => self.inner.channel_id.gate.as_ref().to_string(), + // ROLE => self.sender_id.as_str(), + // ); + // metrics::counter!(BYTES_SENT, M::Size::U64, + // STEP => self.inner.channel_id.gate.as_ref().to_string(), + // ROLE => self.sender_id.as_str(), + // ); + ipa_metrics::counter!(BYTES_SENT, M::Size::U64, + STEP => &self.inner.channel_id.gate, + ROLE => &self.sender_id, ); - metrics::counter!(BYTES_SENT, M::Size::U64, - STEP => self.inner.channel_id.gate.as_ref().to_string(), - ROLE => self.sender_id.as_str(), + ipa_metrics::counter!(RECORDS_SENT, 1, + STEP => &self.inner.channel_id.gate, + ROLE => &self.sender_id, ); r diff --git a/ipa-core/src/helpers/mod.rs b/ipa-core/src/helpers/mod.rs index e33a2ec99..e9ccc2732 100644 --- a/ipa-core/src/helpers/mod.rs +++ b/ipa-core/src/helpers/mod.rs @@ -5,7 +5,7 @@ use std::{ convert::Infallible, - fmt::{Debug, Display, Formatter}, + fmt::{Debug, Display, Formatter, Write}, num::NonZeroUsize, }; @@ -61,6 +61,7 @@ pub use gateway::{ MpcTransportError, MpcTransportImpl, RoleResolvingTransport, ShardTransportImpl, }; pub use gateway_exports::{Gateway, MpcReceivingEnd, SendingEnd, ShardReceivingEnd}; +use ipa_metrics::LabelValue; pub use prss_protocol::negotiate as negotiate_prss; #[cfg(feature = "web-app")] pub use transport::WrappedAxumBodyStream; @@ -158,6 +159,12 @@ impl Debug for HelperIdentity { } } +impl Display for HelperIdentity { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str()) + } +} + #[cfg(feature = "web-app")] impl From for hyper::header::HeaderValue { fn from(id: HelperIdentity) -> Self { @@ -233,6 +240,16 @@ impl IndexMut for Vec { } } +impl LabelValue for HelperIdentity { + fn hash(&self) -> u64 { + todo!() + } + + fn boxed(&self) -> Box { + todo!() + } +} + /// Represents a unique role of the helper inside the MPC circuit. Each helper may have different /// roles in queries it processes in parallel. For some queries it can be `H1` and for others it /// may be `H2` or `H3`. @@ -386,6 +403,22 @@ impl IndexMut for Vec { } } +impl Display for Role { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_static_str()) + } +} + +impl LabelValue for Role { + fn hash(&self) -> u64 { + u64::from(*self as u32) + } + + fn boxed(&self) -> Box { + Box::new(*self) + } +} + impl RoleAssignment { #[must_use] pub const fn new(helper_roles: [HelperIdentity; 3]) -> Self { diff --git a/ipa-core/src/helpers/transport/mod.rs b/ipa-core/src/helpers/transport/mod.rs index c3bb307d8..fe7c9e65a 100644 --- a/ipa-core/src/helpers/transport/mod.rs +++ b/ipa-core/src/helpers/transport/mod.rs @@ -25,6 +25,7 @@ pub use handler::{ }; #[cfg(feature = "in-memory-infra")] pub use in_memory::{config, InMemoryMpcNetwork, InMemoryShardNetwork, InMemoryTransport}; +use ipa_metrics::LabelValue; pub use receive::{LogErrors, ReceiveRecords}; #[cfg(feature = "web-app")] pub use stream::WrappedAxumBodyStream; @@ -41,7 +42,7 @@ use crate::{ /// An identity of a peer that can be communicated with using [`Transport`]. There are currently two /// types of peers - helpers and shards. pub trait Identity: - Copy + Clone + Debug + PartialEq + Eq + PartialOrd + Ord + Hash + Send + Sync + 'static + Copy + Clone + Debug + PartialEq + Eq + PartialOrd + Ord + Hash + Send + Sync + LabelValue + 'static { fn as_str(&self) -> Cow<'static, str>; } diff --git a/ipa-core/src/net/server/mod.rs b/ipa-core/src/net/server/mod.rs index 87d7ee2cd..c743f7b4f 100644 --- a/ipa-core/src/net/server/mod.rs +++ b/ipa-core/src/net/server/mod.rs @@ -30,7 +30,7 @@ use futures::{ future::{ready, BoxFuture, Either, Ready}, Future, FutureExt, }; -use hyper::{body::Incoming, header::HeaderName, Request}; +use hyper::{body::Incoming, header::HeaderName, Request, Version}; use metrics::increment_counter; use rustls::{server::WebPkiClientVerifier, RootCertStore}; use rustls_pki_types::CertificateDer; @@ -131,11 +131,12 @@ impl MpcHelperServer { const BIND_ADDRESS: Ipv4Addr = Ipv4Addr::LOCALHOST; #[cfg(not(test))] const BIND_ADDRESS: Ipv4Addr = Ipv4Addr::UNSPECIFIED; - let svc = self.router().layer( TraceLayer::new_for_http() .make_span_with(move |_request: &hyper::Request<_>| tracing.make_span()) .on_request(|request: &hyper::Request<_>, _: &Span| { + ipa_metrics::counter!(RequestProtocolVersion::from(request.version()), 1); + ipa_metrics::counter!(REQUESTS_RECEIVED, 1); increment_counter!(RequestProtocolVersion::from(request.version())); increment_counter!(REQUESTS_RECEIVED); }), @@ -226,6 +227,8 @@ where { tokio::spawn({ async move { + eprintln!("server started on {:?}", std::thread::current().id()); + tracing::warn!("server started on {:?}", std::thread::current().id()); // Apply configuration HttpServerConfig::apply(&mut server.http_builder().http2()); // Start serving diff --git a/ipa-core/src/protocol/context/prss.rs b/ipa-core/src/protocol/context/prss.rs index d06564234..b94c61e16 100644 --- a/ipa-core/src/protocol/context/prss.rs +++ b/ipa-core/src/protocol/context/prss.rs @@ -75,11 +75,11 @@ impl<'a, I: Iterator> Iterator for InstrumentedChunkIter<'a, I> { type Item = ::Item; fn next(&mut self) -> Option { - let step = self.instrumented.step.as_ref().to_string(); + let step = self.instrumented.step; // TODO: what we really want here is a gauge indicating the maximum index used to generate // PRSS. Gauge infrastructure is not supported yet, `Metrics` struct needs to be able to // handle gauges - metrics::increment_counter!(INDEXED_PRSS_GENERATED, STEP => step, ROLE => self.instrumented.role.as_static_str()); + ipa_metrics::counter!(INDEXED_PRSS_GENERATED, 1, STEP => step, ROLE => &self.instrumented.role); self.inner.next() } } @@ -97,11 +97,11 @@ impl Iterator for InstrumentedChunksIter<'_, IndexedSharedRandom let l = self.left.next()?; let r = self.right.next()?; - let step = self.instrumented.step.as_ref().to_string(); + let step = self.instrumented.step; // TODO: what we really want here is a gauge indicating the maximum index used to generate // PRSS. Gauge infrastructure is not supported yet, `Metrics` struct needs to be able to // handle gauges - metrics::increment_counter!(INDEXED_PRSS_GENERATED, STEP => step, ROLE => self.instrumented.role.as_static_str()); + ipa_metrics::counter!(INDEXED_PRSS_GENERATED, 1, STEP => step, ROLE => &self.instrumented.role); Some((l, r)) } @@ -132,8 +132,8 @@ impl RngCore for InstrumentedSequentialSharedRandomness<'_> { } fn next_u64(&mut self) -> u64 { - let step = self.step.as_ref().to_string(); - metrics::increment_counter!(SEQUENTIAL_PRSS_GENERATED, STEP => step, ROLE => self.role.as_static_str()); + let step = self.step; + ipa_metrics::counter!(SEQUENTIAL_PRSS_GENERATED, 1, STEP => step, ROLE => &self.role); self.inner.next_u64() } diff --git a/ipa-core/src/protocol/mod.rs b/ipa-core/src/protocol/mod.rs index 9401cec8d..10492e4ce 100644 --- a/ipa-core/src/protocol/mod.rs +++ b/ipa-core/src/protocol/mod.rs @@ -23,6 +23,17 @@ pub type Gate = step::ProtocolGate; #[cfg(descriptive_gate)] pub type Gate = ipa_step::descriptive::Descriptive; +#[cfg(compact_gate)] +impl ipa_metrics::LabelValue for step::ProtocolGate { + fn hash(&self) -> u64 { + u64::from(self.index()) + } + + fn boxed(&self) -> Box { + Box::new(self.clone()) + } +} + /// Unique identifier of the MPC query requested by report collectors /// TODO(615): Generating this unique id may be tricky as it may involve communication between helpers and /// them collaborating on constructing this unique id. These details haven't been flushed out yet, diff --git a/ipa-core/src/sharding.rs b/ipa-core/src/sharding.rs index 625f724e6..a8791362a 100644 --- a/ipa-core/src/sharding.rs +++ b/ipa-core/src/sharding.rs @@ -3,6 +3,8 @@ use std::{ num::TryFromIntError, }; +use ipa_metrics::LabelValue; + /// A unique zero-based index of the helper shard. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ShardIndex(u32); @@ -29,6 +31,16 @@ impl Display for ShardIndex { } } +impl LabelValue for ShardIndex { + fn hash(&self) -> u64 { + u64::from(self.0) + } + + fn boxed(&self) -> Box { + Box::new(*self) + } +} + /// Shard-specific configuration required by sharding API. Each shard must know its own index and /// the total number of shards in the system. pub trait ShardConfiguration { diff --git a/ipa-core/src/telemetry/mod.rs b/ipa-core/src/telemetry/mod.rs index 27be318f6..b1d02e037 100644 --- a/ipa-core/src/telemetry/mod.rs +++ b/ipa-core/src/telemetry/mod.rs @@ -27,25 +27,31 @@ pub mod metrics { /// Metric that records the version of HTTP protocol used for a particular request. pub struct RequestProtocolVersion(Version); - impl From for RequestProtocolVersion { - fn from(v: Version) -> Self { - RequestProtocolVersion(v) - } - } - - impl From for KeyName { - fn from(v: RequestProtocolVersion) -> Self { + impl From for &'static str { + fn from(value: RequestProtocolVersion) -> Self { const HTTP11: &str = "request.protocol.HTTP/1.1"; const HTTP2: &str = "request.protocol.HTTP/2"; const HTTP3: &str = "request.protocol.HTTP/3"; const UNKNOWN: &str = "request.protocol.HTTP/UNKNOWN"; - KeyName::from_const_str(match v.0 { + match value.0 { Version::HTTP_11 => HTTP11, Version::HTTP_2 => HTTP2, Version::HTTP_3 => HTTP3, _ => UNKNOWN, - }) + } + } + } + + impl From for RequestProtocolVersion { + fn from(v: Version) -> Self { + RequestProtocolVersion(v) + } + } + + impl From for KeyName { + fn from(v: RequestProtocolVersion) -> Self { + KeyName::from(<&'static str>::from(v)) } } } diff --git a/ipa-core/src/telemetry/stats.rs b/ipa-core/src/telemetry/stats.rs index e6febddb3..c7237dd51 100644 --- a/ipa-core/src/telemetry/stats.rs +++ b/ipa-core/src/telemetry/stats.rs @@ -1,9 +1,13 @@ use std::{ - collections::{hash_map::Iter, HashMap}, + collections::{ + hash_map::{Entry, Iter}, + HashMap, + }, fmt::Debug, }; -use metrics::{KeyName, Label, SharedString}; +use ipa_metrics::{MetricPartition, MetricsStore}; +use metrics::{Key, KeyName, Label, SharedString}; use metrics_util::{ debugging::{DebugValue, Snapshot}, CompositeKey, MetricKind, @@ -32,6 +36,7 @@ pub struct CounterDetails { /// or POST requests. /// /// X1 and X2 cannot be greater than X, but these values may overlap, i.e. X1 + X2 >= X +#[derive(Default)] pub struct Metrics { pub counters: HashMap, pub metric_description: HashMap, @@ -68,6 +73,38 @@ impl<'a> IntoIterator for &'a CounterDetails { } impl Metrics { + pub fn from_partition(metrics_store: &MetricsStore, partition: MetricPartition) -> Self { + let v = metrics_store.with_partition(partition, |store| { + let mut this = Self::default(); + for (counter, value) in store.counters() { + let key = Key::from_parts( + counter.key(), + counter + .labels() + .map(|l| Label::new(l.name(), l.str_value())) + .collect::>(), + ); + let composite_key = CompositeKey::new(MetricKind::Counter, key); + match this.counters.entry(counter.key().into()) { + Entry::Occupied(mut entry) => { + entry + .get_mut() + .add(&composite_key, &DebugValue::Counter(value)); + } + Entry::Vacant(entry) => { + let mut counter_details = CounterDetails::default(); + counter_details.add(&composite_key, &DebugValue::Counter(value)); + entry.insert(counter_details); + } + } + } + + this + }); + + v.expect(&format!("Partition {partition} does not exist")) + } + pub fn from_snapshot(snapshot: Snapshot) -> Self { const ALWAYS_TRUE: fn(&[Label]) -> bool = |_| true; Self::with_filter(snapshot, ALWAYS_TRUE) diff --git a/ipa-core/src/test_fixture/logging.rs b/ipa-core/src/test_fixture/logging.rs index c69686fc1..455fdbd25 100644 --- a/ipa-core/src/test_fixture/logging.rs +++ b/ipa-core/src/test_fixture/logging.rs @@ -44,6 +44,7 @@ pub fn setup() { ) .with(fmt_layer) .with(MetricsLayer::new()) + .with(ipa_metrics_tracing::MetricsPartitioningLayer) .init(); } }); diff --git a/ipa-core/src/test_fixture/metrics.rs b/ipa-core/src/test_fixture/metrics.rs index 80acf4620..4c12c1f79 100644 --- a/ipa-core/src/test_fixture/metrics.rs +++ b/ipa-core/src/test_fixture/metrics.rs @@ -1,3 +1,7 @@ +use ipa_metrics::{ + MetricsCollector, MetricsCollectorController, MetricsContext, MetricsCurrentThreadContext, + MetricsProducer, +}; use metrics::KeyName; use metrics_tracing_context::TracingContextLayer; use metrics_util::{ @@ -15,7 +19,7 @@ use crate::{ }; // TODO: move to OnceCell from std once it is stabilized -static ONCE: OnceCell = OnceCell::new(); +static ONCE: OnceCell<(MetricsProducer, MetricsCollectorController)> = OnceCell::new(); fn setup() { // logging is required to import span fields as metric values @@ -26,6 +30,18 @@ fn setup() { metrics::try_recorder().is_none(), "metric recorder has already been installed" ); + let (collector, producer, controller) = ipa_metrics::installer(); + + // we can't set up the current thread as metric collector. It is possible + // that it will be shut down right after this call and we will lose metrics. + std::thread::Builder::new() + .name("ipa-metric-collector".to_string()) + .spawn(move || { + // todo: expose the join handle to drop the thread + collector.install(); + MetricsCollector::wait_for_shutdown(); + }) + .unwrap(); // no null bytes let recorder = DebuggingRecorder::new(); let snapshotter = recorder.snapshotter(); @@ -40,7 +56,7 @@ fn setup() { // register metrics register(); - snapshotter + (producer, controller) }); } @@ -78,6 +94,14 @@ impl MetricsHandle { pub fn span(&self) -> Span { setup(); + // safety: we call setup that initializes metrics right above this. + let (producer, _) = ONCE.get().unwrap(); + // connect current thread to the metrics collector, if not installed yet + if !MetricsCurrentThreadContext::is_connected() { + tracing::warn!("metrics started on {:?}", std::thread::current().id()); + producer.install(); + } + // Metrics collection with attributes/labels is expensive. Enabling it for all tests // resulted in doubling the time it takes to finish them. Tests must explicitly opt-in to // use this feature. @@ -87,10 +111,18 @@ impl MetricsHandle { // print them. match self.level { Level::INFO => { - tracing::info_span!("", "metrics_id" = self.id) + tracing::info_span!( + "", + "metrics_id" = self.id, + "metrics-partition" = to_u128(&self.id) + ) } Level::DEBUG => { - tracing::debug_span!("", "metrics_id" = self.id) + tracing::debug_span!( + "", + "metrics_id" = self.id, + "metrics-partition" = to_u128(&self.id) + ) } _ => { panic!("Only Info and Debug levels are supported") @@ -104,11 +136,22 @@ impl MetricsHandle { /// if metrics recorder is not installed #[must_use] pub fn snapshot(&self) -> Metrics { - let snapshot = ONCE.get().unwrap().snapshot(); - - Metrics::with_filter(snapshot, |labels| { - labels.iter().any(|label| label.value().eq(&self.id)) - }) + let (_, controller) = ONCE.get().expect("metrics must be installed"); + let store = controller + .snapshot() + .expect("metrics snapshot must be available"); + + // TODO: this is plain wrong, we need to get the snapshot from the collector thread. + // because we may use parallel seq_join + // let snapshot = MetricsContext::current_thread(|ctx| { + let metrics = Metrics::from_partition(&store, to_u128(&self.id)); + metrics + + // let snapshot = ONCE.get().unwrap().snapshot(); + // + // Metrics::with_filter(snapshot, |labels| { + // labels.iter().any(|label| label.value().eq(&self.id)) + // }) } pub fn get_counter_value>(&self, key_name: K) -> Option { @@ -120,6 +163,13 @@ impl MetricsHandle { } } +fn to_u128(input: &str) -> u128 { + let mut c = [0_u8; 16]; + c[0..8].copy_from_slice(input.as_bytes()); + + u128::from_le_bytes(c) +} + #[cfg(feature = "web-app")] impl crate::net::TracingSpanMaker for MetricsHandle { fn make_span(&self) -> Span { diff --git a/ipa-step-derive/src/lib.rs b/ipa-step-derive/src/lib.rs index 0ffdd7b1c..3b632010b 100644 --- a/ipa-step-derive/src/lib.rs +++ b/ipa-step-derive/src/lib.rs @@ -165,6 +165,17 @@ fn derive_gate_impl(ast: &DeriveInput) -> TokenStream { ::fmt(self, f) } } + + impl #name { + /// Returns the current index. It matches the index of the latest step + /// this gate has been narrowed to. + /// + /// If gate hasn't been narrowed yet, it returns the index of the default value. + #[must_use] + pub fn index(&self) -> ::ipa_step::CompactGateIndex { + self.0 + } + } }; // This environment variable is set by build scripts, diff --git a/ipa-step/Cargo.toml b/ipa-step/Cargo.toml index 05b06abbf..9b61c059c 100644 --- a/ipa-step/Cargo.toml +++ b/ipa-step/Cargo.toml @@ -10,8 +10,12 @@ name = [] string-step = [] [dependencies] +ipa-metrics = { version = "*", path = "../ipa-metrics" } + prettyplease = { version = "0.2", optional = true } proc-macro2 = { version = "1", optional = true } +# Used for descriptive gate hashing for metrics +rustc-hash = "2.0.0" quote = { version = "1.0.36", optional = true } serde = { version = "1.0", features = ["derive"] } syn = { version = "2.0.61", optional = true, features = ["full", "extra-traits"] } diff --git a/ipa-step/src/descriptive.rs b/ipa-step/src/descriptive.rs index 3cc8f0482..22ed24ff6 100644 --- a/ipa-step/src/descriptive.rs +++ b/ipa-step/src/descriptive.rs @@ -1,5 +1,10 @@ -use std::fmt::{Debug, Display, Formatter}; +use std::{ + fmt::{Debug, Display, Formatter}, + hash::Hasher, +}; +use ipa_metrics::LabelValue; +use rustc_hash::FxHasher; use serde::Deserialize; use crate::{Gate, Step, StepNarrow}; @@ -92,3 +97,19 @@ impl StepNarrow for Descriptive { Self { id } } } + +impl LabelValue for Descriptive { + fn hash(&self) -> u64 { + fn hash_str(input: &str) -> u64 { + let mut hasher = FxHasher::default(); + hasher.write(input.as_bytes()); + hasher.finish() + } + + hash_str(self.as_ref()) + } + + fn boxed(&self) -> Box { + Box::new(self.clone()) + } +} diff --git a/ipa-step/src/lib.rs b/ipa-step/src/lib.rs index a5a1f9590..27f98858d 100644 --- a/ipa-step/src/lib.rs +++ b/ipa-step/src/lib.rs @@ -73,7 +73,7 @@ pub trait CompactStep: Step { } } -/// A `Gate` implementation is a marker trait for a type that can be used to identify +/// A `Gate` implementation is provided for a type that can be used to identify /// gates in a protocol. It can be mapped to and from strings and has a default value. /// In most cases, implementations will also implement `StepNarrow` for different types, /// but this is not strictly required.