Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-1472 Validator Network log addresses #701

Merged
merged 8 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion finality-aleph/src/tcp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,36 @@ use tokio::net::{

use crate::{
network::{Multiaddress, NetworkIdentity, PeerId},
validator_network::{Dialer, Listener, Splittable},
validator_network::{ConnectionInfo, Dialer, Listener, Splittable},
};

impl ConnectionInfo for TcpStream {
fn peer_address_info(&self) -> String {
match self.peer_addr() {
Ok(addr) => addr.to_string(),
Err(e) => format!("unknown address: {}", e),
}
}
}

impl ConnectionInfo for OwnedWriteHalf {
fn peer_address_info(&self) -> String {
match self.peer_addr() {
Ok(addr) => addr.to_string(),
Err(e) => e.to_string(),
}
}
}

impl ConnectionInfo for OwnedReadHalf {
fn peer_address_info(&self) -> String {
match self.peer_addr() {
Ok(addr) => addr.to_string(),
Err(e) => e.to_string(),
}
}
}

impl Splittable for TcpStream {
type Sender = OwnedWriteHalf;
type Receiver = OwnedReadHalf;
Expand Down
67 changes: 51 additions & 16 deletions finality-aleph/src/testing/mocks/validator_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use crate::{
crypto::AuthorityPen,
network::{mock::Channel, Data, Multiaddress, NetworkIdentity},
validator_network::{
mock::random_keys, Dialer as DialerT, Listener as ListenerT, Network, Service, Splittable,
mock::random_keys, ConnectionInfo, Dialer as DialerT, Listener as ListenerT, Network,
PeerAddressInfo, Service, Splittable,
},
};

Expand Down Expand Up @@ -119,6 +120,7 @@ impl<D: Data> MockNetwork<D> {
pub struct UnreliableDuplexStream {
stream: DuplexStream,
counter: Option<usize>,
peer_address: Address,
}

impl AsyncWrite for UnreliableDuplexStream {
Expand Down Expand Up @@ -160,33 +162,45 @@ impl AsyncRead for UnreliableDuplexStream {
pub struct UnreliableSplittable {
incoming_data: UnreliableDuplexStream,
outgoing_data: UnreliableDuplexStream,
peer_address: Address,
}

impl UnreliableSplittable {
/// Create a pair of mock splittables connected to each other.
pub fn new(max_buf_size: usize, ends_after: Option<usize>) -> (Self, Self) {
let (in_a, out_b) = duplex(max_buf_size);
let (in_b, out_a) = duplex(max_buf_size);
pub fn new(
max_buf_size: usize,
ends_after: Option<usize>,
l_address: Address,
r_address: Address,
) -> (Self, Self) {
let (l_in, r_out) = duplex(max_buf_size);
let (r_in, l_out) = duplex(max_buf_size);
(
UnreliableSplittable {
incoming_data: UnreliableDuplexStream {
stream: in_a,
stream: l_in,
counter: ends_after,
peer_address: r_address,
},
outgoing_data: UnreliableDuplexStream {
stream: out_a,
stream: l_out,
counter: ends_after,
peer_address: r_address,
},
peer_address: r_address,
},
UnreliableSplittable {
incoming_data: UnreliableDuplexStream {
stream: in_b,
stream: r_in,
counter: ends_after,
peer_address: l_address,
},
outgoing_data: UnreliableDuplexStream {
stream: out_b,
stream: r_out,
counter: ends_after,
peer_address: l_address,
},
peer_address: l_address,
},
)
}
Expand Down Expand Up @@ -216,6 +230,18 @@ impl AsyncWrite for UnreliableSplittable {
}
}

impl ConnectionInfo for UnreliableSplittable {
fn peer_address_info(&self) -> PeerAddressInfo {
self.peer_address.to_string()
}
}

impl ConnectionInfo for UnreliableDuplexStream {
fn peer_address_info(&self) -> PeerAddressInfo {
self.peer_address.to_string()
}
}

impl Splittable for UnreliableSplittable {
type Sender = UnreliableDuplexStream;
type Receiver = UnreliableDuplexStream;
Expand All @@ -234,7 +260,9 @@ const TWICE_MAX_DATA_SIZE: usize = 32 * 1024 * 1024;

#[derive(Clone)]
pub struct MockDialer {
channel_connect: mpsc::UnboundedSender<(Address, oneshot::Sender<Connection>)>,
// used for logging
own_address: Address,
channel_connect: mpsc::UnboundedSender<(Address, Address, oneshot::Sender<Connection>)>,
}

#[async_trait::async_trait]
Expand All @@ -245,7 +273,7 @@ impl DialerT<Address> for MockDialer {
async fn connect(&mut self, addresses: Vec<Address>) -> Result<Self::Connection, Self::Error> {
let (tx, rx) = oneshot::channel();
self.channel_connect
.unbounded_send((addresses[0], tx))
.unbounded_send((self.own_address, addresses[0], tx))
.expect("should send");
Ok(rx.await.expect("should receive"))
}
Expand All @@ -266,7 +294,7 @@ impl ListenerT for MockListener {
}

pub struct UnreliableConnectionMaker {
dialers: mpsc::UnboundedReceiver<(Address, oneshot::Sender<Connection>)>,
dialers: mpsc::UnboundedReceiver<(Address, Address, oneshot::Sender<Connection>)>,
listeners: Vec<mpsc::UnboundedSender<Connection>>,
}

Expand All @@ -288,6 +316,7 @@ impl UnreliableConnectionMaker {
for id in ids.into_iter() {
let (tx_listener, rx_listener) = mpsc::unbounded();
let dialer = MockDialer {
own_address: addr.get(&id).expect("should be there")[0],
channel_connect: tx_dialer.clone(),
};
let listener = MockListener {
Expand All @@ -306,13 +335,19 @@ impl UnreliableConnectionMaker {
pub async fn run(&mut self, connections_end_after: Option<usize>) {
loop {
info!(target: "validator-network", "UnreliableConnectionMaker: waiting for new request...");
let (addr, c) = self.dialers.next().await.expect("should receive");
let (dialer_address, listener_address, c) =
self.dialers.next().await.expect("should receive");
info!(target: "validator-network", "UnreliableConnectionMaker: received request");
let (l_stream, r_stream) = Connection::new(4096, connections_end_after);
let (dialer_stream, listener_stream) = Connection::new(
4096,
connections_end_after,
dialer_address,
listener_address,
);
info!(target: "validator-network", "UnreliableConnectionMaker: sending stream");
c.send(l_stream).expect("should send");
self.listeners[addr as usize]
.unbounded_send(r_stream)
c.send(dialer_stream).expect("should send");
self.listeners[listener_address as usize]
.unbounded_send(listener_stream)
.expect("should send");
}
}
Expand Down
3 changes: 2 additions & 1 deletion finality-aleph/src/validator_network/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ pub async fn incoming<D: Data, S: Splittable>(
result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>,
data_for_user: mpsc::UnboundedSender<D>,
) {
let addr = stream.peer_address_info();
if let Err(e) = manage_incoming(authority_pen, stream, result_for_parent, data_for_user).await {
info!(target: "validator-network", "Incoming connection failed: {}", e);
info!(target: "validator-network", "Incoming connection from {} failed: {}.", addr, e);
}
}
17 changes: 16 additions & 1 deletion finality-aleph/src/validator_network/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use aleph_primitives::{AuthorityId, KEY_TYPE};
use sp_keystore::{testing::KeyStore, CryptoStore};
use tokio::io::{duplex, AsyncRead, AsyncWrite, DuplexStream, ReadBuf};

use crate::{crypto::AuthorityPen, validator_network::Splittable};
use crate::{
crypto::AuthorityPen,
validator_network::{ConnectionInfo, PeerAddressInfo, Splittable},
};

/// Create a random authority id and pen pair.
pub async fn key() -> (AuthorityId, AuthorityPen) {
Expand Down Expand Up @@ -86,6 +89,18 @@ impl AsyncWrite for MockSplittable {
}
}

impl ConnectionInfo for MockSplittable {
fn peer_address_info(&self) -> PeerAddressInfo {
String::from("MOCK_ADDRESS")
}
}

impl ConnectionInfo for DuplexStream {
fn peer_address_info(&self) -> PeerAddressInfo {
String::from("MOCK_ADDRESS")
}
}

impl Splittable for MockSplittable {
type Sender = DuplexStream;
type Receiver = DuplexStream;
Expand Down
14 changes: 11 additions & 3 deletions finality-aleph/src/validator_network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,18 @@ pub trait Network<A: Data, D: Data>: Send + 'static {
async fn next(&mut self) -> Option<D>;
}

pub type PeerAddressInfo = String;

/// Reports address of the peer that we are connected to.
pub trait ConnectionInfo {
/// Return the address of the peer that we are connected to.
fn peer_address_info(&self) -> PeerAddressInfo;
}

/// A stream that can be split into a sending and receiving part.
pub trait Splittable: AsyncWrite + AsyncRead + Unpin + Send {
type Sender: AsyncWrite + Unpin + Send;
type Receiver: AsyncRead + Unpin + Send;
pub trait Splittable: AsyncWrite + AsyncRead + ConnectionInfo + Unpin + Send {
type Sender: AsyncWrite + ConnectionInfo + Unpin + Send;
type Receiver: AsyncRead + ConnectionInfo + Unpin + Send;

/// Split into the sending and receiving part.
fn split(self) -> (Self::Sender, Self::Receiver);
Expand Down
48 changes: 24 additions & 24 deletions finality-aleph/src/validator_network/outgoing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::{Display, Error as FmtError, Formatter};
use std::fmt::{Debug, Display, Error as FmtError, Formatter};

use aleph_primitives::AuthorityId;
use futures::channel::mpsc;
Expand All @@ -10,39 +10,35 @@ use crate::{
validator_network::{
protocol_negotiation::{protocol, ProtocolNegotiationError},
protocols::ProtocolError,
Data, Dialer,
ConnectionInfo, Data, Dialer, PeerAddressInfo,
},
};

enum OutgoingError<A: Data, ND: Dialer<A>> {
Dial(ND::Error),
ProtocolNegotiation(ProtocolNegotiationError),
Protocol(ProtocolError),
ProtocolNegotiation(PeerAddressInfo, ProtocolNegotiationError),
Protocol(PeerAddressInfo, ProtocolError),
}

impl<A: Data, ND: Dialer<A>> Display for OutgoingError<A, ND> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
use OutgoingError::*;
match self {
Dial(e) => write!(f, "dial error: {}", e),
ProtocolNegotiation(e) => write!(f, "protocol negotiation error: {}", e),
Protocol(e) => write!(f, "protocol error: {}", e),
ProtocolNegotiation(addr, e) => write!(
f,
"communication with {} failed, protocol negotiation error: {}",
addr, e
),
Protocol(addr, e) => write!(
f,
"communication with {} failed, protocol error: {}",
addr, e
),
}
}
}

impl<A: Data, ND: Dialer<A>> From<ProtocolNegotiationError> for OutgoingError<A, ND> {
fn from(e: ProtocolNegotiationError) -> Self {
OutgoingError::ProtocolNegotiation(e)
}
}

impl<A: Data, ND: Dialer<A>> From<ProtocolError> for OutgoingError<A, ND> {
fn from(e: ProtocolError) -> Self {
OutgoingError::Protocol(e)
}
}

async fn manage_outgoing<D: Data, A: Data, ND: Dialer<A>>(
authority_pen: AuthorityPen,
peer_id: AuthorityId,
Expand All @@ -55,20 +51,24 @@ async fn manage_outgoing<D: Data, A: Data, ND: Dialer<A>>(
.connect(addresses)
.await
.map_err(OutgoingError::Dial)?;
let peer_address_info = stream.peer_address_info();
debug!(target: "validator-network", "Performing outgoing protocol negotiation.");
let (stream, protocol) = protocol(stream).await?;
let (stream, protocol) = protocol(stream)
.await
.map_err(|e| OutgoingError::ProtocolNegotiation(peer_address_info.clone(), e))?;
debug!(target: "validator-network", "Negotiated protocol, running.");
Ok(protocol
protocol
.manage_outgoing(stream, authority_pen, peer_id, result_for_parent)
.await?)
.await
.map_err(|e| OutgoingError::Protocol(peer_address_info.clone(), e))
}

const RETRY_DELAY: Duration = Duration::from_secs(10);

/// Establish an outgoing connection to the provided peer using the dialer and then manage it.
/// While this works it will send any data from the user to the peer. Any failures will be reported
/// to the parent, so that connections can be reestablished if necessary.
pub async fn outgoing<D: Data, A: Data, ND: Dialer<A>>(
pub async fn outgoing<D: Data, A: Data + Debug, ND: Dialer<A>>(
authority_pen: AuthorityPen,
peer_id: AuthorityId,
dialer: ND,
Expand All @@ -79,12 +79,12 @@ pub async fn outgoing<D: Data, A: Data, ND: Dialer<A>>(
authority_pen,
peer_id.clone(),
dialer,
addresses,
addresses.clone(),
result_for_parent.clone(),
)
.await
{
info!(target: "validator-network", "Outgoing connection to {} failed: {}, will retry after {}s.", peer_id, e, RETRY_DELAY.as_secs());
info!(target: "validator-network", "Outgoing connection to {} {:?} failed: {}, will retry after {}s.", peer_id, addresses, e, RETRY_DELAY.as_secs());
sleep(RETRY_DELAY).await;
if result_for_parent.unbounded_send((peer_id, None)).is_err() {
debug!(target: "validator-network", "Could not send the closing message, we've probably been terminated by the parent service.");
Expand Down
6 changes: 4 additions & 2 deletions finality-aleph/src/validator_network/service.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::Debug;

use aleph_primitives::AuthorityId;
use futures::{
channel::{mpsc, oneshot},
Expand Down Expand Up @@ -71,7 +73,7 @@ impl<D: Data, A: Data> Network<A, D> for ServiceInterface<D, A> {
}

/// A service that has to be run for the validator network to work.
pub struct Service<D: Data, A: Data, ND: Dialer<A>, NL: Listener> {
pub struct Service<D: Data, A: Data + Debug, ND: Dialer<A>, NL: Listener> {
commands_from_interface: mpsc::UnboundedReceiver<ServiceCommand<D, A>>,
next_to_interface: mpsc::UnboundedSender<D>,
manager: Manager<A, D>,
Expand All @@ -81,7 +83,7 @@ pub struct Service<D: Data, A: Data, ND: Dialer<A>, NL: Listener> {
authority_pen: AuthorityPen,
}

impl<D: Data, A: Data, ND: Dialer<A>, NL: Listener> Service<D, A, ND, NL> {
impl<D: Data, A: Data + Debug, ND: Dialer<A>, NL: Listener> Service<D, A, ND, NL> {
/// Create a new validator network service plus an interface for interacting with it.
pub fn new(
dialer: ND,
Expand Down