Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bitswap Integration #518

Merged
merged 53 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
be7b8bb
initial bitswap setup
ec2 Jun 17, 2020
fc79e4d
add bitswap methods to behaviour
ec2 Jun 17, 2020
70a0530
remove todo
ec2 Jun 17, 2020
ab20aeb
request manager
ec2 Jun 19, 2020
1a033aa
Merge branch 'master' into ec2/libp2p/bitswap
ec2 Jun 24, 2020
a37b1ba
clean up warning
ec2 Jun 24, 2020
dfdd76b
Merge branch 'master' into ec2/libp2p/bitswap
ec2 Jun 24, 2020
98bbb86
cargo lock
ec2 Jun 24, 2020
e1338df
lint
ec2 Jun 24, 2020
325c90a
fix tests and lint
ec2 Jun 26, 2020
776e837
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jun 26, 2020
42cf94f
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jun 26, 2020
e253155
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jun 29, 2020
3b33ec8
initial refactor with local version
austinabell Jun 30, 2020
97ca70c
cleanup and switch to using forked repo
austinabell Jun 30, 2020
e14d9f3
cargo update for protobuf gen
austinabell Jun 30, 2020
1dced22
Remove other dead code
austinabell Jun 30, 2020
67bd8d4
Merge branch 'main' into austin/rpcreplace
austinabell Jul 1, 2020
e58c580
Remove RPCEvent
austinabell Jul 1, 2020
7851cef
Merge branch 'austin/rpcreplace' of github.com:ChainSafe/forest into …
austinabell Jul 1, 2020
87f0fc2
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jul 2, 2020
d774279
suggested changes
ec2 Jul 2, 2020
733c395
cargo update
ec2 Jul 2, 2020
5d353db
Merge branch 'austin/rpcreplace' into ec2/libp2p/bitswap
ec2 Jul 2, 2020
de674d5
Move libp2p back from fork and update references
austinabell Jul 6, 2020
68657f8
bump other versions
austinabell Jul 6, 2020
3d3af37
fmt
austinabell Jul 7, 2020
42bd8e1
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jul 7, 2020
ae55949
Merge branch 'austin/rpcfork' into ec2/libp2p/bitswap
ec2 Jul 7, 2020
ebdbc95
fix clippys
ec2 Jul 7, 2020
1ebb6ea
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jul 7, 2020
84912f8
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jul 9, 2020
b9b85b3
cid conversion
ec2 Jul 10, 2020
7174bbb
suggested change
ec2 Jul 10, 2020
7439faf
Merge branch 'ec2/libp2p/bitswap' of github.com:ChainSafe/ferret into…
ec2 Jul 10, 2020
97668b6
more suggested changes
ec2 Jul 10, 2020
4b928b8
new rpc handling
ec2 Jul 13, 2020
e353f00
remove print
ec2 Jul 13, 2020
d8a2caf
asthetics
ec2 Jul 13, 2020
ac9134d
more cosmetics
ec2 Jul 13, 2020
df49ba6
bump bitswap
ec2 Jul 13, 2020
138e841
remove manual req id handling
ec2 Jul 14, 2020
056bf53
fix tests
ec2 Jul 14, 2020
aaba8ef
fmt
ec2 Jul 14, 2020
667818a
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jul 15, 2020
75bb5b5
mutex -> refcell
ec2 Jul 15, 2020
59182c0
remove refcell
ec2 Jul 15, 2020
0629051
Merge branch 'main' into ec2/libp2p/bitswap
ec2 Jul 15, 2020
4dea157
fix some suggestions
ec2 Jul 15, 2020
f214b0b
more suggestions
ec2 Jul 15, 2020
c27b2a5
fix some linting thing not caught locally
ec2 Jul 15, 2020
adb96d6
suggestions
ec2 Jul 15, 2020
09e366c
spacing
ec2 Jul 15, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions blockchain/chain_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ num-traits = "0.2"
filecoin-proofs-api = "4.0.1"
fil_types = { path = "../../types" }
commcid = { path = "../../utils/commcid" }
flo_stream = "0.4"

ec2 marked this conversation as resolved.
Show resolved Hide resolved
[dev-dependencies]
test_utils = { version = "0.1.0", path = "../../utils/test_utils/", features = ["test_constructors"] }
6 changes: 6 additions & 0 deletions blockchain/chain_sync/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ impl From<&str> for Error {
}
}

impl From<String> for Error {
fn from(e: String) -> Error {
Error::Other(e)
}
}

