From c0c0de737285d847f94eca51d53aac11b953fbcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Mart=C3=ADn=20Nieto?= Date: Sat, 6 Jul 2024 23:47:16 +0200 Subject: [PATCH] gio: Add a method to get a stream of incoming connections to SocketListener This object provides a `Stream` over which we can iterate in order to accept connections in an async environment. This is analogous to the one provided by async-std's `TcpListener`. --- gio/src/lib.rs | 1 + gio/src/prelude.rs | 3 +- gio/src/socket_listener.rs | 56 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 gio/src/socket_listener.rs diff --git a/gio/src/lib.rs b/gio/src/lib.rs index 3c95c7816754..b559db77329d 100644 --- a/gio/src/lib.rs +++ b/gio/src/lib.rs @@ -82,6 +82,7 @@ mod simple_proxy_resolver; mod socket; pub use socket::{InputMessage, InputVector, OutputMessage, OutputVector, SocketControlMessages}; mod socket_control_message; +mod socket_listener; mod socket_msg_flags; pub use socket_msg_flags::SocketMsgFlags; mod subprocess; diff --git a/gio/src/prelude.rs b/gio/src/prelude.rs index fa19d930ffc1..9c9e1998c57c 100644 --- a/gio/src/prelude.rs +++ b/gio/src/prelude.rs @@ -41,5 +41,6 @@ pub use crate::{ output_stream::OutputStreamExtManual, pollable_input_stream::PollableInputStreamExtManual, pollable_output_stream::PollableOutputStreamExtManual, settings::SettingsExtManual, simple_proxy_resolver::SimpleProxyResolverExtManual, socket::SocketExtManual, - socket_control_message::SocketControlMessageExtManual, tls_connection::TlsConnectionExtManual, + socket_control_message::SocketControlMessageExtManual, + socket_listener::SocketListenerExtManual, tls_connection::TlsConnectionExtManual, }; diff --git a/gio/src/socket_listener.rs b/gio/src/socket_listener.rs new file mode 100644 index 000000000000..023468e852d9 --- /dev/null +++ b/gio/src/socket_listener.rs @@ -0,0 +1,56 @@ +// Take a look at the license at the top of the repository in the LICENSE file. + +use std::{pin::Pin, task::ready}; + +use futures_core::{ + stream::Stream, + task::{Context, Poll}, + Future, +}; + +use crate::{prelude::SocketListenerExt, SocketConnection, SocketListener}; +use glib::{prelude::*, Error, Object}; + +pub struct Incoming { + listener: SocketListener, + fut: Option), Error>>>>>, +} + +impl Stream for Incoming { + type Item = Result<(SocketConnection, Option), Error>; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + if self.fut.is_none() { + self.fut = Some(self.listener.accept_future()); + } + + let fut = self.fut.as_mut().unwrap(); + let res = ready!(Pin::new(fut).poll(ctx)); + self.fut.take(); + + Poll::Ready(Some(res)) + } +} + +pub trait SocketListenerExtManual: SocketListenerExt { + // rustdoc-stripper-ignore-next + /// Returns a stream of incoming connections + /// + /// Iterating over this stream is equivalent to calling [`SocketListenerExt::accept_future`] in a + /// loop. The stream of connections is infinite, i.e awaiting the next + /// connection will never result in [`None`]. + fn incoming( + &self, + ) -> Pin), Error>>>>; +} + +impl> SocketListenerExtManual for O { + fn incoming( + &self, + ) -> Pin), Error>>>> { + Box::pin(Incoming { + listener: self.as_ref().clone(), + fut: None, + }) + } +}