From af043c27e9a583280f591f543d9aad97c21a8275 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sun, 8 Aug 2021 12:26:18 +0200 Subject: [PATCH 01/15] examples/: Add file sharing example Basic file sharing application with peers either providing or locating and getting files by name. While obviously showcasing how to build a basic file sharing application, the actual goal of this example is **to show how to integrate rust-libp2p into a larger application**. Architectural properties - Clean clonable async/await interface ([`Client`]) to interact with the network layer. - Single task driving the network layer, no locks required. --- Cargo.toml | 2 + examples/file-sharing.rs | 639 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 641 insertions(+) create mode 100644 examples/file-sharing.rs diff --git a/Cargo.toml b/Cargo.toml index ba964a0149b..70109918838 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,9 @@ libp2p-websocket = { version = "0.31.0", path = "transports/websocket", optional [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } +async-trait = "0.1" env_logger = "0.9.0" +structopt = "0.3.21" tokio = { version = "1.0.1", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread"] } [workspace] diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs new file mode 100644 index 00000000000..8f7d04d1e3b --- /dev/null +++ b/examples/file-sharing.rs @@ -0,0 +1,639 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! # File sharing example +//! +//! Basic file sharing application with peers either providing or locating and +//! getting files by name. +//! +//! While obviously showcasing how to build a basic file sharing application, +//! the actual goal of this example is **to show how to integrate rust-libp2p +//! into a larger application**. +//! +//! ## Sample plot +//! +//! Assuming there are 3 nodes, provider A and B and client C. +//! +//! Provider nodes A and B each provide a file, file FA and FB respectively. +//! They do so by advertising themselves as a provider for their file on a DHT +//! via [`libp2p-kad`]. The two, among other nodes of the network, are +//! interconnected via the DHT. +//! +//! Client node C can locate the providers for file FA or FB on the DHT via +//! [`libp2p-kad`] without being connected to the specific node providing the +//! file, but any node of the DHT. Client C then connects to the corresponding +//! node and requests the file content of the file via +//! [`libp2p-request-response`]. +//! +//! ## Architectural properties +//! +//! - Clean clonable async/await interface ([`Client`]) to interact with the +//! network layer. +//! +//! - Single task driving the network layer, no locks required. +//! +//! ## Usage +//! +//! A two node setup with one node providing the file and one node requesting the file. +//! +//! 1. Run command below in one terminal. +//! +//! ``` +//! cargo run --example file-sharing -- \ +//! --listen-address /ip4/127.0.0.1/tcp/40837 \ +//! --secret-key-seed 1 \ +//! provide \ +//! --path \ +//! --name +//! ``` +//! +//! 2. Run command below in another terminal. +//! +//! ``` +//! cargo run --example file-sharing -- \ +//! --peer /ip4/127.0.0.1/tcp/40837/p2p/12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X \ +//! get \ +//! --name +//! ``` +//! +//! Note: The client does not need to be directly connected to the providing +//! peer, as long as both are connected to some node on the same DHT. + +use async_std::io; +use async_std::task::spawn; +use futures::prelude::*; +use libp2p::core::{Multiaddr, PeerId}; +use libp2p::multiaddr::Protocol; +use std::error::Error; +use std::path::PathBuf; +use structopt::StructOpt; + +#[async_std::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + let opt = Opt::from_args(); + + // Creates the network components, namely: + // + // - The network client to interact with the network layer from anywhere + // within your application. + // + // - The network event stream, e.g. for incoming requests. + // + // - The network task driving the network itself. + let (mut network_client, mut network_events, network_task) = + network::new(opt.listen_address, opt.secret_key_seed)?; + + // Spawn the network task for it to run in the background. + spawn(network_task); + + // In case the user provided an address of a peer on the CLI, dial it. + if let Some(addr) = opt.peer { + let peer_id = match addr.iter().last() { + Some(Protocol::P2p(hash)) => PeerId::from_multihash(hash).expect("Valid hash."), + _ => return Err("Expect peer multiaddr to contain peer ID.".into()), + }; + network_client.dial(peer_id, addr).await; + } + + match opt.command { + // Providing a file. + Command::Provide { path, name } => { + // Advertise oneself as a provider of the file on the DHT. + network_client.start_providing(name.clone()).await; + + loop { + match network_events.next().await { + // Reply with the content of the file on incoming requests. + Some(network::Event::InboundRequest { request, channel }) => { + if request == name { + let file_content = std::fs::read_to_string(&path)?; + network_client.respond_file(file_content, channel).await; + } + } + _ => todo!(), + } + } + } + // Locating and getting a file. + Command::Get { name } => { + // Locate all nodes providing the file. + let providers = network_client.get_providers(name.clone()).await; + if providers.is_empty() { + return Err(format!("Could not find provider for file {}.", name).into()); + } + + // Request the content of the file from each node. + let requests = providers.into_iter().map(|p| { + let mut network_client = network_client.clone(); + let name = name.clone(); + async move { network_client.request_file(p, name).await.ok_or(()) }.boxed() + }); + + // Await the requests, ignore the remaining once a single one succeeds. + let file = futures::future::select_ok(requests) + .await + .map_err(|_| Into::>::into("None of the providers returned file."))? + .0; + + println!("Content of file {}: {}", name, file); + } + } + + Ok(()) +} + +#[derive(Debug, StructOpt)] +#[structopt(name = "libp2p relay")] +struct Opt { + /// Fixed value to generate deterministic peer ID. + #[structopt(long)] + secret_key_seed: Option, + + #[structopt(long)] + peer: Option, + + #[structopt(long)] + listen_address: Option, + + #[structopt(subcommand)] + command: Command, +} + +#[derive(Debug, StructOpt)] +enum Command { + Provide { + #[structopt(long)] + path: PathBuf, + #[structopt(long)] + name: String, + }, + Get { + #[structopt(long)] + name: String, + }, +} + +/// The network module, encapsulating all network related logic. +mod network { + use super::*; + use async_trait::async_trait; + use futures::channel::{mpsc, oneshot}; + use futures::future::BoxFuture; + use futures::stream::BoxStream; + use libp2p::core::upgrade::{read_length_prefixed, write_length_prefixed, ProtocolName}; + use libp2p::identity; + use libp2p::identity::ed25519; + use libp2p::kad::record::store::MemoryStore; + use libp2p::kad::{GetProvidersOk, Kademlia, KademliaEvent, QueryResult}; + use libp2p::multiaddr::Protocol; + use libp2p::request_response::{ + ProtocolSupport, RequestResponse, RequestResponseCodec, RequestResponseEvent, + RequestResponseMessage, ResponseChannel, + }; + use libp2p::swarm::{SwarmBuilder, SwarmEvent}; + use libp2p::{NetworkBehaviour, Swarm}; + use std::collections::{HashMap, HashSet}; + use std::iter; + + /// Creates the network components, namely: + /// + /// - The network client to interact with the network layer from anywhere + /// within your application. + /// + /// - The network event stream, e.g. for incoming requests. + /// + /// - The network task driving the network itself. + pub fn new( + listen_address: Option, + secret_key_seed: Option, + ) -> Result<(Client, BoxStream<'static, Event>, BoxFuture<'static, ()>), Box> { + // Create a public/private key pair, either random or based on a seed. + let id_keys = match secret_key_seed { + Some(seed) => { + let mut bytes = [0u8; 32]; + bytes[0] = seed; + let secret_key = ed25519::SecretKey::from_bytes(&mut bytes).expect( + "this returns `Err` only if the length is wrong; the length is correct; qed", + ); + identity::Keypair::Ed25519(secret_key.into()) + } + None => identity::Keypair::generate_ed25519(), + }; + let peer_id = id_keys.public().to_peer_id(); + + // Build the Swarm, connecting the lower layer transport logic with the + // higher layer network behaviour logic. + let mut swarm = SwarmBuilder::new( + futures::executor::block_on(libp2p::development_transport(id_keys))?, + ComposedBehaviour { + kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)), + request_response: RequestResponse::new( + FileExchangeCodec(), + iter::once((FileExchangeProtocol(), ProtocolSupport::Full)), + Default::default(), + ), + }, + peer_id, + ) + .build(); + + // In case a listen address was provided use it, otherwise listen on any + // address. + match listen_address { + Some(addr) => swarm.listen_on(addr)?, + None => swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?, + }; + + let (command_sender, command_receiver) = mpsc::channel(0); + let (event_sender, event_receiver) = mpsc::channel(0); + + Ok(( + Client { + sender: command_sender, + }, + event_receiver.boxed(), + event_loop(swarm, command_receiver, event_sender).boxed(), + )) + } + + #[derive(Clone)] + pub struct Client { + sender: mpsc::Sender, + } + + impl Client { + /// Dial the given peer at the given address. + pub async fn dial(&mut self, peer_id: PeerId, peer_addr: Multiaddr) { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(Command::Dial { + peer_id, + peer_addr, + sender, + }) + .await + .unwrap(); + receiver.await.unwrap(); + } + + /// Advertise the local node as the provider of the given file on the DHT. + pub async fn start_providing(&mut self, file_name: String) { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(Command::StartProviding { file_name, sender }) + .await + .unwrap(); + receiver.await.unwrap(); + } + + /// Find the providers for the given file on the DHT. + pub async fn get_providers(&mut self, file_name: String) -> HashSet { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(Command::GetProviders { file_name, sender }) + .await + .unwrap(); + receiver.await.unwrap() + } + + /// Request the content of the given file from the given peer. + pub async fn request_file(&mut self, peer: PeerId, file_name: String) -> Option { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(Command::RequestFile { + file_name, + peer, + sender, + }) + .await + .unwrap(); + receiver.await.ok() + } + + /// Respond with the provided file content to the given request. + pub async fn respond_file(&mut self, file: String, channel: ResponseChannel) { + self.sender + .send(Command::RespondFile { file, channel }) + .await + .unwrap(); + } + } + + async fn event_loop( + mut swarm: Swarm, + mut command_receiver: mpsc::Receiver, + mut event_sender: mpsc::Sender, + ) { + let mut outstanding_dial: HashMap<_, oneshot::Sender<()>> = HashMap::new(); + let mut outstanding_start_providing = HashMap::new(); + let mut outstanding_get_providers: HashMap<_, oneshot::Sender>> = + HashMap::new(); + let mut outstanding_request_file: HashMap<_, oneshot::Sender> = HashMap::new(); + + loop { + futures::select! { + event = swarm.next() => match event.expect("Swarm stream to be infinite.") { + SwarmEvent::Behaviour(ComposedEvent::Kademlia( + KademliaEvent::OutboundQueryCompleted { + id, + result: QueryResult::StartProviding(_), + .. + }, + )) => { + let sender: oneshot::Sender<()> = outstanding_start_providing + .remove(&id) + .expect("Completed query to be previously outstanding."); + sender.send(()).unwrap(); + } + SwarmEvent::Behaviour(ComposedEvent::Kademlia( + KademliaEvent::OutboundQueryCompleted { + id, + result: + QueryResult::GetProviders(Ok(GetProvidersOk { + providers, + closest_peers, + .. + })), + .. + }, + )) => { + if let Some(sender) = outstanding_get_providers.remove(&id) { + sender.send(providers).unwrap(); + } + } + SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} + SwarmEvent::Behaviour(ComposedEvent::RequestResponse( + RequestResponseEvent::Message { peer, message }, + )) => match message { + RequestResponseMessage::Request { + request, channel, .. + } => { + event_sender + .send(Event::InboundRequest { + request: request.0, + channel, + }) + .await + .unwrap(); + } + RequestResponseMessage::Response { + request_id, + response, + } => { + outstanding_request_file + .remove(&request_id) + .expect("Request to still be oustanding.") + .send(response.0) + .unwrap(); + } + }, + SwarmEvent::Behaviour(ComposedEvent::RequestResponse( + RequestResponseEvent::ResponseSent { .. }, + )) => {} + SwarmEvent::NewListenAddr { address, .. } => { + let local_peer_id = *swarm.local_peer_id(); + println!( + "Local node is listening on {:?}", + address.with(Protocol::P2p(local_peer_id.into())) + ); + } + SwarmEvent::IncomingConnection { .. } => {} + SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + } => { + if endpoint.is_dialer() { + if let Some(sender) = outstanding_dial.remove(&peer_id) { + sender.send(()); + } + } + } + SwarmEvent::ConnectionClosed { .. } => {} + SwarmEvent::UnreachableAddr { .. } => {} + e => panic!("{:?}", e), + }, + command = command_receiver.next() => match command { + Some(Command::Dial { + peer_id, + peer_addr, + sender, + }) => { + if outstanding_dial.contains_key(&peer_id) { + todo!("Already dialing peer."); + } else { + swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, peer_addr.clone()); + swarm + .dial_addr(peer_addr.with(Protocol::P2p(peer_id.into()))) + .unwrap(); + outstanding_dial.insert(peer_id, sender); + } + } + Some(Command::StartProviding { file_name, sender }) => { + let query_id = swarm + .behaviour_mut() + .kademlia + .start_providing(file_name.into_bytes().into()) + .unwrap(); + outstanding_start_providing.insert(query_id, sender); + } + Some(Command::GetProviders { file_name, sender }) => { + let query_id = swarm + .behaviour_mut() + .kademlia + .get_providers(file_name.into_bytes().into()); + outstanding_get_providers.insert(query_id, sender); + } + Some(Command::RequestFile { + file_name, + peer, + sender, + }) => { + let request_id = swarm + .behaviour_mut() + .request_response + .send_request(&peer, FileRequest(file_name)); + outstanding_request_file.insert(request_id, sender); + } + Some(Command::RespondFile { file, channel }) => { + swarm + .behaviour_mut() + .request_response + .send_response(channel, FileResponse(file)) + .unwrap(); + } + None => { + // Command channel closed, thus shutting down the network. + return (); + } + }, + } + } + } + + #[derive(NetworkBehaviour)] + #[behaviour(event_process = false, out_event = "ComposedEvent")] + struct ComposedBehaviour { + request_response: RequestResponse, + kademlia: Kademlia, + } + + #[derive(Debug)] + enum ComposedEvent { + RequestResponse(RequestResponseEvent), + Kademlia(KademliaEvent), + } + + impl From> for ComposedEvent { + fn from(event: RequestResponseEvent) -> Self { + ComposedEvent::RequestResponse(event) + } + } + + impl From for ComposedEvent { + fn from(event: KademliaEvent) -> Self { + ComposedEvent::Kademlia(event) + } + } + + #[derive(Debug)] + enum Command { + Dial { + peer_id: PeerId, + peer_addr: Multiaddr, + sender: oneshot::Sender<()>, + }, + StartProviding { + file_name: String, + sender: oneshot::Sender<()>, + }, + GetProviders { + file_name: String, + sender: oneshot::Sender>, + }, + RequestFile { + file_name: String, + peer: PeerId, + sender: oneshot::Sender, + }, + RespondFile { + file: String, + channel: ResponseChannel, + }, + } + + pub enum Event { + InboundRequest { + request: String, + channel: ResponseChannel, + }, + } + + // Simple file exchange protocol + + #[derive(Debug, Clone)] + struct FileExchangeProtocol(); + #[derive(Clone)] + struct FileExchangeCodec(); + #[derive(Debug, Clone, PartialEq, Eq)] + struct FileRequest(String); + #[derive(Debug, Clone, PartialEq, Eq)] + pub struct FileResponse(String); + + impl ProtocolName for FileExchangeProtocol { + fn protocol_name(&self) -> &[u8] { + "/file-exchange/1".as_bytes() + } + } + + #[async_trait] + impl RequestResponseCodec for FileExchangeCodec { + type Protocol = FileExchangeProtocol; + type Request = FileRequest; + type Response = FileResponse; + + async fn read_request( + &mut self, + _: &FileExchangeProtocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let vec = read_length_prefixed(io, 1_000_000).await?; + + if vec.is_empty() { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + Ok(FileRequest(String::from_utf8(vec).unwrap())) + } + + async fn read_response( + &mut self, + _: &FileExchangeProtocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let vec = read_length_prefixed(io, 1_000_000).await?; + + if vec.is_empty() { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + Ok(FileResponse(String::from_utf8(vec).unwrap())) + } + + async fn write_request( + &mut self, + _: &FileExchangeProtocol, + io: &mut T, + FileRequest(data): FileRequest, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + write_length_prefixed(io, data).await?; + io.close().await?; + + Ok(()) + } + + async fn write_response( + &mut self, + _: &FileExchangeProtocol, + io: &mut T, + FileResponse(data): FileResponse, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + write_length_prefixed(io, data).await?; + io.close().await?; + + Ok(()) + } + } +} From f875bfdebf0bb9eb33892172ee65ac661609b6da Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 12:23:40 +0200 Subject: [PATCH 02/15] examples/file-sharing: Rename Command to CliArgument --- examples/file-sharing.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 8f7d04d1e3b..c31839b7560 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -114,9 +114,9 @@ async fn main() -> Result<(), Box> { network_client.dial(peer_id, addr).await; } - match opt.command { + match opt.argument { // Providing a file. - Command::Provide { path, name } => { + CliArgument::Provide { path, name } => { // Advertise oneself as a provider of the file on the DHT. network_client.start_providing(name.clone()).await; @@ -134,7 +134,7 @@ async fn main() -> Result<(), Box> { } } // Locating and getting a file. - Command::Get { name } => { + CliArgument::Get { name } => { // Locate all nodes providing the file. let providers = network_client.get_providers(name.clone()).await; if providers.is_empty() { @@ -175,11 +175,11 @@ struct Opt { listen_address: Option, #[structopt(subcommand)] - command: Command, + argument: CliArgument, } #[derive(Debug, StructOpt)] -enum Command { +enum CliArgument { Provide { #[structopt(long)] path: PathBuf, @@ -370,7 +370,6 @@ mod network { result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, - closest_peers, .. })), .. @@ -382,7 +381,7 @@ mod network { } SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} SwarmEvent::Behaviour(ComposedEvent::RequestResponse( - RequestResponseEvent::Message { peer, message }, + RequestResponseEvent::Message { message, .. }, )) => match message { RequestResponseMessage::Request { request, channel, .. @@ -422,7 +421,7 @@ mod network { } => { if endpoint.is_dialer() { if let Some(sender) = outstanding_dial.remove(&peer_id) { - sender.send(()); + sender.send(()).unwrap(); } } } From 09f28ce4fd35d4908e492d97bdbebec1579044dd Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 12:27:37 +0200 Subject: [PATCH 03/15] examples/file-sharing: Don't block_on dns transport but await instead --- examples/file-sharing.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index c31839b7560..9cae51096a8 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -100,7 +100,7 @@ async fn main() -> Result<(), Box> { // // - The network task driving the network itself. let (mut network_client, mut network_events, network_task) = - network::new(opt.listen_address, opt.secret_key_seed)?; + network::new(opt.listen_address, opt.secret_key_seed).await?; // Spawn the network task for it to run in the background. spawn(network_task); @@ -222,7 +222,7 @@ mod network { /// - The network event stream, e.g. for incoming requests. /// /// - The network task driving the network itself. - pub fn new( + pub async fn new( listen_address: Option, secret_key_seed: Option, ) -> Result<(Client, BoxStream<'static, Event>, BoxFuture<'static, ()>), Box> { @@ -243,7 +243,7 @@ mod network { // Build the Swarm, connecting the lower layer transport logic with the // higher layer network behaviour logic. let mut swarm = SwarmBuilder::new( - futures::executor::block_on(libp2p::development_transport(id_keys))?, + libp2p::development_transport(id_keys).await?, ComposedBehaviour { kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)), request_response: RequestResponse::new( From 6e9c9f2c0a0428355e4abbdbe775be61a4fbea2c Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 13:10:39 +0200 Subject: [PATCH 04/15] examples/file-sharing: Propagate dial failure --- examples/file-sharing.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 9cae51096a8..d1027ca9064 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -111,7 +111,10 @@ async fn main() -> Result<(), Box> { Some(Protocol::P2p(hash)) => PeerId::from_multihash(hash).expect("Valid hash."), _ => return Err("Expect peer multiaddr to contain peer ID.".into()), }; - network_client.dial(peer_id, addr).await; + network_client + .dial(peer_id, addr) + .await + .expect("Dial to succeed"); } match opt.argument { @@ -282,7 +285,7 @@ mod network { impl Client { /// Dial the given peer at the given address. - pub async fn dial(&mut self, peer_id: PeerId, peer_addr: Multiaddr) { + pub async fn dial(&mut self, peer_id: PeerId, peer_addr: Multiaddr) -> Result<(), ()> { let (sender, receiver) = oneshot::channel(); self.sender .send(Command::Dial { @@ -292,7 +295,7 @@ mod network { }) .await .unwrap(); - receiver.await.unwrap(); + receiver.await.unwrap() } /// Advertise the local node as the provider of the given file on the DHT. @@ -343,7 +346,7 @@ mod network { mut command_receiver: mpsc::Receiver, mut event_sender: mpsc::Sender, ) { - let mut outstanding_dial: HashMap<_, oneshot::Sender<()>> = HashMap::new(); + let mut outstanding_dial: HashMap<_, oneshot::Sender>> = HashMap::new(); let mut outstanding_start_providing = HashMap::new(); let mut outstanding_get_providers: HashMap<_, oneshot::Sender>> = HashMap::new(); @@ -421,12 +424,18 @@ mod network { } => { if endpoint.is_dialer() { if let Some(sender) = outstanding_dial.remove(&peer_id) { - sender.send(()).unwrap(); + sender.send(Ok(())).unwrap(); } } } SwarmEvent::ConnectionClosed { .. } => {} - SwarmEvent::UnreachableAddr { .. } => {} + SwarmEvent::UnreachableAddr { peer_id, attempts_remaining, .. } => { + if attempts_remaining == 0 { + if let Some(sender) = outstanding_dial.remove(&peer_id) { + sender.send(Ok(())).unwrap(); + } + } + } e => panic!("{:?}", e), }, command = command_receiver.next() => match command { @@ -520,7 +529,7 @@ mod network { Dial { peer_id: PeerId, peer_addr: Multiaddr, - sender: oneshot::Sender<()>, + sender: oneshot::Sender>, }, StartProviding { file_name: String, From 99c28f3dd7afaefbe70e2655f1b3dd6280c56c78 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 13:24:49 +0200 Subject: [PATCH 05/15] examples/file-sharing: Expect pending outbound provider query --- examples/file-sharing.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index d1027ca9064..459d43bdc6c 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -378,9 +378,9 @@ mod network { .. }, )) => { - if let Some(sender) = outstanding_get_providers.remove(&id) { - sender.send(providers).unwrap(); - } + outstanding_get_providers.remove(&id) + .expect("Completed query to be previously pending.") + .send(providers).unwrap(); } SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} SwarmEvent::Behaviour(ComposedEvent::RequestResponse( From 7521919b43cb1c5fb2f8b94ebbcf338ec17bd315 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 15:49:36 +0200 Subject: [PATCH 06/15] examples/file-sharing: Use str as error --- examples/file-sharing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 459d43bdc6c..6a9dfe4c349 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -154,7 +154,7 @@ async fn main() -> Result<(), Box> { // Await the requests, ignore the remaining once a single one succeeds. let file = futures::future::select_ok(requests) .await - .map_err(|_| Into::>::into("None of the providers returned file."))? + .map_err(|_| "None of the providers returned file.")? .0; println!("Content of file {}: {}", name, file); From aa483c84e76ce17385ac238168848d9d05dba406 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 15:50:38 +0200 Subject: [PATCH 07/15] examples/file-sharing: Fix structopt title --- examples/file-sharing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 6a9dfe4c349..b23359570c5 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -165,7 +165,7 @@ async fn main() -> Result<(), Box> { } #[derive(Debug, StructOpt)] -#[structopt(name = "libp2p relay")] +#[structopt(name = "libp2p file sharing example")] struct Opt { /// Fixed value to generate deterministic peer ID. #[structopt(long)] From 3fba99a12e1f08cb785c021b7b7d86c1597bae93 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 15:52:47 +0200 Subject: [PATCH 08/15] examples/file-sharing: Use impl Future and impl Stream --- examples/file-sharing.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index b23359570c5..36fd7398a36 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -200,8 +200,6 @@ mod network { use super::*; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; - use futures::future::BoxFuture; - use futures::stream::BoxStream; use libp2p::core::upgrade::{read_length_prefixed, write_length_prefixed, ProtocolName}; use libp2p::identity; use libp2p::identity::ed25519; @@ -228,7 +226,7 @@ mod network { pub async fn new( listen_address: Option, secret_key_seed: Option, - ) -> Result<(Client, BoxStream<'static, Event>, BoxFuture<'static, ()>), Box> { + ) -> Result<(Client, impl Stream, impl Future), Box> { // Create a public/private key pair, either random or based on a seed. let id_keys = match secret_key_seed { Some(seed) => { @@ -273,8 +271,8 @@ mod network { Client { sender: command_sender, }, - event_receiver.boxed(), - event_loop(swarm, command_receiver, event_sender).boxed(), + event_receiver, + event_loop(swarm, command_receiver, event_sender), )) } From eaf81f51c5c084042f1af117bfc0bf6547a1920d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 15:54:07 +0200 Subject: [PATCH 09/15] examples/file-sharing: Rename outstanding to pending --- examples/file-sharing.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 36fd7398a36..619c82f6696 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -344,11 +344,11 @@ mod network { mut command_receiver: mpsc::Receiver, mut event_sender: mpsc::Sender, ) { - let mut outstanding_dial: HashMap<_, oneshot::Sender>> = HashMap::new(); - let mut outstanding_start_providing = HashMap::new(); - let mut outstanding_get_providers: HashMap<_, oneshot::Sender>> = + let mut pending_dial: HashMap<_, oneshot::Sender>> = HashMap::new(); + let mut pending_start_providing = HashMap::new(); + let mut pending_get_providers: HashMap<_, oneshot::Sender>> = HashMap::new(); - let mut outstanding_request_file: HashMap<_, oneshot::Sender> = HashMap::new(); + let mut pending_request_file: HashMap<_, oneshot::Sender> = HashMap::new(); loop { futures::select! { @@ -360,9 +360,9 @@ mod network { .. }, )) => { - let sender: oneshot::Sender<()> = outstanding_start_providing + let sender: oneshot::Sender<()> = pending_start_providing .remove(&id) - .expect("Completed query to be previously outstanding."); + .expect("Completed query to be previously pending."); sender.send(()).unwrap(); } SwarmEvent::Behaviour(ComposedEvent::Kademlia( @@ -376,7 +376,7 @@ mod network { .. }, )) => { - outstanding_get_providers.remove(&id) + pending_get_providers.remove(&id) .expect("Completed query to be previously pending.") .send(providers).unwrap(); } @@ -399,7 +399,7 @@ mod network { request_id, response, } => { - outstanding_request_file + pending_request_file .remove(&request_id) .expect("Request to still be oustanding.") .send(response.0) @@ -421,7 +421,7 @@ mod network { peer_id, endpoint, .. } => { if endpoint.is_dialer() { - if let Some(sender) = outstanding_dial.remove(&peer_id) { + if let Some(sender) = pending_dial.remove(&peer_id) { sender.send(Ok(())).unwrap(); } } @@ -429,7 +429,7 @@ mod network { SwarmEvent::ConnectionClosed { .. } => {} SwarmEvent::UnreachableAddr { peer_id, attempts_remaining, .. } => { if attempts_remaining == 0 { - if let Some(sender) = outstanding_dial.remove(&peer_id) { + if let Some(sender) = pending_dial.remove(&peer_id) { sender.send(Ok(())).unwrap(); } } @@ -442,7 +442,7 @@ mod network { peer_addr, sender, }) => { - if outstanding_dial.contains_key(&peer_id) { + if pending_dial.contains_key(&peer_id) { todo!("Already dialing peer."); } else { swarm @@ -452,7 +452,7 @@ mod network { swarm .dial_addr(peer_addr.with(Protocol::P2p(peer_id.into()))) .unwrap(); - outstanding_dial.insert(peer_id, sender); + pending_dial.insert(peer_id, sender); } } Some(Command::StartProviding { file_name, sender }) => { @@ -461,14 +461,14 @@ mod network { .kademlia .start_providing(file_name.into_bytes().into()) .unwrap(); - outstanding_start_providing.insert(query_id, sender); + pending_start_providing.insert(query_id, sender); } Some(Command::GetProviders { file_name, sender }) => { let query_id = swarm .behaviour_mut() .kademlia .get_providers(file_name.into_bytes().into()); - outstanding_get_providers.insert(query_id, sender); + pending_get_providers.insert(query_id, sender); } Some(Command::RequestFile { file_name, @@ -479,7 +479,7 @@ mod network { .behaviour_mut() .request_response .send_request(&peer, FileRequest(file_name)); - outstanding_request_file.insert(request_id, sender); + pending_request_file.insert(request_id, sender); } Some(Command::RespondFile { file, channel }) => { swarm From 13a7d02dda9e9c65aafc5910011b6ed72064a1cd Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 16:25:02 +0200 Subject: [PATCH 10/15] examples/file-sharing: Showcase fallible Client methods --- examples/file-sharing.rs | 44 +++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 619c82f6696..5ce6ebcb504 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -148,7 +148,7 @@ async fn main() -> Result<(), Box> { let requests = providers.into_iter().map(|p| { let mut network_client = network_client.clone(); let name = name.clone(); - async move { network_client.request_file(p, name).await.ok_or(()) }.boxed() + async move { network_client.request_file(p, name).await }.boxed() }); // Await the requests, ignore the remaining once a single one succeeds. @@ -283,7 +283,11 @@ mod network { impl Client { /// Dial the given peer at the given address. - pub async fn dial(&mut self, peer_id: PeerId, peer_addr: Multiaddr) -> Result<(), ()> { + pub async fn dial( + &mut self, + peer_id: PeerId, + peer_addr: Multiaddr, + ) -> Result<(), Box> { let (sender, receiver) = oneshot::channel(); self.sender .send(Command::Dial { @@ -317,7 +321,11 @@ mod network { } /// Request the content of the given file from the given peer. - pub async fn request_file(&mut self, peer: PeerId, file_name: String) -> Option { + pub async fn request_file( + &mut self, + peer: PeerId, + file_name: String, + ) -> Result> { let (sender, receiver) = oneshot::channel(); self.sender .send(Command::RequestFile { @@ -327,7 +335,7 @@ mod network { }) .await .unwrap(); - receiver.await.ok() + receiver.await.expect("Sender not be dropped.") } /// Respond with the provided file content to the given request. @@ -344,11 +352,15 @@ mod network { mut command_receiver: mpsc::Receiver, mut event_sender: mpsc::Sender, ) { - let mut pending_dial: HashMap<_, oneshot::Sender>> = HashMap::new(); + let mut pending_dial: HashMap<_, oneshot::Sender>>> = + HashMap::new(); let mut pending_start_providing = HashMap::new(); let mut pending_get_providers: HashMap<_, oneshot::Sender>> = HashMap::new(); - let mut pending_request_file: HashMap<_, oneshot::Sender> = HashMap::new(); + let mut pending_request_file: HashMap< + _, + oneshot::Sender>>, + > = HashMap::new(); loop { futures::select! { @@ -401,11 +413,19 @@ mod network { } => { pending_request_file .remove(&request_id) - .expect("Request to still be oustanding.") - .send(response.0) + .expect("Request to still be pending.") + .send(Ok(response.0)) .unwrap(); } }, + SwarmEvent::Behaviour(ComposedEvent::RequestResponse( + RequestResponseEvent::OutboundFailure { request_id, error, ..} + )) => { + pending_request_file.remove(&request_id) + .expect("Request to still be pending.") + .send(Err(Box::new(error))) + .unwrap(); + } SwarmEvent::Behaviour(ComposedEvent::RequestResponse( RequestResponseEvent::ResponseSent { .. }, )) => {} @@ -427,10 +447,10 @@ mod network { } } SwarmEvent::ConnectionClosed { .. } => {} - SwarmEvent::UnreachableAddr { peer_id, attempts_remaining, .. } => { + SwarmEvent::UnreachableAddr { peer_id, attempts_remaining, error, .. } => { if attempts_remaining == 0 { if let Some(sender) = pending_dial.remove(&peer_id) { - sender.send(Ok(())).unwrap(); + sender.send(Err(Box::new(error))).unwrap(); } } } @@ -527,7 +547,7 @@ mod network { Dial { peer_id: PeerId, peer_addr: Multiaddr, - sender: oneshot::Sender>, + sender: oneshot::Sender>>, }, StartProviding { file_name: String, @@ -540,7 +560,7 @@ mod network { RequestFile { file_name: String, peer: PeerId, - sender: oneshot::Sender, + sender: oneshot::Sender>>, }, RespondFile { file: String, From 7d7696164c15ad4a12a76676031bacf4e21129cb Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 16:29:29 +0200 Subject: [PATCH 11/15] examples/file-sharing: Do not overload term Client --- examples/file-sharing.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 5ce6ebcb504..0f21763b5c3 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -29,16 +29,17 @@ //! //! ## Sample plot //! -//! Assuming there are 3 nodes, provider A and B and client C. +//! Assuming there are 3 nodes, A, B and C. A and B each provide a file while C +//! retrieves a file. //! //! Provider nodes A and B each provide a file, file FA and FB respectively. //! They do so by advertising themselves as a provider for their file on a DHT //! via [`libp2p-kad`]. The two, among other nodes of the network, are //! interconnected via the DHT. //! -//! Client node C can locate the providers for file FA or FB on the DHT via +//! Node C can locate the providers for file FA or FB on the DHT via //! [`libp2p-kad`] without being connected to the specific node providing the -//! file, but any node of the DHT. Client C then connects to the corresponding +//! file, but any node of the DHT. Node C then connects to the corresponding //! node and requests the file content of the file via //! [`libp2p-request-response`]. //! From 8ca3c2a60137771a97847048a4490165b319d6ab Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 16:50:18 +0200 Subject: [PATCH 12/15] examples/file-sharing: Trigger listening via client --- examples/file-sharing.rs | 46 +++++++++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 0f21763b5c3..9d5d5faaea1 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -101,11 +101,22 @@ async fn main() -> Result<(), Box> { // // - The network task driving the network itself. let (mut network_client, mut network_events, network_task) = - network::new(opt.listen_address, opt.secret_key_seed).await?; + network::new(opt.secret_key_seed).await?; // Spawn the network task for it to run in the background. spawn(network_task); + // In case a listen address was provided use it, otherwise listen on any + // address. + match opt.listen_address { + Some(addr) => network_client.start_listening(addr).await.unwrap(), + None => { + network_client + .start_listening("/ip4/0.0.0.0/tcp/0".parse()?) + .await.unwrap() + } + }; + // In case the user provided an address of a peer on the CLI, dial it. if let Some(addr) = opt.peer { let peer_id = match addr.iter().last() { @@ -225,7 +236,6 @@ mod network { /// /// - The network task driving the network itself. pub async fn new( - listen_address: Option, secret_key_seed: Option, ) -> Result<(Client, impl Stream, impl Future), Box> { // Create a public/private key pair, either random or based on a seed. @@ -244,7 +254,7 @@ mod network { // Build the Swarm, connecting the lower layer transport logic with the // higher layer network behaviour logic. - let mut swarm = SwarmBuilder::new( + let swarm = SwarmBuilder::new( libp2p::development_transport(id_keys).await?, ComposedBehaviour { kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)), @@ -258,13 +268,6 @@ mod network { ) .build(); - // In case a listen address was provided use it, otherwise listen on any - // address. - match listen_address { - Some(addr) => swarm.listen_on(addr)?, - None => swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?, - }; - let (command_sender, command_receiver) = mpsc::channel(0); let (event_sender, event_receiver) = mpsc::channel(0); @@ -283,6 +286,19 @@ mod network { } impl Client { + /// Listen for incoming connections on the given address. + pub async fn start_listening( + &mut self, + addr: Multiaddr, + ) -> Result<(), Box> { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(Command::StartListening { addr, sender }) + .await + .unwrap(); + receiver.await.expect("Sender not to be dropped.") + } + /// Dial the given peer at the given address. pub async fn dial( &mut self, @@ -458,6 +474,12 @@ mod network { e => panic!("{:?}", e), }, command = command_receiver.next() => match command { + Some(Command::StartListening { addr, sender }) => { + match swarm.listen_on(addr) { + Ok(_) => sender.send(Ok(())).unwrap(), + Err(e) => sender.send(Err(Box::new(e))).unwrap(), + } + } Some(Command::Dial { peer_id, peer_addr, @@ -545,6 +567,10 @@ mod network { #[derive(Debug)] enum Command { + StartListening { + addr: Multiaddr, + sender: oneshot::Sender>>, + }, Dial { peer_id: PeerId, peer_addr: Multiaddr, From fe5cba4fa1304962d0cb88005baea2082770728c Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 16:51:50 +0200 Subject: [PATCH 13/15] examples/file-sharing: Remove duplicate comment --- examples/file-sharing.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 9d5d5faaea1..ee844312594 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -92,14 +92,6 @@ async fn main() -> Result<(), Box> { let opt = Opt::from_args(); - // Creates the network components, namely: - // - // - The network client to interact with the network layer from anywhere - // within your application. - // - // - The network event stream, e.g. for incoming requests. - // - // - The network task driving the network itself. let (mut network_client, mut network_events, network_task) = network::new(opt.secret_key_seed).await?; From 64b8a68367b041b1f10fc887e2159bfb4537e46a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 Aug 2021 17:05:31 +0200 Subject: [PATCH 14/15] examples/file-sharing: Replace most unwrap calls --- examples/file-sharing.rs | 74 ++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index ee844312594..0256fdbbcd7 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -101,12 +101,14 @@ async fn main() -> Result<(), Box> { // In case a listen address was provided use it, otherwise listen on any // address. match opt.listen_address { - Some(addr) => network_client.start_listening(addr).await.unwrap(), - None => { - network_client - .start_listening("/ip4/0.0.0.0/tcp/0".parse()?) - .await.unwrap() - } + Some(addr) => network_client + .start_listening(addr) + .await + .expect("Listening not to fail."), + None => network_client + .start_listening("/ip4/0.0.0.0/tcp/0".parse()?) + .await + .expect("Listening not to fail."), }; // In case the user provided an address of a peer on the CLI, dial it. @@ -287,7 +289,7 @@ mod network { self.sender .send(Command::StartListening { addr, sender }) .await - .unwrap(); + .expect("Command receiver not to be dropped."); receiver.await.expect("Sender not to be dropped.") } @@ -305,8 +307,8 @@ mod network { sender, }) .await - .unwrap(); - receiver.await.unwrap() + .expect("Command receiver not to be dropped."); + receiver.await.expect("Sender not to be dropped.") } /// Advertise the local node as the provider of the given file on the DHT. @@ -315,8 +317,8 @@ mod network { self.sender .send(Command::StartProviding { file_name, sender }) .await - .unwrap(); - receiver.await.unwrap(); + .expect("Command receiver not to be dropped."); + receiver.await.expect("Sender not to be dropped."); } /// Find the providers for the given file on the DHT. @@ -325,8 +327,8 @@ mod network { self.sender .send(Command::GetProviders { file_name, sender }) .await - .unwrap(); - receiver.await.unwrap() + .expect("Command receiver not to be dropped."); + receiver.await.expect("Sender not to be dropped.") } /// Request the content of the given file from the given peer. @@ -343,7 +345,7 @@ mod network { sender, }) .await - .unwrap(); + .expect("Command receiver not to be dropped."); receiver.await.expect("Sender not be dropped.") } @@ -352,7 +354,7 @@ mod network { self.sender .send(Command::RespondFile { file, channel }) .await - .unwrap(); + .expect("Command receiver not to be dropped."); } } @@ -384,7 +386,7 @@ mod network { let sender: oneshot::Sender<()> = pending_start_providing .remove(&id) .expect("Completed query to be previously pending."); - sender.send(()).unwrap(); + let _ = sender.send(()); } SwarmEvent::Behaviour(ComposedEvent::Kademlia( KademliaEvent::OutboundQueryCompleted { @@ -397,9 +399,9 @@ mod network { .. }, )) => { - pending_get_providers.remove(&id) + let _ = pending_get_providers.remove(&id) .expect("Completed query to be previously pending.") - .send(providers).unwrap(); + .send(providers); } SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} SwarmEvent::Behaviour(ComposedEvent::RequestResponse( @@ -414,26 +416,24 @@ mod network { channel, }) .await - .unwrap(); + .expect("Event receiver not to be dropped."); } RequestResponseMessage::Response { request_id, response, } => { - pending_request_file + let _ = pending_request_file .remove(&request_id) .expect("Request to still be pending.") - .send(Ok(response.0)) - .unwrap(); + .send(Ok(response.0)); } }, SwarmEvent::Behaviour(ComposedEvent::RequestResponse( RequestResponseEvent::OutboundFailure { request_id, error, ..} )) => { - pending_request_file.remove(&request_id) + let _ = pending_request_file.remove(&request_id) .expect("Request to still be pending.") - .send(Err(Box::new(error))) - .unwrap(); + .send(Err(Box::new(error))); } SwarmEvent::Behaviour(ComposedEvent::RequestResponse( RequestResponseEvent::ResponseSent { .. }, @@ -451,7 +451,7 @@ mod network { } => { if endpoint.is_dialer() { if let Some(sender) = pending_dial.remove(&peer_id) { - sender.send(Ok(())).unwrap(); + let _ = sender.send(Ok(())); } } } @@ -459,7 +459,7 @@ mod network { SwarmEvent::UnreachableAddr { peer_id, attempts_remaining, error, .. } => { if attempts_remaining == 0 { if let Some(sender) = pending_dial.remove(&peer_id) { - sender.send(Err(Box::new(error))).unwrap(); + let _ = sender.send(Err(Box::new(error))); } } } @@ -467,10 +467,10 @@ mod network { }, command = command_receiver.next() => match command { Some(Command::StartListening { addr, sender }) => { - match swarm.listen_on(addr) { - Ok(_) => sender.send(Ok(())).unwrap(), - Err(e) => sender.send(Err(Box::new(e))).unwrap(), - } + let _ = match swarm.listen_on(addr) { + Ok(_) => sender.send(Ok(())), + Err(e) => sender.send(Err(Box::new(e))), + }; } Some(Command::Dial { peer_id, @@ -484,10 +484,10 @@ mod network { .behaviour_mut() .kademlia .add_address(&peer_id, peer_addr.clone()); - swarm - .dial_addr(peer_addr.with(Protocol::P2p(peer_id.into()))) - .unwrap(); - pending_dial.insert(peer_id, sender); + match swarm.dial_addr(peer_addr.with(Protocol::P2p(peer_id.into()))) { + Ok(()) => { pending_dial.insert(peer_id, sender); }, + Err(e) => { let _ = sender.send(Err(Box::new(e))); }, + } } } Some(Command::StartProviding { file_name, sender }) => { @@ -495,7 +495,7 @@ mod network { .behaviour_mut() .kademlia .start_providing(file_name.into_bytes().into()) - .unwrap(); + .expect("No store error."); pending_start_providing.insert(query_id, sender); } Some(Command::GetProviders { file_name, sender }) => { @@ -521,7 +521,7 @@ mod network { .behaviour_mut() .request_response .send_response(channel, FileResponse(file)) - .unwrap(); + .expect("Connection to peer to be still open."); } None => { // Command channel closed, thus shutting down the network. From e05117d092a949bb5fa0e77e26a3c5994c91939f Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 16 Aug 2021 16:45:16 +0200 Subject: [PATCH 15/15] examples/file-sharing: Wrap state in EventLoop struct --- examples/file-sharing.rs | 381 ++++++++++++++++++++++----------------- 1 file changed, 215 insertions(+), 166 deletions(-) diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index 0256fdbbcd7..2bcdbe8719f 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -92,11 +92,11 @@ async fn main() -> Result<(), Box> { let opt = Opt::from_args(); - let (mut network_client, mut network_events, network_task) = + let (mut network_client, mut network_events, network_event_loop) = network::new(opt.secret_key_seed).await?; // Spawn the network task for it to run in the background. - spawn(network_task); + spawn(network_event_loop.run()); // In case a listen address was provided use it, otherwise listen on any // address. @@ -206,17 +206,18 @@ mod network { use super::*; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; + use libp2p::core::either::EitherError; use libp2p::core::upgrade::{read_length_prefixed, write_length_prefixed, ProtocolName}; use libp2p::identity; use libp2p::identity::ed25519; use libp2p::kad::record::store::MemoryStore; - use libp2p::kad::{GetProvidersOk, Kademlia, KademliaEvent, QueryResult}; + use libp2p::kad::{GetProvidersOk, Kademlia, KademliaEvent, QueryId, QueryResult}; use libp2p::multiaddr::Protocol; use libp2p::request_response::{ - ProtocolSupport, RequestResponse, RequestResponseCodec, RequestResponseEvent, + ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseEvent, RequestResponseMessage, ResponseChannel, }; - use libp2p::swarm::{SwarmBuilder, SwarmEvent}; + use libp2p::swarm::{ProtocolsHandlerUpgrErr, SwarmBuilder, SwarmEvent}; use libp2p::{NetworkBehaviour, Swarm}; use std::collections::{HashMap, HashSet}; use std::iter; @@ -231,7 +232,7 @@ mod network { /// - The network task driving the network itself. pub async fn new( secret_key_seed: Option, - ) -> Result<(Client, impl Stream, impl Future), Box> { + ) -> Result<(Client, impl Stream, EventLoop), Box> { // Create a public/private key pair, either random or based on a seed. let id_keys = match secret_key_seed { Some(seed) => { @@ -270,7 +271,7 @@ mod network { sender: command_sender, }, event_receiver, - event_loop(swarm, command_receiver, event_sender), + EventLoop::new(swarm, command_receiver, event_sender), )) } @@ -358,176 +359,224 @@ mod network { } } - async fn event_loop( - mut swarm: Swarm, - mut command_receiver: mpsc::Receiver, - mut event_sender: mpsc::Sender, - ) { - let mut pending_dial: HashMap<_, oneshot::Sender>>> = - HashMap::new(); - let mut pending_start_providing = HashMap::new(); - let mut pending_get_providers: HashMap<_, oneshot::Sender>> = - HashMap::new(); - let mut pending_request_file: HashMap< - _, - oneshot::Sender>>, - > = HashMap::new(); - - loop { - futures::select! { - event = swarm.next() => match event.expect("Swarm stream to be infinite.") { - SwarmEvent::Behaviour(ComposedEvent::Kademlia( - KademliaEvent::OutboundQueryCompleted { - id, - result: QueryResult::StartProviding(_), - .. - }, - )) => { - let sender: oneshot::Sender<()> = pending_start_providing - .remove(&id) - .expect("Completed query to be previously pending."); - let _ = sender.send(()); - } - SwarmEvent::Behaviour(ComposedEvent::Kademlia( - KademliaEvent::OutboundQueryCompleted { - id, - result: - QueryResult::GetProviders(Ok(GetProvidersOk { - providers, - .. - })), - .. - }, - )) => { - let _ = pending_get_providers.remove(&id) - .expect("Completed query to be previously pending.") - .send(providers); - } - SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} - SwarmEvent::Behaviour(ComposedEvent::RequestResponse( - RequestResponseEvent::Message { message, .. }, - )) => match message { - RequestResponseMessage::Request { - request, channel, .. - } => { - event_sender - .send(Event::InboundRequest { - request: request.0, - channel, - }) - .await - .expect("Event receiver not to be dropped."); - } - RequestResponseMessage::Response { - request_id, - response, - } => { - let _ = pending_request_file - .remove(&request_id) - .expect("Request to still be pending.") - .send(Ok(response.0)); - } + pub struct EventLoop { + swarm: Swarm, + command_receiver: mpsc::Receiver, + event_sender: mpsc::Sender, + pending_dial: HashMap>>>, + pending_start_providing: HashMap>, + pending_get_providers: HashMap>>, + pending_request_file: + HashMap>>>, + } + + impl EventLoop { + fn new( + swarm: Swarm, + command_receiver: mpsc::Receiver, + event_sender: mpsc::Sender, + ) -> Self { + Self { + swarm, + command_receiver, + event_sender, + pending_dial: Default::default(), + pending_start_providing: Default::default(), + pending_get_providers: Default::default(), + pending_request_file: Default::default(), + } + } + + pub async fn run(mut self) { + loop { + futures::select! { + event = self.swarm.next() => self.handle_event(event.expect("Swarm stream to be infinite.")).await , + command = self.command_receiver.next() => match command { + Some(c) => self.handle_command(c).await, + // Command channel closed, thus shutting down the network event loop. + None=> return, }, - SwarmEvent::Behaviour(ComposedEvent::RequestResponse( - RequestResponseEvent::OutboundFailure { request_id, error, ..} - )) => { - let _ = pending_request_file.remove(&request_id) - .expect("Request to still be pending.") - .send(Err(Box::new(error))); - } - SwarmEvent::Behaviour(ComposedEvent::RequestResponse( - RequestResponseEvent::ResponseSent { .. }, - )) => {} - SwarmEvent::NewListenAddr { address, .. } => { - let local_peer_id = *swarm.local_peer_id(); - println!( - "Local node is listening on {:?}", - address.with(Protocol::P2p(local_peer_id.into())) - ); - } - SwarmEvent::IncomingConnection { .. } => {} - SwarmEvent::ConnectionEstablished { - peer_id, endpoint, .. + } + } + } + + async fn handle_event( + &mut self, + event: SwarmEvent< + ComposedEvent, + EitherError, io::Error>, + >, + ) { + match event { + SwarmEvent::Behaviour(ComposedEvent::Kademlia( + KademliaEvent::OutboundQueryCompleted { + id, + result: QueryResult::StartProviding(_), + .. + }, + )) => { + let sender: oneshot::Sender<()> = self + .pending_start_providing + .remove(&id) + .expect("Completed query to be previously pending."); + let _ = sender.send(()); + } + SwarmEvent::Behaviour(ComposedEvent::Kademlia( + KademliaEvent::OutboundQueryCompleted { + id, + result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })), + .. + }, + )) => { + let _ = self + .pending_get_providers + .remove(&id) + .expect("Completed query to be previously pending.") + .send(providers); + } + SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} + SwarmEvent::Behaviour(ComposedEvent::RequestResponse( + RequestResponseEvent::Message { message, .. }, + )) => match message { + RequestResponseMessage::Request { + request, channel, .. } => { - if endpoint.is_dialer() { - if let Some(sender) = pending_dial.remove(&peer_id) { - let _ = sender.send(Ok(())); - } - } + self.event_sender + .send(Event::InboundRequest { + request: request.0, + channel, + }) + .await + .expect("Event receiver not to be dropped."); } - SwarmEvent::ConnectionClosed { .. } => {} - SwarmEvent::UnreachableAddr { peer_id, attempts_remaining, error, .. } => { - if attempts_remaining == 0 { - if let Some(sender) = pending_dial.remove(&peer_id) { - let _ = sender.send(Err(Box::new(error))); - } - } + RequestResponseMessage::Response { + request_id, + response, + } => { + let _ = self + .pending_request_file + .remove(&request_id) + .expect("Request to still be pending.") + .send(Ok(response.0)); } - e => panic!("{:?}", e), }, - command = command_receiver.next() => match command { - Some(Command::StartListening { addr, sender }) => { - let _ = match swarm.listen_on(addr) { - Ok(_) => sender.send(Ok(())), - Err(e) => sender.send(Err(Box::new(e))), - }; - } - Some(Command::Dial { - peer_id, - peer_addr, - sender, - }) => { - if pending_dial.contains_key(&peer_id) { - todo!("Already dialing peer."); - } else { - swarm - .behaviour_mut() - .kademlia - .add_address(&peer_id, peer_addr.clone()); - match swarm.dial_addr(peer_addr.with(Protocol::P2p(peer_id.into()))) { - Ok(()) => { pending_dial.insert(peer_id, sender); }, - Err(e) => { let _ = sender.send(Err(Box::new(e))); }, - } + SwarmEvent::Behaviour(ComposedEvent::RequestResponse( + RequestResponseEvent::OutboundFailure { + request_id, error, .. + }, + )) => { + let _ = self + .pending_request_file + .remove(&request_id) + .expect("Request to still be pending.") + .send(Err(Box::new(error))); + } + SwarmEvent::Behaviour(ComposedEvent::RequestResponse( + RequestResponseEvent::ResponseSent { .. }, + )) => {} + SwarmEvent::NewListenAddr { address, .. } => { + let local_peer_id = *self.swarm.local_peer_id(); + println!( + "Local node is listening on {:?}", + address.with(Protocol::P2p(local_peer_id.into())) + ); + } + SwarmEvent::IncomingConnection { .. } => {} + SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + } => { + if endpoint.is_dialer() { + if let Some(sender) = self.pending_dial.remove(&peer_id) { + let _ = sender.send(Ok(())); } } - Some(Command::StartProviding { file_name, sender }) => { - let query_id = swarm - .behaviour_mut() - .kademlia - .start_providing(file_name.into_bytes().into()) - .expect("No store error."); - pending_start_providing.insert(query_id, sender); + } + SwarmEvent::ConnectionClosed { .. } => {} + SwarmEvent::UnreachableAddr { + peer_id, + attempts_remaining, + error, + .. + } => { + if attempts_remaining == 0 { + if let Some(sender) = self.pending_dial.remove(&peer_id) { + let _ = sender.send(Err(Box::new(error))); + } } - Some(Command::GetProviders { file_name, sender }) => { - let query_id = swarm + } + e => panic!("{:?}", e), + } + } + + async fn handle_command(&mut self, command: Command) { + match command { + Command::StartListening { addr, sender } => { + let _ = match self.swarm.listen_on(addr) { + Ok(_) => sender.send(Ok(())), + Err(e) => sender.send(Err(Box::new(e))), + }; + } + Command::Dial { + peer_id, + peer_addr, + sender, + } => { + if self.pending_dial.contains_key(&peer_id) { + todo!("Already dialing peer."); + } else { + self.swarm .behaviour_mut() .kademlia - .get_providers(file_name.into_bytes().into()); - pending_get_providers.insert(query_id, sender); - } - Some(Command::RequestFile { - file_name, - peer, - sender, - }) => { - let request_id = swarm - .behaviour_mut() - .request_response - .send_request(&peer, FileRequest(file_name)); - pending_request_file.insert(request_id, sender); - } - Some(Command::RespondFile { file, channel }) => { - swarm - .behaviour_mut() - .request_response - .send_response(channel, FileResponse(file)) - .expect("Connection to peer to be still open."); - } - None => { - // Command channel closed, thus shutting down the network. - return (); + .add_address(&peer_id, peer_addr.clone()); + match self + .swarm + .dial_addr(peer_addr.with(Protocol::P2p(peer_id.into()))) + { + Ok(()) => { + self.pending_dial.insert(peer_id, sender); + } + Err(e) => { + let _ = sender.send(Err(Box::new(e))); + } + } } - }, + } + Command::StartProviding { file_name, sender } => { + let query_id = self + .swarm + .behaviour_mut() + .kademlia + .start_providing(file_name.into_bytes().into()) + .expect("No store error."); + self.pending_start_providing.insert(query_id, sender); + } + Command::GetProviders { file_name, sender } => { + let query_id = self + .swarm + .behaviour_mut() + .kademlia + .get_providers(file_name.into_bytes().into()); + self.pending_get_providers.insert(query_id, sender); + } + Command::RequestFile { + file_name, + peer, + sender, + } => { + let request_id = self + .swarm + .behaviour_mut() + .request_response + .send_request(&peer, FileRequest(file_name)); + self.pending_request_file.insert(request_id, sender); + } + Command::RespondFile { file, channel } => { + self.swarm + .behaviour_mut() + .request_response + .send_response(channel, FileResponse(file)) + .expect("Connection to peer to be still open."); + } } } }