Skip to content

Commit

Permalink
deps: h3-0.0.6 (w/ tracing feature as h3-quinn) (#2263)
Browse files Browse the repository at this point in the history
  • Loading branch information
junkurihara authored Jul 12, 2024
1 parent ecd0992 commit 71f8d9f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 18 deletions.
6 changes: 5 additions & 1 deletion quic/s2n-quic-h3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ publish = false
[dependencies]
bytes = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false }
h3 = "0.0.5"
h3 = "0.0.6"
s2n-quic = { path = "../s2n-quic" }
s2n-quic-core = { path = "../s2n-quic-core" }
tracing = { version = "0.1", optional = true }

[features]
tracing = ["dep:tracing"]
59 changes: 42 additions & 17 deletions quic/s2n-quic-h3/src/s2n_quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use std::{
task::{self, Poll},
};

#[cfg(feature = "tracing")]
use tracing::instrument;

pub struct Connection {
conn: s2n_quic::connection::Handle,
bidi_acceptor: s2n_quic::connection::BidirectionalStreamAcceptor,
Expand Down Expand Up @@ -66,27 +69,27 @@ impl<B> quic::Connection<B> for Connection
where
B: Buf,
{
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
type OpenStreams = OpenStreams;
type Error = ConnectionError;
type AcceptError = ConnectionError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_accept_recv(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::RecvStream>, Self::Error>> {
) -> Poll<Result<Option<Self::RecvStream>, Self::AcceptError>> {
let recv = match ready!(self.recv_acceptor.poll_accept_receive_stream(cx))? {
Some(x) => x,
None => return Poll::Ready(Ok(None)),
};
Poll::Ready(Ok(Some(Self::RecvStream::new(recv))))
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_accept_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::BidiStream>, Self::Error>> {
) -> Poll<Result<Option<Self::BidiStream>, Self::AcceptError>> {
let (recv, send) = match ready!(self.bidi_acceptor.poll_accept_bidirectional_stream(cx))? {
Some(x) => x.split(),
None => return Poll::Ready(Ok(None)),
Expand All @@ -97,28 +100,40 @@ where
})))
}

fn opener(&self) -> Self::OpenStreams {
OpenStreams {
conn: self.conn.clone(),
}
}
}

impl<B> quic::OpenStreams<B> for Connection
where
B: Buf,
{
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type OpenError = ConnectionError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>> {
) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?;
Ok(stream.into()).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>> {
) -> Poll<Result<Self::SendStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_send_stream(cx))?;
Ok(stream.into()).into()
}

fn opener(&self) -> Self::OpenStreams {
OpenStreams {
conn: self.conn.clone(),
}
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn close(&mut self, code: h3::error::Code, _reason: &[u8]) {
self.conn.close(
code.value()
Expand All @@ -138,25 +153,27 @@ where
{
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
type Error = ConnectionError;
type OpenError = ConnectionError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>> {
) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?;
Ok(stream.into()).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>> {
) -> Poll<Result<Self::SendStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_send_stream(cx))?;
Ok(stream.into()).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn close(&mut self, code: h3::error::Code, _reason: &[u8]) {
self.conn.close(
code.value()
Expand Down Expand Up @@ -271,6 +288,7 @@ impl quic::RecvStream for RecvStream {
type Buf = Bytes;
type Error = ReadError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_data(
&mut self,
cx: &mut task::Context<'_>,
Expand All @@ -279,13 +297,15 @@ impl quic::RecvStream for RecvStream {
Ok(buf).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn stop_sending(&mut self, error_code: u64) {
let _ = self.stream.stop_sending(
s2n_quic::application::Error::new(error_code)
.expect("s2n-quic supports error codes up to 2^62-1"),
);
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn recv_id(&self) -> StreamId {
self.stream.id().try_into().expect("invalid stream id")
}
Expand Down Expand Up @@ -369,6 +389,7 @@ where
{
type Error = SendStreamError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
// try to flush the current chunk if we have one
Expand Down Expand Up @@ -409,6 +430,7 @@ where
// Poll::Ready(Ok(()))
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> {
if self.buf.is_some() {
return Err(Self::Error::NotReady);
Expand All @@ -427,19 +449,22 @@ where
// Ok(())
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// ensure all chunks are flushed to the QUIC stream before finishing
ready!(self.poll_ready(cx))?;
self.stream.finish()?;
Ok(()).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn reset(&mut self, reset_code: u64) {
let _ = self
.stream
.reset(reset_code.try_into().unwrap_or_else(|_| VarInt::MAX.into()));
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn send_id(&self) -> StreamId {
self.stream.id().try_into().expect("invalid stream id")
}
Expand Down

0 comments on commit 71f8d9f

Please sign in to comment.