From b9bcf329ff21399901db4ed33ed5b7d676c1ba55 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Thu, 12 Sep 2024 22:05:07 -0700 Subject: [PATCH 1/8] generate &mut --- dc/s2n-quic-dc/src/event/events.rs | 10 + dc/s2n-quic-dc/src/event/generated.rs | 2033 +++++++++++++++++++++++++ quic/s2n-quic-events/src/main.rs | 218 ++- quic/s2n-quic-events/src/parser.rs | 59 +- 4 files changed, 2229 insertions(+), 91 deletions(-) create mode 100644 dc/s2n-quic-dc/src/event/events.rs create mode 100644 dc/s2n-quic-dc/src/event/generated.rs diff --git a/dc/s2n-quic-dc/src/event/events.rs b/dc/s2n-quic-dc/src/event/events.rs new file mode 100644 index 0000000000..1e31524484 --- /dev/null +++ b/dc/s2n-quic-dc/src/event/events.rs @@ -0,0 +1,10 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +#[event("transport:frame_sent")] +/// Frame was sent +struct FrameSent { + packet_header: PacketHeader, + path_id: u64, + frame: Frame, +} diff --git a/dc/s2n-quic-dc/src/event/generated.rs b/dc/s2n-quic-dc/src/event/generated.rs new file mode 100644 index 0000000000..f0c484fb90 --- /dev/null +++ b/dc/s2n-quic-dc/src/event/generated.rs @@ -0,0 +1,2033 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +// DO NOT MODIFY THIS FILE +// This file was generated with the `s2n-quic-events` crate and any required +// changes should be made there. + +use super::*; +pub mod api { + #![doc = r" This module contains events that are emitted to the [`Subscriber`](crate::event::Subscriber)"] + use super::*; + pub use traits::Subscriber; + #[derive(Clone, Debug)] + #[non_exhaustive] + #[doc = " Frame was sent"] + pub struct FrameSent { + pub packet_header: PacketHeader, + pub path_id: u64, + pub frame: Frame, + } + impl Event for FrameSent { + const NAME: &'static str = "transport:frame_sent"; + } + use super::*; + pub mod api { + #![doc = r" This module contains events that are emitted to the [`Subscriber`](crate::event::Subscriber)"] + use super::*; + pub use traits::Subscriber; + #[derive(Clone, Debug)] + #[non_exhaustive] + #[doc = " Frame was sent"] + pub struct FrameSent { + pub packet_header: PacketHeader, + pub path_id: u64, + pub frame: Frame, + } + impl Event for FrameSent { + const NAME: &'static str = "transport:frame_sent"; + } + use super::*; + pub mod api { + #![doc = r" This module contains events that are emitted to the [`Subscriber`](crate::event::Subscriber)"] + use super::*; + pub use traits::Subscriber; + #[derive(Clone, Debug)] + #[non_exhaustive] + #[doc = " Frame was sent"] + pub struct FrameSent { + pub packet_header: PacketHeader, + pub path_id: u64, + pub frame: Frame, + } + impl Event for FrameSent { + const NAME: &'static str = "transport:frame_sent"; + } + } + #[cfg(feature = "event-tracing")] + pub mod tracing { + #![doc = r" This module contains event integration with [`tracing`](https://docs.rs/tracing)"] + use super::api; + #[doc = r" Emits events with [`tracing`](https://docs.rs/tracing)"] + #[derive(Clone, Debug)] + pub struct Subscriber { + client: tracing::Span, + server: tracing::Span, + } + impl Default for Subscriber { + fn default() -> Self { + let root = tracing :: span ! (target : "s2n_quic" , tracing :: Level :: DEBUG , "s2n_quic"); + let client = tracing :: span ! (parent : root . id () , tracing :: Level :: DEBUG , "client"); + let server = tracing :: span ! (parent : root . id () , tracing :: Level :: DEBUG , "server"); + Self { client, server } + } + } + impl super::Subscriber for Subscriber { + type ConnectionContext = tracing::Span; + fn create_connection_context( + &self, + meta: &api::ConnectionMeta, + _info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + let parent = match meta.endpoint_type { + api::EndpointType::Client {} => self.client.id(), + api::EndpointType::Server {} => self.server.id(), + }; + tracing :: span ! (target : "s2n_quic" , parent : parent , tracing :: Level :: DEBUG , "conn" , id = meta . id) + } + #[inline] + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + _meta: &api::ConnectionMeta, + event: &api::FrameSent, + ) { + let id = context.id(); + let api::FrameSent { + packet_header, + path_id, + frame, + } = event; + tracing :: event ! (target : "frame_sent" , parent : id , tracing :: Level :: DEBUG , packet_header = tracing :: field :: debug (packet_header) , path_id = tracing :: field :: debug (path_id) , frame = tracing :: field :: debug (frame)); + } + } + } + pub mod builder { + use super::*; + #[derive(Clone, Debug)] + #[doc = " Frame was sent"] + pub struct FrameSent { + pub packet_header: PacketHeader, + pub path_id: u64, + pub frame: Frame, + } + impl IntoEvent for FrameSent { + #[inline] + fn into_event(self) -> api::FrameSent { + let FrameSent { + packet_header, + path_id, + frame, + } = self; + api::FrameSent { + packet_header: packet_header.into_event(), + path_id: path_id.into_event(), + frame: frame.into_event(), + } + } + } + } + pub mod supervisor { + #![doc = r" This module contains the `supervisor::Outcome` and `supervisor::Context` for use"] + #![doc = r" when implementing [`Subscriber::supervisor_timeout`](crate::event::Subscriber::supervisor_timeout) and"] + #![doc = r" [`Subscriber::on_supervisor_timeout`](crate::event::Subscriber::on_supervisor_timeout)"] + #![doc = r" on a Subscriber."] + use crate::{ + application, + event::{builder::SocketAddress, IntoEvent}, + }; + #[non_exhaustive] + #[derive(Clone, Debug, Eq, PartialEq)] + pub enum Outcome { + #[doc = r" Allow the connection to remain open"] + Continue, + #[doc = r" Close the connection and notify the peer"] + Close { error_code: application::Error }, + #[doc = r" Close the connection without notifying the peer"] + ImmediateClose { reason: &'static str }, + } + impl Default for Outcome { + fn default() -> Self { + Self::Continue + } + } + #[non_exhaustive] + #[derive(Debug)] + pub struct Context<'a> { + #[doc = r" Number of handshakes that have begun but not completed"] + pub inflight_handshakes: usize, + #[doc = r" Number of open connections"] + pub connection_count: usize, + #[doc = r" The address of the peer"] + pub remote_address: SocketAddress<'a>, + #[doc = r" True if the connection is in the handshake state, false otherwise"] + pub is_handshaking: bool, + } + impl<'a> Context<'a> { + pub fn new( + inflight_handshakes: usize, + connection_count: usize, + remote_address: &'a crate::inet::SocketAddress, + is_handshaking: bool, + ) -> Self { + Self { + inflight_handshakes, + connection_count, + remote_address: remote_address.into_event(), + is_handshaking, + } + } + } + } + pub use traits::*; + mod traits { + use super::*; + use crate::query; + use api::*; + use core::fmt; + #[doc = r" Provides metadata related to an event"] + pub trait Meta: fmt::Debug { + #[doc = r" Returns whether the local endpoint is a Client or Server"] + fn endpoint_type(&self) -> &EndpointType; + #[doc = r" A context from which the event is being emitted"] + #[doc = r""] + #[doc = r" An event can occur in the context of an Endpoint or Connection"] + fn subject(&self) -> Subject; + #[doc = r" The time the event occurred"] + fn timestamp(&self) -> &crate::event::Timestamp; + } + impl Meta for ConnectionMeta { + fn endpoint_type(&self) -> &EndpointType { + &self.endpoint_type + } + fn subject(&self) -> Subject { + Subject::Connection { id: self.id } + } + fn timestamp(&self) -> &crate::event::Timestamp { + &self.timestamp + } + } + impl Meta for EndpointMeta { + fn endpoint_type(&self) -> &EndpointType { + &self.endpoint_type + } + fn subject(&self) -> Subject { + Subject::Endpoint {} + } + fn timestamp(&self) -> &crate::event::Timestamp { + &self.timestamp + } + } + #[doc = r" Allows for events to be subscribed to"] + pub trait Subscriber: 'static + Send { + #[doc = r" An application provided type associated with each connection."] + #[doc = r""] + #[doc = r" The context provides a mechanism for applications to provide a custom type"] + #[doc = r" and update it on each event, e.g. computing statistics. Each event"] + #[doc = r" invocation (e.g. [`Subscriber::on_packet_sent`]) also provides mutable"] + #[doc = r" access to the context `&mut ConnectionContext` and allows for updating the"] + #[doc = r" context."] + #[doc = r""] + #[doc = r" ```no_run"] + #[doc = r" # mod s2n_quic { pub mod provider { pub mod event {"] + #[doc = r" # pub use s2n_quic_core::event::{api as events, api::ConnectionInfo, api::ConnectionMeta, Subscriber};"] + #[doc = r" # }}}"] + #[doc = r" use s2n_quic::provider::event::{"] + #[doc = r" ConnectionInfo, ConnectionMeta, Subscriber, events::PacketSent"] + #[doc = r" };"] + #[doc = r""] + #[doc = r" pub struct MyEventSubscriber;"] + #[doc = r""] + #[doc = r" pub struct MyEventContext {"] + #[doc = r" packet_sent: u64,"] + #[doc = r" }"] + #[doc = r""] + #[doc = r" impl Subscriber for MyEventSubscriber {"] + #[doc = r" type ConnectionContext = MyEventContext;"] + #[doc = r""] + #[doc = r" fn create_connection_context("] + #[doc = r" &mut self, _meta: &ConnectionMeta,"] + #[doc = r" _info: &ConnectionInfo,"] + #[doc = r" ) -> Self::ConnectionContext {"] + #[doc = r" MyEventContext { packet_sent: 0 }"] + #[doc = r" }"] + #[doc = r""] + #[doc = r" fn on_packet_sent("] + #[doc = r" &mut self,"] + #[doc = r" context: &mut Self::ConnectionContext,"] + #[doc = r" _meta: &ConnectionMeta,"] + #[doc = r" _event: &PacketSent,"] + #[doc = r" ) {"] + #[doc = r" context.packet_sent += 1;"] + #[doc = r" }"] + #[doc = r" }"] + #[doc = r" ```"] + type ConnectionContext: 'static + Send; + #[doc = r" Creates a context to be passed to each connection-related event"] + fn create_connection_context( + &self, + meta: &ConnectionMeta, + info: &ConnectionInfo, + ) -> Self::ConnectionContext; + #[doc = r" The period at which `on_supervisor_timeout` is called"] + #[doc = r""] + #[doc = r" If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout`"] + #[doc = r" across all `event::Subscriber`s will be used."] + #[doc = r""] + #[doc = r" If the `supervisor_timeout()` is `None` across all `event::Subscriber`s, connection supervision"] + #[doc = r" will cease for the remaining lifetime of the connection and `on_supervisor_timeout` will no longer"] + #[doc = r" be called."] + #[doc = r""] + #[doc = r" It is recommended to avoid setting this value less than ~100ms, as short durations"] + #[doc = r" may lead to higher CPU utilization."] + #[allow(unused_variables)] + fn supervisor_timeout( + &self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> Option { + None + } + #[doc = r" Called for each `supervisor_timeout` to determine any action to take on the connection based on the `supervisor::Outcome`"] + #[doc = r""] + #[doc = r" If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout`"] + #[doc = r" across all `event::Subscriber`s will be used, and thus `on_supervisor_timeout` may be called"] + #[doc = r" earlier than the `supervisor_timeout` for a given `event::Subscriber` implementation."] + #[allow(unused_variables)] + fn on_supervisor_timeout( + &self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> supervisor::Outcome { + supervisor::Outcome::default() + } + #[doc = "Called when the `FrameSent` event is triggered"] + #[inline] + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &FrameSent, + ) { + let _ = context; + let _ = meta; + let _ = event; + } + #[doc = r" Called for each event that relates to the endpoint and all connections"] + #[inline] + fn on_event(&self, meta: &M, event: &E) { + let _ = meta; + let _ = event; + } + #[doc = r" Called for each event that relates to a connection"] + #[inline] + fn on_connection_event( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &E, + ) { + let _ = context; + let _ = meta; + let _ = event; + } + #[doc = r" Used for querying the `Subscriber::ConnectionContext` on a Subscriber"] + #[inline] + fn query( + context: &Self::ConnectionContext, + query: &mut dyn query::Query, + ) -> query::ControlFlow { + query.execute(context) + } + #[doc = r" Used for querying and mutating the `Subscriber::ConnectionContext` on a Subscriber"] + #[inline] + fn query_mut( + context: &mut Self::ConnectionContext, + query: &mut dyn query::QueryMut, + ) -> query::ControlFlow { + query.execute_mut(context) + } + } + #[doc = r" Subscriber is implemented for a 2-element tuple to make it easy to compose multiple"] + #[doc = r" subscribers."] + impl Subscriber for (A, B) + where + A: Subscriber, + B: Subscriber, + { + type ConnectionContext = (A::ConnectionContext, B::ConnectionContext); + #[inline] + fn create_connection_context( + &self, + meta: &ConnectionMeta, + info: &ConnectionInfo, + ) -> Self::ConnectionContext { + ( + self.0.create_connection_context(meta, info), + self.1.create_connection_context(meta, info), + ) + } + #[inline] + fn supervisor_timeout( + &self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> Option { + let timeout_a = self + .0 + .supervisor_timeout(&mut conn_context.0, meta, context); + let timeout_b = self + .1 + .supervisor_timeout(&mut conn_context.1, meta, context); + match (timeout_a, timeout_b) { + (None, None) => None, + (None, Some(timeout)) | (Some(timeout), None) => Some(timeout), + (Some(a), Some(b)) => Some(a.min(b)), + } + } + #[inline] + fn on_supervisor_timeout( + &self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> supervisor::Outcome { + let outcome_a = + self.0 + .on_supervisor_timeout(&mut conn_context.0, meta, context); + let outcome_b = + self.1 + .on_supervisor_timeout(&mut conn_context.1, meta, context); + match (outcome_a, outcome_b) { + (supervisor::Outcome::ImmediateClose { reason }, _) + | (_, supervisor::Outcome::ImmediateClose { reason }) => { + supervisor::Outcome::ImmediateClose { reason } + } + (supervisor::Outcome::Close { error_code }, _) + | (_, supervisor::Outcome::Close { error_code }) => { + supervisor::Outcome::Close { error_code } + } + _ => supervisor::Outcome::Continue, + } + } + #[inline] + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &FrameSent, + ) { + (self.0).on_frame_sent(&context.0, meta, event); + (self.1).on_frame_sent(&context.1, meta, event); + } + #[inline] + fn on_event(&self, meta: &M, event: &E) { + self.0.on_event(meta, event); + self.1.on_event(meta, event); + } + #[inline] + fn on_connection_event( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &E, + ) { + self.0.on_connection_event(&context.0, meta, event); + self.1.on_connection_event(&context.1, meta, event); + } + #[inline] + fn query( + context: &Self::ConnectionContext, + query: &mut dyn query::Query, + ) -> query::ControlFlow { + query + .execute(context) + .and_then(|| A::query(&context.0, query)) + .and_then(|| B::query(&context.1, query)) + } + #[inline] + fn query_mut( + context: &mut Self::ConnectionContext, + query: &mut dyn query::QueryMut, + ) -> query::ControlFlow { + query + .execute_mut(context) + .and_then(|| A::query_mut(&mut context.0, query)) + .and_then(|| B::query_mut(&mut context.1, query)) + } + } + pub trait EndpointPublisher { + #[doc = r" Returns the QUIC version, if any"] + fn quic_version(&self) -> Option; + } + pub struct EndpointPublisherSubscriber<'a, Sub: Subscriber> { + meta: EndpointMeta, + quic_version: Option, + subscriber: &'a mut Sub, + } + impl<'a, Sub: Subscriber> fmt::Debug for EndpointPublisherSubscriber<'a, Sub> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ConnectionPublisherSubscriber") + .field("meta", &self.meta) + .field("quic_version", &self.quic_version) + .finish() + } + } + impl<'a, Sub: Subscriber> EndpointPublisherSubscriber<'a, Sub> { + #[inline] + pub fn new( + meta: builder::EndpointMeta, + quic_version: Option, + subscriber: &'a mut Sub, + ) -> Self { + Self { + meta: meta.into_event(), + quic_version, + subscriber, + } + } + } + impl<'a, Sub: Subscriber> EndpointPublisher for EndpointPublisherSubscriber<'a, Sub> { + #[inline] + fn quic_version(&self) -> Option { + self.quic_version + } + } + pub trait ConnectionPublisher { + #[doc = "Publishes a `FrameSent` event to the publisher's subscriber"] + fn on_frame_sent(&self, event: builder::FrameSent); + #[doc = r" Returns the QUIC version negotiated for the current connection, if any"] + fn quic_version(&self) -> u32; + #[doc = r" Returns the [`Subject`] for the current publisher"] + fn subject(&self) -> Subject; + } + pub struct ConnectionPublisherSubscriber<'a, Sub: Subscriber> { + meta: ConnectionMeta, + quic_version: u32, + subscriber: &'a Sub, + context: &'a Sub::ConnectionContext, + } + impl<'a, Sub: Subscriber> fmt::Debug for ConnectionPublisherSubscriber<'a, Sub> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ConnectionPublisherSubscriber") + .field("meta", &self.meta) + .field("quic_version", &self.quic_version) + .finish() + } + } + impl<'a, Sub: Subscriber> ConnectionPublisherSubscriber<'a, Sub> { + #[inline] + pub fn new( + meta: builder::ConnectionMeta, + quic_version: u32, + subscriber: &'a Sub, + context: &'a Sub::ConnectionContext, + ) -> Self { + Self { + meta: meta.into_event(), + quic_version, + subscriber, + context, + } + } + } + impl<'a, Sub: Subscriber> ConnectionPublisher for ConnectionPublisherSubscriber<'a, Sub> { + #[inline] + fn on_frame_sent(&self, event: builder::FrameSent) { + let event = event.into_event(); + self.subscriber + .on_frame_sent(self.context, &self.meta, &event); + self.subscriber + .on_connection_event(self.context, &self.meta, &event); + self.subscriber.on_event(&self.meta, &event); + } + #[inline] + fn quic_version(&self) -> u32 { + self.quic_version + } + #[inline] + fn subject(&self) -> api::Subject { + self.meta.subject() + } + } + } + #[cfg(any(test, feature = "testing"))] + pub mod testing { + use super::*; + #[derive(Clone, Debug)] + pub struct Subscriber { + location: Option, + output: Arc>>, + pub frame_sent: Arc, + } + impl Drop for Subscriber { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + if let Some(location) = self.location.as_ref() { + location.snapshot(&self.output.lock().unwrap()); + } + } + } + impl Subscriber { + #[doc = r" Creates a subscriber with snapshot assertions enabled"] + #[track_caller] + pub fn snapshot() -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Location::try_new(); + sub + } + #[doc = r" Creates a subscriber with snapshot assertions disabled"] + pub fn no_snapshot() -> Self { + Self { + location: None, + output: Default::default(), + frame_sent: Arc::new(AtomicU32::new(0)), + } + } + } + impl super::Subscriber for Subscriber { + type ConnectionContext = (); + fn create_connection_context( + &self, + _meta: &api::ConnectionMeta, + _info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + } + fn on_frame_sent( + &self, + _context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::FrameSent, + ) { + self.frame_sent.fetch_add(1, Ordering::SeqCst); + if self.location.is_some() { + self.output + .lock() + .unwrap() + .push(format!("{meta:?} {event:?}")); + } + } + } + #[derive(Clone, Debug)] + pub struct Publisher { + location: Option, + output: Arc>>, + pub frame_sent: Arc, + } + impl Publisher { + #[doc = r" Creates a publisher with snapshot assertions enabled"] + #[track_caller] + pub fn snapshot() -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Location::try_new(); + sub + } + #[doc = r" Creates a publisher with snapshot assertions disabled"] + pub fn no_snapshot() -> Self { + Self { + location: None, + output: Default::default(), + frame_sent: Arc::new(AtomicU32::new(0)), + } + } + } + impl super::EndpointPublisher for Publisher { + fn quic_version(&self) -> Option { + Some(1) + } + } + impl super::ConnectionPublisher for Publisher { + fn on_frame_sent(&self, event: builder::FrameSent) { + self.frame_sent.fetch_add(1, Ordering::SeqCst); + let event = event.into_event(); + if self.location.is_some() { + self.output.lock().unwrap().push(format!("{event:?}")); + } + } + fn quic_version(&self) -> u32 { + 1 + } + fn subject(&self) -> api::Subject { + api::Subject::Connection { id: 0 } + } + } + impl Drop for Publisher { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + if let Some(location) = self.location.as_ref() { + location.snapshot(&self.output.lock().unwrap()); + } + } + } + #[derive(Clone, Debug)] + struct Location(&'static core::panic::Location<'static>); + impl Location { + #[track_caller] + fn try_new() -> Option { + let thread = std::thread::current(); + if thread.name().map_or(false, |name| name != "main") { + Some(Self(core::panic::Location::caller())) + } else { + None + } + } + fn snapshot(&self, output: &[String]) { + if cfg!(miri) { + return; + } + use std::path::{Component, Path}; + let value = output.join("\n"); + let thread = std::thread::current(); + let function_name = thread.name().unwrap(); + let test_path = Path::new(self.0.file().trim_end_matches(".rs")); + let module_path = test_path + .components() + .filter_map(|comp| match comp { + Component::Normal(comp) => comp.to_str(), + _ => Some("_"), + }) + .chain(Some("events")) + .collect::>() + .join("::"); + let current_dir = std::env::current_dir().unwrap(); + insta::_macro_support::assert_snapshot( + insta::_macro_support::AutoName.into(), + &value, + current_dir.to_str().unwrap(), + function_name, + &module_path, + self.0.file(), + self.0.line(), + "", + ) + .unwrap() + } + } + } + } + #[cfg(feature = "event-tracing")] + pub mod tracing { + #![doc = r" This module contains event integration with [`tracing`](https://docs.rs/tracing)"] + use super::api; + #[doc = r" Emits events with [`tracing`](https://docs.rs/tracing)"] + #[derive(Clone, Debug)] + pub struct Subscriber { + client: tracing::Span, + server: tracing::Span, + } + impl Default for Subscriber { + fn default() -> Self { + let root = tracing :: span ! (target : "s2n_quic" , tracing :: Level :: DEBUG , "s2n_quic"); + let client = tracing :: span ! (parent : root . id () , tracing :: Level :: DEBUG , "client"); + let server = tracing :: span ! (parent : root . id () , tracing :: Level :: DEBUG , "server"); + Self { client, server } + } + } + impl super::Subscriber for Subscriber { + type ConnectionContext = tracing::Span; + fn create_connection_context( + &self, + meta: &api::ConnectionMeta, + _info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + let parent = match meta.endpoint_type { + api::EndpointType::Client {} => self.client.id(), + api::EndpointType::Server {} => self.server.id(), + }; + tracing :: span ! (target : "s2n_quic" , parent : parent , tracing :: Level :: DEBUG , "conn" , id = meta . id) + } + #[inline] + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + _meta: &api::ConnectionMeta, + event: &api::FrameSent, + ) { + let id = context.id(); + let api::FrameSent { + packet_header, + path_id, + frame, + } = event; + tracing :: event ! (target : "frame_sent" , parent : id , tracing :: Level :: DEBUG , packet_header = tracing :: field :: debug (packet_header) , path_id = tracing :: field :: debug (path_id) , frame = tracing :: field :: debug (frame)); + } + } + } + pub mod builder { + use super::*; + #[derive(Clone, Debug)] + #[doc = " Frame was sent"] + pub struct FrameSent { + pub packet_header: PacketHeader, + pub path_id: u64, + pub frame: Frame, + } + impl IntoEvent for FrameSent { + #[inline] + fn into_event(self) -> api::FrameSent { + let FrameSent { + packet_header, + path_id, + frame, + } = self; + api::FrameSent { + packet_header: packet_header.into_event(), + path_id: path_id.into_event(), + frame: frame.into_event(), + } + } + } + } + pub mod supervisor { + #![doc = r" This module contains the `supervisor::Outcome` and `supervisor::Context` for use"] + #![doc = r" when implementing [`Subscriber::supervisor_timeout`](crate::event::Subscriber::supervisor_timeout) and"] + #![doc = r" [`Subscriber::on_supervisor_timeout`](crate::event::Subscriber::on_supervisor_timeout)"] + #![doc = r" on a Subscriber."] + use crate::{ + application, + event::{builder::SocketAddress, IntoEvent}, + }; + #[non_exhaustive] + #[derive(Clone, Debug, Eq, PartialEq)] + pub enum Outcome { + #[doc = r" Allow the connection to remain open"] + Continue, + #[doc = r" Close the connection and notify the peer"] + Close { error_code: application::Error }, + #[doc = r" Close the connection without notifying the peer"] + ImmediateClose { reason: &'static str }, + } + impl Default for Outcome { + fn default() -> Self { + Self::Continue + } + } + #[non_exhaustive] + #[derive(Debug)] + pub struct Context<'a> { + #[doc = r" Number of handshakes that have begun but not completed"] + pub inflight_handshakes: usize, + #[doc = r" Number of open connections"] + pub connection_count: usize, + #[doc = r" The address of the peer"] + pub remote_address: SocketAddress<'a>, + #[doc = r" True if the connection is in the handshake state, false otherwise"] + pub is_handshaking: bool, + } + impl<'a> Context<'a> { + pub fn new( + inflight_handshakes: usize, + connection_count: usize, + remote_address: &'a crate::inet::SocketAddress, + is_handshaking: bool, + ) -> Self { + Self { + inflight_handshakes, + connection_count, + remote_address: remote_address.into_event(), + is_handshaking, + } + } + } + } + pub use traits::*; + mod traits { + use super::*; + use crate::query; + use api::*; + use core::fmt; + #[doc = r" Provides metadata related to an event"] + pub trait Meta: fmt::Debug { + #[doc = r" Returns whether the local endpoint is a Client or Server"] + fn endpoint_type(&self) -> &EndpointType; + #[doc = r" A context from which the event is being emitted"] + #[doc = r""] + #[doc = r" An event can occur in the context of an Endpoint or Connection"] + fn subject(&self) -> Subject; + #[doc = r" The time the event occurred"] + fn timestamp(&self) -> &crate::event::Timestamp; + } + impl Meta for ConnectionMeta { + fn endpoint_type(&self) -> &EndpointType { + &self.endpoint_type + } + fn subject(&self) -> Subject { + Subject::Connection { id: self.id } + } + fn timestamp(&self) -> &crate::event::Timestamp { + &self.timestamp + } + } + impl Meta for EndpointMeta { + fn endpoint_type(&self) -> &EndpointType { + &self.endpoint_type + } + fn subject(&self) -> Subject { + Subject::Endpoint {} + } + fn timestamp(&self) -> &crate::event::Timestamp { + &self.timestamp + } + } + #[doc = r" Allows for events to be subscribed to"] + pub trait Subscriber: 'static + Send { + #[doc = r" An application provided type associated with each connection."] + #[doc = r""] + #[doc = r" The context provides a mechanism for applications to provide a custom type"] + #[doc = r" and update it on each event, e.g. computing statistics. Each event"] + #[doc = r" invocation (e.g. [`Subscriber::on_packet_sent`]) also provides mutable"] + #[doc = r" access to the context `&mut ConnectionContext` and allows for updating the"] + #[doc = r" context."] + #[doc = r""] + #[doc = r" ```no_run"] + #[doc = r" # mod s2n_quic { pub mod provider { pub mod event {"] + #[doc = r" # pub use s2n_quic_core::event::{api as events, api::ConnectionInfo, api::ConnectionMeta, Subscriber};"] + #[doc = r" # }}}"] + #[doc = r" use s2n_quic::provider::event::{"] + #[doc = r" ConnectionInfo, ConnectionMeta, Subscriber, events::PacketSent"] + #[doc = r" };"] + #[doc = r""] + #[doc = r" pub struct MyEventSubscriber;"] + #[doc = r""] + #[doc = r" pub struct MyEventContext {"] + #[doc = r" packet_sent: u64,"] + #[doc = r" }"] + #[doc = r""] + #[doc = r" impl Subscriber for MyEventSubscriber {"] + #[doc = r" type ConnectionContext = MyEventContext;"] + #[doc = r""] + #[doc = r" fn create_connection_context("] + #[doc = r" &mut self, _meta: &ConnectionMeta,"] + #[doc = r" _info: &ConnectionInfo,"] + #[doc = r" ) -> Self::ConnectionContext {"] + #[doc = r" MyEventContext { packet_sent: 0 }"] + #[doc = r" }"] + #[doc = r""] + #[doc = r" fn on_packet_sent("] + #[doc = r" &mut self,"] + #[doc = r" context: &mut Self::ConnectionContext,"] + #[doc = r" _meta: &ConnectionMeta,"] + #[doc = r" _event: &PacketSent,"] + #[doc = r" ) {"] + #[doc = r" context.packet_sent += 1;"] + #[doc = r" }"] + #[doc = r" }"] + #[doc = r" ```"] + type ConnectionContext: 'static + Send; + #[doc = r" Creates a context to be passed to each connection-related event"] + fn create_connection_context( + &self, + meta: &ConnectionMeta, + info: &ConnectionInfo, + ) -> Self::ConnectionContext; + #[doc = r" The period at which `on_supervisor_timeout` is called"] + #[doc = r""] + #[doc = r" If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout`"] + #[doc = r" across all `event::Subscriber`s will be used."] + #[doc = r""] + #[doc = r" If the `supervisor_timeout()` is `None` across all `event::Subscriber`s, connection supervision"] + #[doc = r" will cease for the remaining lifetime of the connection and `on_supervisor_timeout` will no longer"] + #[doc = r" be called."] + #[doc = r""] + #[doc = r" It is recommended to avoid setting this value less than ~100ms, as short durations"] + #[doc = r" may lead to higher CPU utilization."] + #[allow(unused_variables)] + fn supervisor_timeout( + &self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> Option { + None + } + #[doc = r" Called for each `supervisor_timeout` to determine any action to take on the connection based on the `supervisor::Outcome`"] + #[doc = r""] + #[doc = r" If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout`"] + #[doc = r" across all `event::Subscriber`s will be used, and thus `on_supervisor_timeout` may be called"] + #[doc = r" earlier than the `supervisor_timeout` for a given `event::Subscriber` implementation."] + #[allow(unused_variables)] + fn on_supervisor_timeout( + &self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> supervisor::Outcome { + supervisor::Outcome::default() + } + #[doc = "Called when the `FrameSent` event is triggered"] + #[inline] + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &FrameSent, + ) { + let _ = context; + let _ = meta; + let _ = event; + } + #[doc = r" Called for each event that relates to the endpoint and all connections"] + #[inline] + fn on_event(&self, meta: &M, event: &E) { + let _ = meta; + let _ = event; + } + #[doc = r" Called for each event that relates to a connection"] + #[inline] + fn on_connection_event( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &E, + ) { + let _ = context; + let _ = meta; + let _ = event; + } + #[doc = r" Used for querying the `Subscriber::ConnectionContext` on a Subscriber"] + #[inline] + fn query( + context: &Self::ConnectionContext, + query: &mut dyn query::Query, + ) -> query::ControlFlow { + query.execute(context) + } + #[doc = r" Used for querying and mutating the `Subscriber::ConnectionContext` on a Subscriber"] + #[inline] + fn query_mut( + context: &mut Self::ConnectionContext, + query: &mut dyn query::QueryMut, + ) -> query::ControlFlow { + query.execute_mut(context) + } + } + #[doc = r" Subscriber is implemented for a 2-element tuple to make it easy to compose multiple"] + #[doc = r" subscribers."] + impl Subscriber for (A, B) + where + A: Subscriber, + B: Subscriber, + { + type ConnectionContext = (A::ConnectionContext, B::ConnectionContext); + #[inline] + fn create_connection_context( + &self, + meta: &ConnectionMeta, + info: &ConnectionInfo, + ) -> Self::ConnectionContext { + ( + self.0.create_connection_context(meta, info), + self.1.create_connection_context(meta, info), + ) + } + #[inline] + fn supervisor_timeout( + &self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> Option { + let timeout_a = self + .0 + .supervisor_timeout(&mut conn_context.0, meta, context); + let timeout_b = self + .1 + .supervisor_timeout(&mut conn_context.1, meta, context); + match (timeout_a, timeout_b) { + (None, None) => None, + (None, Some(timeout)) | (Some(timeout), None) => Some(timeout), + (Some(a), Some(b)) => Some(a.min(b)), + } + } + #[inline] + fn on_supervisor_timeout( + &self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> supervisor::Outcome { + let outcome_a = self + .0 + .on_supervisor_timeout(&mut conn_context.0, meta, context); + let outcome_b = self + .1 + .on_supervisor_timeout(&mut conn_context.1, meta, context); + match (outcome_a, outcome_b) { + (supervisor::Outcome::ImmediateClose { reason }, _) + | (_, supervisor::Outcome::ImmediateClose { reason }) => { + supervisor::Outcome::ImmediateClose { reason } + } + (supervisor::Outcome::Close { error_code }, _) + | (_, supervisor::Outcome::Close { error_code }) => { + supervisor::Outcome::Close { error_code } + } + _ => supervisor::Outcome::Continue, + } + } + #[inline] + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &FrameSent, + ) { + (self.0).on_frame_sent(&context.0, meta, event); + (self.1).on_frame_sent(&context.1, meta, event); + } + #[inline] + fn on_event(&self, meta: &M, event: &E) { + self.0.on_event(meta, event); + self.1.on_event(meta, event); + } + #[inline] + fn on_connection_event( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &E, + ) { + self.0.on_connection_event(&context.0, meta, event); + self.1.on_connection_event(&context.1, meta, event); + } + #[inline] + fn query( + context: &Self::ConnectionContext, + query: &mut dyn query::Query, + ) -> query::ControlFlow { + query + .execute(context) + .and_then(|| A::query(&context.0, query)) + .and_then(|| B::query(&context.1, query)) + } + #[inline] + fn query_mut( + context: &mut Self::ConnectionContext, + query: &mut dyn query::QueryMut, + ) -> query::ControlFlow { + query + .execute_mut(context) + .and_then(|| A::query_mut(&mut context.0, query)) + .and_then(|| B::query_mut(&mut context.1, query)) + } + } + pub trait EndpointPublisher { + #[doc = r" Returns the QUIC version, if any"] + fn quic_version(&self) -> Option; + } + pub struct EndpointPublisherSubscriber<'a, Sub: Subscriber> { + meta: EndpointMeta, + quic_version: Option, + subscriber: &'a mut Sub, + } + impl<'a, Sub: Subscriber> fmt::Debug for EndpointPublisherSubscriber<'a, Sub> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ConnectionPublisherSubscriber") + .field("meta", &self.meta) + .field("quic_version", &self.quic_version) + .finish() + } + } + impl<'a, Sub: Subscriber> EndpointPublisherSubscriber<'a, Sub> { + #[inline] + pub fn new( + meta: builder::EndpointMeta, + quic_version: Option, + subscriber: &'a mut Sub, + ) -> Self { + Self { + meta: meta.into_event(), + quic_version, + subscriber, + } + } + } + impl<'a, Sub: Subscriber> EndpointPublisher for EndpointPublisherSubscriber<'a, Sub> { + #[inline] + fn quic_version(&self) -> Option { + self.quic_version + } + } + pub trait ConnectionPublisher { + #[doc = "Publishes a `FrameSent` event to the publisher's subscriber"] + fn on_frame_sent(&self, event: builder::FrameSent); + #[doc = r" Returns the QUIC version negotiated for the current connection, if any"] + fn quic_version(&self) -> u32; + #[doc = r" Returns the [`Subject`] for the current publisher"] + fn subject(&self) -> Subject; + } + pub struct ConnectionPublisherSubscriber<'a, Sub: Subscriber> { + meta: ConnectionMeta, + quic_version: u32, + subscriber: &'a Sub, + context: &'a Sub::ConnectionContext, + } + impl<'a, Sub: Subscriber> fmt::Debug for ConnectionPublisherSubscriber<'a, Sub> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ConnectionPublisherSubscriber") + .field("meta", &self.meta) + .field("quic_version", &self.quic_version) + .finish() + } + } + impl<'a, Sub: Subscriber> ConnectionPublisherSubscriber<'a, Sub> { + #[inline] + pub fn new( + meta: builder::ConnectionMeta, + quic_version: u32, + subscriber: &'a Sub, + context: &'a Sub::ConnectionContext, + ) -> Self { + Self { + meta: meta.into_event(), + quic_version, + subscriber, + context, + } + } + } + impl<'a, Sub: Subscriber> ConnectionPublisher for ConnectionPublisherSubscriber<'a, Sub> { + #[inline] + fn on_frame_sent(&self, event: builder::FrameSent) { + let event = event.into_event(); + self.subscriber + .on_frame_sent(self.context, &self.meta, &event); + self.subscriber + .on_connection_event(self.context, &self.meta, &event); + self.subscriber.on_event(&self.meta, &event); + } + #[inline] + fn quic_version(&self) -> u32 { + self.quic_version + } + #[inline] + fn subject(&self) -> api::Subject { + self.meta.subject() + } + } + } + #[cfg(any(test, feature = "testing"))] + pub mod testing { + use super::*; + #[derive(Clone, Debug)] + pub struct Subscriber { + location: Option, + output: Arc>>, + pub frame_sent: Arc, + } + impl Drop for Subscriber { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + if let Some(location) = self.location.as_ref() { + location.snapshot(&self.output.lock().unwrap()); + } + } + } + impl Subscriber { + #[doc = r" Creates a subscriber with snapshot assertions enabled"] + #[track_caller] + pub fn snapshot() -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Location::try_new(); + sub + } + #[doc = r" Creates a subscriber with snapshot assertions disabled"] + pub fn no_snapshot() -> Self { + Self { + location: None, + output: Default::default(), + frame_sent: Arc::new(AtomicU32::new(0)), + } + } + } + impl super::Subscriber for Subscriber { + type ConnectionContext = (); + fn create_connection_context( + &self, + _meta: &api::ConnectionMeta, + _info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + } + fn on_frame_sent( + &self, + _context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::FrameSent, + ) { + self.frame_sent.fetch_add(1, Ordering::SeqCst); + if self.location.is_some() { + self.output + .lock() + .unwrap() + .push(format!("{meta:?} {event:?}")); + } + } + } + #[derive(Clone, Debug)] + pub struct Publisher { + location: Option, + output: Arc>>, + pub frame_sent: Arc, + } + impl Publisher { + #[doc = r" Creates a publisher with snapshot assertions enabled"] + #[track_caller] + pub fn snapshot() -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Location::try_new(); + sub + } + #[doc = r" Creates a publisher with snapshot assertions disabled"] + pub fn no_snapshot() -> Self { + Self { + location: None, + output: Default::default(), + frame_sent: Arc::new(AtomicU32::new(0)), + } + } + } + impl super::EndpointPublisher for Publisher { + fn quic_version(&self) -> Option { + Some(1) + } + } + impl super::ConnectionPublisher for Publisher { + fn on_frame_sent(&self, event: builder::FrameSent) { + self.frame_sent.fetch_add(1, Ordering::SeqCst); + let event = event.into_event(); + if self.location.is_some() { + self.output.lock().unwrap().push(format!("{event:?}")); + } + } + fn quic_version(&self) -> u32 { + 1 + } + fn subject(&self) -> api::Subject { + api::Subject::Connection { id: 0 } + } + } + impl Drop for Publisher { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + if let Some(location) = self.location.as_ref() { + location.snapshot(&self.output.lock().unwrap()); + } + } + } + #[derive(Clone, Debug)] + struct Location(&'static core::panic::Location<'static>); + impl Location { + #[track_caller] + fn try_new() -> Option { + let thread = std::thread::current(); + if thread.name().map_or(false, |name| name != "main") { + Some(Self(core::panic::Location::caller())) + } else { + None + } + } + fn snapshot(&self, output: &[String]) { + if cfg!(miri) { + return; + } + use std::path::{Component, Path}; + let value = output.join("\n"); + let thread = std::thread::current(); + let function_name = thread.name().unwrap(); + let test_path = Path::new(self.0.file().trim_end_matches(".rs")); + let module_path = test_path + .components() + .filter_map(|comp| match comp { + Component::Normal(comp) => comp.to_str(), + _ => Some("_"), + }) + .chain(Some("events")) + .collect::>() + .join("::"); + let current_dir = std::env::current_dir().unwrap(); + insta::_macro_support::assert_snapshot( + insta::_macro_support::AutoName.into(), + &value, + current_dir.to_str().unwrap(), + function_name, + &module_path, + self.0.file(), + self.0.line(), + "", + ) + .unwrap() + } + } + } +} +#[cfg(feature = "event-tracing")] +pub mod tracing { + #![doc = r" This module contains event integration with [`tracing`](https://docs.rs/tracing)"] + use super::api; + #[doc = r" Emits events with [`tracing`](https://docs.rs/tracing)"] + #[derive(Clone, Debug)] + pub struct Subscriber { + client: tracing::Span, + server: tracing::Span, + } + impl Default for Subscriber { + fn default() -> Self { + let root = + tracing :: span ! (target : "s2n_quic" , tracing :: Level :: DEBUG , "s2n_quic"); + let client = + tracing :: span ! (parent : root . id () , tracing :: Level :: DEBUG , "client"); + let server = + tracing :: span ! (parent : root . id () , tracing :: Level :: DEBUG , "server"); + Self { client, server } + } + } + impl super::Subscriber for Subscriber { + type ConnectionContext = tracing::Span; + fn create_connection_context( + &self, + meta: &api::ConnectionMeta, + _info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + let parent = match meta.endpoint_type { + api::EndpointType::Client {} => self.client.id(), + api::EndpointType::Server {} => self.server.id(), + }; + tracing :: span ! (target : "s2n_quic" , parent : parent , tracing :: Level :: DEBUG , "conn" , id = meta . id) + } + #[inline] + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + _meta: &api::ConnectionMeta, + event: &api::FrameSent, + ) { + let id = context.id(); + let api::FrameSent { + packet_header, + path_id, + frame, + } = event; + tracing :: event ! (target : "frame_sent" , parent : id , tracing :: Level :: DEBUG , packet_header = tracing :: field :: debug (packet_header) , path_id = tracing :: field :: debug (path_id) , frame = tracing :: field :: debug (frame)); + } + } +} +pub mod builder { + use super::*; + #[derive(Clone, Debug)] + #[doc = " Frame was sent"] + pub struct FrameSent { + pub packet_header: PacketHeader, + pub path_id: u64, + pub frame: Frame, + } + impl IntoEvent for FrameSent { + #[inline] + fn into_event(self) -> api::FrameSent { + let FrameSent { + packet_header, + path_id, + frame, + } = self; + api::FrameSent { + packet_header: packet_header.into_event(), + path_id: path_id.into_event(), + frame: frame.into_event(), + } + } + } +} +pub mod supervisor { + #![doc = r" This module contains the `supervisor::Outcome` and `supervisor::Context` for use"] + #![doc = r" when implementing [`Subscriber::supervisor_timeout`](crate::event::Subscriber::supervisor_timeout) and"] + #![doc = r" [`Subscriber::on_supervisor_timeout`](crate::event::Subscriber::on_supervisor_timeout)"] + #![doc = r" on a Subscriber."] + use crate::{ + application, + event::{builder::SocketAddress, IntoEvent}, + }; + #[non_exhaustive] + #[derive(Clone, Debug, Eq, PartialEq)] + pub enum Outcome { + #[doc = r" Allow the connection to remain open"] + Continue, + #[doc = r" Close the connection and notify the peer"] + Close { error_code: application::Error }, + #[doc = r" Close the connection without notifying the peer"] + ImmediateClose { reason: &'static str }, + } + impl Default for Outcome { + fn default() -> Self { + Self::Continue + } + } + #[non_exhaustive] + #[derive(Debug)] + pub struct Context<'a> { + #[doc = r" Number of handshakes that have begun but not completed"] + pub inflight_handshakes: usize, + #[doc = r" Number of open connections"] + pub connection_count: usize, + #[doc = r" The address of the peer"] + pub remote_address: SocketAddress<'a>, + #[doc = r" True if the connection is in the handshake state, false otherwise"] + pub is_handshaking: bool, + } + impl<'a> Context<'a> { + pub fn new( + inflight_handshakes: usize, + connection_count: usize, + remote_address: &'a crate::inet::SocketAddress, + is_handshaking: bool, + ) -> Self { + Self { + inflight_handshakes, + connection_count, + remote_address: remote_address.into_event(), + is_handshaking, + } + } + } +} +pub use traits::*; +mod traits { + use super::*; + use crate::query; + use api::*; + use core::fmt; + #[doc = r" Provides metadata related to an event"] + pub trait Meta: fmt::Debug { + #[doc = r" Returns whether the local endpoint is a Client or Server"] + fn endpoint_type(&self) -> &EndpointType; + #[doc = r" A context from which the event is being emitted"] + #[doc = r""] + #[doc = r" An event can occur in the context of an Endpoint or Connection"] + fn subject(&self) -> Subject; + #[doc = r" The time the event occurred"] + fn timestamp(&self) -> &crate::event::Timestamp; + } + impl Meta for ConnectionMeta { + fn endpoint_type(&self) -> &EndpointType { + &self.endpoint_type + } + fn subject(&self) -> Subject { + Subject::Connection { id: self.id } + } + fn timestamp(&self) -> &crate::event::Timestamp { + &self.timestamp + } + } + impl Meta for EndpointMeta { + fn endpoint_type(&self) -> &EndpointType { + &self.endpoint_type + } + fn subject(&self) -> Subject { + Subject::Endpoint {} + } + fn timestamp(&self) -> &crate::event::Timestamp { + &self.timestamp + } + } + #[doc = r" Allows for events to be subscribed to"] + pub trait Subscriber: 'static + Send { + #[doc = r" An application provided type associated with each connection."] + #[doc = r""] + #[doc = r" The context provides a mechanism for applications to provide a custom type"] + #[doc = r" and update it on each event, e.g. computing statistics. Each event"] + #[doc = r" invocation (e.g. [`Subscriber::on_packet_sent`]) also provides mutable"] + #[doc = r" access to the context `&mut ConnectionContext` and allows for updating the"] + #[doc = r" context."] + #[doc = r""] + #[doc = r" ```no_run"] + #[doc = r" # mod s2n_quic { pub mod provider { pub mod event {"] + #[doc = r" # pub use s2n_quic_core::event::{api as events, api::ConnectionInfo, api::ConnectionMeta, Subscriber};"] + #[doc = r" # }}}"] + #[doc = r" use s2n_quic::provider::event::{"] + #[doc = r" ConnectionInfo, ConnectionMeta, Subscriber, events::PacketSent"] + #[doc = r" };"] + #[doc = r""] + #[doc = r" pub struct MyEventSubscriber;"] + #[doc = r""] + #[doc = r" pub struct MyEventContext {"] + #[doc = r" packet_sent: u64,"] + #[doc = r" }"] + #[doc = r""] + #[doc = r" impl Subscriber for MyEventSubscriber {"] + #[doc = r" type ConnectionContext = MyEventContext;"] + #[doc = r""] + #[doc = r" fn create_connection_context("] + #[doc = r" &mut self, _meta: &ConnectionMeta,"] + #[doc = r" _info: &ConnectionInfo,"] + #[doc = r" ) -> Self::ConnectionContext {"] + #[doc = r" MyEventContext { packet_sent: 0 }"] + #[doc = r" }"] + #[doc = r""] + #[doc = r" fn on_packet_sent("] + #[doc = r" &mut self,"] + #[doc = r" context: &mut Self::ConnectionContext,"] + #[doc = r" _meta: &ConnectionMeta,"] + #[doc = r" _event: &PacketSent,"] + #[doc = r" ) {"] + #[doc = r" context.packet_sent += 1;"] + #[doc = r" }"] + #[doc = r" }"] + #[doc = r" ```"] + type ConnectionContext: 'static + Send; + #[doc = r" Creates a context to be passed to each connection-related event"] + fn create_connection_context( + &self, + meta: &ConnectionMeta, + info: &ConnectionInfo, + ) -> Self::ConnectionContext; + #[doc = r" The period at which `on_supervisor_timeout` is called"] + #[doc = r""] + #[doc = r" If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout`"] + #[doc = r" across all `event::Subscriber`s will be used."] + #[doc = r""] + #[doc = r" If the `supervisor_timeout()` is `None` across all `event::Subscriber`s, connection supervision"] + #[doc = r" will cease for the remaining lifetime of the connection and `on_supervisor_timeout` will no longer"] + #[doc = r" be called."] + #[doc = r""] + #[doc = r" It is recommended to avoid setting this value less than ~100ms, as short durations"] + #[doc = r" may lead to higher CPU utilization."] + #[allow(unused_variables)] + fn supervisor_timeout( + &self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> Option { + None + } + #[doc = r" Called for each `supervisor_timeout` to determine any action to take on the connection based on the `supervisor::Outcome`"] + #[doc = r""] + #[doc = r" If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout`"] + #[doc = r" across all `event::Subscriber`s will be used, and thus `on_supervisor_timeout` may be called"] + #[doc = r" earlier than the `supervisor_timeout` for a given `event::Subscriber` implementation."] + #[allow(unused_variables)] + fn on_supervisor_timeout( + &self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> supervisor::Outcome { + supervisor::Outcome::default() + } + #[doc = "Called when the `FrameSent` event is triggered"] + #[inline] + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &FrameSent, + ) { + let _ = context; + let _ = meta; + let _ = event; + } + #[doc = r" Called for each event that relates to the endpoint and all connections"] + #[inline] + fn on_event(&self, meta: &M, event: &E) { + let _ = meta; + let _ = event; + } + #[doc = r" Called for each event that relates to a connection"] + #[inline] + fn on_connection_event( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &E, + ) { + let _ = context; + let _ = meta; + let _ = event; + } + #[doc = r" Used for querying the `Subscriber::ConnectionContext` on a Subscriber"] + #[inline] + fn query( + context: &Self::ConnectionContext, + query: &mut dyn query::Query, + ) -> query::ControlFlow { + query.execute(context) + } + #[doc = r" Used for querying and mutating the `Subscriber::ConnectionContext` on a Subscriber"] + #[inline] + fn query_mut( + context: &mut Self::ConnectionContext, + query: &mut dyn query::QueryMut, + ) -> query::ControlFlow { + query.execute_mut(context) + } + } + #[doc = r" Subscriber is implemented for a 2-element tuple to make it easy to compose multiple"] + #[doc = r" subscribers."] + impl Subscriber for (A, B) + where + A: Subscriber, + B: Subscriber, + { + type ConnectionContext = (A::ConnectionContext, B::ConnectionContext); + #[inline] + fn create_connection_context( + &self, + meta: &ConnectionMeta, + info: &ConnectionInfo, + ) -> Self::ConnectionContext { + ( + self.0.create_connection_context(meta, info), + self.1.create_connection_context(meta, info), + ) + } + #[inline] + fn supervisor_timeout( + &self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> Option { + let timeout_a = self + .0 + .supervisor_timeout(&mut conn_context.0, meta, context); + let timeout_b = self + .1 + .supervisor_timeout(&mut conn_context.1, meta, context); + match (timeout_a, timeout_b) { + (None, None) => None, + (None, Some(timeout)) | (Some(timeout), None) => Some(timeout), + (Some(a), Some(b)) => Some(a.min(b)), + } + } + #[inline] + fn on_supervisor_timeout( + &self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> supervisor::Outcome { + let outcome_a = self + .0 + .on_supervisor_timeout(&mut conn_context.0, meta, context); + let outcome_b = self + .1 + .on_supervisor_timeout(&mut conn_context.1, meta, context); + match (outcome_a, outcome_b) { + (supervisor::Outcome::ImmediateClose { reason }, _) + | (_, supervisor::Outcome::ImmediateClose { reason }) => { + supervisor::Outcome::ImmediateClose { reason } + } + (supervisor::Outcome::Close { error_code }, _) + | (_, supervisor::Outcome::Close { error_code }) => { + supervisor::Outcome::Close { error_code } + } + _ => supervisor::Outcome::Continue, + } + } + #[inline] + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &FrameSent, + ) { + (self.0).on_frame_sent(&context.0, meta, event); + (self.1).on_frame_sent(&context.1, meta, event); + } + #[inline] + fn on_event(&self, meta: &M, event: &E) { + self.0.on_event(meta, event); + self.1.on_event(meta, event); + } + #[inline] + fn on_connection_event( + &self, + context: &Self::ConnectionContext, + meta: &ConnectionMeta, + event: &E, + ) { + self.0.on_connection_event(&context.0, meta, event); + self.1.on_connection_event(&context.1, meta, event); + } + #[inline] + fn query( + context: &Self::ConnectionContext, + query: &mut dyn query::Query, + ) -> query::ControlFlow { + query + .execute(context) + .and_then(|| A::query(&context.0, query)) + .and_then(|| B::query(&context.1, query)) + } + #[inline] + fn query_mut( + context: &mut Self::ConnectionContext, + query: &mut dyn query::QueryMut, + ) -> query::ControlFlow { + query + .execute_mut(context) + .and_then(|| A::query_mut(&mut context.0, query)) + .and_then(|| B::query_mut(&mut context.1, query)) + } + } + pub trait EndpointPublisher { + #[doc = r" Returns the QUIC version, if any"] + fn quic_version(&self) -> Option; + } + pub struct EndpointPublisherSubscriber<'a, Sub: Subscriber> { + meta: EndpointMeta, + quic_version: Option, + subscriber: &'a mut Sub, + } + impl<'a, Sub: Subscriber> fmt::Debug for EndpointPublisherSubscriber<'a, Sub> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ConnectionPublisherSubscriber") + .field("meta", &self.meta) + .field("quic_version", &self.quic_version) + .finish() + } + } + impl<'a, Sub: Subscriber> EndpointPublisherSubscriber<'a, Sub> { + #[inline] + pub fn new( + meta: builder::EndpointMeta, + quic_version: Option, + subscriber: &'a mut Sub, + ) -> Self { + Self { + meta: meta.into_event(), + quic_version, + subscriber, + } + } + } + impl<'a, Sub: Subscriber> EndpointPublisher for EndpointPublisherSubscriber<'a, Sub> { + #[inline] + fn quic_version(&self) -> Option { + self.quic_version + } + } + pub trait ConnectionPublisher { + #[doc = "Publishes a `FrameSent` event to the publisher's subscriber"] + fn on_frame_sent(&self, event: builder::FrameSent); + #[doc = r" Returns the QUIC version negotiated for the current connection, if any"] + fn quic_version(&self) -> u32; + #[doc = r" Returns the [`Subject`] for the current publisher"] + fn subject(&self) -> Subject; + } + pub struct ConnectionPublisherSubscriber<'a, Sub: Subscriber> { + meta: ConnectionMeta, + quic_version: u32, + subscriber: &'a Sub, + context: &'a Sub::ConnectionContext, + } + impl<'a, Sub: Subscriber> fmt::Debug for ConnectionPublisherSubscriber<'a, Sub> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ConnectionPublisherSubscriber") + .field("meta", &self.meta) + .field("quic_version", &self.quic_version) + .finish() + } + } + impl<'a, Sub: Subscriber> ConnectionPublisherSubscriber<'a, Sub> { + #[inline] + pub fn new( + meta: builder::ConnectionMeta, + quic_version: u32, + subscriber: &'a Sub, + context: &'a Sub::ConnectionContext, + ) -> Self { + Self { + meta: meta.into_event(), + quic_version, + subscriber, + context, + } + } + } + impl<'a, Sub: Subscriber> ConnectionPublisher for ConnectionPublisherSubscriber<'a, Sub> { + #[inline] + fn on_frame_sent(&self, event: builder::FrameSent) { + let event = event.into_event(); + self.subscriber + .on_frame_sent(self.context, &self.meta, &event); + self.subscriber + .on_connection_event(self.context, &self.meta, &event); + self.subscriber.on_event(&self.meta, &event); + } + #[inline] + fn quic_version(&self) -> u32 { + self.quic_version + } + #[inline] + fn subject(&self) -> api::Subject { + self.meta.subject() + } + } +} +#[cfg(any(test, feature = "testing"))] +pub mod testing { + use super::*; + use std::sync::{Arc, Mutex}; + #[derive(Clone, Debug)] + pub struct Subscriber { + location: Option, + output: Arc>>, + pub frame_sent: Arc, + } + impl Drop for Subscriber { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + if let Some(location) = self.location.as_ref() { + location.snapshot(&self.output.lock().unwrap()); + } + } + } + impl Subscriber { + #[doc = r" Creates a subscriber with snapshot assertions enabled"] + #[track_caller] + pub fn snapshot() -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Location::try_new(); + sub + } + #[doc = r" Creates a subscriber with snapshot assertions disabled"] + pub fn no_snapshot() -> Self { + Self { + location: None, + output: Default::default(), + frame_sent: Arc::new(AtomicU32::new(0)), + } + } + } + impl super::Subscriber for Subscriber { + type ConnectionContext = (); + fn create_connection_context( + &self, + _meta: &api::ConnectionMeta, + _info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + } + fn on_frame_sent( + &self, + _context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::FrameSent, + ) { + self.frame_sent.fetch_add(1, Ordering::SeqCst); + if self.location.is_some() { + self.output + .lock() + .unwrap() + .push(format!("{meta:?} {event:?}")); + } + } + } + #[derive(Clone, Debug)] + pub struct Publisher { + location: Option, + output: Arc>>, + pub frame_sent: Arc, + } + impl Publisher { + #[doc = r" Creates a publisher with snapshot assertions enabled"] + #[track_caller] + pub fn snapshot() -> Self { + let mut sub = Self::no_snapshot(); + sub.location = Location::try_new(); + sub + } + #[doc = r" Creates a publisher with snapshot assertions disabled"] + pub fn no_snapshot() -> Self { + Self { + location: None, + output: Default::default(), + frame_sent: Arc::new(AtomicU32::new(0)), + } + } + } + impl super::EndpointPublisher for Publisher { + fn quic_version(&self) -> Option { + Some(1) + } + } + impl super::ConnectionPublisher for Publisher { + fn on_frame_sent(&self, event: builder::FrameSent) { + self.frame_sent.fetch_add(1, Ordering::SeqCst); + let event = event.into_event(); + if self.location.is_some() { + self.output.lock().unwrap().push(format!("{event:?}")); + } + } + fn quic_version(&self) -> u32 { + 1 + } + fn subject(&self) -> api::Subject { + api::Subject::Connection { id: 0 } + } + } + impl Drop for Publisher { + fn drop(&mut self) { + if std::thread::panicking() { + return; + } + if let Some(location) = self.location.as_ref() { + location.snapshot(&self.output.lock().unwrap()); + } + } + } + #[derive(Clone, Debug)] + struct Location(&'static core::panic::Location<'static>); + impl Location { + #[track_caller] + fn try_new() -> Option { + let thread = std::thread::current(); + if thread.name().map_or(false, |name| name != "main") { + Some(Self(core::panic::Location::caller())) + } else { + None + } + } + fn snapshot(&self, output: &[String]) { + if cfg!(miri) { + return; + } + use std::path::{Component, Path}; + let value = output.join("\n"); + let thread = std::thread::current(); + let function_name = thread.name().unwrap(); + let test_path = Path::new(self.0.file().trim_end_matches(".rs")); + let module_path = test_path + .components() + .filter_map(|comp| match comp { + Component::Normal(comp) => comp.to_str(), + _ => Some("_"), + }) + .chain(Some("events")) + .collect::>() + .join("::"); + let current_dir = std::env::current_dir().unwrap(); + insta::_macro_support::assert_snapshot( + insta::_macro_support::AutoName.into(), + &value, + current_dir.to_str().unwrap(), + function_name, + &module_path, + self.0.file(), + self.0.line(), + "", + ) + .unwrap() + } + } +} diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index 21d4742e7a..9c04620b0e 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -9,6 +9,69 @@ type Result = core::result::Result; mod parser; +#[derive(Debug, Default)] +enum OutputMode { + Ref, + #[default] + Mut, +} + +impl OutputMode { + fn receiver(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(), + OutputMode::Mut => quote!(mut), + } + } + fn counter_type(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(Arc), + OutputMode::Mut => quote!(u32), + } + } + + fn counter_init(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(Arc::new(AtomicU32::new(0))), + OutputMode::Mut => quote!(0), + } + } + + fn counter_increment(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(.fetch_add(1, Ordering::SeqCst)), + OutputMode::Mut => quote!(+= 1), + } + } + + fn lock(&self) ->TokenStream { + match self { + OutputMode::Ref => quote!(.lock().unwrap()), + OutputMode::Mut => quote!(), + } + } + + fn imports(&self) ->TokenStream { + match self { + OutputMode::Ref => quote!(use std::sync::{Arc, Mutex};), + OutputMode::Mut => quote!(), + } + } + + fn testing_output_type(&self) ->TokenStream { + match self { + OutputMode::Ref => quote!(Arc>>), + OutputMode::Mut => quote!(Vec), + } + } +} + +impl ToTokens for OutputMode { + fn to_tokens(&self, tokens: &mut TokenStream) { + tokens.extend(self.receiver()); + } +} + #[derive(Debug, Default)] struct Output { pub subscriber: TokenStream, @@ -26,6 +89,7 @@ struct Output { pub endpoint_publisher_testing: TokenStream, pub connection_publisher_testing: TokenStream, pub extra: TokenStream, + pub mode: OutputMode, } impl ToTokens for Output { @@ -46,7 +110,12 @@ impl ToTokens for Output { endpoint_publisher_testing, connection_publisher_testing, extra, + mode, } = self; + + let imports = self.mode.imports(); + let testing_output_type = self.mode.testing_output_type(); + let lock = self.mode.lock(); tokens.extend(quote!( use super::*; @@ -90,7 +159,7 @@ impl ToTokens for Output { impl super::Subscriber for Subscriber { type ConnectionContext = tracing::Span; - fn create_connection_context(&mut self, meta: &api::ConnectionMeta, _info: &api::ConnectionInfo) -> Self::ConnectionContext { + fn create_connection_context(&#mode self, meta: &api::ConnectionMeta, _info: &api::ConnectionInfo) -> Self::ConnectionContext { let parent = match meta.endpoint_type { api::EndpointType::Client {} => { self.client.id() @@ -272,7 +341,7 @@ impl ToTokens for Output { type ConnectionContext: 'static + Send; /// Creates a context to be passed to each connection-related event - fn create_connection_context(&mut self, meta: &ConnectionMeta, info: &ConnectionInfo) -> Self::ConnectionContext; + fn create_connection_context(&#mode self, meta: &ConnectionMeta, info: &ConnectionInfo) -> Self::ConnectionContext; /// The period at which `on_supervisor_timeout` is called /// @@ -286,7 +355,7 @@ impl ToTokens for Output { /// It is recommended to avoid setting this value less than ~100ms, as short durations /// may lead to higher CPU utilization. #[allow(unused_variables)] - fn supervisor_timeout(&mut self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> Option { + fn supervisor_timeout(&#mode self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> Option { None } @@ -296,7 +365,7 @@ impl ToTokens for Output { /// across all `event::Subscriber`s will be used, and thus `on_supervisor_timeout` may be called /// earlier than the `supervisor_timeout` for a given `event::Subscriber` implementation. #[allow(unused_variables)] - fn on_supervisor_timeout(&mut self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> supervisor::Outcome { + fn on_supervisor_timeout(&#mode self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> supervisor::Outcome { supervisor::Outcome::default() } @@ -304,14 +373,14 @@ impl ToTokens for Output { /// Called for each event that relates to the endpoint and all connections #[inline] - fn on_event(&mut self, meta: &M, event: &E) { + fn on_event(&#mode self, meta: &M, event: &E) { let _ = meta; let _ = event; } /// Called for each event that relates to a connection #[inline] - fn on_connection_event(&mut self, context: &mut Self::ConnectionContext, meta: &ConnectionMeta, event: &E) { + fn on_connection_event(&#mode self, context: &#mode Self::ConnectionContext, meta: &ConnectionMeta, event: &E) { let _ = context; let _ = meta; let _ = event; @@ -340,12 +409,12 @@ impl ToTokens for Output { type ConnectionContext = (A::ConnectionContext, B::ConnectionContext); #[inline] - fn create_connection_context(&mut self, meta: &ConnectionMeta, info: &ConnectionInfo) -> Self::ConnectionContext { + fn create_connection_context(&#mode self, meta: &ConnectionMeta, info: &ConnectionInfo) -> Self::ConnectionContext { (self.0.create_connection_context(meta, info), self.1.create_connection_context(meta, info)) } #[inline] - fn supervisor_timeout(&mut self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> Option { + fn supervisor_timeout(&#mode self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> Option { let timeout_a = self.0.supervisor_timeout(&mut conn_context.0, meta, context); let timeout_b = self.1.supervisor_timeout(&mut conn_context.1, meta, context); match (timeout_a, timeout_b) { @@ -356,7 +425,7 @@ impl ToTokens for Output { } #[inline] - fn on_supervisor_timeout(&mut self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> supervisor::Outcome { + fn on_supervisor_timeout(&#mode self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> supervisor::Outcome { let outcome_a = self.0.on_supervisor_timeout(&mut conn_context.0, meta, context); let outcome_b = self.1.on_supervisor_timeout(&mut conn_context.1, meta, context); match (outcome_a, outcome_b) { @@ -369,15 +438,15 @@ impl ToTokens for Output { #tuple_subscriber #[inline] - fn on_event(&mut self, meta: &M, event: &E) { + fn on_event(&#mode self, meta: &M, event: &E) { self.0.on_event(meta, event); self.1.on_event(meta, event); } #[inline] - fn on_connection_event(&mut self, context: &mut Self::ConnectionContext, meta: &ConnectionMeta, event: &E) { - self.0.on_connection_event(&mut context.0, meta, event); - self.1.on_connection_event(&mut context.1, meta, event); + fn on_connection_event(&#mode self, context: &#mode Self::ConnectionContext, meta: &ConnectionMeta, event: &E) { + self.0.on_connection_event(&#mode context.0, meta, event); + self.1.on_connection_event(&#mode context.1, meta, event); } #[inline] @@ -454,8 +523,8 @@ impl ToTokens for Output { pub struct ConnectionPublisherSubscriber<'a, Sub: Subscriber> { meta: ConnectionMeta, quic_version: u32, - subscriber: &'a mut Sub, - context: &'a mut Sub::ConnectionContext, + subscriber: &'a #mode Sub, + context: &'a #mode Sub::ConnectionContext, } impl<'a, Sub: Subscriber> fmt::Debug for ConnectionPublisherSubscriber<'a, Sub> { @@ -472,8 +541,8 @@ impl ToTokens for Output { pub fn new( meta: builder::ConnectionMeta, quic_version: u32, - subscriber: &'a mut Sub, - context: &'a mut Sub::ConnectionContext + subscriber: &'a #mode Sub, + context: &'a #mode Sub::ConnectionContext ) -> Self { Self { meta: meta.into_event(), @@ -502,11 +571,11 @@ impl ToTokens for Output { #[cfg(any(test, feature = "testing"))] pub mod testing { use super::*; - + #imports #[derive(Clone, Debug)] pub struct Subscriber { location: Option, - output: Vec, + output: #testing_output_type, #testing_fields } @@ -518,7 +587,7 @@ impl ToTokens for Output { } if let Some(location) = self.location.as_ref() { - location.snapshot(&self.output); + location.snapshot(&self.output #lock); } } } @@ -545,7 +614,7 @@ impl ToTokens for Output { impl super::Subscriber for Subscriber { type ConnectionContext = (); - fn create_connection_context(&mut self, _meta: &api::ConnectionMeta, _info: &api::ConnectionInfo) -> Self::ConnectionContext {} + fn create_connection_context(&#mode self, _meta: &api::ConnectionMeta, _info: &api::ConnectionInfo) -> Self::ConnectionContext {} #subscriber_testing } @@ -553,7 +622,7 @@ impl ToTokens for Output { #[derive(Clone, Debug)] pub struct Publisher { location: Option, - output: Vec, + output: #testing_output_type, #testing_fields } @@ -604,7 +673,7 @@ impl ToTokens for Output { } if let Some(location) = self.location.as_ref() { - location.snapshot(&self.output); + location.snapshot(&self.output #lock); } } } @@ -669,50 +738,69 @@ impl ToTokens for Output { } } -fn main() -> Result<()> { - let mut files = vec![]; - - for path in glob::glob(concat!(env!("CARGO_MANIFEST_DIR"), "/events/**/*.rs"))? { - let path = path?; - let file = std::fs::read_to_string(path)?; - files.push(parser::parse(&file).unwrap()); - } - - let mut output = Output::default(); - - for file in &files { - file.to_tokens(&mut output); - } - - let generated = concat!( - env!("CARGO_MANIFEST_DIR"), - "/../s2n-quic-core/src/event/generated.rs" - ); - - let mut o = std::fs::File::create(generated)?; +struct EventInfo<'a> { + input_path: &'a str, + output_path: &'a str, + output_mode: OutputMode, +} - macro_rules! put { - ($($arg:tt)*) => {{ - use std::io::Write; - writeln!(o, $($arg)*)?; - }} +fn main() -> Result<()> { + let event_paths = [ + EventInfo { + input_path: concat!(env!("CARGO_MANIFEST_DIR"), "/../../dc/s2n-quic-dc/src/event/**/*.rs"), + output_path: concat!(env!("CARGO_MANIFEST_DIR"), "/../../dc/s2n-quic-dc/src/event/generated.rs"), + output_mode: OutputMode::Ref, + }, + EventInfo { + input_path: concat!(env!("CARGO_MANIFEST_DIR"),"/events/**/*.rs"), + output_path: concat!(env!("CARGO_MANIFEST_DIR"),"/../s2n-quic-core/src/event/generated.rs"), + output_mode: OutputMode::Mut, + }, + ]; + + for event_info in event_paths { + let mut files = vec![]; + + for path in glob::glob(event_info.input_path)? { + let path = path?; + let file = std::fs::read_to_string(path)?; + files.push(parser::parse(&file).unwrap()); + } + + let mut output = Output::default(); + output.mode = event_info.output_mode; + + for file in &files { + file.to_tokens(&mut output); + } + + let generated = event_info.output_path; + + let mut o = std::fs::File::create(generated)?; + + macro_rules! put { + ($($arg:tt)*) => {{ + use std::io::Write; + writeln!(o, $($arg)*)?; + }} + } + + put!("// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved."); + put!("// SPDX-License-Identifier: Apache-2.0"); + put!(); + put!("// DO NOT MODIFY THIS FILE"); + put!("// This file was generated with the `s2n-quic-events` crate and any required"); + put!("// changes should be made there."); + put!(); + put!("{}", output.to_token_stream()); + + let status = std::process::Command::new("rustfmt") + .arg(generated) + .spawn()? + .wait()?; + + assert!(status.success()); } - put!("// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved."); - put!("// SPDX-License-Identifier: Apache-2.0"); - put!(); - put!("// DO NOT MODIFY THIS FILE"); - put!("// This file was generated with the `s2n-quic-events` crate and any required"); - put!("// changes should be made there."); - put!(); - put!("{}", output.to_token_stream()); - - let status = std::process::Command::new("rustfmt") - .arg(generated) - .spawn()? - .wait()?; - - assert!(status.success()); - Ok(()) } diff --git a/quic/s2n-quic-events/src/parser.rs b/quic/s2n-quic-events/src/parser.rs index fdd12b2df4..948bee45b5 100644 --- a/quic/s2n-quic-events/src/parser.rs +++ b/quic/s2n-quic-events/src/parser.rs @@ -148,14 +148,21 @@ impl Struct { let publisher_doc = format!("Publishes a `{ident_str}` event to the publisher's subscriber"); + let counter_type = output.mode.counter_type(); + let counter_init = output.mode.counter_init(); + // add a counter for testing structs output.testing_fields.extend(quote!( - pub #counter: u32, + pub #counter: #counter_type, )); output.testing_fields_init.extend(quote!( - #counter: 0, + #counter: #counter_init, )); + let receiver = output.mode.receiver(); + let counter_increment = output.mode.counter_increment(); + let lock = output.mode.lock(); + match attrs.subject { Subject::Endpoint => { output.subscriber.extend(quote!( @@ -163,7 +170,7 @@ impl Struct { #[inline] #deprecated #allow_deprecated - fn #function(&mut self, meta: &EndpointMeta, event: &#ident) { + fn #function(&#receiver self, meta: &EndpointMeta, event: &#ident) { let _ = meta; let _ = event; } @@ -172,7 +179,7 @@ impl Struct { output.tuple_subscriber.extend(quote!( #[inline] #allow_deprecated - fn #function(&mut self, meta: &EndpointMeta, event: &#ident) { + fn #function(&#receiver self, meta: &EndpointMeta, event: &#ident) { (self.0).#function(meta, event); (self.1).#function(meta, event); } @@ -181,7 +188,7 @@ impl Struct { output.tracing_subscriber.extend(quote!( #[inline] #allow_deprecated - fn #function(&mut self, meta: &api::EndpointMeta, event: &api::#ident) { + fn #function(&#receiver self, meta: &api::EndpointMeta, event: &api::#ident) { let parent = match meta.endpoint_type { api::EndpointType::Client {} => { self.client.id() @@ -197,13 +204,13 @@ impl Struct { output.endpoint_publisher.extend(quote!( #[doc = #publisher_doc] - fn #function(&mut self, event: builder::#ident); + fn #function(&#receiver self, event: builder::#ident); )); output.endpoint_publisher_subscriber.extend(quote!( #[inline] #allow_deprecated - fn #function(&mut self, event: builder::#ident) { + fn #function(&#receiver self, event: builder::#ident) { let event = event.into_event(); self.subscriber.#function(&self.meta, &event); self.subscriber.on_event(&self.meta, &event); @@ -212,18 +219,18 @@ impl Struct { output.subscriber_testing.extend(quote!( #allow_deprecated - fn #function(&mut self, meta: &api::EndpointMeta, event: &api::#ident) { - self.#counter += 1; - self.output.push(format!("{meta:?} {event:?}")); + fn #function(&#receiver self, meta: &api::EndpointMeta, event: &api::#ident) { + self.#counter #counter_increment; + self.output #lock.push(format!("{meta:?} {event:?}")); } )); output.endpoint_publisher_testing.extend(quote!( #allow_deprecated - fn #function(&mut self, event: builder::#ident) { - self.#counter += 1; + fn #function(&#receiver self, event: builder::#ident) { + self.#counter #counter_increment; let event = event.into_event(); - self.output.push(format!("{event:?}")); + self.output #lock.push(format!("{event:?}")); } )); } @@ -233,7 +240,7 @@ impl Struct { #[inline] #deprecated #allow_deprecated - fn #function(&mut self, context: &mut Self::ConnectionContext, meta: &ConnectionMeta, event: &#ident) { + fn #function(&#receiver self, context: &#receiver Self::ConnectionContext, meta: &ConnectionMeta, event: &#ident) { let _ = context; let _ = meta; let _ = event; @@ -243,16 +250,16 @@ impl Struct { output.tuple_subscriber.extend(quote!( #[inline] #allow_deprecated - fn #function(&mut self, context: &mut Self::ConnectionContext, meta: &ConnectionMeta, event: &#ident) { - (self.0).#function(&mut context.0, meta, event); - (self.1).#function(&mut context.1, meta, event); + fn #function(&#receiver self, context: &#receiver Self::ConnectionContext, meta: &ConnectionMeta, event: &#ident) { + (self.0).#function(&#receiver context.0, meta, event); + (self.1).#function(&#receiver context.1, meta, event); } )); output.tracing_subscriber.extend(quote!( #[inline] #allow_deprecated - fn #function(&mut self, context: &mut Self::ConnectionContext, _meta: &api::ConnectionMeta, event: &api::#ident) { + fn #function(&#receiver self, context: &#receiver Self::ConnectionContext, _meta: &api::ConnectionMeta, event: &api::#ident) { let id = context.id(); let api::#ident { #(#destructure_fields),* } = event; tracing::event!(target: #snake, parent: id, tracing::Level::DEBUG, #(#destructure_fields = tracing::field::debug(#destructure_fields)),*); @@ -261,13 +268,13 @@ impl Struct { output.connection_publisher.extend(quote!( #[doc = #publisher_doc] - fn #function(&mut self, event: builder::#ident); + fn #function(&#receiver self, event: builder::#ident); )); output.connection_publisher_subscriber.extend(quote!( #[inline] #allow_deprecated - fn #function(&mut self, event: builder::#ident) { + fn #function(&#receiver self, event: builder::#ident) { let event = event.into_event(); self.subscriber.#function(self.context, &self.meta, &event); self.subscriber.on_connection_event(self.context, &self.meta, &event); @@ -277,21 +284,21 @@ impl Struct { output.subscriber_testing.extend(quote!( #allow_deprecated - fn #function(&mut self, _context: &mut Self::ConnectionContext, meta: &api::ConnectionMeta, event: &api::#ident) { - self.#counter += 1; + fn #function(&#receiver self, _context: &#receiver Self::ConnectionContext, meta: &api::ConnectionMeta, event: &api::#ident) { + self.#counter #counter_increment; if self.location.is_some() { - self.output.push(format!("{meta:?} {event:?}")); + self.output #lock.push(format!("{meta:?} {event:?}")); } } )); output.connection_publisher_testing.extend(quote!( #allow_deprecated - fn #function(&mut self, event: builder::#ident) { - self.#counter += 1; + fn #function(&#receiver self, event: builder::#ident) { + self.#counter #counter_increment; let event = event.into_event(); if self.location.is_some() { - self.output.push(format!("{event:?}")); + self.output #lock.push(format!("{event:?}")); } } )); From 1a40b770b0df01385eeb14c94241181e439dcd96 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Tue, 17 Sep 2024 11:03:48 -0700 Subject: [PATCH 2/8] Fixed imports --- dc/s2n-quic-dc/src/event/generated.rs | 677 +--------------------- quic/s2n-quic-core/src/event/generated.rs | 2 +- quic/s2n-quic-events/src/main.rs | 3 +- 3 files changed, 6 insertions(+), 676 deletions(-) diff --git a/dc/s2n-quic-dc/src/event/generated.rs b/dc/s2n-quic-dc/src/event/generated.rs index f0c484fb90..7c5eaf2f49 100644 --- a/dc/s2n-quic-dc/src/event/generated.rs +++ b/dc/s2n-quic-dc/src/event/generated.rs @@ -37,680 +37,6 @@ pub mod api { impl Event for FrameSent { const NAME: &'static str = "transport:frame_sent"; } - use super::*; - pub mod api { - #![doc = r" This module contains events that are emitted to the [`Subscriber`](crate::event::Subscriber)"] - use super::*; - pub use traits::Subscriber; - #[derive(Clone, Debug)] - #[non_exhaustive] - #[doc = " Frame was sent"] - pub struct FrameSent { - pub packet_header: PacketHeader, - pub path_id: u64, - pub frame: Frame, - } - impl Event for FrameSent { - const NAME: &'static str = "transport:frame_sent"; - } - } - #[cfg(feature = "event-tracing")] - pub mod tracing { - #![doc = r" This module contains event integration with [`tracing`](https://docs.rs/tracing)"] - use super::api; - #[doc = r" Emits events with [`tracing`](https://docs.rs/tracing)"] - #[derive(Clone, Debug)] - pub struct Subscriber { - client: tracing::Span, - server: tracing::Span, - } - impl Default for Subscriber { - fn default() -> Self { - let root = tracing :: span ! (target : "s2n_quic" , tracing :: Level :: DEBUG , "s2n_quic"); - let client = tracing :: span ! (parent : root . id () , tracing :: Level :: DEBUG , "client"); - let server = tracing :: span ! (parent : root . id () , tracing :: Level :: DEBUG , "server"); - Self { client, server } - } - } - impl super::Subscriber for Subscriber { - type ConnectionContext = tracing::Span; - fn create_connection_context( - &self, - meta: &api::ConnectionMeta, - _info: &api::ConnectionInfo, - ) -> Self::ConnectionContext { - let parent = match meta.endpoint_type { - api::EndpointType::Client {} => self.client.id(), - api::EndpointType::Server {} => self.server.id(), - }; - tracing :: span ! (target : "s2n_quic" , parent : parent , tracing :: Level :: DEBUG , "conn" , id = meta . id) - } - #[inline] - fn on_frame_sent( - &self, - context: &Self::ConnectionContext, - _meta: &api::ConnectionMeta, - event: &api::FrameSent, - ) { - let id = context.id(); - let api::FrameSent { - packet_header, - path_id, - frame, - } = event; - tracing :: event ! (target : "frame_sent" , parent : id , tracing :: Level :: DEBUG , packet_header = tracing :: field :: debug (packet_header) , path_id = tracing :: field :: debug (path_id) , frame = tracing :: field :: debug (frame)); - } - } - } - pub mod builder { - use super::*; - #[derive(Clone, Debug)] - #[doc = " Frame was sent"] - pub struct FrameSent { - pub packet_header: PacketHeader, - pub path_id: u64, - pub frame: Frame, - } - impl IntoEvent for FrameSent { - #[inline] - fn into_event(self) -> api::FrameSent { - let FrameSent { - packet_header, - path_id, - frame, - } = self; - api::FrameSent { - packet_header: packet_header.into_event(), - path_id: path_id.into_event(), - frame: frame.into_event(), - } - } - } - } - pub mod supervisor { - #![doc = r" This module contains the `supervisor::Outcome` and `supervisor::Context` for use"] - #![doc = r" when implementing [`Subscriber::supervisor_timeout`](crate::event::Subscriber::supervisor_timeout) and"] - #![doc = r" [`Subscriber::on_supervisor_timeout`](crate::event::Subscriber::on_supervisor_timeout)"] - #![doc = r" on a Subscriber."] - use crate::{ - application, - event::{builder::SocketAddress, IntoEvent}, - }; - #[non_exhaustive] - #[derive(Clone, Debug, Eq, PartialEq)] - pub enum Outcome { - #[doc = r" Allow the connection to remain open"] - Continue, - #[doc = r" Close the connection and notify the peer"] - Close { error_code: application::Error }, - #[doc = r" Close the connection without notifying the peer"] - ImmediateClose { reason: &'static str }, - } - impl Default for Outcome { - fn default() -> Self { - Self::Continue - } - } - #[non_exhaustive] - #[derive(Debug)] - pub struct Context<'a> { - #[doc = r" Number of handshakes that have begun but not completed"] - pub inflight_handshakes: usize, - #[doc = r" Number of open connections"] - pub connection_count: usize, - #[doc = r" The address of the peer"] - pub remote_address: SocketAddress<'a>, - #[doc = r" True if the connection is in the handshake state, false otherwise"] - pub is_handshaking: bool, - } - impl<'a> Context<'a> { - pub fn new( - inflight_handshakes: usize, - connection_count: usize, - remote_address: &'a crate::inet::SocketAddress, - is_handshaking: bool, - ) -> Self { - Self { - inflight_handshakes, - connection_count, - remote_address: remote_address.into_event(), - is_handshaking, - } - } - } - } - pub use traits::*; - mod traits { - use super::*; - use crate::query; - use api::*; - use core::fmt; - #[doc = r" Provides metadata related to an event"] - pub trait Meta: fmt::Debug { - #[doc = r" Returns whether the local endpoint is a Client or Server"] - fn endpoint_type(&self) -> &EndpointType; - #[doc = r" A context from which the event is being emitted"] - #[doc = r""] - #[doc = r" An event can occur in the context of an Endpoint or Connection"] - fn subject(&self) -> Subject; - #[doc = r" The time the event occurred"] - fn timestamp(&self) -> &crate::event::Timestamp; - } - impl Meta for ConnectionMeta { - fn endpoint_type(&self) -> &EndpointType { - &self.endpoint_type - } - fn subject(&self) -> Subject { - Subject::Connection { id: self.id } - } - fn timestamp(&self) -> &crate::event::Timestamp { - &self.timestamp - } - } - impl Meta for EndpointMeta { - fn endpoint_type(&self) -> &EndpointType { - &self.endpoint_type - } - fn subject(&self) -> Subject { - Subject::Endpoint {} - } - fn timestamp(&self) -> &crate::event::Timestamp { - &self.timestamp - } - } - #[doc = r" Allows for events to be subscribed to"] - pub trait Subscriber: 'static + Send { - #[doc = r" An application provided type associated with each connection."] - #[doc = r""] - #[doc = r" The context provides a mechanism for applications to provide a custom type"] - #[doc = r" and update it on each event, e.g. computing statistics. Each event"] - #[doc = r" invocation (e.g. [`Subscriber::on_packet_sent`]) also provides mutable"] - #[doc = r" access to the context `&mut ConnectionContext` and allows for updating the"] - #[doc = r" context."] - #[doc = r""] - #[doc = r" ```no_run"] - #[doc = r" # mod s2n_quic { pub mod provider { pub mod event {"] - #[doc = r" # pub use s2n_quic_core::event::{api as events, api::ConnectionInfo, api::ConnectionMeta, Subscriber};"] - #[doc = r" # }}}"] - #[doc = r" use s2n_quic::provider::event::{"] - #[doc = r" ConnectionInfo, ConnectionMeta, Subscriber, events::PacketSent"] - #[doc = r" };"] - #[doc = r""] - #[doc = r" pub struct MyEventSubscriber;"] - #[doc = r""] - #[doc = r" pub struct MyEventContext {"] - #[doc = r" packet_sent: u64,"] - #[doc = r" }"] - #[doc = r""] - #[doc = r" impl Subscriber for MyEventSubscriber {"] - #[doc = r" type ConnectionContext = MyEventContext;"] - #[doc = r""] - #[doc = r" fn create_connection_context("] - #[doc = r" &mut self, _meta: &ConnectionMeta,"] - #[doc = r" _info: &ConnectionInfo,"] - #[doc = r" ) -> Self::ConnectionContext {"] - #[doc = r" MyEventContext { packet_sent: 0 }"] - #[doc = r" }"] - #[doc = r""] - #[doc = r" fn on_packet_sent("] - #[doc = r" &mut self,"] - #[doc = r" context: &mut Self::ConnectionContext,"] - #[doc = r" _meta: &ConnectionMeta,"] - #[doc = r" _event: &PacketSent,"] - #[doc = r" ) {"] - #[doc = r" context.packet_sent += 1;"] - #[doc = r" }"] - #[doc = r" }"] - #[doc = r" ```"] - type ConnectionContext: 'static + Send; - #[doc = r" Creates a context to be passed to each connection-related event"] - fn create_connection_context( - &self, - meta: &ConnectionMeta, - info: &ConnectionInfo, - ) -> Self::ConnectionContext; - #[doc = r" The period at which `on_supervisor_timeout` is called"] - #[doc = r""] - #[doc = r" If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout`"] - #[doc = r" across all `event::Subscriber`s will be used."] - #[doc = r""] - #[doc = r" If the `supervisor_timeout()` is `None` across all `event::Subscriber`s, connection supervision"] - #[doc = r" will cease for the remaining lifetime of the connection and `on_supervisor_timeout` will no longer"] - #[doc = r" be called."] - #[doc = r""] - #[doc = r" It is recommended to avoid setting this value less than ~100ms, as short durations"] - #[doc = r" may lead to higher CPU utilization."] - #[allow(unused_variables)] - fn supervisor_timeout( - &self, - conn_context: &mut Self::ConnectionContext, - meta: &ConnectionMeta, - context: &supervisor::Context, - ) -> Option { - None - } - #[doc = r" Called for each `supervisor_timeout` to determine any action to take on the connection based on the `supervisor::Outcome`"] - #[doc = r""] - #[doc = r" If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout`"] - #[doc = r" across all `event::Subscriber`s will be used, and thus `on_supervisor_timeout` may be called"] - #[doc = r" earlier than the `supervisor_timeout` for a given `event::Subscriber` implementation."] - #[allow(unused_variables)] - fn on_supervisor_timeout( - &self, - conn_context: &mut Self::ConnectionContext, - meta: &ConnectionMeta, - context: &supervisor::Context, - ) -> supervisor::Outcome { - supervisor::Outcome::default() - } - #[doc = "Called when the `FrameSent` event is triggered"] - #[inline] - fn on_frame_sent( - &self, - context: &Self::ConnectionContext, - meta: &ConnectionMeta, - event: &FrameSent, - ) { - let _ = context; - let _ = meta; - let _ = event; - } - #[doc = r" Called for each event that relates to the endpoint and all connections"] - #[inline] - fn on_event(&self, meta: &M, event: &E) { - let _ = meta; - let _ = event; - } - #[doc = r" Called for each event that relates to a connection"] - #[inline] - fn on_connection_event( - &self, - context: &Self::ConnectionContext, - meta: &ConnectionMeta, - event: &E, - ) { - let _ = context; - let _ = meta; - let _ = event; - } - #[doc = r" Used for querying the `Subscriber::ConnectionContext` on a Subscriber"] - #[inline] - fn query( - context: &Self::ConnectionContext, - query: &mut dyn query::Query, - ) -> query::ControlFlow { - query.execute(context) - } - #[doc = r" Used for querying and mutating the `Subscriber::ConnectionContext` on a Subscriber"] - #[inline] - fn query_mut( - context: &mut Self::ConnectionContext, - query: &mut dyn query::QueryMut, - ) -> query::ControlFlow { - query.execute_mut(context) - } - } - #[doc = r" Subscriber is implemented for a 2-element tuple to make it easy to compose multiple"] - #[doc = r" subscribers."] - impl Subscriber for (A, B) - where - A: Subscriber, - B: Subscriber, - { - type ConnectionContext = (A::ConnectionContext, B::ConnectionContext); - #[inline] - fn create_connection_context( - &self, - meta: &ConnectionMeta, - info: &ConnectionInfo, - ) -> Self::ConnectionContext { - ( - self.0.create_connection_context(meta, info), - self.1.create_connection_context(meta, info), - ) - } - #[inline] - fn supervisor_timeout( - &self, - conn_context: &mut Self::ConnectionContext, - meta: &ConnectionMeta, - context: &supervisor::Context, - ) -> Option { - let timeout_a = self - .0 - .supervisor_timeout(&mut conn_context.0, meta, context); - let timeout_b = self - .1 - .supervisor_timeout(&mut conn_context.1, meta, context); - match (timeout_a, timeout_b) { - (None, None) => None, - (None, Some(timeout)) | (Some(timeout), None) => Some(timeout), - (Some(a), Some(b)) => Some(a.min(b)), - } - } - #[inline] - fn on_supervisor_timeout( - &self, - conn_context: &mut Self::ConnectionContext, - meta: &ConnectionMeta, - context: &supervisor::Context, - ) -> supervisor::Outcome { - let outcome_a = - self.0 - .on_supervisor_timeout(&mut conn_context.0, meta, context); - let outcome_b = - self.1 - .on_supervisor_timeout(&mut conn_context.1, meta, context); - match (outcome_a, outcome_b) { - (supervisor::Outcome::ImmediateClose { reason }, _) - | (_, supervisor::Outcome::ImmediateClose { reason }) => { - supervisor::Outcome::ImmediateClose { reason } - } - (supervisor::Outcome::Close { error_code }, _) - | (_, supervisor::Outcome::Close { error_code }) => { - supervisor::Outcome::Close { error_code } - } - _ => supervisor::Outcome::Continue, - } - } - #[inline] - fn on_frame_sent( - &self, - context: &Self::ConnectionContext, - meta: &ConnectionMeta, - event: &FrameSent, - ) { - (self.0).on_frame_sent(&context.0, meta, event); - (self.1).on_frame_sent(&context.1, meta, event); - } - #[inline] - fn on_event(&self, meta: &M, event: &E) { - self.0.on_event(meta, event); - self.1.on_event(meta, event); - } - #[inline] - fn on_connection_event( - &self, - context: &Self::ConnectionContext, - meta: &ConnectionMeta, - event: &E, - ) { - self.0.on_connection_event(&context.0, meta, event); - self.1.on_connection_event(&context.1, meta, event); - } - #[inline] - fn query( - context: &Self::ConnectionContext, - query: &mut dyn query::Query, - ) -> query::ControlFlow { - query - .execute(context) - .and_then(|| A::query(&context.0, query)) - .and_then(|| B::query(&context.1, query)) - } - #[inline] - fn query_mut( - context: &mut Self::ConnectionContext, - query: &mut dyn query::QueryMut, - ) -> query::ControlFlow { - query - .execute_mut(context) - .and_then(|| A::query_mut(&mut context.0, query)) - .and_then(|| B::query_mut(&mut context.1, query)) - } - } - pub trait EndpointPublisher { - #[doc = r" Returns the QUIC version, if any"] - fn quic_version(&self) -> Option; - } - pub struct EndpointPublisherSubscriber<'a, Sub: Subscriber> { - meta: EndpointMeta, - quic_version: Option, - subscriber: &'a mut Sub, - } - impl<'a, Sub: Subscriber> fmt::Debug for EndpointPublisherSubscriber<'a, Sub> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("ConnectionPublisherSubscriber") - .field("meta", &self.meta) - .field("quic_version", &self.quic_version) - .finish() - } - } - impl<'a, Sub: Subscriber> EndpointPublisherSubscriber<'a, Sub> { - #[inline] - pub fn new( - meta: builder::EndpointMeta, - quic_version: Option, - subscriber: &'a mut Sub, - ) -> Self { - Self { - meta: meta.into_event(), - quic_version, - subscriber, - } - } - } - impl<'a, Sub: Subscriber> EndpointPublisher for EndpointPublisherSubscriber<'a, Sub> { - #[inline] - fn quic_version(&self) -> Option { - self.quic_version - } - } - pub trait ConnectionPublisher { - #[doc = "Publishes a `FrameSent` event to the publisher's subscriber"] - fn on_frame_sent(&self, event: builder::FrameSent); - #[doc = r" Returns the QUIC version negotiated for the current connection, if any"] - fn quic_version(&self) -> u32; - #[doc = r" Returns the [`Subject`] for the current publisher"] - fn subject(&self) -> Subject; - } - pub struct ConnectionPublisherSubscriber<'a, Sub: Subscriber> { - meta: ConnectionMeta, - quic_version: u32, - subscriber: &'a Sub, - context: &'a Sub::ConnectionContext, - } - impl<'a, Sub: Subscriber> fmt::Debug for ConnectionPublisherSubscriber<'a, Sub> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("ConnectionPublisherSubscriber") - .field("meta", &self.meta) - .field("quic_version", &self.quic_version) - .finish() - } - } - impl<'a, Sub: Subscriber> ConnectionPublisherSubscriber<'a, Sub> { - #[inline] - pub fn new( - meta: builder::ConnectionMeta, - quic_version: u32, - subscriber: &'a Sub, - context: &'a Sub::ConnectionContext, - ) -> Self { - Self { - meta: meta.into_event(), - quic_version, - subscriber, - context, - } - } - } - impl<'a, Sub: Subscriber> ConnectionPublisher for ConnectionPublisherSubscriber<'a, Sub> { - #[inline] - fn on_frame_sent(&self, event: builder::FrameSent) { - let event = event.into_event(); - self.subscriber - .on_frame_sent(self.context, &self.meta, &event); - self.subscriber - .on_connection_event(self.context, &self.meta, &event); - self.subscriber.on_event(&self.meta, &event); - } - #[inline] - fn quic_version(&self) -> u32 { - self.quic_version - } - #[inline] - fn subject(&self) -> api::Subject { - self.meta.subject() - } - } - } - #[cfg(any(test, feature = "testing"))] - pub mod testing { - use super::*; - #[derive(Clone, Debug)] - pub struct Subscriber { - location: Option, - output: Arc>>, - pub frame_sent: Arc, - } - impl Drop for Subscriber { - fn drop(&mut self) { - if std::thread::panicking() { - return; - } - if let Some(location) = self.location.as_ref() { - location.snapshot(&self.output.lock().unwrap()); - } - } - } - impl Subscriber { - #[doc = r" Creates a subscriber with snapshot assertions enabled"] - #[track_caller] - pub fn snapshot() -> Self { - let mut sub = Self::no_snapshot(); - sub.location = Location::try_new(); - sub - } - #[doc = r" Creates a subscriber with snapshot assertions disabled"] - pub fn no_snapshot() -> Self { - Self { - location: None, - output: Default::default(), - frame_sent: Arc::new(AtomicU32::new(0)), - } - } - } - impl super::Subscriber for Subscriber { - type ConnectionContext = (); - fn create_connection_context( - &self, - _meta: &api::ConnectionMeta, - _info: &api::ConnectionInfo, - ) -> Self::ConnectionContext { - } - fn on_frame_sent( - &self, - _context: &Self::ConnectionContext, - meta: &api::ConnectionMeta, - event: &api::FrameSent, - ) { - self.frame_sent.fetch_add(1, Ordering::SeqCst); - if self.location.is_some() { - self.output - .lock() - .unwrap() - .push(format!("{meta:?} {event:?}")); - } - } - } - #[derive(Clone, Debug)] - pub struct Publisher { - location: Option, - output: Arc>>, - pub frame_sent: Arc, - } - impl Publisher { - #[doc = r" Creates a publisher with snapshot assertions enabled"] - #[track_caller] - pub fn snapshot() -> Self { - let mut sub = Self::no_snapshot(); - sub.location = Location::try_new(); - sub - } - #[doc = r" Creates a publisher with snapshot assertions disabled"] - pub fn no_snapshot() -> Self { - Self { - location: None, - output: Default::default(), - frame_sent: Arc::new(AtomicU32::new(0)), - } - } - } - impl super::EndpointPublisher for Publisher { - fn quic_version(&self) -> Option { - Some(1) - } - } - impl super::ConnectionPublisher for Publisher { - fn on_frame_sent(&self, event: builder::FrameSent) { - self.frame_sent.fetch_add(1, Ordering::SeqCst); - let event = event.into_event(); - if self.location.is_some() { - self.output.lock().unwrap().push(format!("{event:?}")); - } - } - fn quic_version(&self) -> u32 { - 1 - } - fn subject(&self) -> api::Subject { - api::Subject::Connection { id: 0 } - } - } - impl Drop for Publisher { - fn drop(&mut self) { - if std::thread::panicking() { - return; - } - if let Some(location) = self.location.as_ref() { - location.snapshot(&self.output.lock().unwrap()); - } - } - } - #[derive(Clone, Debug)] - struct Location(&'static core::panic::Location<'static>); - impl Location { - #[track_caller] - fn try_new() -> Option { - let thread = std::thread::current(); - if thread.name().map_or(false, |name| name != "main") { - Some(Self(core::panic::Location::caller())) - } else { - None - } - } - fn snapshot(&self, output: &[String]) { - if cfg!(miri) { - return; - } - use std::path::{Component, Path}; - let value = output.join("\n"); - let thread = std::thread::current(); - let function_name = thread.name().unwrap(); - let test_path = Path::new(self.0.file().trim_end_matches(".rs")); - let module_path = test_path - .components() - .filter_map(|comp| match comp { - Component::Normal(comp) => comp.to_str(), - _ => Some("_"), - }) - .chain(Some("events")) - .collect::>() - .join("::"); - let current_dir = std::env::current_dir().unwrap(); - insta::_macro_support::assert_snapshot( - insta::_macro_support::AutoName.into(), - &value, - current_dir.to_str().unwrap(), - function_name, - &module_path, - self.0.file(), - self.0.line(), - "", - ) - .unwrap() - } - } - } } #[cfg(feature = "event-tracing")] pub mod tracing { @@ -1215,6 +541,8 @@ pub mod api { #[cfg(any(test, feature = "testing"))] pub mod testing { use super::*; + use core::sync::atomic::{AtomicU32, Ordering}; + use std::sync::{Arc, Mutex}; #[derive(Clone, Debug)] pub struct Subscriber { location: Option, @@ -1876,6 +1204,7 @@ mod traits { #[cfg(any(test, feature = "testing"))] pub mod testing { use super::*; + use core::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; #[derive(Clone, Debug)] pub struct Subscriber { diff --git a/quic/s2n-quic-core/src/event/generated.rs b/quic/s2n-quic-core/src/event/generated.rs index 5561dfa15a..c447a0e95c 100644 --- a/quic/s2n-quic-core/src/event/generated.rs +++ b/quic/s2n-quic-core/src/event/generated.rs @@ -1282,7 +1282,7 @@ pub mod api { } } macro_rules! impl_conn_id { - ($name:ident) => { + ($ name : ident) => { impl<'a> IntoEvent> for &'a crate::connection::id::$name { #[inline] fn into_event(self) -> builder::ConnectionId<'a> { diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index 9c04620b0e..34eae272c4 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -53,7 +53,8 @@ impl OutputMode { fn imports(&self) ->TokenStream { match self { - OutputMode::Ref => quote!(use std::sync::{Arc, Mutex};), + OutputMode::Ref => quote!(use core::sync::atomic::{AtomicU32, Ordering}; + use std::sync::{Arc, Mutex}; ), OutputMode::Mut => quote!(), } } From 098e677112ac0e85f6bfafe6b05aa0403f4a6c94 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Tue, 17 Sep 2024 11:06:11 -0700 Subject: [PATCH 3/8] More tweaks --- dc/s2n-quic-dc/src/event/generated.rs | 676 -------------------------- 1 file changed, 676 deletions(-) diff --git a/dc/s2n-quic-dc/src/event/generated.rs b/dc/s2n-quic-dc/src/event/generated.rs index 7c5eaf2f49..4fab64e7d3 100644 --- a/dc/s2n-quic-dc/src/event/generated.rs +++ b/dc/s2n-quic-dc/src/event/generated.rs @@ -21,682 +21,6 @@ pub mod api { impl Event for FrameSent { const NAME: &'static str = "transport:frame_sent"; } - use super::*; - pub mod api { - #![doc = r" This module contains events that are emitted to the [`Subscriber`](crate::event::Subscriber)"] - use super::*; - pub use traits::Subscriber; - #[derive(Clone, Debug)] - #[non_exhaustive] - #[doc = " Frame was sent"] - pub struct FrameSent { - pub packet_header: PacketHeader, - pub path_id: u64, - pub frame: Frame, - } - impl Event for FrameSent { - const NAME: &'static str = "transport:frame_sent"; - } - } - #[cfg(feature = "event-tracing")] - pub mod tracing { - #![doc = r" This module contains event integration with [`tracing`](https://docs.rs/tracing)"] - use super::api; - #[doc = r" Emits events with [`tracing`](https://docs.rs/tracing)"] - #[derive(Clone, Debug)] - pub struct Subscriber { - client: tracing::Span, - server: tracing::Span, - } - impl Default for Subscriber { - fn default() -> Self { - let root = tracing :: span ! (target : "s2n_quic" , tracing :: Level :: DEBUG , "s2n_quic"); - let client = tracing :: span ! (parent : root . id () , tracing :: Level :: DEBUG , "client"); - let server = tracing :: span ! (parent : root . id () , tracing :: Level :: DEBUG , "server"); - Self { client, server } - } - } - impl super::Subscriber for Subscriber { - type ConnectionContext = tracing::Span; - fn create_connection_context( - &self, - meta: &api::ConnectionMeta, - _info: &api::ConnectionInfo, - ) -> Self::ConnectionContext { - let parent = match meta.endpoint_type { - api::EndpointType::Client {} => self.client.id(), - api::EndpointType::Server {} => self.server.id(), - }; - tracing :: span ! (target : "s2n_quic" , parent : parent , tracing :: Level :: DEBUG , "conn" , id = meta . id) - } - #[inline] - fn on_frame_sent( - &self, - context: &Self::ConnectionContext, - _meta: &api::ConnectionMeta, - event: &api::FrameSent, - ) { - let id = context.id(); - let api::FrameSent { - packet_header, - path_id, - frame, - } = event; - tracing :: event ! (target : "frame_sent" , parent : id , tracing :: Level :: DEBUG , packet_header = tracing :: field :: debug (packet_header) , path_id = tracing :: field :: debug (path_id) , frame = tracing :: field :: debug (frame)); - } - } - } - pub mod builder { - use super::*; - #[derive(Clone, Debug)] - #[doc = " Frame was sent"] - pub struct FrameSent { - pub packet_header: PacketHeader, - pub path_id: u64, - pub frame: Frame, - } - impl IntoEvent for FrameSent { - #[inline] - fn into_event(self) -> api::FrameSent { - let FrameSent { - packet_header, - path_id, - frame, - } = self; - api::FrameSent { - packet_header: packet_header.into_event(), - path_id: path_id.into_event(), - frame: frame.into_event(), - } - } - } - } - pub mod supervisor { - #![doc = r" This module contains the `supervisor::Outcome` and `supervisor::Context` for use"] - #![doc = r" when implementing [`Subscriber::supervisor_timeout`](crate::event::Subscriber::supervisor_timeout) and"] - #![doc = r" [`Subscriber::on_supervisor_timeout`](crate::event::Subscriber::on_supervisor_timeout)"] - #![doc = r" on a Subscriber."] - use crate::{ - application, - event::{builder::SocketAddress, IntoEvent}, - }; - #[non_exhaustive] - #[derive(Clone, Debug, Eq, PartialEq)] - pub enum Outcome { - #[doc = r" Allow the connection to remain open"] - Continue, - #[doc = r" Close the connection and notify the peer"] - Close { error_code: application::Error }, - #[doc = r" Close the connection without notifying the peer"] - ImmediateClose { reason: &'static str }, - } - impl Default for Outcome { - fn default() -> Self { - Self::Continue - } - } - #[non_exhaustive] - #[derive(Debug)] - pub struct Context<'a> { - #[doc = r" Number of handshakes that have begun but not completed"] - pub inflight_handshakes: usize, - #[doc = r" Number of open connections"] - pub connection_count: usize, - #[doc = r" The address of the peer"] - pub remote_address: SocketAddress<'a>, - #[doc = r" True if the connection is in the handshake state, false otherwise"] - pub is_handshaking: bool, - } - impl<'a> Context<'a> { - pub fn new( - inflight_handshakes: usize, - connection_count: usize, - remote_address: &'a crate::inet::SocketAddress, - is_handshaking: bool, - ) -> Self { - Self { - inflight_handshakes, - connection_count, - remote_address: remote_address.into_event(), - is_handshaking, - } - } - } - } - pub use traits::*; - mod traits { - use super::*; - use crate::query; - use api::*; - use core::fmt; - #[doc = r" Provides metadata related to an event"] - pub trait Meta: fmt::Debug { - #[doc = r" Returns whether the local endpoint is a Client or Server"] - fn endpoint_type(&self) -> &EndpointType; - #[doc = r" A context from which the event is being emitted"] - #[doc = r""] - #[doc = r" An event can occur in the context of an Endpoint or Connection"] - fn subject(&self) -> Subject; - #[doc = r" The time the event occurred"] - fn timestamp(&self) -> &crate::event::Timestamp; - } - impl Meta for ConnectionMeta { - fn endpoint_type(&self) -> &EndpointType { - &self.endpoint_type - } - fn subject(&self) -> Subject { - Subject::Connection { id: self.id } - } - fn timestamp(&self) -> &crate::event::Timestamp { - &self.timestamp - } - } - impl Meta for EndpointMeta { - fn endpoint_type(&self) -> &EndpointType { - &self.endpoint_type - } - fn subject(&self) -> Subject { - Subject::Endpoint {} - } - fn timestamp(&self) -> &crate::event::Timestamp { - &self.timestamp - } - } - #[doc = r" Allows for events to be subscribed to"] - pub trait Subscriber: 'static + Send { - #[doc = r" An application provided type associated with each connection."] - #[doc = r""] - #[doc = r" The context provides a mechanism for applications to provide a custom type"] - #[doc = r" and update it on each event, e.g. computing statistics. Each event"] - #[doc = r" invocation (e.g. [`Subscriber::on_packet_sent`]) also provides mutable"] - #[doc = r" access to the context `&mut ConnectionContext` and allows for updating the"] - #[doc = r" context."] - #[doc = r""] - #[doc = r" ```no_run"] - #[doc = r" # mod s2n_quic { pub mod provider { pub mod event {"] - #[doc = r" # pub use s2n_quic_core::event::{api as events, api::ConnectionInfo, api::ConnectionMeta, Subscriber};"] - #[doc = r" # }}}"] - #[doc = r" use s2n_quic::provider::event::{"] - #[doc = r" ConnectionInfo, ConnectionMeta, Subscriber, events::PacketSent"] - #[doc = r" };"] - #[doc = r""] - #[doc = r" pub struct MyEventSubscriber;"] - #[doc = r""] - #[doc = r" pub struct MyEventContext {"] - #[doc = r" packet_sent: u64,"] - #[doc = r" }"] - #[doc = r""] - #[doc = r" impl Subscriber for MyEventSubscriber {"] - #[doc = r" type ConnectionContext = MyEventContext;"] - #[doc = r""] - #[doc = r" fn create_connection_context("] - #[doc = r" &mut self, _meta: &ConnectionMeta,"] - #[doc = r" _info: &ConnectionInfo,"] - #[doc = r" ) -> Self::ConnectionContext {"] - #[doc = r" MyEventContext { packet_sent: 0 }"] - #[doc = r" }"] - #[doc = r""] - #[doc = r" fn on_packet_sent("] - #[doc = r" &mut self,"] - #[doc = r" context: &mut Self::ConnectionContext,"] - #[doc = r" _meta: &ConnectionMeta,"] - #[doc = r" _event: &PacketSent,"] - #[doc = r" ) {"] - #[doc = r" context.packet_sent += 1;"] - #[doc = r" }"] - #[doc = r" }"] - #[doc = r" ```"] - type ConnectionContext: 'static + Send; - #[doc = r" Creates a context to be passed to each connection-related event"] - fn create_connection_context( - &self, - meta: &ConnectionMeta, - info: &ConnectionInfo, - ) -> Self::ConnectionContext; - #[doc = r" The period at which `on_supervisor_timeout` is called"] - #[doc = r""] - #[doc = r" If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout`"] - #[doc = r" across all `event::Subscriber`s will be used."] - #[doc = r""] - #[doc = r" If the `supervisor_timeout()` is `None` across all `event::Subscriber`s, connection supervision"] - #[doc = r" will cease for the remaining lifetime of the connection and `on_supervisor_timeout` will no longer"] - #[doc = r" be called."] - #[doc = r""] - #[doc = r" It is recommended to avoid setting this value less than ~100ms, as short durations"] - #[doc = r" may lead to higher CPU utilization."] - #[allow(unused_variables)] - fn supervisor_timeout( - &self, - conn_context: &mut Self::ConnectionContext, - meta: &ConnectionMeta, - context: &supervisor::Context, - ) -> Option { - None - } - #[doc = r" Called for each `supervisor_timeout` to determine any action to take on the connection based on the `supervisor::Outcome`"] - #[doc = r""] - #[doc = r" If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout`"] - #[doc = r" across all `event::Subscriber`s will be used, and thus `on_supervisor_timeout` may be called"] - #[doc = r" earlier than the `supervisor_timeout` for a given `event::Subscriber` implementation."] - #[allow(unused_variables)] - fn on_supervisor_timeout( - &self, - conn_context: &mut Self::ConnectionContext, - meta: &ConnectionMeta, - context: &supervisor::Context, - ) -> supervisor::Outcome { - supervisor::Outcome::default() - } - #[doc = "Called when the `FrameSent` event is triggered"] - #[inline] - fn on_frame_sent( - &self, - context: &Self::ConnectionContext, - meta: &ConnectionMeta, - event: &FrameSent, - ) { - let _ = context; - let _ = meta; - let _ = event; - } - #[doc = r" Called for each event that relates to the endpoint and all connections"] - #[inline] - fn on_event(&self, meta: &M, event: &E) { - let _ = meta; - let _ = event; - } - #[doc = r" Called for each event that relates to a connection"] - #[inline] - fn on_connection_event( - &self, - context: &Self::ConnectionContext, - meta: &ConnectionMeta, - event: &E, - ) { - let _ = context; - let _ = meta; - let _ = event; - } - #[doc = r" Used for querying the `Subscriber::ConnectionContext` on a Subscriber"] - #[inline] - fn query( - context: &Self::ConnectionContext, - query: &mut dyn query::Query, - ) -> query::ControlFlow { - query.execute(context) - } - #[doc = r" Used for querying and mutating the `Subscriber::ConnectionContext` on a Subscriber"] - #[inline] - fn query_mut( - context: &mut Self::ConnectionContext, - query: &mut dyn query::QueryMut, - ) -> query::ControlFlow { - query.execute_mut(context) - } - } - #[doc = r" Subscriber is implemented for a 2-element tuple to make it easy to compose multiple"] - #[doc = r" subscribers."] - impl Subscriber for (A, B) - where - A: Subscriber, - B: Subscriber, - { - type ConnectionContext = (A::ConnectionContext, B::ConnectionContext); - #[inline] - fn create_connection_context( - &self, - meta: &ConnectionMeta, - info: &ConnectionInfo, - ) -> Self::ConnectionContext { - ( - self.0.create_connection_context(meta, info), - self.1.create_connection_context(meta, info), - ) - } - #[inline] - fn supervisor_timeout( - &self, - conn_context: &mut Self::ConnectionContext, - meta: &ConnectionMeta, - context: &supervisor::Context, - ) -> Option { - let timeout_a = self - .0 - .supervisor_timeout(&mut conn_context.0, meta, context); - let timeout_b = self - .1 - .supervisor_timeout(&mut conn_context.1, meta, context); - match (timeout_a, timeout_b) { - (None, None) => None, - (None, Some(timeout)) | (Some(timeout), None) => Some(timeout), - (Some(a), Some(b)) => Some(a.min(b)), - } - } - #[inline] - fn on_supervisor_timeout( - &self, - conn_context: &mut Self::ConnectionContext, - meta: &ConnectionMeta, - context: &supervisor::Context, - ) -> supervisor::Outcome { - let outcome_a = self - .0 - .on_supervisor_timeout(&mut conn_context.0, meta, context); - let outcome_b = self - .1 - .on_supervisor_timeout(&mut conn_context.1, meta, context); - match (outcome_a, outcome_b) { - (supervisor::Outcome::ImmediateClose { reason }, _) - | (_, supervisor::Outcome::ImmediateClose { reason }) => { - supervisor::Outcome::ImmediateClose { reason } - } - (supervisor::Outcome::Close { error_code }, _) - | (_, supervisor::Outcome::Close { error_code }) => { - supervisor::Outcome::Close { error_code } - } - _ => supervisor::Outcome::Continue, - } - } - #[inline] - fn on_frame_sent( - &self, - context: &Self::ConnectionContext, - meta: &ConnectionMeta, - event: &FrameSent, - ) { - (self.0).on_frame_sent(&context.0, meta, event); - (self.1).on_frame_sent(&context.1, meta, event); - } - #[inline] - fn on_event(&self, meta: &M, event: &E) { - self.0.on_event(meta, event); - self.1.on_event(meta, event); - } - #[inline] - fn on_connection_event( - &self, - context: &Self::ConnectionContext, - meta: &ConnectionMeta, - event: &E, - ) { - self.0.on_connection_event(&context.0, meta, event); - self.1.on_connection_event(&context.1, meta, event); - } - #[inline] - fn query( - context: &Self::ConnectionContext, - query: &mut dyn query::Query, - ) -> query::ControlFlow { - query - .execute(context) - .and_then(|| A::query(&context.0, query)) - .and_then(|| B::query(&context.1, query)) - } - #[inline] - fn query_mut( - context: &mut Self::ConnectionContext, - query: &mut dyn query::QueryMut, - ) -> query::ControlFlow { - query - .execute_mut(context) - .and_then(|| A::query_mut(&mut context.0, query)) - .and_then(|| B::query_mut(&mut context.1, query)) - } - } - pub trait EndpointPublisher { - #[doc = r" Returns the QUIC version, if any"] - fn quic_version(&self) -> Option; - } - pub struct EndpointPublisherSubscriber<'a, Sub: Subscriber> { - meta: EndpointMeta, - quic_version: Option, - subscriber: &'a mut Sub, - } - impl<'a, Sub: Subscriber> fmt::Debug for EndpointPublisherSubscriber<'a, Sub> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("ConnectionPublisherSubscriber") - .field("meta", &self.meta) - .field("quic_version", &self.quic_version) - .finish() - } - } - impl<'a, Sub: Subscriber> EndpointPublisherSubscriber<'a, Sub> { - #[inline] - pub fn new( - meta: builder::EndpointMeta, - quic_version: Option, - subscriber: &'a mut Sub, - ) -> Self { - Self { - meta: meta.into_event(), - quic_version, - subscriber, - } - } - } - impl<'a, Sub: Subscriber> EndpointPublisher for EndpointPublisherSubscriber<'a, Sub> { - #[inline] - fn quic_version(&self) -> Option { - self.quic_version - } - } - pub trait ConnectionPublisher { - #[doc = "Publishes a `FrameSent` event to the publisher's subscriber"] - fn on_frame_sent(&self, event: builder::FrameSent); - #[doc = r" Returns the QUIC version negotiated for the current connection, if any"] - fn quic_version(&self) -> u32; - #[doc = r" Returns the [`Subject`] for the current publisher"] - fn subject(&self) -> Subject; - } - pub struct ConnectionPublisherSubscriber<'a, Sub: Subscriber> { - meta: ConnectionMeta, - quic_version: u32, - subscriber: &'a Sub, - context: &'a Sub::ConnectionContext, - } - impl<'a, Sub: Subscriber> fmt::Debug for ConnectionPublisherSubscriber<'a, Sub> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("ConnectionPublisherSubscriber") - .field("meta", &self.meta) - .field("quic_version", &self.quic_version) - .finish() - } - } - impl<'a, Sub: Subscriber> ConnectionPublisherSubscriber<'a, Sub> { - #[inline] - pub fn new( - meta: builder::ConnectionMeta, - quic_version: u32, - subscriber: &'a Sub, - context: &'a Sub::ConnectionContext, - ) -> Self { - Self { - meta: meta.into_event(), - quic_version, - subscriber, - context, - } - } - } - impl<'a, Sub: Subscriber> ConnectionPublisher for ConnectionPublisherSubscriber<'a, Sub> { - #[inline] - fn on_frame_sent(&self, event: builder::FrameSent) { - let event = event.into_event(); - self.subscriber - .on_frame_sent(self.context, &self.meta, &event); - self.subscriber - .on_connection_event(self.context, &self.meta, &event); - self.subscriber.on_event(&self.meta, &event); - } - #[inline] - fn quic_version(&self) -> u32 { - self.quic_version - } - #[inline] - fn subject(&self) -> api::Subject { - self.meta.subject() - } - } - } - #[cfg(any(test, feature = "testing"))] - pub mod testing { - use super::*; - use core::sync::atomic::{AtomicU32, Ordering}; - use std::sync::{Arc, Mutex}; - #[derive(Clone, Debug)] - pub struct Subscriber { - location: Option, - output: Arc>>, - pub frame_sent: Arc, - } - impl Drop for Subscriber { - fn drop(&mut self) { - if std::thread::panicking() { - return; - } - if let Some(location) = self.location.as_ref() { - location.snapshot(&self.output.lock().unwrap()); - } - } - } - impl Subscriber { - #[doc = r" Creates a subscriber with snapshot assertions enabled"] - #[track_caller] - pub fn snapshot() -> Self { - let mut sub = Self::no_snapshot(); - sub.location = Location::try_new(); - sub - } - #[doc = r" Creates a subscriber with snapshot assertions disabled"] - pub fn no_snapshot() -> Self { - Self { - location: None, - output: Default::default(), - frame_sent: Arc::new(AtomicU32::new(0)), - } - } - } - impl super::Subscriber for Subscriber { - type ConnectionContext = (); - fn create_connection_context( - &self, - _meta: &api::ConnectionMeta, - _info: &api::ConnectionInfo, - ) -> Self::ConnectionContext { - } - fn on_frame_sent( - &self, - _context: &Self::ConnectionContext, - meta: &api::ConnectionMeta, - event: &api::FrameSent, - ) { - self.frame_sent.fetch_add(1, Ordering::SeqCst); - if self.location.is_some() { - self.output - .lock() - .unwrap() - .push(format!("{meta:?} {event:?}")); - } - } - } - #[derive(Clone, Debug)] - pub struct Publisher { - location: Option, - output: Arc>>, - pub frame_sent: Arc, - } - impl Publisher { - #[doc = r" Creates a publisher with snapshot assertions enabled"] - #[track_caller] - pub fn snapshot() -> Self { - let mut sub = Self::no_snapshot(); - sub.location = Location::try_new(); - sub - } - #[doc = r" Creates a publisher with snapshot assertions disabled"] - pub fn no_snapshot() -> Self { - Self { - location: None, - output: Default::default(), - frame_sent: Arc::new(AtomicU32::new(0)), - } - } - } - impl super::EndpointPublisher for Publisher { - fn quic_version(&self) -> Option { - Some(1) - } - } - impl super::ConnectionPublisher for Publisher { - fn on_frame_sent(&self, event: builder::FrameSent) { - self.frame_sent.fetch_add(1, Ordering::SeqCst); - let event = event.into_event(); - if self.location.is_some() { - self.output.lock().unwrap().push(format!("{event:?}")); - } - } - fn quic_version(&self) -> u32 { - 1 - } - fn subject(&self) -> api::Subject { - api::Subject::Connection { id: 0 } - } - } - impl Drop for Publisher { - fn drop(&mut self) { - if std::thread::panicking() { - return; - } - if let Some(location) = self.location.as_ref() { - location.snapshot(&self.output.lock().unwrap()); - } - } - } - #[derive(Clone, Debug)] - struct Location(&'static core::panic::Location<'static>); - impl Location { - #[track_caller] - fn try_new() -> Option { - let thread = std::thread::current(); - if thread.name().map_or(false, |name| name != "main") { - Some(Self(core::panic::Location::caller())) - } else { - None - } - } - fn snapshot(&self, output: &[String]) { - if cfg!(miri) { - return; - } - use std::path::{Component, Path}; - let value = output.join("\n"); - let thread = std::thread::current(); - let function_name = thread.name().unwrap(); - let test_path = Path::new(self.0.file().trim_end_matches(".rs")); - let module_path = test_path - .components() - .filter_map(|comp| match comp { - Component::Normal(comp) => comp.to_str(), - _ => Some("_"), - }) - .chain(Some("events")) - .collect::>() - .join("::"); - let current_dir = std::env::current_dir().unwrap(); - insta::_macro_support::assert_snapshot( - insta::_macro_support::AutoName.into(), - &value, - current_dir.to_str().unwrap(), - function_name, - &module_path, - self.0.file(), - self.0.line(), - "", - ) - .unwrap() - } - } - } } #[cfg(feature = "event-tracing")] pub mod tracing { From a4ef0c614b7c82048701967b29455c43e0573082 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Tue, 17 Sep 2024 12:15:11 -0700 Subject: [PATCH 4/8] clippy rustfmt --- quic/s2n-quic-events/src/main.rs | 36 ++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index 34eae272c4..96ff1899cf 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -44,26 +44,28 @@ impl OutputMode { } } - fn lock(&self) ->TokenStream { + fn lock(&self) -> TokenStream { match self { OutputMode::Ref => quote!(.lock().unwrap()), OutputMode::Mut => quote!(), } } - fn imports(&self) ->TokenStream { + fn imports(&self) -> TokenStream { match self { - OutputMode::Ref => quote!(use core::sync::atomic::{AtomicU32, Ordering}; - use std::sync::{Arc, Mutex}; ), + OutputMode::Ref => quote!( + use core::sync::atomic::{AtomicU32, Ordering}; + use std::sync::{Arc, Mutex}; + ), OutputMode::Mut => quote!(), } } - fn testing_output_type(&self) ->TokenStream { + fn testing_output_type(&self) -> TokenStream { match self { OutputMode::Ref => quote!(Arc>>), OutputMode::Mut => quote!(Vec), - } + } } } @@ -113,7 +115,7 @@ impl ToTokens for Output { extra, mode, } = self; - + let imports = self.mode.imports(); let testing_output_type = self.mode.testing_output_type(); let lock = self.mode.lock(); @@ -748,13 +750,22 @@ struct EventInfo<'a> { fn main() -> Result<()> { let event_paths = [ EventInfo { - input_path: concat!(env!("CARGO_MANIFEST_DIR"), "/../../dc/s2n-quic-dc/src/event/**/*.rs"), - output_path: concat!(env!("CARGO_MANIFEST_DIR"), "/../../dc/s2n-quic-dc/src/event/generated.rs"), + input_path: concat!( + env!("CARGO_MANIFEST_DIR"), + "/../../dc/s2n-quic-dc/src/event/**/*.rs" + ), + output_path: concat!( + env!("CARGO_MANIFEST_DIR"), + "/../../dc/s2n-quic-dc/src/event/generated.rs" + ), output_mode: OutputMode::Ref, }, EventInfo { - input_path: concat!(env!("CARGO_MANIFEST_DIR"),"/events/**/*.rs"), - output_path: concat!(env!("CARGO_MANIFEST_DIR"),"/../s2n-quic-core/src/event/generated.rs"), + input_path: concat!(env!("CARGO_MANIFEST_DIR"), "/events/**/*.rs"), + output_path: concat!( + env!("CARGO_MANIFEST_DIR"), + "/../s2n-quic-core/src/event/generated.rs" + ), output_mode: OutputMode::Mut, }, ]; @@ -768,8 +779,7 @@ fn main() -> Result<()> { files.push(parser::parse(&file).unwrap()); } - let mut output = Output::default(); - output.mode = event_info.output_mode; + let mut output = Output { mode: event_info.output_mode, ..Default::default() }; for file in &files { file.to_tokens(&mut output); From 3837bb3d42a6ecbfd22eca23b65311e6833b42ed Mon Sep 17 00:00:00 2001 From: Appelmans Date: Tue, 17 Sep 2024 12:27:19 -0700 Subject: [PATCH 5/8] more rustfmt --- quic/s2n-quic-core/src/event/generated.rs | 2 +- quic/s2n-quic-events/src/main.rs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/quic/s2n-quic-core/src/event/generated.rs b/quic/s2n-quic-core/src/event/generated.rs index c447a0e95c..5561dfa15a 100644 --- a/quic/s2n-quic-core/src/event/generated.rs +++ b/quic/s2n-quic-core/src/event/generated.rs @@ -1282,7 +1282,7 @@ pub mod api { } } macro_rules! impl_conn_id { - ($ name : ident) => { + ($name:ident) => { impl<'a> IntoEvent> for &'a crate::connection::id::$name { #[inline] fn into_event(self) -> builder::ConnectionId<'a> { diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index 96ff1899cf..cc262ee77a 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -779,7 +779,10 @@ fn main() -> Result<()> { files.push(parser::parse(&file).unwrap()); } - let mut output = Output { mode: event_info.output_mode, ..Default::default() }; + let mut output = Output { + mode: event_info.output_mode, + ..Default::default() + }; for file in &files { file.to_tokens(&mut output); From 97c25b288b9dfa355f157c213f1106e17f6ef70b Mon Sep 17 00:00:00 2001 From: Appelmans Date: Tue, 17 Sep 2024 14:16:53 -0700 Subject: [PATCH 6/8] Changed event path --- quic/s2n-quic-events/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index cc262ee77a..31ef88ba5d 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -752,7 +752,7 @@ fn main() -> Result<()> { EventInfo { input_path: concat!( env!("CARGO_MANIFEST_DIR"), - "/../../dc/s2n-quic-dc/src/event/**/*.rs" + "/../../dc/s2n-quic-dc/src/event/events.rs" ), output_path: concat!( env!("CARGO_MANIFEST_DIR"), From 8485ffd11d32d9851e95270ce1e00aabdeb013b7 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Wed, 18 Sep 2024 10:56:43 -0700 Subject: [PATCH 7/8] Tokenize all the things --- dc/s2n-quic-dc/src/event/generated.rs | 103 ++---------- quic/s2n-quic-events/src/main.rs | 215 ++++++++++++++++---------- 2 files changed, 142 insertions(+), 176 deletions(-) diff --git a/dc/s2n-quic-dc/src/event/generated.rs b/dc/s2n-quic-dc/src/event/generated.rs index 4fab64e7d3..891c89b972 100644 --- a/dc/s2n-quic-dc/src/event/generated.rs +++ b/dc/s2n-quic-dc/src/event/generated.rs @@ -34,8 +34,7 @@ pub mod tracing { } impl Default for Subscriber { fn default() -> Self { - let root = - tracing :: span ! (target : "s2n_quic" , tracing :: Level :: DEBUG , "s2n_quic"); + let root = tracing :: span ! (target : "s2n_quic_dc" , tracing :: Level :: DEBUG , "s2n_quic_dc"); let client = tracing :: span ! (parent : root . id () , tracing :: Level :: DEBUG , "client"); let server = @@ -54,7 +53,7 @@ pub mod tracing { api::EndpointType::Client {} => self.client.id(), api::EndpointType::Server {} => self.server.id(), }; - tracing :: span ! (target : "s2n_quic" , parent : parent , tracing :: Level :: DEBUG , "conn" , id = meta . id) + tracing :: span ! (target : "s2n_quic_dc" , parent : parent , tracing :: Level :: DEBUG , "conn" , id = meta . id) } #[inline] fn on_frame_sent( @@ -98,58 +97,6 @@ pub mod builder { } } } -pub mod supervisor { - #![doc = r" This module contains the `supervisor::Outcome` and `supervisor::Context` for use"] - #![doc = r" when implementing [`Subscriber::supervisor_timeout`](crate::event::Subscriber::supervisor_timeout) and"] - #![doc = r" [`Subscriber::on_supervisor_timeout`](crate::event::Subscriber::on_supervisor_timeout)"] - #![doc = r" on a Subscriber."] - use crate::{ - application, - event::{builder::SocketAddress, IntoEvent}, - }; - #[non_exhaustive] - #[derive(Clone, Debug, Eq, PartialEq)] - pub enum Outcome { - #[doc = r" Allow the connection to remain open"] - Continue, - #[doc = r" Close the connection and notify the peer"] - Close { error_code: application::Error }, - #[doc = r" Close the connection without notifying the peer"] - ImmediateClose { reason: &'static str }, - } - impl Default for Outcome { - fn default() -> Self { - Self::Continue - } - } - #[non_exhaustive] - #[derive(Debug)] - pub struct Context<'a> { - #[doc = r" Number of handshakes that have begun but not completed"] - pub inflight_handshakes: usize, - #[doc = r" Number of open connections"] - pub connection_count: usize, - #[doc = r" The address of the peer"] - pub remote_address: SocketAddress<'a>, - #[doc = r" True if the connection is in the handshake state, false otherwise"] - pub is_handshaking: bool, - } - impl<'a> Context<'a> { - pub fn new( - inflight_handshakes: usize, - connection_count: usize, - remote_address: &'a crate::inet::SocketAddress, - is_handshaking: bool, - ) -> Self { - Self { - inflight_handshakes, - connection_count, - remote_address: remote_address.into_event(), - is_handshaking, - } - } - } -} pub use traits::*; mod traits { use super::*; @@ -254,7 +201,7 @@ mod traits { #[allow(unused_variables)] fn supervisor_timeout( &self, - conn_context: &mut Self::ConnectionContext, + conn_context: &Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context, ) -> Option { @@ -268,7 +215,7 @@ mod traits { #[allow(unused_variables)] fn on_supervisor_timeout( &self, - conn_context: &mut Self::ConnectionContext, + conn_context: &Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context, ) -> supervisor::Outcome { @@ -312,14 +259,6 @@ mod traits { ) -> query::ControlFlow { query.execute(context) } - #[doc = r" Used for querying and mutating the `Subscriber::ConnectionContext` on a Subscriber"] - #[inline] - fn query_mut( - context: &mut Self::ConnectionContext, - query: &mut dyn query::QueryMut, - ) -> query::ControlFlow { - query.execute_mut(context) - } } #[doc = r" Subscriber is implemented for a 2-element tuple to make it easy to compose multiple"] #[doc = r" subscribers."] @@ -343,16 +282,12 @@ mod traits { #[inline] fn supervisor_timeout( &self, - conn_context: &mut Self::ConnectionContext, + conn_context: &Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context, ) -> Option { - let timeout_a = self - .0 - .supervisor_timeout(&mut conn_context.0, meta, context); - let timeout_b = self - .1 - .supervisor_timeout(&mut conn_context.1, meta, context); + let timeout_a = self.0.supervisor_timeout(&conn_context.0, meta, context); + let timeout_b = self.1.supervisor_timeout(&conn_context.1, meta, context); match (timeout_a, timeout_b) { (None, None) => None, (None, Some(timeout)) | (Some(timeout), None) => Some(timeout), @@ -362,16 +297,12 @@ mod traits { #[inline] fn on_supervisor_timeout( &self, - conn_context: &mut Self::ConnectionContext, + conn_context: &Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context, ) -> supervisor::Outcome { - let outcome_a = self - .0 - .on_supervisor_timeout(&mut conn_context.0, meta, context); - let outcome_b = self - .1 - .on_supervisor_timeout(&mut conn_context.1, meta, context); + let outcome_a = self.0.on_supervisor_timeout(&conn_context.0, meta, context); + let outcome_b = self.1.on_supervisor_timeout(&conn_context.1, meta, context); match (outcome_a, outcome_b) { (supervisor::Outcome::ImmediateClose { reason }, _) | (_, supervisor::Outcome::ImmediateClose { reason }) => { @@ -419,16 +350,6 @@ mod traits { .and_then(|| A::query(&context.0, query)) .and_then(|| B::query(&context.1, query)) } - #[inline] - fn query_mut( - context: &mut Self::ConnectionContext, - query: &mut dyn query::QueryMut, - ) -> query::ControlFlow { - query - .execute_mut(context) - .and_then(|| A::query_mut(&mut context.0, query)) - .and_then(|| B::query_mut(&mut context.1, query)) - } } pub trait EndpointPublisher { #[doc = r" Returns the QUIC version, if any"] @@ -437,7 +358,7 @@ mod traits { pub struct EndpointPublisherSubscriber<'a, Sub: Subscriber> { meta: EndpointMeta, quic_version: Option, - subscriber: &'a mut Sub, + subscriber: &'a Sub, } impl<'a, Sub: Subscriber> fmt::Debug for EndpointPublisherSubscriber<'a, Sub> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -452,7 +373,7 @@ mod traits { pub fn new( meta: builder::EndpointMeta, quic_version: Option, - subscriber: &'a mut Sub, + subscriber: &'a Sub, ) -> Self { Self { meta: meta.into_event(), diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index 31ef88ba5d..ccb9aa5931 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -67,6 +67,117 @@ impl OutputMode { OutputMode::Mut => quote!(Vec), } } + + fn target_crate(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!("s2n_quic_dc"), + OutputMode::Mut => quote!("s2n_quic"), + } + } + + fn query_mut(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(), + OutputMode::Mut => quote!( + /// Used for querying and mutating the `Subscriber::ConnectionContext` on a Subscriber + #[inline] + fn query_mut( + context: &mut Self::ConnectionContext, + query: &mut dyn query::QueryMut, + ) -> query::ControlFlow { + query.execute_mut(context) + } + ), + } + } + + fn query_mut_tuple(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(), + OutputMode::Mut => quote!( + #[inline] + fn query_mut( + context: &mut Self::ConnectionContext, + query: &mut dyn query::QueryMut, + ) -> query::ControlFlow { + query + .execute_mut(context) + .and_then(|| A::query_mut(&mut context.0, query)) + .and_then(|| B::query_mut(&mut context.1, query)) + } + ), + } + } + + fn supervisor(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(), + OutputMode::Mut => quote!( + pub mod supervisor { + //! This module contains the `supervisor::Outcome` and `supervisor::Context` for use + //! when implementing [`Subscriber::supervisor_timeout`](crate::event::Subscriber::supervisor_timeout) and + //! [`Subscriber::on_supervisor_timeout`](crate::event::Subscriber::on_supervisor_timeout) + //! on a Subscriber. + + use crate::{ + application, + event::{builder::SocketAddress, IntoEvent}, + }; + + #[non_exhaustive] + #[derive(Clone, Debug, Eq, PartialEq)] + pub enum Outcome { + /// Allow the connection to remain open + Continue, + + /// Close the connection and notify the peer + Close { error_code: application::Error }, + + /// Close the connection without notifying the peer + ImmediateClose { reason: &'static str }, + } + + impl Default for Outcome { + fn default() -> Self { + Self::Continue + } + } + + #[non_exhaustive] + #[derive(Debug)] + pub struct Context<'a> { + /// Number of handshakes that have begun but not completed + pub inflight_handshakes: usize, + + /// Number of open connections + pub connection_count: usize, + + /// The address of the peer + pub remote_address: SocketAddress<'a>, + + /// True if the connection is in the handshake state, false otherwise + pub is_handshaking: bool, + } + + impl<'a> Context<'a> { + pub fn new( + inflight_handshakes: usize, + connection_count: usize, + remote_address: &'a crate::inet::SocketAddress, + is_handshaking: bool, + ) -> Self { + Self { + inflight_handshakes, + connection_count, + remote_address: remote_address.into_event(), + is_handshaking, + } + } + } + } + ), + } + } } impl ToTokens for OutputMode { @@ -119,6 +230,10 @@ impl ToTokens for Output { let imports = self.mode.imports(); let testing_output_type = self.mode.testing_output_type(); let lock = self.mode.lock(); + let target_crate = self.mode.target_crate(); + let supervisor = self.mode.supervisor(); + let query_mut = self.mode.query_mut(); + let query_mut_tuple = self.mode.query_mut_tuple(); tokens.extend(quote!( use super::*; @@ -148,7 +263,7 @@ impl ToTokens for Output { impl Default for Subscriber { fn default() -> Self { - let root = tracing::span!(target: "s2n_quic", tracing::Level::DEBUG, "s2n_quic"); + let root = tracing::span!(target: #target_crate, tracing::Level::DEBUG, #target_crate); let client = tracing::span!(parent: root.id(), tracing::Level::DEBUG, "client"); let server = tracing::span!(parent: root.id(), tracing::Level::DEBUG, "server"); @@ -171,7 +286,7 @@ impl ToTokens for Output { self.server.id() } }; - tracing::span!(target: "s2n_quic", parent: parent, tracing::Level::DEBUG, "conn", id = meta.id) + tracing::span!(target: #target_crate, parent: parent, tracing::Level::DEBUG, "conn", id = meta.id) } #tracing_subscriber @@ -184,68 +299,7 @@ impl ToTokens for Output { #builders } - pub mod supervisor { - //! This module contains the `supervisor::Outcome` and `supervisor::Context` for use - //! when implementing [`Subscriber::supervisor_timeout`](crate::event::Subscriber::supervisor_timeout) and - //! [`Subscriber::on_supervisor_timeout`](crate::event::Subscriber::on_supervisor_timeout) - //! on a Subscriber. - - use crate::{ - application, - event::{builder::SocketAddress, IntoEvent}, - }; - - #[non_exhaustive] - #[derive(Clone, Debug, Eq, PartialEq)] - pub enum Outcome { - /// Allow the connection to remain open - Continue, - - /// Close the connection and notify the peer - Close {error_code: application::Error}, - - /// Close the connection without notifying the peer - ImmediateClose {reason: &'static str}, - } - - impl Default for Outcome { - fn default() -> Self { - Self::Continue - } - } - - #[non_exhaustive] - #[derive(Debug)] - pub struct Context<'a> { - /// Number of handshakes that have begun but not completed - pub inflight_handshakes: usize, - - /// Number of open connections - pub connection_count: usize, - - /// The address of the peer - pub remote_address: SocketAddress<'a>, - - /// True if the connection is in the handshake state, false otherwise - pub is_handshaking: bool, - } - - impl<'a> Context<'a> { - pub fn new( - inflight_handshakes: usize, - connection_count: usize, - remote_address: &'a crate::inet::SocketAddress, - is_handshaking: bool, - ) -> Self { - Self { - inflight_handshakes, - connection_count, - remote_address: remote_address.into_event(), - is_handshaking, - } - } - } - } + #supervisor pub use traits::*; mod traits { @@ -358,7 +412,7 @@ impl ToTokens for Output { /// It is recommended to avoid setting this value less than ~100ms, as short durations /// may lead to higher CPU utilization. #[allow(unused_variables)] - fn supervisor_timeout(&#mode self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> Option { + fn supervisor_timeout(&#mode self, conn_context: &#mode Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> Option { None } @@ -368,7 +422,7 @@ impl ToTokens for Output { /// across all `event::Subscriber`s will be used, and thus `on_supervisor_timeout` may be called /// earlier than the `supervisor_timeout` for a given `event::Subscriber` implementation. #[allow(unused_variables)] - fn on_supervisor_timeout(&#mode self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> supervisor::Outcome { + fn on_supervisor_timeout(&#mode self, conn_context: &#mode Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> supervisor::Outcome { supervisor::Outcome::default() } @@ -395,11 +449,7 @@ impl ToTokens for Output { query.execute(context) } - /// Used for querying and mutating the `Subscriber::ConnectionContext` on a Subscriber - #[inline] - fn query_mut(context: &mut Self::ConnectionContext, query: &mut dyn query::QueryMut) -> query::ControlFlow { - query.execute_mut(context) - } + #query_mut } /// Subscriber is implemented for a 2-element tuple to make it easy to compose multiple @@ -417,9 +467,9 @@ impl ToTokens for Output { } #[inline] - fn supervisor_timeout(&#mode self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> Option { - let timeout_a = self.0.supervisor_timeout(&mut conn_context.0, meta, context); - let timeout_b = self.1.supervisor_timeout(&mut conn_context.1, meta, context); + fn supervisor_timeout(&#mode self, conn_context: &#mode Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> Option { + let timeout_a = self.0.supervisor_timeout(&#mode conn_context.0, meta, context); + let timeout_b = self.1.supervisor_timeout(&#mode conn_context.1, meta, context); match (timeout_a, timeout_b) { (None, None) => None, (None, Some(timeout)) | (Some(timeout), None) => Some(timeout), @@ -428,9 +478,9 @@ impl ToTokens for Output { } #[inline] - fn on_supervisor_timeout(&#mode self, conn_context: &mut Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> supervisor::Outcome { - let outcome_a = self.0.on_supervisor_timeout(&mut conn_context.0, meta, context); - let outcome_b = self.1.on_supervisor_timeout(&mut conn_context.1, meta, context); + fn on_supervisor_timeout(&#mode self, conn_context: &#mode Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> supervisor::Outcome { + let outcome_a = self.0.on_supervisor_timeout(&#mode conn_context.0, meta, context); + let outcome_b = self.1.on_supervisor_timeout(&#mode conn_context.1, meta, context); match (outcome_a, outcome_b) { (supervisor::Outcome::ImmediateClose { reason }, _) | (_, supervisor::Outcome::ImmediateClose { reason }) => supervisor::Outcome::ImmediateClose { reason }, (supervisor::Outcome::Close { error_code }, _) | (_, supervisor::Outcome::Close { error_code }) => supervisor::Outcome::Close { error_code }, @@ -459,12 +509,7 @@ impl ToTokens for Output { .and_then(|| B::query(&context.1, query)) } - #[inline] - fn query_mut(context: &mut Self::ConnectionContext, query: &mut dyn query::QueryMut) -> query::ControlFlow { - query.execute_mut(context) - .and_then(|| A::query_mut(&mut context.0, query)) - .and_then(|| B::query_mut(&mut context.1, query)) - } + #query_mut_tuple } pub trait EndpointPublisher { @@ -477,7 +522,7 @@ impl ToTokens for Output { pub struct EndpointPublisherSubscriber<'a, Sub: Subscriber> { meta: EndpointMeta, quic_version: Option, - subscriber: &'a mut Sub, + subscriber: &'a #mode Sub, } impl<'a, Sub: Subscriber> fmt::Debug for EndpointPublisherSubscriber<'a, Sub> { @@ -494,7 +539,7 @@ impl ToTokens for Output { pub fn new( meta: builder::EndpointMeta, quic_version: Option, - subscriber: &'a mut Sub, + subscriber: &'a #mode Sub, ) -> Self { Self { meta: meta.into_event(), From 0a39346785aa76c4f48e5b8ceebd55c8c3886ad4 Mon Sep 17 00:00:00 2001 From: Appelmans Date: Wed, 18 Sep 2024 11:17:44 -0700 Subject: [PATCH 8/8] More tokenization --- dc/s2n-quic-dc/src/event/generated.rs | 70 ------------ quic/s2n-quic-events/src/main.rs | 146 ++++++++++++++++++-------- 2 files changed, 100 insertions(+), 116 deletions(-) diff --git a/dc/s2n-quic-dc/src/event/generated.rs b/dc/s2n-quic-dc/src/event/generated.rs index 891c89b972..7639587eea 100644 --- a/dc/s2n-quic-dc/src/event/generated.rs +++ b/dc/s2n-quic-dc/src/event/generated.rs @@ -187,40 +187,6 @@ mod traits { meta: &ConnectionMeta, info: &ConnectionInfo, ) -> Self::ConnectionContext; - #[doc = r" The period at which `on_supervisor_timeout` is called"] - #[doc = r""] - #[doc = r" If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout`"] - #[doc = r" across all `event::Subscriber`s will be used."] - #[doc = r""] - #[doc = r" If the `supervisor_timeout()` is `None` across all `event::Subscriber`s, connection supervision"] - #[doc = r" will cease for the remaining lifetime of the connection and `on_supervisor_timeout` will no longer"] - #[doc = r" be called."] - #[doc = r""] - #[doc = r" It is recommended to avoid setting this value less than ~100ms, as short durations"] - #[doc = r" may lead to higher CPU utilization."] - #[allow(unused_variables)] - fn supervisor_timeout( - &self, - conn_context: &Self::ConnectionContext, - meta: &ConnectionMeta, - context: &supervisor::Context, - ) -> Option { - None - } - #[doc = r" Called for each `supervisor_timeout` to determine any action to take on the connection based on the `supervisor::Outcome`"] - #[doc = r""] - #[doc = r" If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout`"] - #[doc = r" across all `event::Subscriber`s will be used, and thus `on_supervisor_timeout` may be called"] - #[doc = r" earlier than the `supervisor_timeout` for a given `event::Subscriber` implementation."] - #[allow(unused_variables)] - fn on_supervisor_timeout( - &self, - conn_context: &Self::ConnectionContext, - meta: &ConnectionMeta, - context: &supervisor::Context, - ) -> supervisor::Outcome { - supervisor::Outcome::default() - } #[doc = "Called when the `FrameSent` event is triggered"] #[inline] fn on_frame_sent( @@ -280,42 +246,6 @@ mod traits { ) } #[inline] - fn supervisor_timeout( - &self, - conn_context: &Self::ConnectionContext, - meta: &ConnectionMeta, - context: &supervisor::Context, - ) -> Option { - let timeout_a = self.0.supervisor_timeout(&conn_context.0, meta, context); - let timeout_b = self.1.supervisor_timeout(&conn_context.1, meta, context); - match (timeout_a, timeout_b) { - (None, None) => None, - (None, Some(timeout)) | (Some(timeout), None) => Some(timeout), - (Some(a), Some(b)) => Some(a.min(b)), - } - } - #[inline] - fn on_supervisor_timeout( - &self, - conn_context: &Self::ConnectionContext, - meta: &ConnectionMeta, - context: &supervisor::Context, - ) -> supervisor::Outcome { - let outcome_a = self.0.on_supervisor_timeout(&conn_context.0, meta, context); - let outcome_b = self.1.on_supervisor_timeout(&conn_context.1, meta, context); - match (outcome_a, outcome_b) { - (supervisor::Outcome::ImmediateClose { reason }, _) - | (_, supervisor::Outcome::ImmediateClose { reason }) => { - supervisor::Outcome::ImmediateClose { reason } - } - (supervisor::Outcome::Close { error_code }, _) - | (_, supervisor::Outcome::Close { error_code }) => { - supervisor::Outcome::Close { error_code } - } - _ => supervisor::Outcome::Continue, - } - } - #[inline] fn on_frame_sent( &self, context: &Self::ConnectionContext, diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index ccb9aa5931..97387e2167 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -178,6 +178,102 @@ impl OutputMode { ), } } + + fn supervisor_timeout(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(), + OutputMode::Mut => quote!( + /// The period at which `on_supervisor_timeout` is called + /// + /// If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout` + /// across all `event::Subscriber`s will be used. + /// + /// If the `supervisor_timeout()` is `None` across all `event::Subscriber`s, connection supervision + /// will cease for the remaining lifetime of the connection and `on_supervisor_timeout` will no longer + /// be called. + /// + /// It is recommended to avoid setting this value less than ~100ms, as short durations + /// may lead to higher CPU utilization. + #[allow(unused_variables)] + fn supervisor_timeout( + &mut self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> Option { + None + } + + /// Called for each `supervisor_timeout` to determine any action to take on the connection based on the `supervisor::Outcome` + /// + /// If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout` + /// across all `event::Subscriber`s will be used, and thus `on_supervisor_timeout` may be called + /// earlier than the `supervisor_timeout` for a given `event::Subscriber` implementation. + #[allow(unused_variables)] + fn on_supervisor_timeout( + &mut self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> supervisor::Outcome { + supervisor::Outcome::default() + } + ), + } + } + + fn supervisor_timeout_tuple(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(), + OutputMode::Mut => quote!( + #[inline] + fn supervisor_timeout( + &mut self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> Option { + let timeout_a = self + .0 + .supervisor_timeout(&mut conn_context.0, meta, context); + let timeout_b = self + .1 + .supervisor_timeout(&mut conn_context.1, meta, context); + match (timeout_a, timeout_b) { + (None, None) => None, + (None, Some(timeout)) | (Some(timeout), None) => Some(timeout), + (Some(a), Some(b)) => Some(a.min(b)), + } + } + + #[inline] + fn on_supervisor_timeout( + &mut self, + conn_context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + context: &supervisor::Context, + ) -> supervisor::Outcome { + let outcome_a = + self.0 + .on_supervisor_timeout(&mut conn_context.0, meta, context); + let outcome_b = + self.1 + .on_supervisor_timeout(&mut conn_context.1, meta, context); + match (outcome_a, outcome_b) { + (supervisor::Outcome::ImmediateClose { reason }, _) + | (_, supervisor::Outcome::ImmediateClose { reason }) => { + supervisor::Outcome::ImmediateClose { reason } + } + (supervisor::Outcome::Close { error_code }, _) + | (_, supervisor::Outcome::Close { error_code }) => { + supervisor::Outcome::Close { error_code } + } + _ => supervisor::Outcome::Continue, + } + } + ), + } + } } impl ToTokens for OutputMode { @@ -232,6 +328,8 @@ impl ToTokens for Output { let lock = self.mode.lock(); let target_crate = self.mode.target_crate(); let supervisor = self.mode.supervisor(); + let supervisor_timeout = self.mode.supervisor_timeout(); + let supervisor_timeout_tuple = self.mode.supervisor_timeout_tuple(); let query_mut = self.mode.query_mut(); let query_mut_tuple = self.mode.query_mut_tuple(); @@ -400,31 +498,7 @@ impl ToTokens for Output { /// Creates a context to be passed to each connection-related event fn create_connection_context(&#mode self, meta: &ConnectionMeta, info: &ConnectionInfo) -> Self::ConnectionContext; - /// The period at which `on_supervisor_timeout` is called - /// - /// If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout` - /// across all `event::Subscriber`s will be used. - /// - /// If the `supervisor_timeout()` is `None` across all `event::Subscriber`s, connection supervision - /// will cease for the remaining lifetime of the connection and `on_supervisor_timeout` will no longer - /// be called. - /// - /// It is recommended to avoid setting this value less than ~100ms, as short durations - /// may lead to higher CPU utilization. - #[allow(unused_variables)] - fn supervisor_timeout(&#mode self, conn_context: &#mode Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> Option { - None - } - - /// Called for each `supervisor_timeout` to determine any action to take on the connection based on the `supervisor::Outcome` - /// - /// If multiple `event::Subscriber`s are composed together, the minimum `supervisor_timeout` - /// across all `event::Subscriber`s will be used, and thus `on_supervisor_timeout` may be called - /// earlier than the `supervisor_timeout` for a given `event::Subscriber` implementation. - #[allow(unused_variables)] - fn on_supervisor_timeout(&#mode self, conn_context: &#mode Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> supervisor::Outcome { - supervisor::Outcome::default() - } + #supervisor_timeout #subscriber @@ -466,27 +540,7 @@ impl ToTokens for Output { (self.0.create_connection_context(meta, info), self.1.create_connection_context(meta, info)) } - #[inline] - fn supervisor_timeout(&#mode self, conn_context: &#mode Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> Option { - let timeout_a = self.0.supervisor_timeout(&#mode conn_context.0, meta, context); - let timeout_b = self.1.supervisor_timeout(&#mode conn_context.1, meta, context); - match (timeout_a, timeout_b) { - (None, None) => None, - (None, Some(timeout)) | (Some(timeout), None) => Some(timeout), - (Some(a), Some(b)) => Some(a.min(b)), - } - } - - #[inline] - fn on_supervisor_timeout(&#mode self, conn_context: &#mode Self::ConnectionContext, meta: &ConnectionMeta, context: &supervisor::Context) -> supervisor::Outcome { - let outcome_a = self.0.on_supervisor_timeout(&#mode conn_context.0, meta, context); - let outcome_b = self.1.on_supervisor_timeout(&#mode conn_context.1, meta, context); - match (outcome_a, outcome_b) { - (supervisor::Outcome::ImmediateClose { reason }, _) | (_, supervisor::Outcome::ImmediateClose { reason }) => supervisor::Outcome::ImmediateClose { reason }, - (supervisor::Outcome::Close { error_code }, _) | (_, supervisor::Outcome::Close { error_code }) => supervisor::Outcome::Close { error_code }, - _ => supervisor::Outcome::Continue, - } - } + #supervisor_timeout_tuple #tuple_subscriber