Skip to content

Commit

Permalink
Migrate IPA to ipa_metrics crate
Browse files Browse the repository at this point in the history
This is an umbrella commit that will contain the migration logic
  • Loading branch information
akoshelev committed Sep 17, 2024
1 parent a54e0dd commit f48436e
Show file tree
Hide file tree
Showing 16 changed files with 242 additions and 40 deletions.
6 changes: 5 additions & 1 deletion ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ web-app = [
"http-body",
"http-body-util",
]
test-fixture = ["weak-field"]
test-fixture = ["weak-field", "ipa-metrics/partitions", "ipa-metrics-tracing"]
# Include observability instruments that detect lack of progress inside MPC. If there is a bug that leads to helper
# miscommunication, this feature helps to detect it. Turning it on has some cost.
# If "shuttle" feature is enabled, turning this on has no effect.
Expand Down Expand Up @@ -78,6 +78,8 @@ ipa-prf = []
[dependencies]
ipa-step = { version = "*", path = "../ipa-step" }
ipa-step-derive = { version = "*", path = "../ipa-step-derive" }
ipa-metrics = { version = "*", path = "../ipa-metrics"}
ipa-metrics-tracing = { optional = true, version = "*", path = "../ipa-metrics-tracing" }

aes = "0.8.3"
async-trait = "0.1.79"
Expand Down Expand Up @@ -163,6 +165,8 @@ serde = { version = "1.0", features = ["derive"] }
command-fds = "0.2.2"
hex = "0.4"
ipa-step = { version = "*", path = "../ipa-step", features = ["build", "string-step"] }
ipa-metrics = { version = "*", path = "../ipa-metrics", features = ["partitions"] }
ipa-metrics-tracing = { version = "*", path = "../ipa-metrics-tracing" }
permutation = "0.4.1"
proptest = "1.4"
rustls = { version = "0.23" }
Expand Down
20 changes: 14 additions & 6 deletions ipa-core/src/helpers/gateway/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,21 @@ impl<I: TransportIdentity, M: Message> SendingEnd<I, M> {
))]
pub async fn send<B: Borrow<M>>(&self, record_id: RecordId, msg: B) -> Result<(), Error<I>> {
let r = self.inner.send(record_id, msg).await;
metrics::increment_counter!(RECORDS_SENT,
STEP => self.inner.channel_id.gate.as_ref().to_string(),
ROLE => self.sender_id.as_str(),
// metrics::increment_counter!(RECORDS_SENT,
// STEP => self.inner.channel_id.gate.as_ref().to_string(),
// ROLE => self.sender_id.as_str(),
// );
// metrics::counter!(BYTES_SENT, M::Size::U64,
// STEP => self.inner.channel_id.gate.as_ref().to_string(),
// ROLE => self.sender_id.as_str(),
// );
ipa_metrics::counter!(BYTES_SENT, M::Size::U64,
STEP => &self.inner.channel_id.gate,
ROLE => &self.sender_id,
);
metrics::counter!(BYTES_SENT, M::Size::U64,
STEP => self.inner.channel_id.gate.as_ref().to_string(),
ROLE => self.sender_id.as_str(),
ipa_metrics::counter!(RECORDS_SENT, 1,
STEP => &self.inner.channel_id.gate,
ROLE => &self.sender_id,
);

r
Expand Down
35 changes: 34 additions & 1 deletion ipa-core/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use std::{
convert::Infallible,
fmt::{Debug, Display, Formatter},
fmt::{Debug, Display, Formatter, Write},
num::NonZeroUsize,
};

Expand Down Expand Up @@ -61,6 +61,7 @@ pub use gateway::{
MpcTransportError, MpcTransportImpl, RoleResolvingTransport, ShardTransportImpl,
};
pub use gateway_exports::{Gateway, MpcReceivingEnd, SendingEnd, ShardReceivingEnd};
use ipa_metrics::LabelValue;
pub use prss_protocol::negotiate as negotiate_prss;
#[cfg(feature = "web-app")]
pub use transport::WrappedAxumBodyStream;
Expand Down Expand Up @@ -158,6 +159,12 @@ impl Debug for HelperIdentity {
}
}

impl Display for HelperIdentity {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}

#[cfg(feature = "web-app")]
impl From<HelperIdentity> for hyper::header::HeaderValue {
fn from(id: HelperIdentity) -> Self {
Expand Down Expand Up @@ -233,6 +240,16 @@ impl<T> IndexMut<HelperIdentity> for Vec<T> {
}
}

impl LabelValue for HelperIdentity {
fn hash(&self) -> u64 {
todo!()
}

fn boxed(&self) -> Box<dyn LabelValue> {
todo!()
}
}

/// Represents a unique role of the helper inside the MPC circuit. Each helper may have different
/// roles in queries it processes in parallel. For some queries it can be `H1` and for others it
/// may be `H2` or `H3`.
Expand Down Expand Up @@ -386,6 +403,22 @@ impl<T> IndexMut<Role> for Vec<T> {
}
}

impl Display for Role {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_static_str())
}
}

