Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bitswap Integration #518

Merged
merged 53 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
be7b8bb
initial bitswap setup
ec2 Jun 17, 2020
fc79e4d
add bitswap methods to behaviour
ec2 Jun 17, 2020
70a0530
remove todo
ec2 Jun 17, 2020
ab20aeb
request manager
ec2 Jun 19, 2020
1a033aa
Merge branch 'master' into ec2/libp2p/bitswap
ec2 Jun 24, 2020
a37b1ba
clean up warning
ec2 Jun 24, 2020
dfdd76b
Merge branch 'master' into ec2/libp2p/bitswap
ec2 Jun 24, 2020
98bbb86
cargo lock
ec2 Jun 24, 2020
e1338df
lint
ec2 Jun 24, 2020
325c90a
fix tests and lint
ec2 Jun 26, 2020
776e837
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jun 26, 2020
42cf94f
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jun 26, 2020
e253155
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jun 29, 2020
3b33ec8
initial refactor with local version
austinabell Jun 30, 2020
97ca70c
cleanup and switch to using forked repo
austinabell Jun 30, 2020
e14d9f3
cargo update for protobuf gen
austinabell Jun 30, 2020
1dced22
Remove other dead code
austinabell Jun 30, 2020
67bd8d4
Merge branch 'main' into austin/rpcreplace
austinabell Jul 1, 2020
e58c580
Remove RPCEvent
austinabell Jul 1, 2020
7851cef
Merge branch 'austin/rpcreplace' of github.com:ChainSafe/forest into …
austinabell Jul 1, 2020
87f0fc2
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jul 2, 2020
d774279
suggested changes
ec2 Jul 2, 2020
733c395
cargo update
ec2 Jul 2, 2020
5d353db
Merge branch 'austin/rpcreplace' into ec2/libp2p/bitswap
ec2 Jul 2, 2020
de674d5
Move libp2p back from fork and update references
austinabell Jul 6, 2020
68657f8
bump other versions
austinabell Jul 6, 2020
3d3af37
fmt
austinabell Jul 7, 2020
42bd8e1
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jul 7, 2020
ae55949
Merge branch 'austin/rpcfork' into ec2/libp2p/bitswap
ec2 Jul 7, 2020
ebdbc95
fix clippys
ec2 Jul 7, 2020
1ebb6ea
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jul 7, 2020
84912f8
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jul 9, 2020
b9b85b3
cid conversion
ec2 Jul 10, 2020
7174bbb
suggested change
ec2 Jul 10, 2020
7439faf
Merge branch 'ec2/libp2p/bitswap' of github.com:ChainSafe/ferret into…
ec2 Jul 10, 2020
97668b6
more suggested changes
ec2 Jul 10, 2020
4b928b8
new rpc handling
ec2 Jul 13, 2020
e353f00
remove print
ec2 Jul 13, 2020
d8a2caf
asthetics
ec2 Jul 13, 2020
ac9134d
more cosmetics
ec2 Jul 13, 2020
df49ba6
bump bitswap
ec2 Jul 13, 2020
138e841
remove manual req id handling
ec2 Jul 14, 2020
056bf53
fix tests
ec2 Jul 14, 2020
aaba8ef
fmt
ec2 Jul 14, 2020
667818a
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jul 15, 2020
75bb5b5
mutex -> refcell
ec2 Jul 15, 2020
59182c0
remove refcell
ec2 Jul 15, 2020
0629051
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jul 15, 2020
4dea157
fix some suggestions
ec2 Jul 15, 2020
f214b0b
more suggestions
ec2 Jul 15, 2020
c27b2a5
fix some linting thing not caught locally
ec2 Jul 15, 2020
adb96d6
suggestions
ec2 Jul 15, 2020
09e366c
spacing
ec2 Jul 15, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions blockchain/chain_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ num-traits = "0.2"
filecoin-proofs-api = "4.0.1"
fil_types = { path = "../../types" }
commcid = { path = "../../utils/commcid" }
flo_stream = "0.4"

