Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

client/finality-grandpa: Make round_communication use bounded channel #4691

Merged
merged 4 commits into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 48 additions & 41 deletions client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
//! In the future, there will be a fallback for allowing sending the same message
//! under certain conditions that are used to un-stick the protocol.

use futures::{prelude::*, sync::mpsc};
use futures::prelude::*;
use futures03::{
channel::mpsc as mpsc03,
compat::Compat,
future::{Future as Future03},
stream::StreamExt,
future::{self as future03, Future as Future03},
sink::Sink as Sink03,
stream::{Stream as Stream03, StreamExt},
};
use log::{debug, trace};
use parking_lot::Mutex;
Expand Down Expand Up @@ -271,8 +272,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
local_key: Option<AuthorityPair>,
has_voted: HasVoted<B>,
) -> (
impl Stream<Item=SignedMessage<B>,Error=Error>,
impl Sink<SinkItem=Message<B>,SinkError=Error>,
impl Stream03<Item=SignedMessage<B>> + Unpin,
OutgoingMessages<B>,
) {
self.note_round(
round,
Expand All @@ -290,22 +291,20 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
});

let topic = round_topic::<B>(round.0, set_id.0);
let incoming = Compat::new(self.gossip_engine.messages_for(topic)
.map(|item| Ok::<_, ()>(item)))
.filter_map(|notification| {
let incoming = self.gossip_engine.messages_for(topic)
.filter_map(move |notification| {
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
if let Err(ref e) = decoded {
debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
}
decoded.ok()
})
.and_then(move |msg| {
match msg {
GossipMessage::Vote(msg) => {

match decoded {
Err(ref e) => {
debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
return future03::ready(None);
}
Ok(GossipMessage::Vote(msg)) => {
// check signature.
if !voters.contains_key(&msg.message.id) {
debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id);
return Ok(None);
return future03::ready(None);
}

match &msg.message.message {
Expand All @@ -332,18 +331,16 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
},
};

Ok(Some(msg.message))
future03::ready(Some(msg.message))
}
_ => {
debug!(target: "afg", "Skipping unknown message type");
return Ok(None);
return future03::ready(None);
}
}
})
.filter_map(|x| x)
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")));
});

let (tx, out_rx) = mpsc::unbounded();
let (tx, out_rx) = mpsc03::channel(0);
let outgoing = OutgoingMessages::<B> {
round: round.0,
set_id: set_id.0,
Expand All @@ -353,14 +350,10 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
has_voted,
};

let out_rx = out_rx.map_err(move |()| Error::Network(
format!("Failed to receive on unbounded receiver for round {}", round.0)
));

// Combine incoming votes from external GRANDPA nodes with outgoing
// votes from our own GRANDPA voter to have a single
// vote-import-pipeline.
let incoming = incoming.select(out_rx);
let incoming = futures03::stream::select(incoming, out_rx);

(incoming, outgoing)
}
Expand Down Expand Up @@ -637,21 +630,29 @@ pub(crate) fn check_message_sig<Block: BlockT>(
/// use the same raw message and key to sign. This is currently true for
/// `ed25519` and `BLS` signatures (which we might use in the future), care must
/// be taken when switching to different key types.
struct OutgoingMessages<Block: BlockT> {
pub(crate) struct OutgoingMessages<Block: BlockT> {
round: RoundNumber,
set_id: SetIdNumber,
locals: Option<(AuthorityPair, AuthorityId)>,
sender: mpsc::UnboundedSender<SignedMessage<Block>>,
sender: mpsc03::Sender<SignedMessage<Block>>,
network: GossipEngine<Block>,
has_voted: HasVoted<Block>,
}

impl<Block: BlockT> Sink for OutgoingMessages<Block>
impl<B: BlockT> Unpin for OutgoingMessages<B> {}

impl<Block: BlockT> Sink03<Message<Block>> for OutgoingMessages<Block>
{
type SinkItem = Message<Block>;
type SinkError = Error;
type Error = Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Sink03::poll_ready(Pin::new(&mut self.sender), cx)
.map(|elem| { elem.map_err(|e| {
Error::Network(format!("Failed to poll_ready channel sender: {:?}", e))
})})
}

fn start_send(&mut self, mut msg: Message<Block>) -> StartSend<Message<Block>, Error> {
fn start_send(mut self: Pin<&mut Self>, mut msg: Message<Block>) -> Result<(), Self::Error> {
// if we've voted on this round previously under the same key, send that vote instead
match &mut msg {
finality_grandpa::Message::PrimaryPropose(ref mut vote) =>
Expand Down Expand Up @@ -707,17 +708,23 @@ impl<Block: BlockT> Sink for OutgoingMessages<Block>
self.network.gossip_message(topic, message.encode(), false);

// forward the message to the inner sender.
let _ = self.sender.unbounded_send(signed);
}
return self.sender.start_send(signed).map_err(|e| {
Error::Network(format!("Failed to start_send on channel sender: {:?}", e))
});
};

Ok(AsyncSink::Ready)
Ok(())
}

fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Poll03::Ready(Ok(()))
}

fn close(&mut self) -> Poll<(), Error> {
// ignore errors since we allow this inner sender to be closed already.
self.sender.close().or_else(|_| Ok(Async::Ready(())))
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
Sink03::poll_close(Pin::new(&mut self.sender), cx)
.map(|elem| { elem.map_err(|e| {
Error::Network(format!("Failed to poll_close channel sender: {:?}", e))
})})
}
}