impl LabelValue for Role {
fn hash(&self) -> u64 {
u64::from(*self as u32)
}

fn boxed(&self) -> Box<dyn LabelValue> {
Box::new(*self)
}
}

impl RoleAssignment {
#[must_use]
pub const fn new(helper_roles: [HelperIdentity; 3]) -> Self {
Expand Down
3 changes: 2 additions & 1 deletion ipa-core/src/helpers/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub use handler::{
};
#[cfg(feature = "in-memory-infra")]
pub use in_memory::{config, InMemoryMpcNetwork, InMemoryShardNetwork, InMemoryTransport};
use ipa_metrics::LabelValue;
pub use receive::{LogErrors, ReceiveRecords};
#[cfg(feature = "web-app")]
pub use stream::WrappedAxumBodyStream;
Expand All @@ -41,7 +42,7 @@ use crate::{
/// An identity of a peer that can be communicated with using [`Transport`]. There are currently two
/// types of peers - helpers and shards.
pub trait Identity:
Copy + Clone + Debug + PartialEq + Eq + PartialOrd + Ord + Hash + Send + Sync + 'static
Copy + Clone + Debug + PartialEq + Eq + PartialOrd + Ord + Hash + Send + Sync + LabelValue + 'static
{
fn as_str(&self) -> Cow<'static, str>;
}
Expand Down
7 changes: 5 additions & 2 deletions ipa-core/src/net/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use futures::{
future::{ready, BoxFuture, Either, Ready},
Future, FutureExt,
};
use hyper::{body::Incoming, header::HeaderName, Request};
use hyper::{body::Incoming, header::HeaderName, Request, Version};
use metrics::increment_counter;
use rustls::{server::WebPkiClientVerifier, RootCertStore};
use rustls_pki_types::CertificateDer;
Expand Down Expand Up @@ -131,11 +131,12 @@ impl MpcHelperServer {
const BIND_ADDRESS: Ipv4Addr = Ipv4Addr::LOCALHOST;
#[cfg(not(test))]
const BIND_ADDRESS: Ipv4Addr = Ipv4Addr::UNSPECIFIED;

let svc = self.router().layer(
TraceLayer::new_for_http()
.make_span_with(move |_request: &hyper::Request<_>| tracing.make_span())
.on_request(|request: &hyper::Request<_>, _: &Span| {
ipa_metrics::counter!(RequestProtocolVersion::from(request.version()), 1);
ipa_metrics::counter!(REQUESTS_RECEIVED, 1);
increment_counter!(RequestProtocolVersion::from(request.version()));
increment_counter!(REQUESTS_RECEIVED);
}),
Expand Down Expand Up @@ -226,6 +227,8 @@ where
{
tokio::spawn({
async move {
eprintln!("server started on {:?}", std::thread::current().id());
tracing::warn!("server started on {:?}", std::thread::current().id());
// Apply configuration
HttpServerConfig::apply(&mut server.http_builder().http2());
// Start serving
Expand Down
12 changes: 6 additions & 6 deletions ipa-core/src/protocol/context/prss.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ impl<'a, I: Iterator> Iterator for InstrumentedChunkIter<'a, I> {
type Item = <I as Iterator>::Item;

fn next(&mut self) -> Option<Self::Item> {
let step = self.instrumented.step.as_ref().to_string();
let step = self.instrumented.step;
// TODO: what we really want here is a gauge indicating the maximum index used to generate
// PRSS. Gauge infrastructure is not supported yet, `Metrics` struct needs to be able to
// handle gauges
metrics::increment_counter!(INDEXED_PRSS_GENERATED, STEP => step, ROLE => self.instrumented.role.as_static_str());
ipa_metrics::counter!(INDEXED_PRSS_GENERATED, 1, STEP => step, ROLE => &self.instrumented.role);
self.inner.next()
}
}
Expand All @@ -97,11 +97,11 @@ impl<Z: ArrayLength> Iterator for InstrumentedChunksIter<'_, IndexedSharedRandom
let l = self.left.next()?;
let r = self.right.next()?;

let step = self.instrumented.step.as_ref().to_string();
let step = self.instrumented.step;
// TODO: what we really want here is a gauge indicating the maximum index used to generate
// PRSS. Gauge infrastructure is not supported yet, `Metrics` struct needs to be able to
// handle gauges
metrics::increment_counter!(INDEXED_PRSS_GENERATED, STEP => step, ROLE => self.instrumented.role.as_static_str());
ipa_metrics::counter!(INDEXED_PRSS_GENERATED, 1, STEP => step, ROLE => &self.instrumented.role);

Some((l, r))
}
Expand Down Expand Up @@ -132,8 +132,8 @@ impl RngCore for InstrumentedSequentialSharedRandomness<'_> {
}

fn next_u64(&mut self) -> u64 {
let step = self.step.as_ref().to_string();
metrics::increment_counter!(SEQUENTIAL_PRSS_GENERATED, STEP => step, ROLE => self.role.as_static_str());
let step = self.step;
ipa_metrics::counter!(SEQUENTIAL_PRSS_GENERATED, 1, STEP => step, ROLE => &self.role);
self.inner.next_u64()
}

Expand Down
11 changes: 11 additions & 0 deletions ipa-core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ pub type Gate = step::ProtocolGate;
#[cfg(descriptive_gate)]
pub type Gate = ipa_step::descriptive::Descriptive;

#[cfg(compact_gate)]
impl ipa_metrics::LabelValue for step::ProtocolGate {
fn hash(&self) -> u64 {
u64::from(self.index())
}

fn boxed(&self) -> Box<dyn ipa_metrics::LabelValue> {
Box::new(self.clone())
}
}

/// Unique identifier of the MPC query requested by report collectors
/// TODO(615): Generating this unique id may be tricky as it may involve communication between helpers and
/// them collaborating on constructing this unique id. These details haven't been flushed out yet,
Expand Down
12 changes: 12 additions & 0 deletions ipa-core/src/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::{
num::TryFromIntError,
};

use ipa_metrics::LabelValue;

/// A unique zero-based index of the helper shard.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ShardIndex(u32);
Expand All @@ -29,6 +31,16 @@ impl Display for ShardIndex {
}
}

impl LabelValue for ShardIndex {
fn hash(&self) -> u64 {
u64::from(self.0)
}

fn boxed(&self) -> Box<dyn LabelValue> {
Box::new(*self)
}
}

/// Shard-specific configuration required by sharding API. Each shard must know its own index and
/// the total number of shards in the system.
pub trait ShardConfiguration {
Expand Down
26 changes: 16 additions & 10 deletions ipa-core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,31 @@ pub mod metrics {
/// Metric that records the version of HTTP protocol used for a particular request.
pub struct RequestProtocolVersion(Version);

impl From<Version> for RequestProtocolVersion {
fn from(v: Version) -> Self {
RequestProtocolVersion(v)
}
}

impl From<RequestProtocolVersion> for KeyName {
fn from(v: RequestProtocolVersion) -> Self {
impl From<RequestProtocolVersion> for &'static str {
fn from(value: RequestProtocolVersion) -> Self {
const HTTP11: &str = "request.protocol.HTTP/1.1";
const HTTP2: &str = "request.protocol.HTTP/2";
const HTTP3: &str = "request.protocol.HTTP/3";
const UNKNOWN: &str = "request.protocol.HTTP/UNKNOWN";

KeyName::from_const_str(match v.0 {
match value.0 {
Version::HTTP_11 => HTTP11,
Version::HTTP_2 => HTTP2,
Version::HTTP_3 => HTTP3,
_ => UNKNOWN,
})
}
}
}

impl From<Version> for RequestProtocolVersion {
fn from(v: Version) -> Self {
RequestProtocolVersion(v)
}
}

impl From<RequestProtocolVersion> for KeyName {
fn from(v: RequestProtocolVersion) -> Self {
KeyName::from(<&'static str>::from(v))
}
}
}
Expand Down
41 changes: 39 additions & 2 deletions ipa-core/src/telemetry/stats.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::{
collections::{hash_map::Iter, HashMap},
collections::{
hash_map::{Entry, Iter},
HashMap,
},
fmt::Debug,
};

use metrics::{KeyName, Label, SharedString};
use ipa_metrics::{MetricPartition, MetricsStore};
use metrics::{Key, KeyName, Label, SharedString};
use metrics_util::{
debugging::{DebugValue, Snapshot},
CompositeKey, MetricKind,
Expand Down Expand Up @@ -32,6 +36,7 @@ pub struct CounterDetails {
/// or POST requests.
///
/// X1 and X2 cannot be greater than X, but these values may overlap, i.e. X1 + X2 >= X
#[derive(Default)]
pub struct Metrics {
pub counters: HashMap<KeyName, CounterDetails>,
pub metric_description: HashMap<KeyName, SharedString>,
Expand Down Expand Up @@ -68,6 +73,38 @@ impl<'a> IntoIterator for &'a CounterDetails {
}

impl Metrics {
pub fn from_partition(metrics_store: &MetricsStore, partition: MetricPartition) -> Self {
let v = metrics_store.with_partition(partition, |store| {
let mut this = Self::default();
for (counter, value) in store.counters() {
let key = Key::from_parts(
counter.key(),
counter
.labels()
.map(|l| Label::new(l.name(), l.str_value()))
.collect::<Vec<_>>(),
);
let composite_key = CompositeKey::new(MetricKind::Counter, key);
match this.counters.entry(counter.key().into()) {
Entry::Occupied(mut entry) => {
entry
.get_mut()
.add(&composite_key, &DebugValue::Counter(value));
}
Entry::Vacant(entry) => {
let mut counter_details = CounterDetails::default();
counter_details.add(&composite_key, &DebugValue::Counter(value));
entry.insert(counter_details);
}
}
}

this
});

v.expect(&format!("Partition {partition} does not exist"))
}

pub fn from_snapshot(snapshot: Snapshot) -> Self {
const ALWAYS_TRUE: fn(&[Label]) -> bool = |_| true;
Self::with_filter(snapshot, ALWAYS_TRUE)
Expand Down
1 change: 1 addition & 0 deletions ipa-core/src/test_fixture/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub fn setup() {
)
.with(fmt_layer)
.with(MetricsLayer::new())
.with(ipa_metrics_tracing::MetricsPartitioningLayer)
.init();
}
});
Expand Down
Loading

0 comments on commit f48436e

Please sign in to comment.