-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
820 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
use crate::{error::Error, ComputeHeavyFutureExecutor}; | ||
|
||
pub(crate) struct BlockInPlaceExecutor {} | ||
|
||
impl ComputeHeavyFutureExecutor for BlockInPlaceExecutor { | ||
async fn execute<F, O>(&self, fut: F) -> Result<O, Error> | ||
where | ||
F: std::future::Future<Output = O> + Send + 'static, | ||
O: Send + 'static, | ||
{ | ||
Ok(tokio::task::block_in_place(move || { | ||
tokio::runtime::Handle::current().block_on(async { fut.await }) | ||
})) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
use crate::{error::Error, ComputeHeavyFutureExecutor}; | ||
|
||
pub(crate) struct CurrentContextExecutor {} | ||
|
||
impl ComputeHeavyFutureExecutor for CurrentContextExecutor { | ||
async fn execute<F, O>(&self, fut: F) -> Result<O, Error> | ||
where | ||
F: std::future::Future<Output = O> + Send + 'static, | ||
O: Send + 'static, | ||
{ | ||
Ok(fut.await) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
use std::{any::Any, future::Future, pin::Pin}; | ||
|
||
use crate::{error::Error, ComputeHeavyFutureExecutor}; | ||
|
||
/// The input future for a custom executor closure. | ||
/// This is the regular future, with its output type erased to Any. | ||
pub type AnyWrappedFuture = Pin<Box<dyn Future<Output = Box<dyn Any + Send + 'static>> + Send + 'static>>; | ||
|
||
/// A closure that accepts a type-erased input future and returns a future that resolves to | ||
/// either Ok(<type erased input future's output>) or Err(<boxed error representing executor errors>) | ||
pub type CustomExecutorClosure = Box< | ||
dyn Fn( | ||
AnyWrappedFuture, | ||
) -> Box< | ||
dyn Future<Output = Result<Box<dyn Any + Send + 'static>, Box<dyn std::error::Error + Send + Sync>>>, | ||
> + Send | ||
+ Sync, | ||
>; | ||
|
||
// used for checking that the closure doesn't mutate the output type while working with Any | ||
struct PrivateType {} | ||
|
||
pub(crate) struct CustomExecutor { | ||
pub(crate) closure: CustomExecutorClosure, | ||
} | ||
|
||
impl CustomExecutor { | ||
pub(crate) async fn new(closure: CustomExecutorClosure) -> Self { | ||
let executor = Self { closure }; | ||
|
||
let test_future = async { PrivateType {} }; | ||
|
||
if let Err(_) = executor.execute(test_future).await { | ||
panic!( | ||
"CustomExecutor strategy initialized with a closure that \ | ||
changes the future's output type" | ||
); | ||
} | ||
|
||
executor | ||
} | ||
} | ||
|
||
impl ComputeHeavyFutureExecutor for CustomExecutor { | ||
async fn execute<F, O>(&self, fut: F) -> Result<O, Error> | ||
where | ||
F: Future<Output = O> + Send + 'static, | ||
O: Send + 'static, | ||
{ | ||
let wrapped_future = Box::pin(async move { | ||
let res = fut.await; | ||
Box::new(res) as Box<dyn Any + Send> | ||
}); | ||
|
||
let executor_result = Box::into_pin((self.closure)(wrapped_future)).await; | ||
|
||
match executor_result { | ||
Ok(future_result) => { | ||
if let Ok(value) = future_result.downcast::<O>() { | ||
Ok(*value) | ||
} else { | ||
// we would love to include the output that we actually received, but we don't want | ||
// to force display/debug bounds onto the future output | ||
Err(Error::CustomExecutorOutputTypeMismatch) | ||
} | ||
}, | ||
Err(err) => Err(Error::BoxError(err)) | ||
|
||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
use core::fmt; | ||
|
||
#[non_exhaustive] | ||
#[derive(Debug)] | ||
pub enum Error { | ||
CustomExecutorOutputTypeMismatch, | ||
#[cfg(compute_heavy_executor_tokio)] | ||
JoinError(String), | ||
BoxError(Box<dyn std::error::Error + Send + Sync>) | ||
} | ||
|
||
impl fmt::Display for Error { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
match self { | ||
Error::CustomExecutorOutputTypeMismatch => write!(f, "custom executor returned a different output type than the input future's output type"), | ||
#[cfg(compute_heavy_executor_tokio)] | ||
Error::JoinError(err) => write!(f, "error joining tokio handle: {err}"), | ||
Error::BoxError(err) => write!(f, "{err}"), | ||
} | ||
} | ||
} |
Oops, something went wrong.