Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-core): add aggregate metrics support #2364

Merged
merged 2 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dc/s2n-quic-dc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ exclude = ["corpus.tar.gz"]

[features]
default = ["tokio"]
testing = ["bolero-generator", "s2n-quic-core/testing"]
testing = ["bolero-generator", "s2n-quic-core/testing", "tracing-subscriber"]
tokio = ["tokio/io-util", "tokio/net", "tokio/rt-multi-thread", "tokio/time"]

[dependencies]
Expand Down Expand Up @@ -40,6 +40,7 @@ slotmap = "1"
thiserror = "1"
tokio = { version = "1", default-features = false, features = ["sync"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = true }
zerocopy = { version = "0.7", features = ["derive"] }
zeroize = "1"
parking_lot = "0.12"
Expand All @@ -51,6 +52,7 @@ insta = "1"
s2n-codec = { path = "../../common/s2n-codec", features = ["testing"] }
s2n-quic-core = { path = "../../quic/s2n-quic-core", features = ["testing"] }
tokio = { version = "1", features = ["full"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[lints.rust.unexpected_cfgs]
level = "warn"
Expand Down
6 changes: 6 additions & 0 deletions dc/s2n-quic-dc/events/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@
#[event("application:write")]
pub struct ApplicationWrite {
/// The number of bytes that the application tried to write
#[measure("provided", "b")]
total_len: usize,

/// The amount that was written
#[measure("committed", "b")]
#[counter("committed.total", "b")]
write_len: usize,
}

#[event("application:read")]
pub struct ApplicationRead {
/// The number of bytes that the application tried to read
#[measure("capacity", "b")]
capacity: usize,

/// The amount that was read
#[measure("committed", "b")]
#[counter("committed.total", "b")]
read_len: usize,
}
4 changes: 4 additions & 0 deletions dc/s2n-quic-dc/events/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
#[subject(endpoint)]
struct PathSecretMapInitialized {
/// The capacity of the path secret map
#[measure("capacity")]
capacity: usize,
}

#[event("path_secret_map:uninitialized")]
#[subject(endpoint)]
struct PathSecretMapUninitialized {
/// The capacity of the path secret map
#[measure("capacity")]
capacity: usize,

/// The number of entries in the map
#[measure("entries")]
entries: usize,
}

Expand Down Expand Up @@ -128,6 +131,7 @@ struct ReplayPotentiallyDetected<'a> {

key_id: u64,

#[measure("gap")]
gap: u64,
}

Expand Down
17 changes: 17 additions & 0 deletions dc/s2n-quic-dc/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,20 @@ impl Meta for api::EndpointMeta {

mod generated;
pub use generated::*;

pub mod metrics {
pub use crate::event::generated::metrics::*;
pub use s2n_quic_core::event::metrics::Recorder;

pub mod aggregate {
pub use crate::event::generated::metrics::aggregate::*;
pub use s2n_quic_core::event::metrics::aggregate::{
info, AsMetric, Info, Recorder, Registry,
};

pub mod probe {
pub use crate::event::generated::metrics::probe::*;
pub use s2n_quic_core::event::metrics::aggregate::probe::dynamic;
}
}
}
77 changes: 1 addition & 76 deletions dc/s2n-quic-dc/src/event/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// changes should be made there.

use super::*;
pub(crate) mod metrics;
pub mod api {
#![doc = r" This module contains events that are emitted to the [`Subscriber`](crate::event::Subscriber)"]
use super::*;
Expand Down Expand Up @@ -2673,82 +2674,6 @@ mod traits {
}
}
}
pub mod metrics {
use super::*;
use core::sync::atomic::{AtomicU32, Ordering};
use s2n_quic_core::event::metrics::Recorder;
#[derive(Debug)]
pub struct Subscriber<S: super::Subscriber>
where
S::ConnectionContext: Recorder,
{
subscriber: S,
}
impl<S: super::Subscriber> Subscriber<S>
where
S::ConnectionContext: Recorder,
{
pub fn new(subscriber: S) -> Self {
Self { subscriber }
}
}
pub struct Context<R: Recorder> {
recorder: R,
application_write: AtomicU32,
application_read: AtomicU32,
}
impl<S: super::Subscriber> super::Subscriber for Subscriber<S>
where
S::ConnectionContext: Recorder,
{
type ConnectionContext = Context<S::ConnectionContext>;
fn create_connection_context(
&self,
meta: &api::ConnectionMeta,
info: &api::ConnectionInfo,
) -> Self::ConnectionContext {
Context {
recorder: self.subscriber.create_connection_context(meta, info),
application_write: AtomicU32::new(0),
application_read: AtomicU32::new(0),
}
}
#[inline]
fn on_application_write(
&self,
context: &Self::ConnectionContext,
meta: &api::ConnectionMeta,
event: &api::ApplicationWrite,
) {
context.application_write.fetch_add(1, Ordering::Relaxed);
self.subscriber
.on_application_write(&context.recorder, meta, event);
}
#[inline]
fn on_application_read(
&self,
context: &Self::ConnectionContext,
meta: &api::ConnectionMeta,
event: &api::ApplicationRead,
) {
context.application_read.fetch_add(1, Ordering::Relaxed);
self.subscriber
.on_application_read(&context.recorder, meta, event);
}
}
impl<R: Recorder> Drop for Context<R> {
fn drop(&mut self) {
self.recorder.increment_counter(
"application_write",
self.application_write.load(Ordering::Relaxed) as _,
);
self.recorder.increment_counter(
"application_read",
self.application_read.load(Ordering::Relaxed) as _,
);
}
}
}
#[cfg(any(test, feature = "testing"))]
pub mod testing {
use super::*;
Expand Down
82 changes: 82 additions & 0 deletions dc/s2n-quic-dc/src/event/generated/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// DO NOT MODIFY THIS FILE
// This file was generated with the `s2n-quic-events` crate and any required
// changes should be made there.

use crate::event::{self, api, metrics::Recorder};
use core::sync::atomic::{AtomicU32, Ordering};
pub(crate) mod aggregate;
pub(crate) mod probe;
#[derive(Debug)]
pub struct Subscriber<S: event::Subscriber>
where
S::ConnectionContext: Recorder,
{
subscriber: S,
}
impl<S: event::Subscriber> Subscriber<S>
where
S::ConnectionContext: Recorder,
{
pub fn new(subscriber: S) -> Self {
Self { subscriber }
}
}
pub struct Context<R: Recorder> {
recorder: R,
application_write: AtomicU32,
application_read: AtomicU32,
}
impl<S: event::Subscriber> event::Subscriber for Subscriber<S>
where
S::ConnectionContext: Recorder,
{
type ConnectionContext = Context<S::ConnectionContext>;
fn create_connection_context(
&self,
meta: &api::ConnectionMeta,
info: &api::ConnectionInfo,
) -> Self::ConnectionContext {
Context {
recorder: self.subscriber.create_connection_context(meta, info),
application_write: AtomicU32::new(0),
application_read: AtomicU32::new(0),
}
}
#[inline]
fn on_application_write(
&self,
context: &Self::ConnectionContext,
meta: &api::ConnectionMeta,
event: &api::ApplicationWrite,
) {
context.application_write.fetch_add(1, Ordering::Relaxed);
self.subscriber
.on_application_write(&context.recorder, meta, event);
}
#[inline]
fn on_application_read(
&self,
context: &Self::ConnectionContext,
meta: &api::ConnectionMeta,
event: &api::ApplicationRead,
) {
context.application_read.fetch_add(1, Ordering::Relaxed);
self.subscriber
.on_application_read(&context.recorder, meta, event);
}
}
impl<R: Recorder> Drop for Context<R> {
fn drop(&mut self) {
self.recorder.increment_counter(
"application_write",
self.application_write.load(Ordering::Relaxed) as _,
);
self.recorder.increment_counter(
"application_read",
self.application_read.load(Ordering::Relaxed) as _,
);
}
}
Loading
Loading