Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor connection.rs to make fail_with errors impossible #1721

Merged
merged 13 commits into from
Feb 19, 2021
1 change: 0 additions & 1 deletion zebra-network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use client::ClientRequest;
use client::ClientRequestReceiver;
use client::InProgressClientRequest;
use client::MustUseOneshotSender;
use error::ErrorSlot;

pub use client::Client;
pub use connection::Connection;
Expand Down
27 changes: 4 additions & 23 deletions zebra-network/src/peer/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,21 @@ use std::{

use futures::{
channel::{mpsc, oneshot},
future, ready,
ready,
stream::{Stream, StreamExt},
};
use tower::Service;

use crate::protocol::internal::{Request, Response};

use super::{ErrorSlot, PeerError, SharedPeerError};
use super::{PeerError, SharedPeerError};

/// The "client" duplex half of a peer connection.
pub struct Client {
// Used to shut down the corresponding heartbeat.
// This is always Some except when we take it on drop.
pub(super) shutdown_tx: Option<oneshot::Sender<()>>,
pub(super) server_tx: mpsc::Sender<ClientRequest>,
pub(super) error_slot: ErrorSlot,
}

/// A message from the `peer::Client` to the `peer::Server`.
Expand All @@ -47,8 +46,6 @@ pub(super) struct ClientRequestReceiver {

/// A message from the `peer::Client` to the `peer::Server`,
/// after it has been received by the `peer::Server`.
///
///
#[derive(Debug)]
#[must_use = "tx.send() must be called before drop"]
pub(super) struct InProgressClientRequest {
Expand Down Expand Up @@ -98,13 +95,6 @@ impl From<ClientRequest> for InProgressClientRequest {
}
}

impl ClientRequestReceiver {
/// Forwards to `inner.close()`
pub fn close(&mut self) {
self.inner.close()
}
}

impl Stream for ClientRequestReceiver {
type Item = InProgressClientRequest;

Expand Down Expand Up @@ -199,10 +189,7 @@ impl Service<Request> for Client {

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if ready!(self.server_tx.poll_ready(cx)).is_err() {
Poll::Ready(Err(self
.error_slot
.try_get_error()
.expect("failed servers must set their error slot")))
Comment on lines -202 to -205
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change replaces error_slot with PeerError::ConnectionClosed.

What happened to the error value that used to be here?
Are we hiding more specific errors as part of this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly, but I think the errors we used to grab here were misleading and shouldn't be propagated back to this source. The error slot used to store the error that brought down a Connection, but that error is actually associated with one of the client requests created by call. We would propagate the error back to that caller over the channel and also copy it to be shared with all future callers. My instinct is that this isn't particularly useful and that we will correctly report all these errors still, they just wont end up being reported many extra times. Imo its fine for subsequent attempts to use a Client with a dead Connection to just say "sorry this is already closed".

Poll::Ready(Err(PeerError::ConnectionClosed.into()))
} else {
Poll::Ready(Ok(()))
}
Expand All @@ -221,13 +208,7 @@ impl Service<Request> for Client {
match self.server_tx.try_send(ClientRequest { request, span, tx }) {
Err(e) => {
if e.is_disconnected() {
let ClientRequest { tx, .. } = e.into_inner();
let _ = tx.send(Err(PeerError::ConnectionClosed.into()));
future::ready(Err(self
.error_slot
.try_get_error()
.expect("failed servers must set their error slot")))
.boxed()
Comment on lines -224 to -230
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens to the ClientRequest.tx we used to send on here?
What happens to the specific error in the error_slot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's never sent to the Connection in the background so we never poll the rx so there's no reason to send an error through the tx before dropping it. I think this originally got added when we were figuring out the MustUseSender, and this was added to avoid panicking because the sender hadn't been used, but we don't even convert to a MustUseSender yet here so it should be fine.

There error slot here is gone by the same logic above, the error from a previous request doesn't need to be propagated back to subsequent failed attempts at new requests.

async { Err(PeerError::ConnectionClosed.into()) }.boxed()
} else {
// sending fails when there's not enough
// channel space, but we called poll_ready
Expand Down
Loading