Skip to content

Commit

Permalink
gio: Add a method to get a stream of incoming connections to SocketLi…
Browse files Browse the repository at this point in the history
…stener

This object provides a `Stream` over which we can iterate in order to accept
connections in an async environment.

This is analogous to the one provided by async-std's `TcpListener`.
  • Loading branch information
carlosmn committed Jul 8, 2024
1 parent a9c1b6a commit e291438
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
2 changes: 2 additions & 0 deletions gio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ mod simple_proxy_resolver;
mod socket;
pub use socket::{InputMessage, InputVector, OutputMessage, OutputVector, SocketControlMessages};
mod socket_control_message;
mod socket_listener;
pub use socket_listener::SocketListenerExtManual;
mod socket_msg_flags;
pub use socket_msg_flags::SocketMsgFlags;
mod subprocess;
Expand Down
55 changes: 55 additions & 0 deletions gio/src/socket_listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Take a look at the license at the top of the repository in the LICENSE file.

use std::{pin::Pin, task::ready};

use futures_core::{
stream::Stream,
task::{Context, Poll},
Future,
};

use crate::{prelude::SocketListenerExt, SocketConnection, SocketListener};
use glib::{prelude::*, Error, Object};

pub struct Incoming {
listener: SocketListener,
fut: Option<Pin<Box<dyn Future<Output = Result<(SocketConnection, Option<Object>), Error>>>>>,
}

impl Stream for Incoming {
type Item = Result<(SocketConnection, Option<Object>), Error>;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.fut.is_none() {
self.fut = Some(self.listener.accept_future());
}

let fut = self.fut.as_mut().unwrap();
let res = ready!(Pin::new(fut).poll(ctx));
self.fut.take();

Poll::Ready(Some(res))
}
}

pub trait SocketListenerExtManual: SocketListenerExt {
/// Returns a stream of incoming connections
///
/// Iterating over this stream is equivalent to calling [`SocketListenerExt::accept_future`] in a
/// loop. The stream of connections is infinite, i.e awaiting the next
/// connection will never result in [`None`].
fn incoming(
&self,
) -> Pin<Box<dyn Stream<Item = Result<(SocketConnection, Option<Object>), Error>>>>;
}

impl<O: IsA<SocketListener>> SocketListenerExtManual for O {
fn incoming(
&self,
) -> Pin<Box<dyn Stream<Item = Result<(SocketConnection, Option<Object>), Error>>>> {
Box::pin(Incoming {
listener: self.as_ref().clone(),
fut: None,
})
}
}

0 comments on commit e291438

Please sign in to comment.