ec2 marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
test_utils = { version = "0.1.0", path = "../../utils/test_utils/", features = ["test_constructors"] }
6 changes: 6 additions & 0 deletions blockchain/chain_sync/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ impl From<&str> for Error {
}
}

impl From<String> for Error {
fn from(e: String) -> Error {
Error::Other(e)
}
}

impl From<std::num::TryFromIntError> for Error {
fn from(e: std::num::TryFromIntError) -> Error {
Error::Other(e.to_string())
Expand Down
58 changes: 27 additions & 31 deletions blockchain/chain_sync/src/network_context.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::network_handler::RPCReceiver;
use async_std::future;
use async_std::prelude::*;
use async_std::sync::{Receiver, Sender};
use async_std::sync::{Mutex, Sender};
use blocks::{FullTipset, Tipset, TipsetKeys};
use flo_stream::Subscriber;
use forest_libp2p::{
blocksync::{BlockSyncRequest, BlockSyncResponse, BLOCKS, MESSAGES},
hello::HelloRequest,
rpc::{RPCRequest, RPCResponse, RequestId},
NetworkEvent, NetworkMessage,
};

ec2 marked this conversation as resolved.
Show resolved Hide resolved
use futures::channel::oneshot::{
channel as oneshot_channel, Receiver as OneShotReceiver, Sender as OneShotSender,
};
use libp2p::core::PeerId;
use log::trace;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

/// Timeout for response from an RPC request
Expand All @@ -27,23 +32,21 @@ pub struct SyncNetworkContext {
/// Handles sequential request ID enumeration for requests
request_id: RequestId,

/// Receiver channel for BlockSync responses
rpc_receiver: RPCReceiver,

/// Receiver channel for network events
pub receiver: Receiver<NetworkEvent>,
pub receiver: Subscriber<NetworkEvent>,
request_table: Arc<Mutex<HashMap<RequestId, OneShotSender<RPCResponse>>>>,
}

impl SyncNetworkContext {
pub fn new(
network_send: Sender<NetworkMessage>,
rpc_receiver: RPCReceiver,
receiver: Receiver<NetworkEvent>,
receiver: Subscriber<NetworkEvent>,
request_table: Arc<Mutex<HashMap<RequestId, OneShotSender<RPCResponse>>>>,
) -> Self {
Self {
network_send,
rpc_receiver,
receiver,
request_table,
request_id: RequestId(1),
}
}
Expand Down Expand Up @@ -97,16 +100,18 @@ impl SyncNetworkContext {
&mut self,
peer_id: PeerId,
request: BlockSyncRequest,
) -> Result<BlockSyncResponse, &'static str> {
) -> Result<BlockSyncResponse, String> {
trace!("Sending BlockSync Request {:?}", request);
let rpc_res = self
.send_rpc_request(peer_id, RPCRequest::BlockSync(request))
.await?;

if let RPCResponse::BlockSync(bs_res) = rpc_res {
Ok(bs_res)
} else {
Err("Invalid response type")
.await;
match future::timeout(Duration::from_secs(RPC_TIMEOUT), rpc_res).await {
Ok(Ok(RPCResponse::BlockSync(bs_res))) => Ok(bs_res),
Ok(Ok(RPCResponse::Hello(_))) => {
unreachable!();
ec2 marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(Err(e)) => Err(format!("RPC error: {}", e.to_string())),
Err(_) => Err("Connection Timedout".to_string()),
ec2 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -129,28 +134,19 @@ impl SyncNetworkContext {
&mut self,
peer_id: PeerId,
request: RPCRequest,
) -> Result<RPCResponse, &'static str> {
) -> OneShotReceiver<RPCResponse> {
let request_id = self.request_id;
self.request_id.0 += 1;

let (tx, rx) = oneshot_channel();
self.request_table.lock().await.insert(request_id, tx);
self.network_send
ec2 marked this conversation as resolved.
Show resolved Hide resolved
.send(NetworkMessage::RPC {
peer_id,
request,
id: request_id,
})
.await;
loop {
match future::timeout(Duration::from_secs(RPC_TIMEOUT), self.rpc_receiver.next()).await
{
Ok(Some((id, response))) => {
if id == request_id {
return Ok(response);
}
// Ignore any other RPC responses for now
}
Ok(None) => return Err("RPC Stream closed"),
Err(_) => return Err("Connection timeout"),
}
}
rx
}
}
44 changes: 25 additions & 19 deletions blockchain/chain_sync/src/network_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,42 @@

use super::peer_manager::PeerManager;
use async_std::prelude::*;
use async_std::sync::{Receiver, Sender};
use async_std::sync::Mutex;
use async_std::sync::Receiver;
use async_std::task;
use flo_stream::{MessagePublisher, Publisher};
use forest_libp2p::rpc::{RPCResponse, RequestId};
use forest_libp2p::NetworkEvent;
use log::trace;
use futures::channel::oneshot::Sender as OneShotSender;
use log::{debug, trace};
use std::collections::HashMap;
use std::sync::Arc;

pub(crate) type RPCReceiver = Receiver<(RequestId, RPCResponse)>;
pub(crate) type RPCSender = Sender<(RequestId, RPCResponse)>;

/// Handles network events from channel and splits based on request
pub(crate) struct NetworkHandler {
rpc_send: RPCSender,
event_send: Sender<NetworkEvent>,
event_send: Publisher<NetworkEvent>,
receiver: Receiver<NetworkEvent>,
/// keeps track of a mapping from rpc request id to oneshot senders
request_table: Arc<Mutex<HashMap<RequestId, OneShotSender<RPCResponse>>>>,
}

impl NetworkHandler {
pub(crate) fn new(
receiver: Receiver<NetworkEvent>,
rpc_send: RPCSender,
event_send: Sender<NetworkEvent>,
event_send: Publisher<NetworkEvent>,
request_table: Arc<Mutex<HashMap<RequestId, OneShotSender<RPCResponse>>>>,
) -> Self {
Self {
receiver,
rpc_send,
event_send,
request_table,
}
}

pub(crate) fn spawn(&self, peer_manager: Arc<PeerManager>) {
let mut receiver = self.receiver.clone();
let rpc_send = self.rpc_send.clone();
let event_send = self.event_send.clone();
let mut event_send = self.event_send.republish();
let request_table = self.request_table.clone();

task::spawn(async move {
loop {
Expand All @@ -46,9 +48,15 @@ impl NetworkHandler {
request_id,
response,
}) => {
rpc_send
.send((request_id, RPCResponse::BlockSync(response)))
.await
let tx = request_table.lock().await.remove(&request_id);

if let Some(tx) = tx {
if let Err(e) = tx.send(RPCResponse::BlockSync(response)) {
debug!("RPCResponse receive failed: {:?}", e)
}
} else {
debug!("RPCResponse receive failed: channel not found");
};
}
// Pass any non RPC responses through event channel
Some(event) => {
Expand All @@ -57,10 +65,8 @@ impl NetworkHandler {
// TODO should probably add peer with their tipset/ not handled seperately
peer_manager.add_peer(channel.peer.clone(), None).await;
}

// TODO revisit, doing this to avoid blocking this thread but can handle better
if !event_send.is_full() {
event_send.send(event).await
if let NetworkEvent::BitswapBlock { .. } = &event {
event_send.publish(event).await
}
}
None => break,
Expand Down
25 changes: 14 additions & 11 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::peer_manager::PeerManager;
use super::{Error, SyncNetworkContext};
use address::{Address, Protocol};
use amt::Amt;
use async_std::sync::{channel, Receiver, Sender};
use async_std::sync::{Mutex, Receiver, Sender};
use async_std::task;
use beacon::{Beacon, BeaconEntry};
use blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta};
Expand All @@ -23,6 +23,7 @@ use crypto::DomainSeparationTag;
use encoding::{Cbor, Error as EncodingError};
use fil_types::SectorInfo;
use filecoin_proofs_api::{post::verify_winning_post, ProverId, PublicReplicaInfo, SectorId};
use flo_stream::{MessagePublisher, Publisher};
use forest_libp2p::{
hello::HelloRequest, BlockSyncRequest, NetworkEvent, NetworkMessage, MESSAGES,
};
Expand Down Expand Up @@ -121,14 +122,14 @@ where
let state_manager = Arc::new(StateManager::new(chain_store.db.clone()));

// Split incoming channel to handle blocksync requests
let (rpc_send, rpc_rx) = channel(20);
let (event_send, event_rx) = channel(30);

let network = SyncNetworkContext::new(network_send, rpc_rx, event_rx);
let mut event_send = Publisher::new(30);
let req_table = Arc::new(Mutex::new(HashMap::new()));
let network =
SyncNetworkContext::new(network_send, event_send.subscribe(), req_table.clone());

let peer_manager = Arc::new(PeerManager::default());

let net_handler = NetworkHandler::new(network_rx, rpc_send, event_send);
let net_handler = NetworkHandler::new(network_rx, event_send, req_table);

Ok(Self {
state: SyncState::Init,
Expand Down Expand Up @@ -1020,6 +1021,7 @@ fn cids_from_messages<T: Cbor>(messages: &[T]) -> Result<Vec<Cid>, EncodingError
#[cfg(test)]
mod tests {
use super::*;
use async_std::sync::channel;
use async_std::sync::Sender;
use beacon::MockBeacon;
use blocks::BlockHeader;
Expand Down Expand Up @@ -1093,8 +1095,6 @@ mod tests {
let (mut cs, event_sender) = chain_syncer_setup(db);

cs.net_handler.spawn(Arc::clone(&cs.peer_manager));
// send blocksync response to channel
send_blocksync_response(event_sender);

// params for sync_headers_reverse
let source = PeerId::random();
Expand All @@ -1104,9 +1104,12 @@ mod tests {
task::block_on(async move {
cs.peer_manager.add_peer(source.clone(), None).await;
assert_eq!(cs.peer_manager.len().await, 1);

let return_set = cs.sync_headers_reverse(head, &to).await;
assert_eq!(return_set.unwrap().len(), 4);
// make blocksync request
let return_set = task::spawn(async move { cs.sync_headers_reverse(head, &to).await });
task::sleep(Duration::from_secs(2)).await;
ec2 marked this conversation as resolved.
Show resolved Hide resolved
// send blocksync response to channel
send_blocksync_response(event_sender);
assert_eq!(return_set.await.unwrap().len(), 4);
});
}

Expand Down
3 changes: 2 additions & 1 deletion forest/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ pub(super) async fn start(config: Config) {
initialize_genesis(&config.genesis_file, &mut chain_store).unwrap();

// Libp2p service setup
let p2p_service = Libp2pService::new(config.network, net_keypair, &network_name);
let p2p_service =
Libp2pService::new(config.network, Arc::clone(&db), net_keypair, &network_name);
let network_rx = p2p_service.network_receiver();
let network_send = p2p_service.network_sender();

Expand Down
3 changes: 3 additions & 0 deletions node/forest_libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ fnv = "1.0.6"
smallvec = "1.1.0"
clock = { path = "../clock" }
num-bigint = { path = "../../utils/bigint", package = "forest_bigint" }
libp2p-bitswap = { git = "https://github.com/ChainSafe/libp2p-bitswap", rev = "c31e0ae31fa7d0b848bf4fb711106e59aac0fa5e" }
ec2 marked this conversation as resolved.
Show resolved Hide resolved
libipld-core = "0.3.0"
ipld_blockstore = { path = "../../ipld/blockstore" }
async-trait = "0.1"

[dev-dependencies]
Expand Down
Loading