From d9bd916ad3b8849863a08e96c994d4bbaf81b1ba Mon Sep 17 00:00:00 2001 From: Thomas Wright Date: Fri, 8 Nov 2024 19:02:39 +0100 Subject: [PATCH] Replace the implementation of oneshot_to_stream in the async runtime using the flatten_stream library function --- src/async_runtime.rs | 38 ++++++++++---------------------------- 1 file changed, 10 insertions(+), 28 deletions(-) diff --git a/src/async_runtime.rs b/src/async_runtime.rs index 2c388a1..ff36582 100644 --- a/src/async_runtime.rs +++ b/src/async_runtime.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use futures::future::join_all; use futures::stream; use futures::stream::BoxStream; +use futures::FutureExt; use futures::StreamExt; use std::fmt::Debug; use tokio::select; @@ -24,40 +25,21 @@ use crate::core::MonitoringSemantics; use crate::core::Specification; use crate::core::{OutputStream, StreamContext, StreamData, VarName}; -/* A struct containing a stream of type S which is lazily provided via a - * oneshot channel. This is used to allow a stream to be provided lazily to a - * consumer. - * - * This is either in the state Arrived(S) if the stream has already arrived, or - * Waiting(oneshot::Receiver) when we are still waiting to receive the - * stream on the channel. - */ -enum WaitingStream { - Arrived(S), - Waiting(oneshot::Receiver), -} - /* This function takes a receiver of a stream and returns a new stream which * first waits for the stream to be received on the channel before the first * element is supplied. This is useful for providing code which depend on a * stream with this stream before it has been defined, and make it possible to * define multiple mutually recursive streams. */ -fn oneshot_to_stream(receiver: oneshot::Receiver>) -> OutputStream { - Box::pin(stream::unfold( - WaitingStream::Waiting(receiver), - |waiting_stream| async move { - match waiting_stream { - WaitingStream::Arrived(mut stream) => { - Some((stream.next().await?, WaitingStream::Arrived(stream))) - } - WaitingStream::Waiting(receiver) => match receiver.await { - Ok(mut stream) => Some((stream.next().await?, WaitingStream::Arrived(stream))), - Err(_) => None, - }, - } - }, - )) +fn oneshot_to_stream( + receiver: oneshot::Receiver>, +) -> OutputStream { + let empty_stream = Box::pin(stream::empty()); + Box::pin( + receiver + .map(|res| res.unwrap_or(empty_stream)) + .flatten_stream() + ) } /* Wrap a stream in a drop guard to ensure that the associated cancellation