Skip to content

Commit

Permalink
Stream closing and finishing APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
MOZGIII committed Dec 3, 2024
1 parent e68a452 commit 3392a11
Show file tree
Hide file tree
Showing 34 changed files with 1,221 additions and 97 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 53 additions & 2 deletions crates/xwt-anchor/src/impls/xwt_core/streams.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,76 @@
//! Implementations related to streams.
use core::num::NonZeroUsize;

use crate::types::*;

impl<T> xwt_core::stream::Write for SendStream<T>
where
T: xwt_core::stream::Write,
{
type ErrorCode = T::ErrorCode;
type Error = T::Error;

async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
async fn write(&mut self, buf: &[u8]) -> Result<NonZeroUsize, Self::Error> {
T::write(&mut self.0, buf).await
}
}

impl<T> xwt_core::stream::WriteAbort for SendStream<T>
where
T: xwt_core::stream::WriteAbort,
{
type ErrorCode = T::ErrorCode;
type Error = T::Error;

async fn abort(self, error_code: Self::ErrorCode) -> Result<(), Self::Error> {
T::abort(self.0, error_code).await
}
}

impl<T> xwt_core::stream::WriteAborted for SendStream<T>
where
T: xwt_core::stream::WriteAborted,
{
type ErrorCode = T::ErrorCode;
type Error = T::Error;

async fn aborted(self) -> Result<Self::ErrorCode, Self::Error> {
T::aborted(self.0).await
}
}

impl<T> xwt_core::stream::Finish for SendStream<T>
where
T: xwt_core::stream::Finish,
{
type Error = T::Error;

async fn finish(self) -> Result<(), Self::Error> {
T::finish(self.0).await
}
}

