Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid panics when there are multiple failures on the same connection #1600

Merged
merged 4 commits into from
Jan 18, 2021
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 60 additions & 11 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,11 @@ where
State::AwaitingResponse {
ref mut handler, ..
} => span.in_scope(|| handler.process_message(peer_msg)),
_ => unreachable!(),
_ => unreachable!("unexpected state after AwaitingResponse: {:?}, peer_msg: {:?}, client_receiver: {:?}",
self.state,
peer_msg,
self.client_rx,
),
};
// If the message was not consumed, check whether it
// should be handled as a request.
Expand All @@ -444,7 +448,10 @@ where
State::AwaitingRequest
}
pending @ State::AwaitingResponse { .. } => pending,
_ => unreachable!(),
_ => unreachable!(
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
self.client_rx
),
};
}
}
Expand All @@ -465,7 +472,10 @@ where
let _ = tx.send(Err(e.into()));
State::AwaitingRequest
}
_ => unreachable!(),
_ => unreachable!(
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
self.client_rx
),
};
}
Either::Right((Either::Right(_), _peer_fut)) => {
Expand All @@ -481,7 +491,7 @@ where
Some(InProgressClientRequest { tx, span, .. }) => {
trace!(
parent: &span,
"erroring pending request to failed connection"
"sending an error response to a pending request on a failed connection"
);
let e = self
.error_slot
Expand All @@ -504,15 +514,42 @@ where
E: Into<SharedPeerError>,
{
let e = e.into();
debug!(%e, "failing peer service with error");
debug!(%e,
connection_state = ?self.state,
client_receiver = ?self.client_rx,
"failing peer service with error");
// Update the shared error slot
let mut guard = self
.error_slot
.0
.lock()
.expect("mutex should be unpoisoned");
if guard.is_some() {
panic!("called fail_with on already-failed connection state");
if let Some(original_error) = guard.clone() {
// A failed connection might experience further errors if we:
// 1. concurrently process two different messages
// 2. check for a failed state for the second message
// 3. fail the connection due to the first message
// 4. fail the connection due to the second message
//
// It's not clear:
// * if this is actually a bug,
// * how we can modify Zebra to avoid it.
//
// This warning can also happen due to these bugs:
// * we mark a connection as failed without using fail_with
// * we call fail_with without checking for a failed connection
// state
//
// See the original bug #1510, the initial fix #1531, and the later
// bug #1599.
warn!(?original_error,
new_error = ?e,
connection_state = ?self.state,
client_receiver = ?self.client_rx,
"calling fail_with on already-failed connection state: ignoring new error");
// we don't need to clean up the connection, the original call to
// fail_with does that
return;
} else {
*guard = Some(e);
}
Expand Down Expand Up @@ -551,9 +588,17 @@ where

// These matches return a Result with (new_state, Option<Sender>) or an (error, Sender)
let new_state_result = match (&self.state, request) {
(Failed, _) => panic!("failed connection cannot handle requests"),
(AwaitingResponse { .. }, _) => panic!("tried to update pending request"),

(Failed, request) => panic!(
"failed connection cannot handle new request: {:?}, client_receiver: {:?}",
request,
self.client_rx
),
(pending @ AwaitingResponse { .. }, request) => panic!(
"tried to process new request: {:?} while awaiting a response: {:?}, client_receiver: {:?}",
request,
pending,
self.client_rx
),
(AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await {
Ok(()) => Ok((
AwaitingResponse {
Expand Down Expand Up @@ -840,6 +885,7 @@ where
if self.svc.ready_and().await.is_err() {
// Treat all service readiness errors as Overloaded
self.fail_with(PeerError::Overloaded);
return;
}

let rsp = match self.svc.call(req).await {
Expand All @@ -850,7 +896,10 @@ where
self.fail_with(PeerError::Overloaded);
} else {
// We could send a reject to the remote peer.
error!(%e);
error!(%e,
connection_state = ?self.state,
client_receiver = ?self.client_rx,
"error processing peer request");
}
return;
}
Expand Down