Skip to content

Commit

Permalink
feat(s2n-quic-core): add aggregate metrics support
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft committed Nov 1, 2024
1 parent 65d55a4 commit 9a46404
Show file tree
Hide file tree
Showing 20 changed files with 3,366 additions and 956 deletions.
1 change: 1 addition & 0 deletions dc/s2n-quic-dc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ insta = "1"
s2n-codec = { path = "../../common/s2n-codec", features = ["testing"] }
s2n-quic-core = { path = "../../quic/s2n-quic-core", features = ["testing"] }
tokio = { version = "1", features = ["full"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[lints.rust.unexpected_cfgs]
level = "warn"
Expand Down
10 changes: 10 additions & 0 deletions dc/s2n-quic-dc/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,13 @@ impl Meta for api::EndpointMeta {

mod generated;
pub use generated::*;

pub mod metrics {
pub use crate::event::generated::metrics::*;
pub use s2n_quic_core::event::metrics::Recorder;

pub mod aggregate {
pub use crate::event::generated::metrics::aggregate::*;
pub use s2n_quic_core::event::metrics::aggregate::{info, probe, Info, Recorder, Registry};
}
}
77 changes: 1 addition & 76 deletions dc/s2n-quic-dc/src/event/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// changes should be made there.

use super::*;
pub(crate) mod metrics;
pub mod api {
#![doc = r" This module contains events that are emitted to the [`Subscriber`](crate::event::Subscriber)"]
use super::*;
Expand Down Expand Up @@ -2673,82 +2674,6 @@ mod traits {
}
}
}
pub mod metrics {
use super::*;
use core::sync::atomic::{AtomicU32, Ordering};
use s2n_quic_core::event::metrics::Recorder;
#[derive(Debug)]
pub struct Subscriber<S: super::Subscriber>
where
S::ConnectionContext: Recorder,
{
subscriber: S,
}
impl<S: super::Subscriber> Subscriber<S>
where
S::ConnectionContext: Recorder,
{
pub fn new(subscriber: S) -> Self {
Self { subscriber }
}
}
pub struct Context<R: Recorder> {
recorder: R,
application_write: AtomicU32,
application_read: AtomicU32,
}
impl<S: super::Subscriber> super::Subscriber for Subscriber<S>
where
S::ConnectionContext: Recorder,
{
type ConnectionContext = Context<S::ConnectionContext>;
fn create_connection_context(
&self,
meta: &api::ConnectionMeta,
info: &api::ConnectionInfo,
) -> Self::ConnectionContext {
Context {
recorder: self.subscriber.create_connection_context(meta, info),
application_write: AtomicU32::new(0),
application_read: AtomicU32::new(0),
}
}
#[inline]
fn on_application_write(
&self,
context: &Self::ConnectionContext,
meta: &api::ConnectionMeta,
event: &api::ApplicationWrite,
) {
context.application_write.fetch_add(1, Ordering::Relaxed);
self.subscriber
.on_application_write(&context.recorder, meta, event);
}
#[inline]
fn on_application_read(
&self,
context: &Self::ConnectionContext,
meta: &api::ConnectionMeta,
event: &api::ApplicationRead,
) {
context.application_read.fetch_add(1, Ordering::Relaxed);
self.subscriber
.on_application_read(&context.recorder, meta, event);
}
}
impl<R: Recorder> Drop for Context<R> {
fn drop(&mut self) {
self.recorder.increment_counter(
"application_write",
self.application_write.load(Ordering::Relaxed) as _,
);
self.recorder.increment_counter(
"application_read",
self.application_read.load(Ordering::Relaxed) as _,
);
}
}
}
#[cfg(any(test, feature = "testing"))]
pub mod testing {
use super::*;
Expand Down
81 changes: 81 additions & 0 deletions dc/s2n-quic-dc/src/event/generated/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// DO NOT MODIFY THIS FILE
// This file was generated with the `s2n-quic-events` crate and any required
// changes should be made there.

use crate::event::{self, api, metrics::Recorder};
use core::sync::atomic::{AtomicU32, Ordering};
pub(crate) mod aggregate;
#[derive(Debug)]
pub struct Subscriber<S: event::Subscriber>
where
S::ConnectionContext: Recorder,
{
subscriber: S,
}
impl<S: event::Subscriber> Subscriber<S>
where
S::ConnectionContext: Recorder,
{
pub fn new(subscriber: S) -> Self {
Self { subscriber }
}
}
pub struct Context<R: Recorder> {
recorder: R,
application_write: AtomicU32,
application_read: AtomicU32,
}
impl<S: event::Subscriber> event::Subscriber for Subscriber<S>
where
S::ConnectionContext: Recorder,
{
type ConnectionContext = Context<S::ConnectionContext>;
fn create_connection_context(
&self,
meta: &api::ConnectionMeta,
info: &api::ConnectionInfo,
) -> Self::ConnectionContext {
Context {
recorder: self.subscriber.create_connection_context(meta, info),
application_write: AtomicU32::new(0),
application_read: AtomicU32::new(0),
}
}
#[inline]
fn on_application_write(
&self,
context: &Self::ConnectionContext,
meta: &api::ConnectionMeta,
event: &api::ApplicationWrite,
) {
context.application_write.fetch_add(1, Ordering::Relaxed);
self.subscriber
.on_application_write(&context.recorder, meta, event);
}
#[inline]
fn on_application_read(
&self,
context: &Self::ConnectionContext,
meta: &api::ConnectionMeta,
event: &api::ApplicationRead,
) {
context.application_read.fetch_add(1, Ordering::Relaxed);
self.subscriber
.on_application_read(&context.recorder, meta, event);
}
}
impl<R: Recorder> Drop for Context<R> {
fn drop(&mut self) {
self.recorder.increment_counter(
"application_write",
self.application_write.load(Ordering::Relaxed) as _,
);
self.recorder.increment_counter(
"application_read",
self.application_read.load(Ordering::Relaxed) as _,
);
}
}
Loading

0 comments on commit 9a46404

Please sign in to comment.