impl<T> xwt_core::stream::Read for RecvStream<T>
where
T: xwt_core::stream::Read,
{
type ErrorCode = T::ErrorCode;
type Error = T::Error;

async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, Self::Error> {
async fn read(&mut self, buf: &mut [u8]) -> Result<NonZeroUsize, Self::Error> {
T::read(&mut self.0, buf).await
}
}

impl<T> xwt_core::stream::ReadAbort for RecvStream<T>
where
T: xwt_core::stream::ReadAbort,
{
type ErrorCode = T::ErrorCode;
type Error = T::Error;

async fn abort(self, error_code: Self::ErrorCode) -> Result<(), Self::Error> {
T::abort(self.0, error_code).await
}
}
5 changes: 4 additions & 1 deletion crates/xwt-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ pub mod prelude {
pub use crate::session::stream::{
AcceptBi as _, AcceptUni as _, OpenBi as _, OpenUni as _, OpeningBi as _, OpeningUni as _,
};
pub use crate::stream::{Read as _, ReadChunk as _, Write as _, WriteChunk as _};
pub use crate::stream::{
AsErrorCode as _, Finish as _, Read as _, ReadAbort as _, ReadChunk as _, Write as _,
WriteAbort as _, WriteAborted as _, WriteChunk as _,
};

pub use crate::endpoint::accept_utils::*;
pub use crate::endpoint::connect_utils::*;
Expand Down
7 changes: 5 additions & 2 deletions crates/xwt-core/src/session/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ use core::future::Future;
use crate::utils::{maybe, Error};

pub trait SendSpec: maybe::Send {
type SendStream: crate::stream::Write;
type SendStream: crate::stream::Write
+ crate::stream::WriteAbort
+ crate::stream::WriteAborted
+ crate::stream::Finish;
}

pub trait RecvSpec: maybe::Send {
type RecvStream: crate::stream::Read;
type RecvStream: crate::stream::Read + crate::stream::ReadAbort;
}

pub trait PairSpec: maybe::Send + SendSpec + RecvSpec {}
Expand Down
117 changes: 113 additions & 4 deletions crates/xwt-core/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,61 @@
//! Operations on the WebTransport streams.
use core::future::Future;
use core::num::NonZeroUsize;

use crate::utils::{maybe, Error};

/// A way to represent a stream operation error as an error code.
pub trait AsErrorCode<ErrorCode>: maybe::Send
where
ErrorCode: TryInto<u32> + maybe::Send + maybe::Sync,
{
/// Represent the error as an error code.
fn as_error_code(&self) -> Option<ErrorCode>;
}

/// Read the data from the stream.
pub trait Read: maybe::Send {
/// An error code the stream may be aborted with.
type ErrorCode: TryInto<u32> + maybe::Send + maybe::Sync;

/// An error that can occur while reading the stream.
type Error: Error + maybe::Send + maybe::Sync + 'static;
type Error: Error + AsErrorCode<Self::ErrorCode> + maybe::Send + maybe::Sync + 'static;

/// Read the data from the stream into a given buffer and return the amount
/// of bytes filled in the buffer or `None` if the stream is closed and does
/// not have any pending unread data.
///
/// A would-be zero-length read should be reported as a error
/// with a zero error code.
/// There is a handy way to check that condition:
///
/// ```ignore
/// let result = stream.read(buf).await;
/// let data = match result {
/// Ok(read) => buf[..read.get()],
/// Err(err) if err.is_closed() => {
/// // Stream ended.
/// return;
/// }
/// Err(err) => {
/// panic!("something went wrong");
/// }
/// };
/// ```
fn read(
&mut self,
buf: &mut [u8],
) -> impl Future<Output = Result<Option<usize>, Self::Error>> + maybe::Send;
) -> impl Future<Output = Result<NonZeroUsize, Self::Error>> + maybe::Send;
}

/// Write the data to a stream.
pub trait Write: maybe::Send {
/// An error code the stream may be aborted with.
type ErrorCode: TryInto<u32> + maybe::Send + maybe::Sync;

/// An error that can occur while writing to the stream.
type Error: Error + maybe::Send + maybe::Sync + 'static;
type Error: Error + AsErrorCode<Self::ErrorCode> + maybe::Send + maybe::Sync + 'static;

/// Write the data from the given buffer into the stream, returning
/// the amount of of bytes that were successfully written into the stream.
Expand All @@ -30,7 +64,82 @@ pub trait Write: maybe::Send {
fn write(
&mut self,
buf: &[u8],
) -> impl Future<Output = Result<usize, Self::Error>> + maybe::Send;
) -> impl Future<Output = Result<NonZeroUsize, Self::Error>> + maybe::Send;
}

/// Abort the read stream.
///
/// Sends a signal to the peer that the read side of the stream has been
/// aborted.
/// Discards the receive buffer; the peer is typically expected to abort
/// the corresponding send side in response.
///
/// An unsigned 8-bit error code can be supplied as a part of the signal
/// to the peer.
pub trait ReadAbort: maybe::Send {
/// An error code to abort the stream with.
///
/// Pass `0` for default.
type ErrorCode: From<u32> + maybe::Send + maybe::Sync;

/// An error that can occur while stopping the stream.
type Error: Error + maybe::Send + maybe::Sync + 'static;

/// Abort the stream.
fn abort(
self,
error_code: Self::ErrorCode,
) -> impl Future<Output = Result<(), Self::Error>> + maybe::Send;
}

/// Abort the write stream.
///
/// Sends a signal to the peer that the write side of the stream has been
/// aborted.
/// Discards the send buffer; if possible, no currently outstanding data
/// is transmitted or retransmitted.
///
/// An unsigned 8-bit error code can be supplied as a part of the signal to
/// the peer; if omitted, the error code is presumed to be 0.
pub trait WriteAbort: maybe::Send {
/// An error code to abort the stream with.
///
/// Pass `0` for default.
type ErrorCode: From<u32> + maybe::Send + maybe::Sync;

/// An error that can occur while stopping the stream.
type Error: Error + maybe::Send + maybe::Sync + 'static;

/// Abort the stream.
fn abort(
self,
error_code: Self::ErrorCode,
) -> impl Future<Output = Result<(), Self::Error>> + maybe::Send;
}

/// Wait for the write stream to abort.
///
/// This can happen when the "read" part aborts the stream.
pub trait WriteAborted: maybe::Send {
/// An error code the stream is aborted with.
type ErrorCode: TryInto<u32> + maybe::Send + maybe::Sync;

/// An error that can occur while waiting for a stream to be aborted.
type Error: Error + maybe::Send + maybe::Sync + 'static;

/// Wait for a stream to abort.
fn aborted(self) -> impl Future<Output = Result<Self::ErrorCode, Self::Error>> + maybe::Send;
}

/// Finish the write stream.
///
/// Call when all data has been submitted and no further data will be written.
pub trait Finish: maybe::Send {
/// An error that can occur while finishing the stream.
type Error: Error + maybe::Send + maybe::Sync + 'static;

/// Finish the stream.
fn finish(self) -> impl Future<Output = Result<(), Self::Error>> + maybe::Send;
}

/// An chunk of data with an explicit offset in the stream.
Expand Down
62 changes: 61 additions & 1 deletion crates/xwt-core/src/stream_utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Utilitites for the WebTransport streams.
use crate::stream;
use crate::{stream, utils::maybe};

/// A shortcut for the error type for a given [`stream::Read`] type.
pub type ReadErrorFor<T> = <T as stream::Read>::Error;
Expand All @@ -13,3 +13,63 @@ pub type ReadChunkErrorFor<T, Data> = <T as stream::ReadChunk<Data>>::Error;

/// A shortcut for the error type for a given [`stream::WriteChunk] type.
pub type WriteChunkErrorFor<T, Data> = <T as stream::WriteChunk<Data>>::Error;

/// A shortcut for the error type for a given [`stream::ReadAbort`] type.
pub type ReadAbortErrorFor<T> = <T as stream::ReadAbort>::Error;

/// A shortcut for the error code type for a given [`stream::ReadAbort`] type.
pub type ReadAbortErrorCodeFor<T> = <T as stream::ReadAbort>::ErrorCode;

/// A shortcut for the error type for a given [`stream::WriteAbort`] type.
pub type WriteAbortErrorFor<T> = <T as stream::WriteAbort>::Error;

/// A shortcut for the error code type for a given [`stream::WriteAbort`] type.
pub type WriteAbortErrorCodeFor<T> = <T as stream::WriteAbort>::ErrorCode;

/// A shortcut for the error type for a given [`stream::WriteAborted`] type.
pub type WriteAbortedErrorFor<T> = <T as stream::WriteAborted>::Error;

/// A shortcut for the error code type for a given [`stream::WriteAborted`] type.
pub type WriteAbortedErrorCodeFor<T> = <T as stream::WriteAborted>::ErrorCode;

/// A shortcut for the error type for a given [`stream::Finish`] type.
pub type FinishErrorFor<T> = <T as stream::Finish>::Error;

/// Extensions to the .
pub trait AsErrorCodeExt<ErrorCode>: stream::AsErrorCode<ErrorCode>
where
ErrorCode: TryInto<u32> + maybe::Send + maybe::Sync,
{
/// Get the error code value.
fn as_error_code_value(&self) -> Option<u32>;

/// Check of the error code matches the given value.
fn is_error_code(&self, expected_code: u32) -> bool;

/// Checks that the error code exists and is zero.
fn is_closed(&self) -> bool;
}

impl<T, ErrorCode> AsErrorCodeExt<ErrorCode> for T
where
T: stream::AsErrorCode<ErrorCode>,
ErrorCode: TryInto<u32> + maybe::Send + maybe::Sync,
{
fn as_error_code_value(&self) -> Option<u32> {
let error_code = self.as_error_code()?;
let error_code = error_code.try_into().ok()?;
Some(error_code)
}

fn is_error_code(&self, expected_error_code: u32) -> bool {
let Some(error_code) = self.as_error_code_value() else {
return false;
};

error_code == expected_error_code
}

fn is_closed(&self) -> bool {
self.is_error_code(0)
}
}
1 change: 1 addition & 0 deletions crates/xwt-core/src/utils/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub use std::error::Error;
pub use core::error::Error;

#[cfg(all(not(feature = "std"), not(feature = "error-in-core")))]
/// An xwt error.
pub trait Error: core::fmt::Debug + core::fmt::Display {}

#[cfg(all(not(feature = "std"), not(feature = "error-in-core")))]
Expand Down
Loading

0 comments on commit 3392a11

Please sign in to comment.