Skip to content

Commit

Permalink
Update blocksync message formats (#686)
Browse files Browse the repository at this point in the history
* Update blocksync message format

* Update networking params and logs

* oops

* This actions failure is annoying
  • Loading branch information
austinabell authored Sep 4, 2020
1 parent 0db7ddb commit 0809097
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 69 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci-rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ jobs:
with:
submodules: "recursive"

- name: Update apt repositories
run: sudo apt update

- name: Install OpenCL
run: sudo apt install ocl-icd-opencl-dev

Expand Down
23 changes: 13 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion blockchain/chain_sync/src/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use log::trace;
use std::time::Duration;

/// Timeout for response from an RPC request
const RPC_TIMEOUT: u64 = 10;
const RPC_TIMEOUT: u64 = 20;

/// Context used in chain sync to handle network requests
pub struct SyncNetworkContext {
Expand Down
8 changes: 6 additions & 2 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,12 @@ where
self.state.write().await.set_epoch(curr_epoch);

// store messages
self.chain_store.put_messages(&b.bls_msgs)?;
self.chain_store.put_messages(&b.secp_msgs)?;
if let Some(m) = b.messages {
self.chain_store.put_messages(&m.bls_msgs)?;
self.chain_store.put_messages(&m.secp_msgs)?;
} else {
warn!("Blocksync request for messages returned null messages");
}
}
}
i -= REQUEST_WINDOW;
Expand Down
2 changes: 1 addition & 1 deletion node/forest_libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
utils = { path = "../utils" }
libp2p = "0.24"
libp2p-request-response = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "7ba76ccc03569a02a924d08ff9b6f62504e8bc51" }
libp2p-request-response = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "73bb35927ae7e0ba91dc6a157c562f2810aa4052" }
futures = "0.3.5"
futures-util = "0.3.5"
futures_codec = "0.4.0"
Expand Down
14 changes: 9 additions & 5 deletions node/forest_libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ use libp2p::swarm::{
use libp2p::NetworkBehaviour;
use libp2p_bitswap::{Bitswap, BitswapEvent, Priority};
use libp2p_request_response::{
ProtocolSupport, RequestId, RequestResponse, RequestResponseEvent, RequestResponseMessage,
ResponseChannel,
ProtocolSupport, RequestId, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseMessage, ResponseChannel,
};
use log::{debug, trace, warn};
use std::collections::HashSet;
use std::convert::TryFrom;
use std::error::Error;
use std::time::Duration;
use std::{task::Context, task::Poll};
use tiny_cid::Cid as Cid2;

Expand Down Expand Up @@ -277,7 +278,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BlockSyncRequest, BlockSy
peer, request_id, error
),
RequestResponseEvent::InboundFailure { peer, error } => {
warn!("BlockSync onbound error (peer: {:?}): {:?}", peer, error)
warn!("BlockSync inbound error (peer: {:?}): {:?}", peer, error)
}
}
}
Expand Down Expand Up @@ -342,6 +343,9 @@ impl ForestBehaviour {
let hp = std::iter::once((HelloProtocolName, ProtocolSupport::Full));
let bp = std::iter::once((BlockSyncProtocolName, ProtocolSupport::Full));

let mut req_res_config = RequestResponseConfig::default();
req_res_config.set_request_timeout(Duration::from_secs(20));

ForestBehaviour {
gossipsub: Gossipsub::new(MessageAuthenticity::Author(local_peer_id), gossipsub_config),
mdns: mdns_opt.into(),
Expand All @@ -354,8 +358,8 @@ impl ForestBehaviour {
),
kademlia: kademlia_opt.into(),
bitswap,
hello: RequestResponse::new(HelloCodec, hp, Default::default()),
blocksync: RequestResponse::new(BlockSyncCodec, bp, Default::default()),
hello: RequestResponse::new(HelloCodec, hp, req_res_config.clone()),
blocksync: RequestResponse::new(BlockSyncCodec, bp, req_res_config),
events: vec![],
peers: Default::default(),
}
Expand Down
50 changes: 24 additions & 26 deletions node/forest_libp2p/src/blocksync/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ pub struct BlockSyncRequest {
/// The response to a BlockSync request.
#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)]
pub struct BlockSyncResponse {
/// The tipsets requested
pub chain: Vec<TipsetBundle>,
/// Error code
pub status: u64,
/// Status message indicating failure reason
// TODO not included in blocksync spec, revisit if it will be removed in future
pub message: String,
/// The tipsets requested
pub chain: Vec<TipsetBundle>,
}

impl BlockSyncResponse {
Expand All @@ -48,12 +47,9 @@ impl BlockSyncResponse {
}
}

/// Contains the blocks and messages in a particular tipset
/// Contains all bls and secp messages and their indexes per block
#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)]
pub struct TipsetBundle {
/// The blocks in the tipset
pub blocks: Vec<BlockHeader>,

pub struct CompactedMessages {
/// Signed bls messages
pub bls_msgs: Vec<UnsignedMessage>,
/// Describes which block each message belongs to
Expand All @@ -65,6 +61,16 @@ pub struct TipsetBundle {
pub secp_msg_includes: Vec<Vec<u64>>,
}

/// Contains the blocks and messages in a particular tipset
#[derive(Clone, Debug, PartialEq, Serialize_tuple, Deserialize_tuple)]
pub struct TipsetBundle {
/// The blocks in the tipset
pub blocks: Vec<BlockHeader>,

/// Compressed messages format
pub messages: Option<CompactedMessages>,
}

impl TryFrom<TipsetBundle> for Tipset {
type Error = String;

Expand All @@ -77,37 +83,29 @@ impl TryFrom<TipsetBundle> for FullTipset {
type Error = String;

fn try_from(tsb: TipsetBundle) -> Result<FullTipset, Self::Error> {
fts_from_bundle_parts(
tsb.blocks,
&tsb.bls_msgs,
&tsb.secp_msgs,
&tsb.bls_msg_includes,
&tsb.secp_msg_includes,
)
fts_from_bundle_parts(tsb.blocks, tsb.messages.as_ref())
}
}

impl TryFrom<&TipsetBundle> for FullTipset {
type Error = String;

fn try_from(tsb: &TipsetBundle) -> Result<FullTipset, Self::Error> {
fts_from_bundle_parts(
tsb.blocks.clone(),
&tsb.bls_msgs,
&tsb.secp_msgs,
&tsb.bls_msg_includes,
&tsb.secp_msg_includes,
)
fts_from_bundle_parts(tsb.blocks.clone(), tsb.messages.as_ref())
}
}

fn fts_from_bundle_parts(
headers: Vec<BlockHeader>,
bls_msgs: &[UnsignedMessage],
secp_msgs: &[SignedMessage],
bls_msg_includes: &[Vec<u64>],
secp_msg_includes: &[Vec<u64>],
messages: Option<&CompactedMessages>,
) -> Result<FullTipset, String> {
let CompactedMessages {
bls_msgs,
bls_msg_includes,
secp_msg_includes,
secp_msgs,
} = messages.ok_or("Tipset bundle did not contain message bundle")?;

// TODO: we may already want to check this on construction of the bundle
if headers.len() != bls_msg_includes.len() || headers.len() != secp_msg_includes.len() {
return Err(
Expand Down
13 changes: 8 additions & 5 deletions node/forest_libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ where
topics,
message,
} => {
debug!("Got a Gossip Message from {:?}", source);
trace!("Got a Gossip Message from {:?}", source);
self.network_sender_out.send(NetworkEvent::PubsubMessage {
source,
topics,
Expand All @@ -181,7 +181,7 @@ where
}).await;
}
ForestBehaviourEvent::HelloResponse { request_id, response, .. } => {
debug!("Received hello response (id: {:?}): {:?}", request_id, response);
debug!("Received hello response (id: {:?})", request_id);
self.network_sender_out.send(NetworkEvent::HelloResponse {
request_id,
response,
Expand All @@ -196,12 +196,12 @@ where
});
}
ForestBehaviourEvent::BlockSyncResponse { request_id, response, .. } => {
debug!("Received blocksync response (id: {:?}): {:?}", request_id, response);
debug!("Received blocksync response (id: {:?})", request_id);
let tx = self.bs_request_table.remove(&request_id);

if let Some(tx) = tx {
if let Err(e) = tx.send(response) {
debug!("RPCResponse receive failed: {:?}", e)
debug!("RPCResponse receive failed")
}
}
else {
Expand Down Expand Up @@ -283,11 +283,14 @@ pub fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Er
let dh_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&local_key)
.expect("Noise key generation failed");
let mut yamux_config = yamux::Config::default();
yamux_config.set_max_buffer_size(1 << 20);
yamux_config.set_receive_window(1 << 20);
transport
.upgrade(core::upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(dh_keys).into_authenticated())
.multiplex(core::upgrade::SelectUpgrade::new(
yamux::Config::default(),
yamux_config,
mplex::MplexConfig::new(),
))
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
Expand Down
37 changes: 23 additions & 14 deletions node/forest_libp2p/tests/decode_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crypto::{Signature, Signer};
use forest_address::Address;
use forest_blocks::{Block, BlockHeader, FullTipset};
use forest_libp2p::blocksync::{BlockSyncResponse, TipsetBundle};
use forest_libp2p::blocksync::{BlockSyncResponse, CompactedMessages, TipsetBundle};
use forest_message::{SignedMessage, UnsignedMessage};
use num_bigint::BigInt;
use std::convert::TryFrom;
Expand Down Expand Up @@ -32,10 +32,12 @@ fn convert_single_tipset_bundle() {
};
let bundle = TipsetBundle {
blocks: vec![block.header.clone()],
bls_msgs: Vec::new(),
bls_msg_includes: vec![Vec::new()],
secp_msgs: Vec::new(),
secp_msg_includes: vec![Vec::new()],
messages: Some(CompactedMessages {
bls_msgs: Vec::new(),
bls_msg_includes: vec![Vec::new()],
secp_msgs: Vec::new(),
secp_msg_includes: vec![Vec::new()],
}),
};

let res = BlockSyncResponse {
Expand Down Expand Up @@ -99,28 +101,35 @@ fn tipset_bundle_to_full_tipset() {

let mut tsb = TipsetBundle {
blocks: vec![h0, h1],
secp_msgs: vec![sa, sb, sc, sd],
secp_msg_includes: vec![vec![0, 1, 3], vec![1, 2, 0]],
bls_msgs: vec![ua, ub, uc, ud],
bls_msg_includes: vec![vec![0, 1], vec![2, 3]],
messages: Some(CompactedMessages {
secp_msgs: vec![sa, sb, sc, sd],
secp_msg_includes: vec![vec![0, 1, 3], vec![1, 2, 0]],
bls_msgs: vec![ua, ub, uc, ud],
bls_msg_includes: vec![vec![0, 1], vec![2, 3]],
}),
};

assert_eq!(
FullTipset::try_from(tsb.clone()).unwrap(),
FullTipset::new(vec![b0, b1]).unwrap()
);

let mut cloned = tsb.clone();
if let Some(m) = cloned.messages.as_mut() {
m.secp_msg_includes = vec![vec![0, 4], vec![0]];
}
// Invalidate tipset bundle by having invalid index
tsb.secp_msg_includes = vec![vec![0, 4], vec![0]];
assert!(
FullTipset::try_from(tsb.clone()).is_err(),
FullTipset::try_from(cloned).is_err(),
"Invalid index should return error"
);

// Invalidate tipset bundle by not having includes same length as number of blocks
tsb.secp_msg_includes = vec![vec![0]];
if let Some(m) = tsb.messages.as_mut() {
// Invalidate tipset bundle by not having includes same length as number of blocks
m.secp_msg_includes = vec![vec![0]];
}
assert!(
FullTipset::try_from(tsb.clone()).is_err(),
FullTipset::try_from(tsb).is_err(),
"Invalid includes index vector should return error"
);
}
12 changes: 7 additions & 5 deletions utils/test_utils/src/chain_structures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use chain::TipsetMetadata;
use cid::{multihash::Blake2b256, Cid};
use crypto::{Signature, Signer, VRFProof};
use encoding::{from_slice, to_vec};
use forest_libp2p::blocksync::{BlockSyncResponse, TipsetBundle};
use forest_libp2p::blocksync::{BlockSyncResponse, CompactedMessages, TipsetBundle};
use message::{SignedMessage, UnsignedMessage};
use num_bigint::BigInt;
use std::error::Error;
Expand Down Expand Up @@ -184,10 +184,12 @@ pub fn construct_tipset_bundle(epoch: i64, weight: u64) -> TipsetBundle {

TipsetBundle {
blocks: headers,
bls_msgs: vec![bls],
secp_msgs: vec![secp],
bls_msg_includes: includes.clone(),
secp_msg_includes: includes,
messages: Some(CompactedMessages {
bls_msgs: vec![bls],
secp_msgs: vec![secp],
bls_msg_includes: includes.clone(),
secp_msg_includes: includes,
}),
}
}

Expand Down

0 comments on commit 0809097

Please sign in to comment.