Skip to content

Commit

Permalink
feat: Allow to create tarpc on existing TCP listener
Browse files Browse the repository at this point in the history
  • Loading branch information
SabatierBoris authored and tikue committed Jan 2, 2024
1 parent b92dd15 commit d62706e
Showing 1 changed file with 33 additions and 1 deletion.
34 changes: 33 additions & 1 deletion tarpc/src/serde_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,19 @@ pub mod tcp {
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
let listener = TcpListener::bind(addr).await?;
listen_on(TcpListener::bind(addr).await?, codec_fn).await
}

/// Wrap accepted connections from `listener` in TCP transports.
pub async fn listen_on<Item, SinkItem, Codec, CodecFn>(
listener: TcpListener,
codec_fn: CodecFn,
) -> io::Result<Incoming<Item, SinkItem, Codec, CodecFn>>
where
Item: for<'de> Deserialize<'de>,
Codec: Serializer<SinkItem> + Deserializer<Item>,
CodecFn: Fn() -> Codec,
{
let local_addr = listener.local_addr()?;
Ok(Incoming {
listener,
Expand Down Expand Up @@ -662,6 +674,26 @@ mod tests {
Ok(())
}

#[cfg(tcp)]
#[tokio::test]
async fn tcp_on_existing_transport() -> io::Result<()> {
use super::tcp;

let transport = TcpListener::bind("0.0.0.0:0").await?;
let mut listener = tcp::listen_on(transport, SymmetricalJson::<String>::default).await?;
let addr = listener.local_addr();
tokio::spawn(async move {
let mut transport = listener.next().await.unwrap().unwrap();
let message = transport.next().await.unwrap().unwrap();
transport.send(message).await.unwrap();
});
let mut transport = tcp::connect(addr, SymmetricalJson::<String>::default).await?;
transport.send(String::from("test")).await?;
assert_matches!(transport.next().await, Some(Ok(s)) if s == "test");
assert_matches!(transport.next().await, None);
Ok(())
}

#[cfg(all(unix, feature = "unix"))]
#[tokio::test]
async fn uds() -> io::Result<()> {
Expand Down

0 comments on commit d62706e

Please sign in to comment.