Expand Down
9 changes: 8 additions & 1 deletion client/finality-grandpa/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ use std::time::Duration;
use log::{debug, warn, info};
use parity_scale_codec::{Decode, Encode};
use futures::prelude::*;
use futures03::future::{FutureExt as _, TryFutureExt as _};
use futures03::{
compat::{Compat, CompatSink},
future::{FutureExt as _, TryFutureExt as _},
stream::StreamExt as _,
};
use futures_timer::Delay;
use parking_lot::RwLock;
use sp_blockchain::{HeaderBackend, Error as ClientError};
Expand Down Expand Up @@ -608,6 +612,9 @@ where
has_voted,
);

let incoming = Compat::new(incoming.map(|item| Ok::<_, Error>(item)));
let outgoing = CompatSink::new(outgoing);

// schedule incoming messages from the network to be held until
// corresponding blocks are imported.
let incoming = Box::new(UntilVoteTargetImported::new(
Expand Down
25 changes: 20 additions & 5 deletions client/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ use sp_consensus::{
BlockOrigin, ForkChoiceStrategy, ImportedAux, BlockImportParams, ImportResult, BlockImport,
import_queue::{BoxJustificationImport, BoxFinalityProofImport},
};
use std::collections::{HashMap, HashSet};
use std::result;
use std::{
collections::{HashMap, HashSet},
result,
pin::Pin, task,
};
use parity_scale_codec::Decode;
use sp_runtime::traits::{Header as HeaderT, HasherFor};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, HasherFor};
use sp_runtime::generic::{BlockId, DigestItem};
use sp_core::{H256, NativeOrEncoded, ExecutionContext, crypto::Public};
use sp_finality_grandpa::{GRANDPA_ENGINE_ID, AuthorityList, GrandpaApi};
use sp_state_machine::{InMemoryBackend, prove_read, read_proof_check};
use std::{pin::Pin, task};

use authorities::AuthoritySet;
use finality_proof::{
Expand Down Expand Up @@ -1280,6 +1282,9 @@ fn voter_persists_its_votes() {
HasVoted::No,
);

let round_rx = futures03::compat::Compat::new(round_rx.map(|item| Ok::<_, Error>(item)));
let round_tx = futures03::compat::CompatSink::new(round_tx);

let round_tx = Arc::new(Mutex::new(round_tx));
let exit_tx = Arc::new(Mutex::new(Some(exit_tx)));

Expand Down Expand Up @@ -1330,7 +1335,17 @@ fn voter_persists_its_votes() {
target_hash: block_30_hash,
};

round_tx.lock().start_send(finality_grandpa::Message::Prevote(prevote)).unwrap();
// One should either be calling `Sink::send` or `Sink::start_send` followed
// by `Sink::poll_complete` to make sure items are being flushed. Given that
// we send in a loop including a delay until items are received, this can be
// ignored for the sake of reduced complexity.
if !round_tx.lock()
.start_send(finality_grandpa::Message::Prevote(prevote))
.unwrap()
.is_ready() {
panic!("expected sink to be ready to write to.");
}

Ok(())
}).map_err(|_| panic!()))

Expand Down