Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid panics when there are multiple failures on the same connection #1600

Merged
merged 4 commits into from
Jan 18, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 49 additions & 11 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,11 @@ where
State::AwaitingResponse {
ref mut handler, ..
} => span.in_scope(|| handler.process_message(peer_msg)),
_ => unreachable!(),
_ => unreachable!("unexpected state after AwaitingResponse: {:?}, peer_msg: {:?}, client_receiver: {:?}",
self.state,
peer_msg,
self.client_rx,
),
};
// If the message was not consumed, check whether it
// should be handled as a request.
Expand All @@ -444,7 +448,10 @@ where
State::AwaitingRequest
}
pending @ State::AwaitingResponse { .. } => pending,
_ => unreachable!(),
_ => unreachable!(
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
self.client_rx
),
};
}
}
Expand All @@ -465,7 +472,10 @@ where
let _ = tx.send(Err(e.into()));
State::AwaitingRequest
}
_ => unreachable!(),
_ => unreachable!(
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
self.client_rx
),
};
}
Either::Right((Either::Right(_), _peer_fut)) => {
Expand All @@ -481,7 +491,7 @@ where
Some(InProgressClientRequest { tx, span, .. }) => {
trace!(
parent: &span,
"erroring pending request to failed connection"
"sending an error response to a pending request on a failed connection"
);
let e = self
.error_slot
Expand All @@ -504,15 +514,31 @@ where
E: Into<SharedPeerError>,
{
let e = e.into();
debug!(%e, "failing peer service with error");
debug!(%e,
connection_state = ?self.state,
client_receiver = ?self.client_rx,
"failing peer service with error");
// Update the shared error slot
let mut guard = self
.error_slot
.0
.lock()
.expect("mutex should be unpoisoned");
if guard.is_some() {
panic!("called fail_with on already-failed connection state");
if let Some(original_error) = guard.clone() {
// This panic typically happens due to these bugs:
// * we mark a connection as failed without using fail_with
// * we call fail_with without checking for a failed connection
// state
//
// See the original bug #1510 and PR #1531, and the later bug #1599
// and PR #1600.
panic!(
"calling fail_with on already-failed connection state: failed connections must stop processing pending requests and responses, then close the connection. state: {:?} original error: {:?} new error: {:?} client receiver: {:?}",
self.state,
original_error,
e,
self.client_rx
);
} else {
*guard = Some(e);
}
Expand Down Expand Up @@ -551,9 +577,17 @@ where

// These matches return a Result with (new_state, Option<Sender>) or an (error, Sender)
let new_state_result = match (&self.state, request) {
(Failed, _) => panic!("failed connection cannot handle requests"),
(AwaitingResponse { .. }, _) => panic!("tried to update pending request"),

(Failed, request) => panic!(
"failed connection cannot handle new request: {:?}, client_receiver: {:?}",
request,
self.client_rx
),
(pending @ AwaitingResponse { .. }, request) => panic!(
"tried to process new request: {:?} while awaiting a response: {:?}, client_receiver: {:?}",
request,
pending,
self.client_rx
),
(AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await {
Ok(()) => Ok((
AwaitingResponse {
Expand Down Expand Up @@ -840,6 +874,7 @@ where
if self.svc.ready_and().await.is_err() {
// Treat all service readiness errors as Overloaded
self.fail_with(PeerError::Overloaded);
return;
}

let rsp = match self.svc.call(req).await {
Expand All @@ -850,7 +885,10 @@ where
self.fail_with(PeerError::Overloaded);
} else {
// We could send a reject to the remote peer.
error!(%e);
error!(%e,
connection_state = ?self.state,
client_receiver = ?self.client_rx,
"error processing peer request");
}
return;
}
Expand Down