diff --git a/.github/config/cargo-deny.toml b/.github/config/cargo-deny.toml index f52df18a96..538211987c 100644 --- a/.github/config/cargo-deny.toml +++ b/.github/config/cargo-deny.toml @@ -11,10 +11,8 @@ ignore = [ multiple-versions = "deny" skip-tree = [ - { name = "docdiff" }, # docdiff is a private crate - { name = "duvet" }, # duvet is a private crate + { name = "bolero" }, # bolero is always going to be just a test dependency { name = "criterion" }, # criterion is always going to be just a test dependency - { name = "s2n-quic-integration" }, # s2n-quic-integration is a private crate ] [sources] diff --git a/quic/s2n-quic-integration/Cargo.toml b/quic/s2n-quic-integration/Cargo.toml deleted file mode 100644 index 2ca7477f5c..0000000000 --- a/quic/s2n-quic-integration/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "s2n-quic-integration" -version = "0.1.0" -authors = ["AWS s2n"] -edition = "2018" -license = "Apache-2.0" -# this only contains internal tests and should not be published -publish = false - -[features] -default = [] - -[dependencies] -anyhow = "1" -bolero-generator = "0.6" -bytes = { version = "1", default-features = false } -futures = "0.3" -lazy_static = "1" -s2n-quic = { version = "=1.0.0", path = "../s2n-quic" } -s2n-quic-core = { version = "=0.1.1", path = "../s2n-quic-core", features = ["std", "testing"] } -s2n-quic-platform = { version = "=0.1.0", path = "../s2n-quic-platform" } -tokio = { version = "1", features = ["full"] } - -[dev-dependencies] -bolero = "0.6" -flume = { version = "0.10", features = ["async"] } diff --git a/quic/s2n-quic-integration/README.md b/quic/s2n-quic-integration/README.md deleted file mode 100644 index 1c99854b2b..0000000000 --- a/quic/s2n-quic-integration/README.md +++ /dev/null @@ -1,10 +0,0 @@ -# s2n-quic-integration - -This is an internal crate used by [s2n-quic](https://github.com/aws/s2n-quic). The API is not currently stable and should not be used directly. - -## License - -This project is licensed under the [Apache-2.0 License][license-url]. - -[license-badge]: https://img.shields.io/badge/license-apache-blue.svg -[license-url]: https://aws.amazon.com/apache-2-0/ diff --git a/quic/s2n-quic-integration/src/api.rs b/quic/s2n-quic-integration/src/api.rs deleted file mode 100644 index 7777652fd6..0000000000 --- a/quic/s2n-quic-integration/src/api.rs +++ /dev/null @@ -1,210 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -//! Abstractions over a QUIC connection and streams - -use bytes::Bytes; -use core::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -#[cfg(test)] -pub(crate) mod testing; - -pub trait Connection { - type Acceptor: Acceptor; - type Handle: Handle; - - fn split(self) -> (Self::Handle, Self::Acceptor); -} - -pub trait Acceptor: Sized { - type ReceiveStreamAcceptor: ReceiveStreamAcceptor; - type BidirectionalStreamAcceptor: BidirectionalStreamAcceptor; - - fn split( - self, - ) -> ( - Self::BidirectionalStreamAcceptor, - Self::ReceiveStreamAcceptor, - ); -} - -pub trait ReceiveStreamAcceptor: Sized + Send { - type ReceiveStream: ReceiveStream; - type Error: Error; - - fn poll_accept_receive( - &mut self, - cx: &mut Context, - ) -> Poll, Self::Error>>; - - fn accept_receive(&mut self) -> AcceptReceiveFuture { - AcceptReceiveFuture(self) - } -} - -pub trait BidirectionalStreamAcceptor: Sized + Send { - type BidirectionalStream: BidirectionalStream; - type Error: Error; - - fn poll_accept_bidirectional( - &mut self, - cx: &mut Context, - ) -> Poll, Self::Error>>; - - fn accept_bidirectional(&mut self) -> AcceptBidirectionalFuture { - AcceptBidirectionalFuture(self) - } -} - -macro_rules! accept_future { - ($name:ident, $handle:ident, $ty:ident, $call:ident) => { - #[must_use] - pub struct $name<'a, H: $handle>(&'a mut H); - - impl<'a, H: $handle> Future for $name<'a, H> { - type Output = Result, H::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - (self.0).$call(cx) - } - } - }; -} - -accept_future!( - AcceptReceiveFuture, - ReceiveStreamAcceptor, - ReceiveStream, - poll_accept_receive -); - -accept_future!( - AcceptBidirectionalFuture, - BidirectionalStreamAcceptor, - BidirectionalStream, - poll_accept_bidirectional -); - -pub trait Handle: Clone + Sized + Send { - type SendStream: SendStream; - type BidirectionalStream: BidirectionalStream; - type Error: Error; - - fn poll_open_send(&mut self, cx: &mut Context) -> Poll>; - - fn open_send(&mut self) -> OpenSendFuture { - OpenSendFuture(self) - } - - fn poll_open_bidirectional( - &mut self, - cx: &mut Context, - ) -> Poll>; - - fn open_bidirectional(&mut self) -> OpenBidirectionalFuture { - OpenBidirectionalFuture(self) - } - - fn close(&mut self); -} - -macro_rules! open_future { - ($name:ident, $ty:ident, $call:ident) => { - #[must_use] - pub struct $name<'a, H: Handle>(&'a mut H); - - impl<'a, H: Handle> Future for $name<'a, H> { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - (self.0).$call(cx) - } - } - }; -} - -open_future!(OpenSendFuture, SendStream, poll_open_send); -open_future!( - OpenBidirectionalFuture, - BidirectionalStream, - poll_open_bidirectional -); - -pub trait SendStream: Sized + Send { - type Error: Error; - - fn send(&mut self, data: Bytes) -> SendFuture { - SendFuture(self, data) - } - - fn poll_send(&mut self, data: &mut Bytes, cx: &mut Context) -> Poll>; - - fn finish(&mut self) -> FinishFuture { - FinishFuture(self) - } - - fn poll_finish(&mut self, cx: &mut Context) -> Poll>; - - fn reset(&mut self, code: u64); -} - -#[must_use] -pub struct SendFuture<'a, S: SendStream>(&'a mut S, Bytes); - -impl<'a, S: SendStream> Future for SendFuture<'a, S> { - type Output = Result<(), S::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let Self(a, b) = &mut *self; - a.poll_send(b, cx) - } -} - -#[must_use] -pub struct FinishFuture<'a, S: SendStream>(&'a mut S); - -impl<'a, S: SendStream> Future for FinishFuture<'a, S> { - type Output = Result<(), S::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - self.0.poll_finish(cx) - } -} - -pub trait ReceiveStream: Sized + Send { - type Error: Error; - - fn receive(&mut self) -> ReceiveFuture { - ReceiveFuture(self) - } - - fn poll_receive(&mut self, cx: &mut Context) -> Poll, Self::Error>>; - - fn stop_sending(&mut self, error_code: u64); -} - -#[must_use] -pub struct ReceiveFuture<'a, S: ReceiveStream>(&'a mut S); - -impl<'a, S: ReceiveStream> Future for ReceiveFuture<'a, S> { - type Output = Result, S::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - self.0.poll_receive(cx) - } -} - -pub trait BidirectionalStream: Send { - type SendStream: SendStream; - type ReceiveStream: ReceiveStream; - - fn split(self) -> (Self::ReceiveStream, Self::SendStream); -} - -pub trait Error: 'static + Send + Sync + std::error::Error {} - -impl Error for T {} diff --git a/quic/s2n-quic-integration/src/api/testing.rs b/quic/s2n-quic-integration/src/api/testing.rs deleted file mode 100644 index 65d14e4698..0000000000 --- a/quic/s2n-quic-integration/src/api/testing.rs +++ /dev/null @@ -1,362 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -//! API implementation used to test the integration framework itself - -use crate::api; -use bytes::Bytes; -use core::{ - fmt, - pin::Pin, - task::{Context, Poll}, -}; -use futures::{ready, stream::Stream}; - -#[derive(Clone, Copy, Debug)] -pub enum Error { - Closed, - Reset(u64), -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(self, f) - } -} - -impl std::error::Error for Error {} - -impl From> for Error { - fn from(_error: flume::SendError) -> Self { - Self::Closed - } -} - -#[derive(Clone)] -pub struct Connection { - handle: Handle, - acceptor: Acceptor, -} - -impl Connection { - pub fn pair() -> (Self, Self) { - let (handle_a, acceptor_a) = Self::side(); - let (handle_b, acceptor_b) = Self::side(); - let a = Self { - handle: handle_a, - acceptor: acceptor_b, - }; - let b = Self { - handle: handle_b, - acceptor: acceptor_a, - }; - (a, b) - } - - fn side() -> (Handle, Acceptor) { - let (send_recv, recv_recv) = flume::unbounded(); - let (send_bidi, recv_bidi) = flume::unbounded(); - - let handle = Handle::new(send_bidi, send_recv); - let acceptor = Acceptor { - receive: ReceiveStreamAcceptor { - streams: recv_recv.into_stream(), - }, - bidi: BidirectionalStreamAcceptor { - streams: recv_bidi.into_stream(), - }, - }; - (handle, acceptor) - } - - pub fn split_all(self) -> (Handle, BidirectionalStreamAcceptor, ReceiveStreamAcceptor) { - use api::*; - let (handle, acceptor) = self.split(); - let (bidi, recv) = acceptor.split(); - (handle, bidi, recv) - } -} - -impl api::Connection for Connection { - type Acceptor = Acceptor; - type Handle = Handle; - - fn split(self) -> (Self::Handle, Self::Acceptor) { - let Self { handle, acceptor } = self; - (handle, acceptor) - } -} - -#[derive(Clone)] -pub struct Acceptor { - receive: ReceiveStreamAcceptor, - bidi: BidirectionalStreamAcceptor, -} - -impl api::Acceptor for Acceptor { - type ReceiveStreamAcceptor = ReceiveStreamAcceptor; - type BidirectionalStreamAcceptor = BidirectionalStreamAcceptor; - - fn split( - self, - ) -> ( - Self::BidirectionalStreamAcceptor, - Self::ReceiveStreamAcceptor, - ) { - let Self { bidi, receive } = self; - (bidi, receive) - } -} - -#[derive(Debug)] -enum AcceptorMessage { - Stream(Stream), - Close, - Error(Error), -} - -#[derive(Clone)] -pub struct ReceiveStreamAcceptor { - streams: flume::r#async::RecvStream<'static, AcceptorMessage>, -} - -impl fmt::Debug for ReceiveStreamAcceptor { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("ReceiveStreamAcceptor") - .field("pending", &self.streams.size_hint().0) - .finish() - } -} - -impl api::ReceiveStreamAcceptor for ReceiveStreamAcceptor { - type ReceiveStream = ReceiveStream; - type Error = Error; - - fn poll_accept_receive( - &mut self, - cx: &mut Context, - ) -> Poll, Self::Error>> { - let msg = ready!(Pin::new(&mut self.streams).poll_next(cx)); - - match msg { - Some(AcceptorMessage::Stream(stream)) => Ok(Some(stream)), - Some(AcceptorMessage::Error(err)) => Err(err), - Some(AcceptorMessage::Close) | None => Ok(None), - } - .into() - } -} - -#[derive(Clone)] -pub struct BidirectionalStreamAcceptor { - streams: flume::r#async::RecvStream<'static, AcceptorMessage>, -} - -impl fmt::Debug for BidirectionalStreamAcceptor { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("BidirectionalStreamAcceptor") - .field("pending", &self.streams.size_hint().0) - .finish() - } -} - -impl api::BidirectionalStreamAcceptor for BidirectionalStreamAcceptor { - type BidirectionalStream = BidirectionalStream; - type Error = Error; - - fn poll_accept_bidirectional( - &mut self, - cx: &mut Context, - ) -> Poll, Self::Error>> { - let msg = ready!(Pin::new(&mut self.streams).poll_next(cx)); - - match msg { - Some(AcceptorMessage::Stream(stream)) => Ok(Some(stream)), - Some(AcceptorMessage::Error(err)) => Err(err), - Some(AcceptorMessage::Close) | None => Ok(None), - } - .into() - } -} - -#[derive(Clone)] -pub struct Handle { - recv_streams: flume::Sender>, - bidi_streams: flume::Sender>, -} - -impl Handle { - fn new( - bidi_streams: flume::Sender>, - recv_streams: flume::Sender>, - ) -> Self { - Self { - recv_streams, - bidi_streams, - } - } - - #[allow(dead_code)] - pub fn close(&mut self, error: Error) { - let _ = self.recv_streams.send(AcceptorMessage::Error(error)); - let _ = self.bidi_streams.send(AcceptorMessage::Error(error)); - } -} - -impl api::Handle for Handle { - type SendStream = SendStream; - type BidirectionalStream = BidirectionalStream; - type Error = Error; - - fn poll_open_send(&mut self, _cx: &mut Context) -> Poll> { - let (send, recv) = pair(); - - self.recv_streams.send(AcceptorMessage::Stream(recv))?; - - Ok(send).into() - } - - fn poll_open_bidirectional( - &mut self, - _cx: &mut Context, - ) -> Poll> { - let (send_a, recv_a) = pair(); - let (send_b, recv_b) = pair(); - - let a = BidirectionalStream { - send: send_a, - recv: recv_b, - }; - let b = BidirectionalStream { - send: send_b, - recv: recv_a, - }; - - self.bidi_streams.send(AcceptorMessage::Stream(a))?; - - Ok(b).into() - } - - fn close(&mut self) { - let _ = self.recv_streams.send(AcceptorMessage::Close); - let _ = self.bidi_streams.send(AcceptorMessage::Close); - } -} - -#[derive(Debug)] -enum StreamMessage { - Data(Bytes), - Finish, - Reset(u64), -} - -pub struct SendStream { - stream: flume::Sender, -} - -impl api::SendStream for SendStream { - type Error = Error; - - fn poll_send(&mut self, data: &mut Bytes, _cx: &mut Context) -> Poll> { - let data = core::mem::replace(data, Bytes::new()); - self.stream.send(StreamMessage::Data(data))?; - Ok(()).into() - } - - fn poll_finish(&mut self, _cx: &mut Context) -> Poll> { - self.stream.send(StreamMessage::Finish)?; - Ok(()).into() - } - - fn reset(&mut self, code: u64) { - let _ = self.stream.send(StreamMessage::Reset(code)); - } -} - -pub struct ReceiveStream { - stream: flume::r#async::RecvStream<'static, StreamMessage>, -} - -impl api::ReceiveStream for ReceiveStream { - type Error = Error; - - fn poll_receive(&mut self, cx: &mut Context) -> Poll, Self::Error>> { - let message = - ready!(Stream::poll_next(Pin::new(&mut self.stream), cx)).ok_or(Error::Closed)?; - - match message { - StreamMessage::Data(chunk) => Ok(Some(chunk)), - StreamMessage::Finish => Ok(None), - StreamMessage::Reset(code) => Err(Error::Reset(code)), - } - .into() - } - - fn stop_sending(&mut self, _code: u64) { - todo!() - } -} - -pub struct BidirectionalStream { - send: SendStream, - recv: ReceiveStream, -} - -impl api::BidirectionalStream for BidirectionalStream { - type SendStream = SendStream; - type ReceiveStream = ReceiveStream; - - fn split(self) -> (Self::ReceiveStream, Self::SendStream) { - let Self { send, recv } = self; - (recv, send) - } -} - -fn pair() -> (SendStream, ReceiveStream) { - let (send, recv) = flume::unbounded(); - - let send = SendStream { stream: send }; - - let recv = ReceiveStream { - stream: recv.into_stream(), - }; - - (send, recv) -} - -#[cfg(test)] -mod tests { - use crate::{api::*, rt::spawn}; - - #[tokio::test] - async fn accept_test() { - let (client, server) = super::Connection::pair(); - - let (mut client_handle, _client_bidi_acceptor, _client_recv_acceptor) = client.split_all(); - let (_server_handle, mut server_bidi_acceptor, mut server_recv_acceptor) = - server.split_all(); - - let server_recv_task = spawn(async move { server_recv_acceptor.accept_receive().await }); - - let mut client_send = client_handle.open_send().await.unwrap(); - - let mut server_recv = server_recv_task.await.unwrap().unwrap().unwrap(); - - client_send - .send(Bytes::from_static(&[1, 2, 3])) - .await - .unwrap(); - - assert_eq!( - server_recv.receive().await.unwrap().unwrap(), - Bytes::from_static(&[1, 2, 3]) - ); - - let server_bidi_task = - spawn(async move { server_bidi_acceptor.accept_bidirectional().await }); - - let _client_bidi = client_handle.open_bidirectional().await.unwrap(); - - server_bidi_task.await.unwrap().unwrap(); - } -} diff --git a/quic/s2n-quic-integration/src/lib.rs b/quic/s2n-quic-integration/src/lib.rs deleted file mode 100644 index 79d6f590cb..0000000000 --- a/quic/s2n-quic-integration/src/lib.rs +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -#![recursion_limit = "256"] - -pub mod api; -pub mod packet; -pub mod rt; -pub mod s2n_quic; -pub mod stream; diff --git a/quic/s2n-quic-integration/src/packet.rs b/quic/s2n-quic-integration/src/packet.rs deleted file mode 100644 index a11e1d3f71..0000000000 --- a/quic/s2n-quic-integration/src/packet.rs +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -//! Represents a packet that is sent over a network - -use s2n_quic_core::{ - inet::{datagram, ExplicitCongestionNotification, SocketAddress}, - io::{rx, tx}, - path::{self, Handle as _}, -}; - -#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord)] -pub struct Packet { - pub destination_address: SocketAddress, - pub source_address: SocketAddress, - pub ecn: ExplicitCongestionNotification, - pub ipv6_flow_label: u32, - pub payload: Vec, -} - -impl tx::Entry for Packet { - type Handle = path::Tuple; - - fn set>( - &mut self, - mut message: M, - ) -> Result { - let handle = message.path_handle(); - self.destination_address = handle.remote_address().0; - self.source_address = handle.local_address().0; - self.ecn = message.ecn(); - self.ipv6_flow_label = message.ipv6_flow_label(); - - let len = message.write_payload(&mut self.payload[..], 0); - - if len == 0 { - return Err(tx::Error::EmptyPayload); - } - - self.payload.truncate(len); - - Ok(len) - } - - fn payload(&self) -> &[u8] { - &self.payload[..] - } - - fn payload_mut(&mut self) -> &mut [u8] { - &mut self.payload[..] - } -} - -impl rx::Entry for Packet { - type Handle = path::Tuple; - - fn read( - &mut self, - _local_address: &path::LocalAddress, - ) -> Option<(datagram::Header, &mut [u8])> { - let path = path::Tuple { - remote_address: self.source_address.into(), - local_address: self.destination_address.into(), - }; - let header = datagram::Header { - path, - ecn: self.ecn, - }; - Some((header, &mut self.payload[..])) - } -} diff --git a/quic/s2n-quic-integration/src/rt.rs b/quic/s2n-quic-integration/src/rt.rs deleted file mode 100644 index e788a49974..0000000000 --- a/quic/s2n-quic-integration/src/rt.rs +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -// TODO abstract over the current runtime -pub use tokio::spawn; - -pub use tokio::time::sleep as delay; diff --git a/quic/s2n-quic-integration/src/s2n_quic.rs b/quic/s2n-quic-integration/src/s2n_quic.rs deleted file mode 100644 index aef6d5fa52..0000000000 --- a/quic/s2n-quic-integration/src/s2n_quic.rs +++ /dev/null @@ -1,4 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -mod api; diff --git a/quic/s2n-quic-integration/src/s2n_quic/api.rs b/quic/s2n-quic-integration/src/s2n_quic/api.rs deleted file mode 100644 index 41dbbb383f..0000000000 --- a/quic/s2n-quic-integration/src/s2n_quic/api.rs +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -use crate::api; -use bytes::Bytes; -use core::{ - convert::TryInto, - task::{Context, Poll}, -}; -use s2n_quic::{ - connection::{self, Handle, StreamAcceptor}, - stream, Connection, -}; - -impl api::Connection for Connection { - type Acceptor = StreamAcceptor; - type Handle = Handle; - - fn split(self) -> (Self::Handle, Self::Acceptor) { - Connection::split(self) - } -} - -impl api::Acceptor for StreamAcceptor { - type ReceiveStreamAcceptor = connection::ReceiveStreamAcceptor; - type BidirectionalStreamAcceptor = connection::BidirectionalStreamAcceptor; - - fn split( - self, - ) -> ( - Self::BidirectionalStreamAcceptor, - Self::ReceiveStreamAcceptor, - ) { - Self::split(self) - } -} - -impl api::ReceiveStreamAcceptor for connection::ReceiveStreamAcceptor { - type ReceiveStream = stream::ReceiveStream; - type Error = connection::Error; - - fn poll_accept_receive( - &mut self, - cx: &mut Context, - ) -> Poll, Self::Error>> { - Self::poll_accept_receive_stream(self, cx) - } -} - -impl api::BidirectionalStreamAcceptor for connection::BidirectionalStreamAcceptor { - type BidirectionalStream = stream::BidirectionalStream; - type Error = connection::Error; - - fn poll_accept_bidirectional( - &mut self, - cx: &mut Context, - ) -> Poll, Self::Error>> { - Self::poll_accept_bidirectional_stream(self, cx) - } -} - -impl crate::api::Handle for Handle { - type BidirectionalStream = stream::BidirectionalStream; - type SendStream = stream::SendStream; - type Error = connection::Error; - - fn poll_open_send(&mut self, cx: &mut Context) -> Poll> { - Handle::poll_open_send_stream(self, cx) - } - - fn poll_open_bidirectional( - &mut self, - cx: &mut Context, - ) -> Poll> { - Handle::poll_open_bidirectional_stream(self, cx) - } - - fn close(&mut self) { - // TODO - } -} - -macro_rules! send_stream { - ($ty:ident) => { - impl api::SendStream for stream::$ty { - type Error = stream::Error; - - fn poll_send( - &mut self, - chunk: &mut Bytes, - cx: &mut Context, - ) -> Poll> { - stream::$ty::poll_send(self, chunk, cx) - } - - fn poll_finish(&mut self, _cx: &mut Context) -> Poll> { - stream::$ty::finish(self).into() - } - - fn reset(&mut self, code: u64) { - let _ = stream::$ty::reset(self, code.try_into().unwrap()); - } - } - }; -} - -send_stream!(SendStream); -send_stream!(BidirectionalStream); - -macro_rules! receive_stream { - ($ty:ident) => { - impl api::ReceiveStream for stream::$ty { - type Error = stream::Error; - - fn poll_receive( - &mut self, - cx: &mut Context, - ) -> Poll, Self::Error>> { - stream::$ty::poll_receive(self, cx) - } - - fn stop_sending(&mut self, code: u64) { - let _ = stream::$ty::stop_sending(self, code.try_into().unwrap()); - } - } - }; -} - -receive_stream!(ReceiveStream); -receive_stream!(BidirectionalStream); - -impl api::BidirectionalStream for stream::BidirectionalStream { - type SendStream = stream::SendStream; - type ReceiveStream = stream::ReceiveStream; - - fn split(self) -> (Self::ReceiveStream, Self::SendStream) { - stream::BidirectionalStream::split(self) - } -} diff --git a/quic/s2n-quic-integration/src/stream.rs b/quic/s2n-quic-integration/src/stream.rs deleted file mode 100644 index 88f97e6ddd..0000000000 --- a/quic/s2n-quic-integration/src/stream.rs +++ /dev/null @@ -1,9 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -mod endpoint; -mod scenario; - -pub use endpoint::*; -pub use s2n_quic_core::stream::testing::*; -pub use scenario::*; diff --git a/quic/s2n-quic-integration/src/stream/endpoint.rs b/quic/s2n-quic-integration/src/stream/endpoint.rs deleted file mode 100644 index 04776e4c6e..0000000000 --- a/quic/s2n-quic-integration/src/stream/endpoint.rs +++ /dev/null @@ -1,380 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -use crate::{ - api::{self, BidirectionalStream as _}, - rt::{delay, spawn}, - stream::scenario::{self, Scenario}, -}; -use anyhow::Result; -use bytes::Bytes; -use core::future::Future; -use std::{collections::HashSet, sync::Arc}; - -#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord)] -pub struct Instructions { - pub client: Arc, - pub server: Arc, -} - -impl Instructions { - pub fn server(&self) -> Server { - self.into() - } - - pub fn client(&self) -> Client { - self.into() - } -} - -impl From for Instructions { - fn from(scenario: Scenario) -> Self { - Self { - client: Arc::new(scenario.client), - server: Arc::new(scenario.server), - } - } -} - -impl From<&Scenario> for Instructions { - fn from(scenario: &Scenario) -> Self { - Self { - client: Arc::new(scenario.client.clone()), - server: Arc::new(scenario.server.clone()), - } - } -} - -macro_rules! endpoint { - ($name:ident, $local:ident, $peer:ident) => { - #[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord)] - pub struct $name(Endpoint); - - impl $name { - pub fn run( - &self, - connection: C, - ) -> impl Future> + 'static { - self.0.run(connection) - } - } - - impl From<&Scenario> for $name { - fn from(scenario: &Scenario) -> Self { - let instructions: Instructions = scenario.into(); - instructions.into() - } - } - - impl From for $name { - fn from(scenario: Instructions) -> Self { - Self(Endpoint { - local: scenario.$local, - peer: scenario.$peer, - }) - } - } - - impl From<&Instructions> for $name { - fn from(scenario: &Instructions) -> Self { - Self(Endpoint { - local: scenario.$local.clone(), - peer: scenario.$peer.clone(), - }) - } - } - }; -} - -endpoint!(Server, server, client); -endpoint!(Client, client, server); - -#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord)] -struct Endpoint { - local: Arc, - peer: Arc, -} - -impl Endpoint { - fn run( - &self, - connection: C, - ) -> impl Future> + 'static { - let (handle, acceptor) = connection.split(); - let local = Self::local(&self.local, handle); - let peer = Self::peer(&self.peer, acceptor); - async move { - // TODO check if we are allowed to have an error in this test - let _ = futures::try_join!(local, peer).map(|_| ()); - Ok(()) - } - } - - fn local( - streams: &Arc, - mut handle: H, - ) -> impl Future> + 'static { - let mut handles = vec![]; - - for (id, scenario) in streams.uni_streams.iter() { - let mut handle = handle.clone(); - // Notify the peer of the scenario - let id = Bytes::copy_from_slice(&id.to_be_bytes()); - let scenario = *scenario; - handles.push(spawn(async move { - delay(scenario.delay).await; - let stream = handle.open_send().await?; - Self::sender(stream, id, scenario.local).await?; - >::Ok(()) - })); - } - - for (id, scenario) in streams.bidi_streams.iter() { - let mut handle = handle.clone(); - // Notify the peer of the scenario - let id = Bytes::copy_from_slice(&id.to_be_bytes()); - let scenario = *scenario; - handles.push(spawn(async move { - delay(scenario.delay).await; - let stream = handle.open_bidirectional().await?; - let (receiver, sender) = stream.split(); - - let sender = spawn(Self::sender(sender, id, scenario.local)); - let receiver = spawn(Self::receiver(receiver, Bytes::new(), scenario.peer)); - let (sender, receiver) = futures::try_join!(sender, receiver)?; - sender?; - receiver?; - >::Ok(()) - })); - } - - async move { - let results = futures::future::try_join_all(handles).await?; - for result in results { - result?; - } - handle.close(); - Ok(()) - } - } - - fn peer( - scenarios: &Arc, - acceptor: A, - ) -> impl Future> + 'static { - let (bidi, recv) = acceptor.split(); - - let recv = spawn(Self::peer_receiver(recv, scenarios.clone())); - let bidi = spawn(Self::peer_bidirectional(bidi, scenarios.clone())); - - async { - let (recv, bidi) = futures::try_join!(recv, bidi)?; - recv?; - bidi?; - Ok(()) - } - } - - async fn peer_receiver( - mut recv: A, - scenarios: Arc, - ) -> Result<()> { - let mut handles = vec![]; - - while let Some(mut stream) = recv.accept_receive().await? { - let scenarios = scenarios.clone(); - handles.push(spawn(async move { - let (id, prelude) = Self::read_stream_id(&mut stream).await?; - - let scenario = scenarios - .uni_streams - .get(&id) - .unwrap_or_else(|| panic!("missing receive scenario {}", id)); - - Self::receiver(stream, prelude, scenario.local).await?; - - >::Ok(id) - })); - } - - let mut used: HashSet = HashSet::new(); - - let results = futures::future::try_join_all(handles).await?; - for result in results { - let id = result?; - - assert!(used.insert(id), "scenario {} used multiple times", id); - } - - let complete: HashSet = scenarios.uni_streams.keys().copied().collect(); - - let mut difference: Vec<_> = complete.difference(&used).collect(); - if !difference.is_empty() { - difference.sort(); - panic!( - "the following receive scenarios did not occur: {:?}", - difference - ); - } - - Ok(()) - } - - async fn peer_bidirectional( - mut bidi: A, - scenarios: Arc, - ) -> Result<()> { - let mut handles = vec![]; - - while let Some(stream) = bidi.accept_bidirectional().await? { - let scenarios = scenarios.clone(); - handles.push(spawn(async move { - let (mut receiver, sender) = stream.split(); - - let (id, prelude) = Self::read_stream_id(&mut receiver).await?; - - let scenario = scenarios - .bidi_streams - .get(&id) - .unwrap_or_else(|| panic!("missing bidirectional scenario {}", id)); - - let sender = spawn(Self::sender(sender, Bytes::new(), scenario.peer)); - let receiver = spawn(Self::receiver(receiver, prelude, scenario.local)); - let (sender, receiver) = futures::try_join!(sender, receiver)?; - sender?; - receiver?; - - >::Ok(id) - })); - } - - let mut used: HashSet = HashSet::new(); - - let results = futures::future::try_join_all(handles).await?; - for result in results { - let id = result?; - - assert!(used.insert(id), "scenario {} used multiple times", id); - } - - let complete: HashSet = scenarios.bidi_streams.keys().copied().collect(); - - let mut difference: Vec<_> = complete.difference(&used).collect(); - if !difference.is_empty() { - difference.sort(); - panic!( - "the following bidirectional scenarios did not occur: {:?}", - difference - ); - } - - Ok(()) - } - - async fn sender( - mut stream: S, - prelude: Bytes, - scenario: scenario::Stream, - ) -> Result<()> { - stream.send(prelude).await?; - - let mut sender = scenario.data; - let mut chunks = [bytes::Bytes::new()]; - - let mut send_amount = scenario.send_amount.iter(); - - while sender - .send(send_amount.next().unwrap(), &mut chunks) - .is_some() - { - // TODO implement resets - stream - .send(core::mem::replace(&mut chunks[0], Bytes::new())) - .await?; - } - - stream.finish().await?; - - Ok(()) - } - - async fn receiver( - mut stream: S, - prelude: Bytes, - scenario: scenario::Stream, - ) -> Result<()> { - let mut receiver = scenario.data; - receiver.receive(&[prelude]); - - while let Some(chunk) = stream.receive().await? { - // TODO implement stop_sending - receiver.receive(&[chunk]); - } - - assert!( - receiver.is_finished(), - "did not receive a complete stream of data from peer" - ); - - Ok(()) - } - - async fn read_stream_id(stream: &mut S) -> Result<(u64, Bytes)> { - let mut chunk = Bytes::new(); - let mut offset = 0; - let mut id = [0u8; core::mem::size_of::()]; - - while offset < id.len() { - chunk = stream - .receive() - .await? - .expect("every stream should be prefixed with the scenario ID"); - - let needed_len = id.len() - offset; - let len = chunk.len().min(needed_len); - - id[offset..offset + len].copy_from_slice(&chunk[..len]); - offset += len; - bytes::Buf::advance(&mut chunk, len); - } - - let id = u64::from_be_bytes(id); - - Ok((id, chunk)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::api::testing::Connection; - - async fn check(scenario: &Scenario) { - let (client, server) = Connection::pair(); - - let scenario: Instructions = scenario.into(); - - let client_task = spawn(scenario.client().run(client.clone())); - let server_task = spawn(scenario.server().run(server.clone())); - - let (client_res, server_res) = futures::try_join!(client_task, server_task).unwrap(); - client_res.unwrap(); - server_res.unwrap(); - } - - #[tokio::test] - async fn basic_test() { - check(&Scenario::default()).await; - } - - #[test] - fn random_scenario_test() { - bolero::check!().with_type().for_each(|scenario| { - tokio::runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap() - .block_on(check(scenario)); - }); - } -} diff --git a/quic/s2n-quic-integration/src/stream/scenario.rs b/quic/s2n-quic-integration/src/stream/scenario.rs deleted file mode 100644 index 2dc546205e..0000000000 --- a/quic/s2n-quic-integration/src/stream/scenario.rs +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -use crate::stream::Data; -use bolero_generator::{gen, TypeGenerator, ValueGenerator}; -use core::time::Duration; - -pub use std::collections::BTreeMap as Map; - -#[derive(Clone, Debug, Default, PartialEq, PartialOrd, Eq, Ord, TypeGenerator)] -pub struct Scenario { - /// The streams owned by the client - pub client: Streams, - /// The streams owned by the server - pub server: Streams, -} - -#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, TypeGenerator)] -pub struct Streams { - /// The locally-owned unidirectional streams - #[generator(gen::>().with().len(0usize..=25))] - pub uni_streams: Map, - /// The locally-owned bidirectional streams - #[generator(gen::>().with().len(0usize..=25))] - pub bidi_streams: Map, -} - -impl Default for Streams { - fn default() -> Self { - Self { - uni_streams: Iterator::map(1..=25, |id| (id, Default::default())).collect(), - bidi_streams: Iterator::map(1..=25, |id| (id, Default::default())).collect(), - } - } -} - -#[derive(Copy, Clone, Debug, Default, PartialEq, PartialOrd, Eq, Ord, TypeGenerator)] -pub struct UniStream { - /// The amount of time the initiator should delay before opening the stream - #[generator((0..=2).map_gen(Duration::from_millis))] - pub delay: Duration, - /// The stream data that should be sent from the local (initiator) towards the peer - pub local: Stream, -} - -#[derive(Copy, Clone, Debug, Default, PartialEq, PartialOrd, Eq, Ord, TypeGenerator)] -pub struct BidiStream { - /// The amount of time the initiator should delay before opening the stream - #[generator((0..=2).map_gen(Duration::from_millis))] - pub delay: Duration, - /// The stream data that should be sent from the local (initiator) towards the peer - pub local: Stream, - /// The stream data that should be sent from the peer (non-initiator) towards the initiator - pub peer: Stream, -} - -#[derive(Copy, Clone, Debug, Default, PartialEq, PartialOrd, Eq, Ord, TypeGenerator)] -pub struct Stream { - /// The data that should be sent over the stream - pub data: Data, - /// A potential error that could happen on the sending side - pub reset: Option, - /// A potential error that could happen on the receiving side - pub stop_sending: Option, - /// The size of the chunks that should be sent on the stream - pub send_amount: SendAmount, -} - -#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, TypeGenerator)] -pub struct Error { - /// The offset at which this error should happen - pub offset: usize, - /// The code of the error - pub code: u64, -} - -#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord)] -pub struct SendAmount { - /// The minimal amount of data that should be sent in a chunk - pub min: usize, - /// The maximum amount of data that should be sent in a chunk - pub max: usize, -} - -impl Default for SendAmount { - fn default() -> Self { - Self { min: 32, max: 256 } - } -} - -impl TypeGenerator for SendAmount { - fn generate(driver: &mut D) -> Option { - let min = bolero_generator::ValueGenerator::generate(&(1..=2048), driver)?; - let variance = bolero_generator::ValueGenerator::generate(&(0..=1024), driver)?; - let max = min + variance; - Some(Self { min, max }) - } -} - -impl SendAmount { - pub fn iter(&self) -> impl Iterator { - let min = self.min.min(self.max); - let max = self.min.max(self.max); - - Iterator::map(min..=max, |amount| { - // ensure we send at least 1 byte otherwise we'll endlessly loop - amount.max(1) - }) - .cycle() - } -}