From 46dd2a685f8c69b7b4021f13d538dae00c818ae1 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Tue, 11 Apr 2023 15:08:44 +0200 Subject: [PATCH] fix: Handle bearer I/O errors (#247) --- pallas-network/src/facades.rs | 4 ++-- pallas-network/src/multiplexer.rs | 27 ++++++++++++++++++--------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index c2b19849..4623be79 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -25,7 +25,7 @@ pub enum Error { } pub struct PeerClient { - plexer_handle: JoinHandle>, + plexer_handle: JoinHandle>, pub handshake: handshake::Confirmation, chainsync: chainsync::N2NClient, blockfetch: blockfetch::Client, @@ -81,7 +81,7 @@ impl PeerClient { } pub struct NodeClient { - plexer_handle: JoinHandle>, + plexer_handle: JoinHandle>, pub handshake: handshake::Confirmation, chainsync: chainsync::N2CClient, statequery: localstate::ClientV10, diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index 733d9f26..ab91aa4b 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -151,7 +151,10 @@ impl SegmentBuffer { let mut buf = vec![0u8; remaining]; match self.0.try_read(&mut buf) { - Ok(0) => break Err(Error::EmptyBearer), + Ok(0) => { + error!("empty bearer"); + break Err(Error::EmptyBearer); + } Ok(n) => { trace!(n, "found data on bearer"); self.1.extend_from_slice(&buf[0..n]); @@ -164,8 +167,9 @@ impl SegmentBuffer { trace!("reading from bearer would block"); continue; } - Err(e) => { - return Err(Error::BearerIo(e)); + Err(err) => { + error!(?err, "beaerer IO error"); + break Err(Error::BearerIo(err)); } } } @@ -296,10 +300,11 @@ impl Plexer { } } - async fn mux(&mut self, msg: (Protocol, Payload)) -> tokio::io::Result<()> { + async fn mux(&mut self, msg: (Protocol, Payload)) -> Result<(), Error> { self.bearer .write_segment(msg.0, &self.clock, &msg.1) - .await?; + .await + .map_err(|_| Error::PlexerMux)?; if tracing::event_enabled!(tracing::Level::TRACE) { trace!( @@ -312,12 +317,15 @@ impl Plexer { Ok(()) } - async fn demux(&mut self, protocol: Protocol, payload: Payload) -> tokio::io::Result<()> { + async fn demux(&mut self, protocol: Protocol, payload: Payload) -> Result<(), Error> { if tracing::event_enabled!(tracing::Level::TRACE) { trace!(protocol, data = hex::encode(&payload), "read from bearer"); } - self.egress.0.send((protocol, payload)).unwrap(); + self.egress + .0 + .send((protocol, payload)) + .map_err(|err| Error::PlexerDemux(err.0 .0, err.0 .1))?; Ok(()) } @@ -330,11 +338,12 @@ impl Plexer { AgentChannel::for_server(protocol, &self.ingress, &self.egress) } - pub async fn run(&mut self) -> tokio::io::Result<()> { + pub async fn run(&mut self) -> Result<(), Error> { loop { trace!("selecting"); select! { - Ok(x) = self.bearer.read_segment() => { + res = self.bearer.read_segment() => { + let x = res?; trace!("demux selected"); self.demux(x.0, x.1).await? },