Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream support for Command #2150

Merged
merged 2 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion futures/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Run commands and keep track of subscriptions.
use crate::core::event::{self, Event};
use crate::subscription;
use crate::{BoxFuture, Executor, MaybeSend};
use crate::{BoxFuture, BoxStream, Executor, MaybeSend};

use futures::{channel::mpsc, Sink};
use std::marker::PhantomData;
Expand Down Expand Up @@ -69,6 +69,29 @@ where
self.executor.spawn(future);
}

/// Runs a [`Stream`] in the [`Runtime`] until completion.
///
/// The resulting `Message`s will be forwarded to the `Sender` of the
/// [`Runtime`].
///
/// [`Stream`]: BoxStream
pub fn run(&mut self, stream: BoxStream<Message>) {
use futures::{FutureExt, StreamExt};

let sender = self.sender.clone();
let future =
stream.map(Ok).forward(sender).map(|result| match result {
Ok(()) => (),
Err(error) => {
log::warn!(
"Stream could not run until completion: {error}"
);
}
});

self.executor.spawn(future);
}

/// Tracks a [`Subscription`] in the [`Runtime`].
///
/// It will spawn new streams or close old ones as necessary! See
Expand Down
35 changes: 34 additions & 1 deletion runtime/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ mod action;
pub use action::Action;

use crate::core::widget;
use crate::futures::futures;
use crate::futures::MaybeSend;

use futures::channel::mpsc;
use futures::Stream;
use std::fmt;
use std::future::Future;

Expand Down Expand Up @@ -43,11 +46,21 @@ impl<T> Command<T> {
future: impl Future<Output = A> + 'static + MaybeSend,
f: impl FnOnce(A) -> T + 'static + MaybeSend,
) -> Command<T> {
use iced_futures::futures::FutureExt;
use futures::FutureExt;

Command::single(Action::Future(Box::pin(future.map(f))))
}

/// Creates a [`Command`] that runs the given stream to completion.
pub fn run<A>(
stream: impl Stream<Item = A> + 'static + MaybeSend,
f: impl Fn(A) -> T + 'static + MaybeSend,
) -> Command<T> {
use futures::StreamExt;

Command::single(Action::Stream(Box::pin(stream.map(f))))
}

/// Creates a [`Command`] that performs the actions of all the given
/// commands.
///
Expand Down Expand Up @@ -106,3 +119,23 @@ impl<T> fmt::Debug for Command<T> {
command.fmt(f)
}
}

/// Creates a [`Command`] that produces the `Message`s published from a [`Future`]
/// to an [`mpsc::Sender`] with the given bounds.
pub fn channel<Fut, Message>(
size: usize,
f: impl FnOnce(mpsc::Sender<Message>) -> Fut + MaybeSend + 'static,
) -> Command<Message>
where
Fut: Future<Output = ()> + MaybeSend + 'static,
Message: 'static + MaybeSend,
{
use futures::future;
use futures::stream::{self, StreamExt};

let (sender, receiver) = mpsc::channel(size);

let runner = stream::once(f(sender)).filter_map(|_| future::ready(None));

Command::single(Action::Stream(Box::pin(stream::select(receiver, runner))))
}
9 changes: 8 additions & 1 deletion runtime/src/command/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ pub enum Action<T> {
/// [`Future`]: iced_futures::BoxFuture
Future(iced_futures::BoxFuture<T>),

/// Run a [`Stream`] to completion.
///
/// [`Stream`]: iced_futures::BoxStream
Stream(iced_futures::BoxStream<T>),

/// Run a clipboard action.
Clipboard(clipboard::Action<T>),

Expand Down Expand Up @@ -52,10 +57,11 @@ impl<T> Action<T> {
A: 'static,
T: 'static,
{
use iced_futures::futures::FutureExt;
use iced_futures::futures::{FutureExt, StreamExt};

match self {
Self::Future(future) => Action::Future(Box::pin(future.map(f))),
Self::Stream(stream) => Action::Stream(Box::pin(stream.map(f))),
Self::Clipboard(action) => Action::Clipboard(action.map(f)),
Self::Window(window) => Action::Window(window.map(f)),
Self::System(system) => Action::System(system.map(f)),
Expand All @@ -74,6 +80,7 @@ impl<T> fmt::Debug for Action<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Future(_) => write!(f, "Action::Future"),
Self::Stream(_) => write!(f, "Action::Stream"),
Self::Clipboard(action) => {
write!(f, "Action::Clipboard({action:?})")
}
Expand Down
7 changes: 6 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ pub use crate::core::{
color, Alignment, Background, BorderRadius, Color, ContentFit, Degrees,
Gradient, Length, Padding, Pixels, Point, Radians, Rectangle, Size, Vector,
};
pub use crate::runtime::Command;

pub mod clipboard {
//! Access the clipboard.
Expand Down Expand Up @@ -239,6 +238,11 @@ pub mod mouse {
};
}

pub mod command {
//! Run asynchronous actions.
pub use crate::runtime::command::{channel, Command};
}

pub mod subscription {
//! Listen to external events in your application.
pub use iced_futures::subscription::{
Expand Down Expand Up @@ -287,6 +291,7 @@ pub mod widget {
}

pub use application::Application;
pub use command::Command;
pub use error::Error;
pub use event::Event;
pub use executor::Executor;
Expand Down
3 changes: 3 additions & 0 deletions winit/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,9 @@ pub fn run_command<A, C, E>(
command::Action::Future(future) => {
runtime.spawn(future);
}
command::Action::Stream(stream) => {
runtime.run(stream);
}
command::Action::Clipboard(action) => match action {
clipboard::Action::Read(tag) => {
let message = tag(clipboard.read());
Expand Down
Loading