diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index 66c25205246..a082f3ef113 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -44,7 +44,7 @@ use libp2p::{ mdns::{Mdns, MdnsEvent}, mplex, noise, - swarm::{dial_opts::DialOpts, NetworkBehaviourEventProcess, SwarmBuilder, SwarmEvent}, + swarm::{SwarmBuilder, SwarmEvent}, // `TokioTcpTransport` is available through the `tcp-tokio` feature. tcp::TokioTcpTransport, Multiaddr, @@ -82,47 +82,29 @@ async fn main() -> Result<(), Box> { // Create a Floodsub topic let floodsub_topic = floodsub::Topic::new("chat"); - // We create a custom network behaviour that combines floodsub and mDNS. - // The derive generates a delegating `NetworkBehaviour` impl which in turn - // requires the implementations of `NetworkBehaviourEventProcess` for - // the events of each behaviour. + // We create a custom behaviour that combines floodsub and mDNS. + // The derive generates a delegating `NetworkBehaviour` impl. #[derive(NetworkBehaviour)] - #[behaviour(event_process = true)] + #[behaviour(out_event = "MyBehaviourEvent")] struct MyBehaviour { floodsub: Floodsub, mdns: Mdns, } - impl NetworkBehaviourEventProcess for MyBehaviour { - // Called when `floodsub` produces an event. - fn inject_event(&mut self, message: FloodsubEvent) { - if let FloodsubEvent::Message(message) = message { - println!( - "Received: '{:?}' from {:?}", - String::from_utf8_lossy(&message.data), - message.source - ); - } + enum MyBehaviourEvent { + Floodsub(FloodsubEvent), + Mdns(MdnsEvent), + } + + impl From for MyBehaviourEvent { + fn from(event: FloodsubEvent) -> Self { + MyBehaviourEvent::Floodsub(event) } } - impl NetworkBehaviourEventProcess for MyBehaviour { - // Called when `mdns` produces an event. - fn inject_event(&mut self, event: MdnsEvent) { - match event { - MdnsEvent::Discovered(list) => { - for (peer, _) in list { - self.floodsub.add_node_to_partial_view(peer); - } - } - MdnsEvent::Expired(list) => { - for (peer, _) in list { - if !self.mdns.has_node(&peer) { - self.floodsub.remove_node_from_partial_view(&peer); - } - } - } - } + impl From for MyBehaviourEvent { + fn from(event: MdnsEvent) -> Self { + MyBehaviourEvent::Mdns(event) } } @@ -166,8 +148,36 @@ async fn main() -> Result<(), Box> { swarm.behaviour_mut().floodsub.publish(floodsub_topic.clone(), line.as_bytes()); } event = swarm.select_next_some() => { - if let SwarmEvent::NewListenAddr { address, .. } = event { - println!("Listening on {:?}", address); + match event { + SwarmEvent::NewListenAddr { address, .. } => { + println!("Listening on {:?}", address); + } + SwarmEvent::Behaviour(MyBehaviourEvent::Floodsub(event)) => { + if let FloodsubEvent::Message(message) = event { + println!( + "Received: '{:?}' from {:?}", + String::from_utf8_lossy(&message.data), + message.source + ); + } + } + SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(event)) => { + match event { + MdnsEvent::Discovered(list) => { + for (peer, _) in list { + swarm.behaviour_mut().floodsub.add_node_to_partial_view(peer); + } + } + MdnsEvent::Expired(list) => { + for (peer, _) in list { + if !swarm.behaviour().mdns.has_node(&peer) { + swarm.behaviour_mut().floodsub.remove_node_from_partial_view(&peer); + } + } + } + } + } + _ => {} } } } diff --git a/examples/chat.rs b/examples/chat.rs index d03c0a6f3e5..b9569142a41 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -79,9 +79,7 @@ async fn main() -> Result<(), Box> { let floodsub_topic = floodsub::Topic::new("chat"); // We create a custom network behaviour that combines floodsub and mDNS. - // In the future, we want to improve libp2p to make this easier to do. - // Use the derive to generate delegating NetworkBehaviour impl and require the - // NetworkBehaviourEventProcess implementations below. + // Use the derive to generate delegating NetworkBehaviour impl. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent")] struct MyBehaviour { diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index 6bf28bf0ff9..7fef717cf78 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -50,7 +50,7 @@ use libp2p::kad::{ use libp2p::{ development_transport, identity, mdns::{Mdns, MdnsConfig, MdnsEvent}, - swarm::{NetworkBehaviourEventProcess, SwarmEvent}, + swarm::SwarmEvent, NetworkBehaviour, PeerId, Swarm, }; use std::error::Error; @@ -68,28 +68,60 @@ async fn main() -> Result<(), Box> { // We create a custom network behaviour that combines Kademlia and mDNS. #[derive(NetworkBehaviour)] - #[behaviour(event_process = true)] + #[behaviour(out_event = "MyBehaviourEvent")] struct MyBehaviour { kademlia: Kademlia, mdns: Mdns, } - impl NetworkBehaviourEventProcess for MyBehaviour { - // Called when `mdns` produces an event. - fn inject_event(&mut self, event: MdnsEvent) { - if let MdnsEvent::Discovered(list) = event { - for (peer_id, multiaddr) in list { - self.kademlia.add_address(&peer_id, multiaddr); - } - } + enum MyBehaviourEvent { + Kademlia(KademliaEvent), + Mdns(MdnsEvent), + } + + impl From for MyBehaviourEvent { + fn from(event: KademliaEvent) -> Self { + MyBehaviourEvent::Kademlia(event) + } + } + + impl From for MyBehaviourEvent { + fn from(event: MdnsEvent) -> Self { + MyBehaviourEvent::Mdns(event) } } - impl NetworkBehaviourEventProcess for MyBehaviour { - // Called when `kademlia` produces an event. - fn inject_event(&mut self, message: KademliaEvent) { - match message { - KademliaEvent::OutboundQueryCompleted { result, .. } => match result { + // Create a swarm to manage peers and events. + let mut swarm = { + // Create a Kademlia behaviour. + let store = MemoryStore::new(local_peer_id); + let kademlia = Kademlia::new(local_peer_id, store); + let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?; + let behaviour = MyBehaviour { kademlia, mdns }; + Swarm::new(transport, behaviour, local_peer_id) + }; + + // Read full lines from stdin + let mut stdin = io::BufReader::new(io::stdin()).lines().fuse(); + + // Listen on all interfaces and whatever port the OS assigns. + swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; + + // Kick it off. + loop { + select! { + line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")), + event = swarm.select_next_some() => match event { + SwarmEvent::NewListenAddr { address, .. } => { + println!("Listening in {:?}", address); + }, + SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => { + for (peer_id, multiaddr) in list { + swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr); + } + } + SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => { + match result { QueryResult::GetProviders(Ok(ok)) => { for peer in ok.providers { println!( @@ -137,38 +169,10 @@ async fn main() -> Result<(), Box> { eprintln!("Failed to put provider record: {:?}", err); } _ => {} - }, - _ => {} + } } + _ => {} } - } - - // Create a swarm to manage peers and events. - let mut swarm = { - // Create a Kademlia behaviour. - let store = MemoryStore::new(local_peer_id); - let kademlia = Kademlia::new(local_peer_id, store); - let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?; - let behaviour = MyBehaviour { kademlia, mdns }; - Swarm::new(transport, behaviour, local_peer_id) - }; - - // Read full lines from stdin - let mut stdin = io::BufReader::new(io::stdin()).lines().fuse(); - - // Listen on all interfaces and whatever port the OS assigns. - swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; - - // Kick it off. - loop { - select! { - line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")), - event = swarm.select_next_some() => match event { - SwarmEvent::NewListenAddr { address, .. } => { - println!("Listening in {:?}", address); - }, - _ => {} - } } } } diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index 113bdf988f2..00b529bf6f2 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -41,9 +41,10 @@ use libp2p::{ identify::{Identify, IdentifyConfig, IdentifyEvent}, identity, multiaddr::Protocol, - noise, ping, + noise, + ping::{self, PingEvent}, pnet::{PnetConfig, PreSharedKey}, - swarm::{NetworkBehaviourEventProcess, SwarmEvent}, + swarm::SwarmEvent, tcp::TcpTransport, yamux::YamuxConfig, Multiaddr, NetworkBehaviour, PeerId, Swarm, Transport, @@ -157,78 +158,34 @@ async fn main() -> Result<(), Box> { // We create a custom network behaviour that combines gossipsub, ping and identify. #[derive(NetworkBehaviour)] - #[behaviour(event_process = true)] + #[behaviour(out_event = "MyBehaviourEvent")] struct MyBehaviour { gossipsub: Gossipsub, identify: Identify, ping: ping::Behaviour, } - impl NetworkBehaviourEventProcess for MyBehaviour { - // Called when `identify` produces an event. - fn inject_event(&mut self, event: IdentifyEvent) { - println!("identify: {:?}", event); + enum MyBehaviourEvent { + Gossipsub(GossipsubEvent), + Identify(IdentifyEvent), + Ping(PingEvent), + } + + impl From for MyBehaviourEvent { + fn from(event: GossipsubEvent) -> Self { + MyBehaviourEvent::Gossipsub(event) } } - impl NetworkBehaviourEventProcess for MyBehaviour { - // Called when `gossipsub` produces an event. - fn inject_event(&mut self, event: GossipsubEvent) { - match event { - GossipsubEvent::Message { - propagation_source: peer_id, - message_id: id, - message, - } => println!( - "Got message: {} with id: {} from peer: {:?}", - String::from_utf8_lossy(&message.data), - id, - peer_id - ), - _ => {} - } + impl From for MyBehaviourEvent { + fn from(event: IdentifyEvent) -> Self { + MyBehaviourEvent::Identify(event) } } - impl NetworkBehaviourEventProcess for MyBehaviour { - // Called when `ping` produces an event. - fn inject_event(&mut self, event: ping::Event) { - match event { - ping::Event { - peer, - result: Result::Ok(ping::Success::Ping { rtt }), - } => { - println!( - "ping: rtt to {} is {} ms", - peer.to_base58(), - rtt.as_millis() - ); - } - ping::Event { - peer, - result: Result::Ok(ping::Success::Pong), - } => { - println!("ping: pong from {}", peer.to_base58()); - } - ping::Event { - peer, - result: Result::Err(ping::Failure::Timeout), - } => { - println!("ping: timeout to {}", peer.to_base58()); - } - ping::Event { - peer, - result: Result::Err(ping::Failure::Unsupported), - } => { - println!("ping: {} does not support ping protocol", peer.to_base58()); - } - ping::Event { - peer, - result: Result::Err(ping::Failure::Other { error }), - } => { - println!("ping: ping::Failure with {}: {}", peer.to_base58(), error); - } - } + impl From for MyBehaviourEvent { + fn from(event: PingEvent) -> Self { + MyBehaviourEvent::Ping(event) } } @@ -282,8 +239,64 @@ async fn main() -> Result<(), Box> { } }, event = swarm.select_next_some() => { - if let SwarmEvent::NewListenAddr { address, .. } = event { - println!("Listening on {:?}", address); + match event { + SwarmEvent::NewListenAddr { address, .. } => { + println!("Listening on {:?}", address); + } + SwarmEvent::Behaviour(MyBehaviourEvent::Identify(event)) => { + println!("identify: {:?}", event); + } + SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(GossipsubEvent::Message { + propagation_source: peer_id, + message_id: id, + message, + })) => { + println!( + "Got message: {} with id: {} from peer: {:?}", + String::from_utf8_lossy(&message.data), + id, + peer_id + ) + } + SwarmEvent::Behaviour(MyBehaviourEvent::Ping(event)) => { + match event { + ping::Event { + peer, + result: Result::Ok(ping::Success::Ping { rtt }), + } => { + println!( + "ping: rtt to {} is {} ms", + peer.to_base58(), + rtt.as_millis() + ); + } + ping::Event { + peer, + result: Result::Ok(ping::Success::Pong), + } => { + println!("ping: pong from {}", peer.to_base58()); + } + ping::Event { + peer, + result: Result::Err(ping::Failure::Timeout), + } => { + println!("ping: timeout to {}", peer.to_base58()); + } + ping::Event { + peer, + result: Result::Err(ping::Failure::Unsupported), + } => { + println!("ping: {} does not support ping protocol", peer.to_base58()); + } + ping::Event { + peer, + result: Result::Err(ping::Failure::Other { error }), + } => { + println!("ping: ping::Failure with {}: {}", peer.to_base58(), error); + } + } + } + _ => {} } } }