impl From<std::num::TryFromIntError> for Error {
fn from(e: std::num::TryFromIntError) -> Error {
Error::Other(e.to_string())
Expand Down
80 changes: 26 additions & 54 deletions blockchain/chain_sync/src/network_context.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::network_handler::RPCReceiver;
use async_std::future;
use async_std::prelude::*;
use async_std::sync::{Receiver, Sender};
use async_std::sync::Sender;
use blocks::{FullTipset, Tipset, TipsetKeys};
use flo_stream::Subscriber;
use forest_libp2p::{
blocksync::{BlockSyncRequest, BlockSyncResponse, BLOCKS, MESSAGES},
hello::HelloRequest,
rpc::{RPCRequest, RPCResponse, RequestId},
rpc::RequestId,
NetworkEvent, NetworkMessage,
};
use futures::channel::oneshot::channel as oneshot_channel;
use libp2p::core::PeerId;
use log::trace;
use std::time::Duration;
Expand All @@ -27,22 +27,14 @@ pub struct SyncNetworkContext {
/// Handles sequential request ID enumeration for requests
request_id: RequestId,

/// Receiver channel for BlockSync responses
rpc_receiver: RPCReceiver,

/// Receiver channel for network events
pub receiver: Receiver<NetworkEvent>,
pub receiver: Subscriber<NetworkEvent>,
}

impl SyncNetworkContext {
pub fn new(
network_send: Sender<NetworkMessage>,
rpc_receiver: RPCReceiver,
receiver: Receiver<NetworkEvent>,
) -> Self {
pub fn new(network_send: Sender<NetworkMessage>, receiver: Subscriber<NetworkEvent>) -> Self {
Self {
network_send,
rpc_receiver,
receiver,
request_id: RequestId(1),
}
Expand Down Expand Up @@ -97,16 +89,26 @@ impl SyncNetworkContext {
&mut self,
peer_id: PeerId,
request: BlockSyncRequest,
) -> Result<BlockSyncResponse, &'static str> {
) -> Result<BlockSyncResponse, String> {
trace!("Sending BlockSync Request {:?}", request);
let rpc_res = self
.send_rpc_request(peer_id, RPCRequest::BlockSync(request))
.await?;
let request_id = self.request_id;
self.request_id.0 += 1;

if let RPCResponse::BlockSync(bs_res) = rpc_res {
Ok(bs_res)
} else {
Err("Invalid response type")
let (tx, rx) = oneshot_channel();

self.network_send
.send(NetworkMessage::BlockSyncRequest {
peer_id,
request,
id: request_id,
response_channel: tx,
})
.await;

match future::timeout(Duration::from_secs(RPC_TIMEOUT), rx).await {
Ok(Ok(bs_res)) => Ok(bs_res),
Ok(Err(e)) => Err(format!("RPC error: {}", e.to_string())),
Err(_) => Err("Connection timed out".to_string()),
}
}

Expand All @@ -115,42 +117,12 @@ impl SyncNetworkContext {
trace!("Sending Hello Message {:?}", request);
// TODO update to await response when we want to handle the latency
self.network_send
.send(NetworkMessage::RPC {
.send(NetworkMessage::HelloRequest {
peer_id,
request: RPCRequest::Hello(request),
request,
id: self.request_id,
})
.await;
self.request_id.0 += 1;
}

/// Send any RPC request to the network and await the response
pub async fn send_rpc_request(
&mut self,
peer_id: PeerId,
request: RPCRequest,
) -> Result<RPCResponse, &'static str> {
let request_id = self.request_id;
self.request_id.0 += 1;
self.network_send
.send(NetworkMessage::RPC {
peer_id,
request,
id: request_id,
})
.await;
loop {
match future::timeout(Duration::from_secs(RPC_TIMEOUT), self.rpc_receiver.next()).await
{
Ok(Some((id, response))) => {
if id == request_id {
return Ok(response);
}
// Ignore any other RPC responses for now
}
Ok(None) => return Err("RPC Stream closed"),
Err(_) => return Err("Connection timeout"),
}
}
}
}
51 changes: 13 additions & 38 deletions blockchain/chain_sync/src/network_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,67 +3,42 @@

use super::peer_manager::PeerManager;
use async_std::prelude::*;
use async_std::sync::{Receiver, Sender};
use async_std::sync::Receiver;
use async_std::task;
use forest_libp2p::rpc::{RPCResponse, RequestId};
use flo_stream::{MessagePublisher, Publisher};
use forest_libp2p::NetworkEvent;
use log::trace;
use std::sync::Arc;

pub(crate) type RPCReceiver = Receiver<(RequestId, RPCResponse)>;
pub(crate) type RPCSender = Sender<(RequestId, RPCResponse)>;

/// Handles network events from channel and splits based on request
pub(crate) struct NetworkHandler {
rpc_send: RPCSender,
event_send: Sender<NetworkEvent>,
event_send: Publisher<NetworkEvent>,
receiver: Receiver<NetworkEvent>,
}

impl NetworkHandler {
pub(crate) fn new(
receiver: Receiver<NetworkEvent>,
rpc_send: RPCSender,
event_send: Sender<NetworkEvent>,
event_send: Publisher<NetworkEvent>,
) -> Self {
Self {
receiver,
rpc_send,
event_send,
}
}

pub(crate) fn spawn(&self, peer_manager: Arc<PeerManager>) {
let mut receiver = self.receiver.clone();
let rpc_send = self.rpc_send.clone();
let event_send = self.event_send.clone();

let mut event_send = self.event_send.republish();
task::spawn(async move {
loop {
match receiver.next().await {
// Handle specifically RPC responses and send to that channel
Some(NetworkEvent::BlockSyncResponse {
request_id,
response,
}) => {
rpc_send
.send((request_id, RPCResponse::BlockSync(response)))
.await
}
// Pass any non RPC responses through event channel
Some(event) => {
// Update peer on this thread before sending hello
if let NetworkEvent::HelloRequest { channel, .. } = &event {
// TODO should probably add peer with their tipset/ not handled seperately
peer_manager.add_peer(channel.peer.clone(), None).await;
}

// TODO revisit, doing this to avoid blocking this thread but can handle better
if !event_send.is_full() {
event_send.send(event).await
}
}
None => break,
while let Some(event) = receiver.next().await {
// Update peer on this thread before sending hello
if let NetworkEvent::HelloRequest { channel, .. } = &event {
// TODO should probably add peer with their tipset/ not handled seperately
peer_manager.add_peer(channel.peer.clone(), None).await;
}
if let NetworkEvent::BitswapBlock { .. } = &event {
event_send.publish(event).await
}
}
});
Expand Down
Loading