Skip to content

Commit

Permalink
fix: Handle bearer I/O errors (#247)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Apr 11, 2023
1 parent 1dc8717 commit 46dd2a6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pallas-network/src/facades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum Error {
}

pub struct PeerClient {
plexer_handle: JoinHandle<tokio::io::Result<()>>,
plexer_handle: JoinHandle<Result<(), crate::multiplexer::Error>>,
pub handshake: handshake::Confirmation<handshake::n2n::VersionData>,
chainsync: chainsync::N2NClient,
blockfetch: blockfetch::Client,
Expand Down Expand Up @@ -81,7 +81,7 @@ impl PeerClient {
}

pub struct NodeClient {
plexer_handle: JoinHandle<tokio::io::Result<()>>,
plexer_handle: JoinHandle<Result<(), crate::multiplexer::Error>>,
pub handshake: handshake::Confirmation<handshake::n2c::VersionData>,
chainsync: chainsync::N2CClient,
statequery: localstate::ClientV10,
Expand Down
27 changes: 18 additions & 9 deletions pallas-network/src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ impl SegmentBuffer {
let mut buf = vec![0u8; remaining];

match self.0.try_read(&mut buf) {
Ok(0) => break Err(Error::EmptyBearer),
Ok(0) => {
error!("empty bearer");
break Err(Error::EmptyBearer);
}
Ok(n) => {
trace!(n, "found data on bearer");
self.1.extend_from_slice(&buf[0..n]);
Expand All @@ -164,8 +167,9 @@ impl SegmentBuffer {
trace!("reading from bearer would block");
continue;
}
Err(e) => {
return Err(Error::BearerIo(e));
Err(err) => {
error!(?err, "beaerer IO error");
break Err(Error::BearerIo(err));
}
}
}
Expand Down Expand Up @@ -296,10 +300,11 @@ impl Plexer {
}
}

async fn mux(&mut self, msg: (Protocol, Payload)) -> tokio::io::Result<()> {
async fn mux(&mut self, msg: (Protocol, Payload)) -> Result<(), Error> {
self.bearer
.write_segment(msg.0, &self.clock, &msg.1)
.await?;
.await
.map_err(|_| Error::PlexerMux)?;

if tracing::event_enabled!(tracing::Level::TRACE) {
trace!(
Expand All @@ -312,12 +317,15 @@ impl Plexer {
Ok(())
}

async fn demux(&mut self, protocol: Protocol, payload: Payload) -> tokio::io::Result<()> {
async fn demux(&mut self, protocol: Protocol, payload: Payload) -> Result<(), Error> {
if tracing::event_enabled!(tracing::Level::TRACE) {
trace!(protocol, data = hex::encode(&payload), "read from bearer");
}

self.egress.0.send((protocol, payload)).unwrap();
self.egress
.0
.send((protocol, payload))
.map_err(|err| Error::PlexerDemux(err.0 .0, err.0 .1))?;

Ok(())
}
Expand All @@ -330,11 +338,12 @@ impl Plexer {
AgentChannel::for_server(protocol, &self.ingress, &self.egress)
}

pub async fn run(&mut self) -> tokio::io::Result<()> {
pub async fn run(&mut self) -> Result<(), Error> {
loop {
trace!("selecting");
select! {
Ok(x) = self.bearer.read_segment() => {
res = self.bearer.read_segment() => {
let x = res?;
trace!("demux selected");
self.demux(x.0, x.1).await?
},
Expand Down

0 comments on commit 46dd2a6

Please sign in to comment.