Skip to content

Commit

Permalink
feat(client): remove Send +Sync bounds requirement of `http2::Conne…
Browse files Browse the repository at this point in the history
…ction` executor (#3682)
  • Loading branch information
mstyura authored Jul 1, 2024
1 parent 9580b35 commit 56c3cd5
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 2 deletions.
218 changes: 217 additions & 1 deletion src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ where
B::Data: Send,
E: Unpin,
B::Error: Into<Box<dyn Error + Send + Sync>>,
E: Http2ClientConnExec<B, T> + 'static + Send + Sync + Unpin,
E: Http2ClientConnExec<B, T> + Unpin,
{
type Output = crate::Result<()>;

Expand Down Expand Up @@ -457,3 +457,219 @@ where
}
}
}

#[cfg(test)]
mod tests {

#[tokio::test]
#[ignore] // only compilation is checked
async fn send_sync_executor_of_non_send_futures() {
#[derive(Clone)]
struct LocalTokioExecutor;

impl<F> crate::rt::Executor<F> for LocalTokioExecutor
where
F: std::future::Future + 'static, // not requiring `Send`
{
fn execute(&self, fut: F) {
// This will spawn into the currently running `LocalSet`.
tokio::task::spawn_local(fut);
}
}

#[allow(unused)]
async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) {
let (_sender, conn) = crate::client::conn::http2::handshake::<
_,
_,
http_body_util::Empty<bytes::Bytes>,
>(LocalTokioExecutor, io)
.await
.unwrap();

tokio::task::spawn_local(async move {
conn.await.unwrap();
});
}
}

#[tokio::test]
#[ignore] // only compilation is checked
async fn not_send_not_sync_executor_of_not_send_futures() {
#[derive(Clone)]
struct LocalTokioExecutor {
_x: std::marker::PhantomData<std::rc::Rc<()>>,
}

impl<F> crate::rt::Executor<F> for LocalTokioExecutor
where
F: std::future::Future + 'static, // not requiring `Send`
{
fn execute(&self, fut: F) {
// This will spawn into the currently running `LocalSet`.
tokio::task::spawn_local(fut);
}
}

#[allow(unused)]
async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) {
let (_sender, conn) =
crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(
LocalTokioExecutor {
_x: Default::default(),
},
io,
)
.await
.unwrap();

tokio::task::spawn_local(async move {
conn.await.unwrap();
});
}
}

#[tokio::test]
#[ignore] // only compilation is checked
async fn send_not_sync_executor_of_not_send_futures() {
#[derive(Clone)]
struct LocalTokioExecutor {
_x: std::marker::PhantomData<std::cell::Cell<()>>,
}

impl<F> crate::rt::Executor<F> for LocalTokioExecutor
where
F: std::future::Future + 'static, // not requiring `Send`
{
fn execute(&self, fut: F) {
// This will spawn into the currently running `LocalSet`.
tokio::task::spawn_local(fut);
}
}

#[allow(unused)]
async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) {
let (_sender, conn) =
crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(
LocalTokioExecutor {
_x: Default::default(),
},
io,
)
.await
.unwrap();

tokio::task::spawn_local(async move {
conn.await.unwrap();
});
}
}

#[tokio::test]
#[ignore] // only compilation is checked
async fn send_sync_executor_of_send_futures() {
#[derive(Clone)]
struct TokioExecutor;

impl<F> crate::rt::Executor<F> for TokioExecutor
where
F: std::future::Future + 'static + Send,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
tokio::task::spawn(fut);
}
}

#[allow(unused)]
async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) {
let (_sender, conn) = crate::client::conn::http2::handshake::<
_,
_,
http_body_util::Empty<bytes::Bytes>,
>(TokioExecutor, io)
.await
.unwrap();

tokio::task::spawn(async move {
conn.await.unwrap();
});
}
}

#[tokio::test]
#[ignore] // only compilation is checked
async fn not_send_not_sync_executor_of_send_futures() {
#[derive(Clone)]
struct TokioExecutor {
// !Send, !Sync
_x: std::marker::PhantomData<std::rc::Rc<()>>,
}

impl<F> crate::rt::Executor<F> for TokioExecutor
where
F: std::future::Future + 'static + Send,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
tokio::task::spawn(fut);
}
}

#[allow(unused)]
async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) {
let (_sender, conn) =
crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(
TokioExecutor {
_x: Default::default(),
},
io,
)
.await
.unwrap();

tokio::task::spawn_local(async move {
// can't use spawn here because when executor is !Send
conn.await.unwrap();
});
}
}

#[tokio::test]
#[ignore] // only compilation is checked
async fn send_not_sync_executor_of_send_futures() {
#[derive(Clone)]
struct TokioExecutor {
// !Sync
_x: std::marker::PhantomData<std::cell::Cell<()>>,
}

impl<F> crate::rt::Executor<F> for TokioExecutor
where
F: std::future::Future + 'static + Send,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
tokio::task::spawn(fut);
}
}

#[allow(unused)]
async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) {
let (_sender, conn) =
crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(
TokioExecutor {
_x: Default::default(),
},
io,
)
.await
.unwrap();

tokio::task::spawn_local(async move {
// can't use spawn here because when executor is !Send
conn.await.unwrap();
});
}
}
}
2 changes: 1 addition & 1 deletion src/proto/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ where
B: Body + 'static + Unpin,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
E: Http2ClientConnExec<B, T> + 'static + Send + Sync + Unpin,
E: Http2ClientConnExec<B, T> + Unpin,
T: Read + Write + Unpin,
{
type Output = crate::Result<Dispatched>;
Expand Down

0 comments on commit 56c3cd5

Please sign in to comment.