Skip to content

Commit

Permalink
Replace the implementation of oneshot_to_stream in the async runtime …
Browse files Browse the repository at this point in the history
…using the flatten_stream library function
  • Loading branch information
twright committed Nov 8, 2024
1 parent c969555 commit d9bd916
Showing 1 changed file with 10 additions and 28 deletions.
38 changes: 10 additions & 28 deletions src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;
use futures::future::join_all;
use futures::stream;
use futures::stream::BoxStream;
use futures::FutureExt;
use futures::StreamExt;
use std::fmt::Debug;
use tokio::select;
Expand All @@ -24,40 +25,21 @@ use crate::core::MonitoringSemantics;
use crate::core::Specification;
use crate::core::{OutputStream, StreamContext, StreamData, VarName};

/* A struct containing a stream of type S which is lazily provided via a
* oneshot channel. This is used to allow a stream to be provided lazily to a
* consumer.
*
* This is either in the state Arrived(S) if the stream has already arrived, or
* Waiting(oneshot::Receiver<S>) when we are still waiting to receive the
* stream on the channel.
*/
enum WaitingStream<S> {
Arrived(S),
Waiting(oneshot::Receiver<S>),
}

/* This function takes a receiver of a stream and returns a new stream which
* first waits for the stream to be received on the channel before the first
* element is supplied. This is useful for providing code which depend on a
* stream with this stream before it has been defined, and make it possible to
* define multiple mutually recursive streams.
*/
fn oneshot_to_stream<T: 'static>(receiver: oneshot::Receiver<OutputStream<T>>) -> OutputStream<T> {
Box::pin(stream::unfold(
WaitingStream::Waiting(receiver),
|waiting_stream| async move {
match waiting_stream {
WaitingStream::Arrived(mut stream) => {
Some((stream.next().await?, WaitingStream::Arrived(stream)))
}
WaitingStream::Waiting(receiver) => match receiver.await {
Ok(mut stream) => Some((stream.next().await?, WaitingStream::Arrived(stream))),
Err(_) => None,
},
}
},
))
fn oneshot_to_stream<T: StreamData>(
receiver: oneshot::Receiver<OutputStream<T>>,
) -> OutputStream<T> {
let empty_stream = Box::pin(stream::empty());
Box::pin(
receiver
.map(|res| res.unwrap_or(empty_stream))
.flatten_stream()
)
}

/* Wrap a stream in a drop guard to ensure that the associated cancellation
Expand Down

0 comments on commit d9bd916

Please sign in to comment.