Skip to content

Commit

Permalink
Revert "introduce Transition enum"
Browse files Browse the repository at this point in the history
This reverts commit 6906f87.
  • Loading branch information
teor2345 authored and yaahc committed Feb 24, 2021
1 parent a5e89f4 commit 72e2e83
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 247 deletions.
1 change: 1 addition & 0 deletions zebra-network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use client::ClientRequest;
use client::ClientRequestReceiver;
use client::InProgressClientRequest;
use client::MustUseOneshotSender;
use error::ErrorSlot;

pub use client::Client;
pub use connection::Connection;
Expand Down
25 changes: 21 additions & 4 deletions zebra-network/src/peer/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@ use std::{

use futures::{
channel::{mpsc, oneshot},
ready,
future, ready,
stream::{Stream, StreamExt},
};
use tower::Service;

use crate::protocol::internal::{Request, Response};

use super::{PeerError, SharedPeerError};
use super::{ErrorSlot, PeerError, SharedPeerError};

/// The "client" duplex half of a peer connection.
pub struct Client {
// Used to shut down the corresponding heartbeat.
// This is always Some except when we take it on drop.
pub(super) shutdown_tx: Option<oneshot::Sender<()>>,
pub(super) server_tx: mpsc::Sender<ClientRequest>,
pub(super) error_slot: ErrorSlot,
}

/// A message from the `peer::Client` to the `peer::Server`.
Expand Down Expand Up @@ -97,6 +98,13 @@ impl From<ClientRequest> for InProgressClientRequest {
}
}

impl ClientRequestReceiver {
/// Forwards to `inner.close()`
pub fn close(&mut self) {
self.inner.close()
}
}

impl Stream for ClientRequestReceiver {
type Item = InProgressClientRequest;

Expand Down Expand Up @@ -191,7 +199,10 @@ impl Service<Request> for Client {

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if ready!(self.server_tx.poll_ready(cx)).is_err() {
Poll::Ready(Err(PeerError::ConnectionClosed.into()))
Poll::Ready(Err(self
.error_slot
.try_get_error()
.expect("failed servers must set their error slot")))
} else {
Poll::Ready(Ok(()))
}
Expand All @@ -210,7 +221,13 @@ impl Service<Request> for Client {
match self.server_tx.try_send(ClientRequest { request, span, tx }) {
Err(e) => {
if e.is_disconnected() {
async { Err(PeerError::ConnectionClosed.into()) }.boxed()
let ClientRequest { tx, .. } = e.into_inner();
let _ = tx.send(Err(PeerError::ConnectionClosed.into()));
future::ready(Err(self
.error_slot
.try_get_error()
.expect("failed servers must set their error slot")))
.boxed()
} else {
// sending fails when there's not enough
// channel space, but we called poll_ready
Expand Down
Loading

0 comments on commit 72e2e83

Please sign in to comment.