diff --git a/CHANGELOG.md b/CHANGELOG.md index a9b03c66..cd1b79f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # Changelog +## Release 0.14.0 +- Asynchronous connections: `NetworkController::connect()` behaviour modified. +Now it performs a non-blocking connection. Previous behaviour with `connect_sync` version. +- Reduced slightly the websocket latency. +- Adapter API modified to handle easily handshakes. +- Fixed websocket issue that could offer an accepted connection that was yet not valid. +- Added `NetworkController::is_ready()` +- Added `SendStatus::ResourceNotAvailable` +- Added `borrow()` method for `StoredNetEvent` to transform in into `NetEvent`. +- Added `is_local()` and `is_remote()` helpers to `ResourceId`. + ## Release 0.13.3 - Fixed a bad internal assert. diff --git a/Cargo.lock b/Cargo.lock index cf3ca9a4..aaaf7488 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -482,7 +482,7 @@ dependencies = [ [[package]] name = "message-io" -version = "0.13.3" +version = "0.14.0" dependencies = [ "bincode", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 30c380cc..a8fe5d49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "message-io" -version = "0.13.3" +version = "0.14.0" authors = ["lemunozm "] edition = "2018" readme = "README.md" diff --git a/README.md b/README.md index 9b6b0875..c807df78 100644 --- a/README.md +++ b/README.md @@ -80,21 +80,24 @@ You could change the transport of your application in literally one line. Add to your `Cargo.toml` (all transports included by default): ```toml [dependencies] -message-io = "0.13" +message-io = "0.14" ``` If you **only** want to use a subset of the available transport battery, you can select them by their associated features `tcp`, `udp`, and `websocket`. For example, in order to include only *TCP* and *UDP*, add to your `Cargo.toml`: ```toml [dependencies] -message-io = { version = "0.13", default-features = false, features = ["tcp", "udp"] } +message-io = { version = "0.14", default-features = false, features = ["tcp", "udp"] } ``` -**Warning**: Version **0.12** comes with important API changes ([changelog](CHANGELOG.md)) -in order to reach [zero-copy write/read](https://github.com/lemunozm/message-io/issues/61) goal. -If you find problems porting your application to this version, -check the examples folder, API docs, and don't hesitate to open an issue. - +_**Read before update to 0.14**: Version **0.14** modifies the [`connect()`](https://docs.rs/message-io/latest/message_io/network/struct.NetworkController.html#method.connect) behaviour to perform a +[**non**-blocking connections](https://github.com/lemunozm/message-io/issues/61) instead. +It is recommended to use this non-blocking mode in order to get the +best scalability and performance in your application. If you need to perform +a similar blocking connection as before (version 0.13), you can call to [`connect_sync()`](https://docs.rs/message-io/latest/message_io/network/struct.NetworkController.html#method.connect_sync). +Note also that the previous `NetEvent::Connect` has been renamed to `NetEvent::Accepted`. +The current `NetEvent::Connect` is a new event to deal with the new non-blocking connections. +See [`NetEvent`](https://docs.rs/message-io/latest/message_io/network/enum.NetEvent.html) docs for more info._ ### All in one: TCP, UDP and WebSocket echo server The following example is the simplest server that reads messages from the clients and responds @@ -118,7 +121,8 @@ fn main() { // Read incoming network events. listener.for_each(move |event| match event.network() { - NetEvent::Connected(_endpoint, _) => println!("Client connected"), // Tcp or Ws + NetEvent::Connected(_, _) => unreachable!(), // Used for explicit connections. + NetEvent::Accepted(_endpoint, _listener) => println!("Client connected"), // Tcp or Ws NetEvent::Message(endpoint, data) => { println!("Received: {}", String::from_utf8_lossy(data)); handler.network().send(endpoint, data); @@ -149,8 +153,6 @@ fn main() { // You can change the transport to Udp or Ws (WebSocket). let (server, _) = handler.network().connect(Transport::FramedTcp, "127.0.0.1:3042").unwrap(); - handler.signals().send(Signal::Greet); // Start sending - listener.for_each(move |event| match event { NodeEvent::Signal(signal) => match signal { Signal::Greet => { // computed every second @@ -159,10 +161,12 @@ fn main() { } } NodeEvent::Network(net_event) => match net_event { + NetEvent::Connected(_endpoint, _ok) => handler.signals().send(Signal::Greet), + NetEvent::Accepted(_, _) => unreachable!(), // Only generated by listening NetEvent::Message(_endpoint, data) => { println!("Received: {}", String::from_utf8_lossy(data)); }, - _ => unreachable!(), // Connected and Disconnected are only generated by listening + NetEvent::Disconnected(_endpoint) => (), } }); } @@ -199,8 +203,8 @@ If a transport protocol can be built in top of [`mio`](https://github.com/tokio- 1. Add your *adapter* file in `src/adapters/.rs` that implements the traits that you find [here](https://docs.rs/message-io/latest/message_io/network/adapter/index.html). - It contains only 7 mandatory functions to implement (see the [template](src/adapters/template.rs)), - and it takes little more than 150 lines to implement an adapter. + It contains only 8 mandatory functions to implement (see the [template](src/adapters/template.rs)), + and it takes arround 150 lines to implement an adapter. 1. Add a new field in the `Transport` enum found in [src/network/transport.rs](src/network/transport.rs) to register your new adapter. diff --git a/docs/performance_benchmarks.md b/docs/performance_benchmarks.md index e827286f..5da83807 100644 --- a/docs/performance_benchmarks.md +++ b/docs/performance_benchmarks.md @@ -40,7 +40,7 @@ The following results are measured for the transmision of 1GB of data by localho | Transport | native | message-io | efficiency | |:----------:|:--------:|:----------:|:----------:| | UDP | 7.1 GB/s | 5.9 GB/s | ~83% | -| TCP | 6.4 GB/s | 5.4 GB/s | ~84% | +| TCP | 6.4 GB/s | 5.2 GB/s | ~81% | | Framed TCP | 5.5 GB/s | 5.0 GB/s | ~91% | | Web Socket | 590 MB/s | 560 MB/s | ~95% | @@ -71,9 +71,9 @@ The following results are measured by transferring 1-byte by localhost: | UDP | 1.2 us | 2.1 us | + ~0.9 us | | TCP | 2.6 us | 3.5 us | + ~0.9 us | | Framed TCP | 5.2 us | 6.6 us | + ~1.4 us | -| Web Socket | 9.1 us | 11.2 us | + ~2.1 us | +| Web Socket | 9.1 us | 10.1 us | + ~1.0 us | -Depending on the transport used, `message-io` adds around `1-2us` of overhead per chunk of data transsmision. +Depending on the transport used, `message-io` adds around `1us` of overhead per chunk of data transsmision. Because it is zero-copy at reading/writing messages, this overhead is constant and independently of the size of that chunk of data. The library only copies the pointer to the data. diff --git a/examples/distributed/discovery_server.rs b/examples/distributed/discovery_server.rs index 243383e9..b9042baa 100644 --- a/examples/distributed/discovery_server.rs +++ b/examples/distributed/discovery_server.rs @@ -5,6 +5,7 @@ use message_io::node::{self, NodeHandler, NodeListener}; use std::net::{SocketAddr}; use std::collections::{HashMap}; +use std::io::{self}; struct ParticipantInfo { addr: SocketAddr, @@ -13,34 +14,31 @@ struct ParticipantInfo { pub struct DiscoveryServer { handler: NodeHandler<()>, - listener: Option>, + node_listener: Option>, participants: HashMap, } impl DiscoveryServer { - pub fn new() -> Option { - let (handler, listener) = node::split::<()>(); + pub fn new() -> io::Result { + let (handler, node_listener) = node::split::<()>(); let listen_addr = "127.0.0.1:5000"; - match handler.network().listen(Transport::FramedTcp, listen_addr) { - Ok(_) => { - println!("Discovery server running at {}", listen_addr); - Some(DiscoveryServer { - handler, - listener: Some(listener), - participants: HashMap::new(), - }) - } - Err(_) => { - println!("Can not listen on {}", listen_addr); - None - } - } + handler.network().listen(Transport::FramedTcp, listen_addr)?; + + println!("Discovery server running at {}", listen_addr); + + Ok(DiscoveryServer { + handler, + node_listener: Some(node_listener), + participants: HashMap::new(), + }) } pub fn run(mut self) { - let listener = self.listener.take().unwrap(); - listener.for_each(move |event| match event.network() { + let node_listener = self.node_listener.take().unwrap(); + node_listener.for_each(move |event| match event.network() { + NetEvent::Connected(_, _) => unreachable!(), // There is no connect() calls. + NetEvent::Accepted(_, _) => (), // All endpoint accepted NetEvent::Message(endpoint, input_data) => { let message: Message = bincode::deserialize(&input_data).unwrap(); match message { @@ -53,19 +51,15 @@ impl DiscoveryServer { _ => unreachable!(), } } - NetEvent::Connected(_, _) => (), NetEvent::Disconnected(endpoint) => { // Participant disconection without explict unregistration. // We must remove from the registry too. - let participant_name = self.participants.iter().find_map(|(name, info)| { - match info.endpoint == endpoint { - true => Some(name.clone()), - false => None, - } - }); + let participant = + self.participants.iter().find(|(_, info)| info.endpoint == endpoint); - if let Some(name) = participant_name { - self.unregister(&name) + if let Some(participant) = participant { + let name = participant.0.to_string(); + self.unregister(&name); } } }); diff --git a/examples/distributed/main.rs b/examples/distributed/main.rs index 2e54b725..74e225a6 100644 --- a/examples/distributed/main.rs +++ b/examples/distributed/main.rs @@ -7,13 +7,13 @@ pub fn main() { match args.get(1).unwrap_or(&String::new()).as_ref() { "discovery-server" => match discovery_server::DiscoveryServer::new() { - Some(discovery_server) => discovery_server.run(), - None => println!("Can not run the discovery server"), + Ok(discovery_server) => discovery_server.run(), + Err(err) => println!("Can not run the discovery server: {}", err), }, "participant" => match args.get(2) { Some(name) => match participant::Participant::new(name) { - Some(participant) => participant.run(), - None => println!("Can not run the participant"), + Ok(participant) => participant.run(), + Err(err) => println!("Can not run the participant: {}", err), }, None => println!("The participant needs a 'name'"), }, diff --git a/examples/distributed/participant.rs b/examples/distributed/participant.rs index 76b445b0..02645e57 100644 --- a/examples/distributed/participant.rs +++ b/examples/distributed/participant.rs @@ -5,67 +5,78 @@ use message_io::node::{self, NodeHandler, NodeListener}; use std::net::{SocketAddr}; use std::collections::{HashMap}; +use std::io::{self}; pub struct Participant { handler: NodeHandler<()>, - listener: Option>, + node_listener: Option>, name: String, discovery_endpoint: Endpoint, public_addr: SocketAddr, known_participants: HashMap, // Used only for free resources later + grettings: HashMap, } impl Participant { - pub fn new(name: &str) -> Option { - let (handler, listener) = node::split(); + pub fn new(name: &str) -> io::Result { + let (handler, node_listener) = node::split(); - // A listener for any other participant that want to establish connection. - // 'addr' contains the port that the OS gives for us when we put a 0. + // A node_listener for any other participant that want to establish connection. + // Returned 'listen_addr' contains the port that the OS gives for us when we put a 0. let listen_addr = "127.0.0.1:0"; - let listen_addr = match handler.network().listen(Transport::Udp, listen_addr) { - Ok((_, addr)) => addr, - Err(_) => { - println!("Can not listen on {}", listen_addr); - return None - } - }; + let (_, listen_addr) = handler.network().listen(Transport::FramedTcp, listen_addr)?; let discovery_addr = "127.0.0.1:5000"; // Connection to the discovery server. - match handler.network().connect(Transport::FramedTcp, discovery_addr) { - Ok((endpoint, _)) => Some(Participant { - handler, - listener: Some(listener), - name: name.to_string(), - discovery_endpoint: endpoint, - public_addr: listen_addr, - known_participants: HashMap::new(), - }), - Err(_) => { - println!("Can not connect to the discovery server at {}", discovery_addr); - return None - } - } + let (endpoint, _) = handler.network().connect(Transport::FramedTcp, discovery_addr)?; + + Ok(Participant { + handler, + node_listener: Some(node_listener), + name: name.to_string(), + discovery_endpoint: endpoint, + public_addr: listen_addr, + known_participants: HashMap::new(), + grettings: HashMap::new(), + }) } pub fn run(mut self) { // Register this participant into the discovery server - let message = Message::RegisterParticipant(self.name.clone(), self.public_addr); - let output_data = bincode::serialize(&message).unwrap(); - self.handler.network().send(self.discovery_endpoint, &output_data); - - let listener = self.listener.take().unwrap(); - listener.for_each(move |event| match event.network() { + let node_listener = self.node_listener.take().unwrap(); + node_listener.for_each(move |event| match event.network() { + NetEvent::Connected(endpoint, established) => { + if endpoint == self.discovery_endpoint { + if established { + let message = + Message::RegisterParticipant(self.name.clone(), self.public_addr); + let output_data = bincode::serialize(&message).unwrap(); + self.handler.network().send(self.discovery_endpoint, &output_data); + } + else { + println!("Can not connect to the discovery server"); + } + } + else { + // Participant endpoint + let (name, message) = self.grettings.remove(&endpoint).unwrap(); + if established { + let gretings = format!("Hi '{}', {}", name, message); + let message = Message::Gretings(self.name.clone(), gretings); + let output_data = bincode::serialize(&message).unwrap(); + self.handler.network().send(endpoint, &output_data); + self.known_participants.insert(name.clone(), endpoint); + } + } + } + NetEvent::Accepted(_, _) => (), NetEvent::Message(_, input_data) => { let message: Message = bincode::deserialize(&input_data).unwrap(); match message { Message::ParticipantList(participants) => { println!("Participant list received ({} participants)", participants.len()); for (name, addr) in participants { - self.discovered_participant( - &name, - addr, - "I see you in the participant list", - ); + let text = "I see you in the participant list"; + self.discovered_participant(&name, addr, text); } } Message::ParticipantNotificationAdded(name, addr) => { @@ -74,12 +85,6 @@ impl Participant { } Message::ParticipantNotificationRemoved(name) => { println!("Removed participant '{}' from the network", name); - - // Free network resource. - // It is only necessary because the connections among participants - // are done by UDP, - // UDP is not connection-oriented protocol, and the - // Connected/Disconnected events are not generated by UDP. if let Some(endpoint) = self.known_participants.remove(&name) { self.handler.network().remove(endpoint.resource_id()); } @@ -90,7 +95,6 @@ impl Participant { _ => unreachable!(), } } - NetEvent::Connected(_, _) => (), NetEvent::Disconnected(endpoint) => { if endpoint == self.discovery_endpoint { println!("Discovery server disconnected, closing"); @@ -100,13 +104,9 @@ impl Participant { }); } - fn discovered_participant(&mut self, name: &str, addr: SocketAddr, message: &str) { - if let Ok((endpoint, _)) = self.handler.network().connect(Transport::Udp, addr) { - let gretings = format!("Hi '{}', {}", name, message); - let message = Message::Gretings(self.name.clone(), gretings); - let output_data = bincode::serialize(&message).unwrap(); - self.handler.network().send(endpoint, &output_data); - self.known_participants.insert(name.to_string(), endpoint); - } + fn discovered_participant(&mut self, name: &str, addr: SocketAddr, text: &str) { + let (endpoint, _) = self.handler.network().connect(Transport::FramedTcp, addr).unwrap(); + // Save the necessary info to send the message when the connection is established. + self.grettings.insert(endpoint, (name.into(), text.into())); } } diff --git a/examples/file-transfer/receiver.rs b/examples/file-transfer/receiver.rs index 2c1506fb..1440e6a7 100644 --- a/examples/file-transfer/receiver.rs +++ b/examples/file-transfer/receiver.rs @@ -18,14 +18,14 @@ pub fn run() { let (handler, listener) = node::split::<()>(); let listen_addr = "127.0.0.1:3005"; - match handler.network().listen(Transport::FramedTcp, listen_addr) { - Ok(_) => println!("Receiver running by TCP at {}", listen_addr), - Err(_) => return println!("Can not listening by TCP at {}", listen_addr), - } + handler.network().listen(Transport::FramedTcp, listen_addr).unwrap(); + println!("Receiver running by TCP at {}", listen_addr); let mut transfers: HashMap = HashMap::new(); listener.for_each(move |event| match event.network() { + NetEvent::Connected(_, _) => unreachable!(), + NetEvent::Accepted(_, _) => (), NetEvent::Message(endpoint, input_data) => { let message: SenderMsg = bincode::deserialize(&input_data).unwrap(); match message { @@ -64,7 +64,6 @@ pub fn run() { } } } - NetEvent::Connected(_, _) => {} NetEvent::Disconnected(endpoint) => { // Unexpected sender disconnection. Cleaninig. if transfers.contains_key(&endpoint) { diff --git a/examples/file-transfer/sender.rs b/examples/file-transfer/sender.rs index ee59fce5..86715c2e 100644 --- a/examples/file-transfer/sender.rs +++ b/examples/file-transfer/sender.rs @@ -18,25 +18,27 @@ pub fn run(file_path: String) { let (handler, listener) = node::split(); let server_addr = "127.0.0.1:3005"; - let (server_id, _) = match handler.network().connect(Transport::FramedTcp, server_addr) { - Ok(server_id) => { - println!("Sender connected by TCP at {}", server_addr); - server_id - } - Err(_) => return println!("Can not connect to the receiver by TCP to {}", server_addr), - }; + let (server_id, _) = handler.network().connect(Transport::FramedTcp, server_addr).unwrap(); let file_size = fs::metadata(&file_path).unwrap().len() as usize; let mut file = File::open(&file_path).unwrap(); let file_name: String = file_path.rsplit('/').into_iter().next().unwrap_or(&file_path).into(); - let request = SenderMsg::FileRequest(file_name.clone(), file_size); - let output_data = bincode::serialize(&request).unwrap(); - handler.network().send(server_id, &output_data); - let mut file_bytes_sent = 0; listener.for_each(move |event| match event { NodeEvent::Network(net_event) => match net_event { + NetEvent::Connected(_, established) => { + if established { + println!("Sender connected by TCP at {}", server_addr); + let request = SenderMsg::FileRequest(file_name.clone(), file_size); + let output_data = bincode::serialize(&request).unwrap(); + handler.network().send(server_id, &output_data); + } + else { + println!("Can not connect to the receiver by TCP to {}", server_addr) + } + } + NetEvent::Accepted(_, _) => unreachable!(), NetEvent::Message(_, input_data) => { let message: ReceiverMsg = bincode::deserialize(&input_data).unwrap(); match message { @@ -49,7 +51,6 @@ pub fn run(file_path: String) { }, } } - NetEvent::Connected(_, _) => unreachable!(), NetEvent::Disconnected(_) => { handler.stop(); println!("\nReceiver disconnected"); diff --git a/examples/multicast/main.rs b/examples/multicast/main.rs index ab4cf6c7..df162808 100644 --- a/examples/multicast/main.rs +++ b/examples/multicast/main.rs @@ -10,24 +10,22 @@ fn main() { let (handler, listener) = node::split::<()>(); - let addr = "239.255.0.1:3010"; - match handler.network().connect(Transport::Udp, addr) { - Ok((endpoint, _)) => { + let multicast_addr = "239.255.0.1:3010"; + let (endpoint, _) = handler.network().connect(Transport::Udp, multicast_addr).unwrap(); + + listener.for_each(move |event| match event.network() { + NetEvent::Connected(_, _always_true_for_udp) => { println!("Notifying on the network"); handler.network().send(endpoint, my_name.as_bytes()); - } - Err(_) => return eprintln!("Could not connect to {}", addr), - } - // Since the addrers belongs to the multicast range (from 224.0.0.0 to 239.255.255.255) - // the internal resource will be configured to receive multicast messages. - handler.network().listen(Transport::Udp, addr).unwrap(); - - listener.for_each(move |event| match event.network() { + // Since the address belongs to the multicast range (from 224.0.0.0 to 239.255.255.255) + // the internal resource will be configured to receive multicast messages. + handler.network().listen(Transport::Udp, multicast_addr).unwrap(); + } + NetEvent::Accepted(_, _) => unreachable!(), // UDP is not connection-oriented NetEvent::Message(_, data) => { println!("{} greets to the network!", String::from_utf8_lossy(&data)); } - NetEvent::Connected(_, _) => (), NetEvent::Disconnected(_) => (), }); } diff --git a/examples/ping-pong/client.rs b/examples/ping-pong/client.rs index b2f9ae8f..0af9abf0 100644 --- a/examples/ping-pong/client.rs +++ b/examples/ping-pong/client.rs @@ -6,37 +6,29 @@ use message_io::node::{self, NodeEvent}; use std::time::{Duration}; enum Signal { - // This is a self event called every second. - Greet, - // Other signals here, + Greet, // This is a self event called every second. + // Other signals here, } pub fn run(transport: Transport, remote_addr: RemoteAddr) { let (handler, listener) = node::split(); - let server_id = match handler.network().connect(transport, remote_addr.clone()) { - Ok((server_id, local_addr)) => { - println!("Connected to server by {} at {}", transport, server_id.addr()); - println!("Client identified by local port: {}", local_addr.port()); - server_id - } - Err(_) => { - return println!("Can not connect to the server by {} to {}", transport, remote_addr) - } - }; - - handler.signals().send(Signal::Greet); + let (server_id, local_addr) = + handler.network().connect(transport, remote_addr.clone()).unwrap(); listener.for_each(move |event| match event { - NodeEvent::Signal(signal) => match signal { - Signal::Greet => { - let message = FromClientMessage::Ping; - let output_data = bincode::serialize(&message).unwrap(); - handler.network().send(server_id, &output_data); - handler.signals().send_with_timer(Signal::Greet, Duration::from_secs(1)); - } - }, NodeEvent::Network(net_event) => match net_event { + NetEvent::Connected(_, established) => { + if established { + println!("Connected to server at {} by {}", server_id.addr(), transport); + println!("Client identified by local port: {}", local_addr.port()); + handler.signals().send(Signal::Greet); + } + else { + println!("Can not connect to server at {} by {}", remote_addr, transport) + } + } + NetEvent::Accepted(_, _) => unreachable!(), // Only generated when a listener accepts NetEvent::Message(_, input_data) => { let message: FromServerMessage = bincode::deserialize(&input_data).unwrap(); match message { @@ -46,11 +38,18 @@ pub fn run(transport: Transport, remote_addr: RemoteAddr) { FromServerMessage::UnknownPong => println!("Pong from server"), } } - NetEvent::Connected(_, _) => unreachable!(), // Only generated when a listener accepts NetEvent::Disconnected(_) => { println!("Server is disconnected"); handler.stop(); } }, + NodeEvent::Signal(signal) => match signal { + Signal::Greet => { + let message = FromClientMessage::Ping; + let output_data = bincode::serialize(&message).unwrap(); + handler.network().send(server_id, &output_data); + handler.signals().send_with_timer(Signal::Greet, Duration::from_secs(1)); + } + }, }); } diff --git a/examples/ping-pong/server.rs b/examples/ping-pong/server.rs index 3537e037..e5f9e015 100644 --- a/examples/ping-pong/server.rs +++ b/examples/ping-pong/server.rs @@ -16,13 +16,17 @@ pub fn run(transport: Transport, addr: SocketAddr) { let mut clients: HashMap = HashMap::new(); match handler.network().listen(transport, addr) { - Ok((_resource_id, real_addr)) => { - println!("Server running at {} by {}", real_addr, transport) - } + Ok((_id, real_addr)) => println!("Server running at {} by {}", real_addr, transport), Err(_) => return println!("Can not listening at {} by {}", addr, transport), } listener.for_each(move |event| match event.network() { + NetEvent::Connected(_, _) => (), // Only generated at connect() calls. + NetEvent::Accepted(endpoint, _listener_id) => { + // Only connection oriented protocols will generate this event + clients.insert(endpoint, ClientInfo { count: 0 }); + println!("Client ({}) connected (total clients: {})", endpoint.addr(), clients.len()); + } NetEvent::Message(endpoint, input_data) => { let message: FromClientMessage = bincode::deserialize(&input_data).unwrap(); match message { @@ -45,11 +49,6 @@ pub fn run(transport: Transport, addr: SocketAddr) { } } } - NetEvent::Connected(endpoint, _) => { - // Only connection oriented protocols will generate this event - clients.insert(endpoint, ClientInfo { count: 0 }); - println!("Client ({}) connected (total clients: {})", endpoint.addr(), clients.len()); - } NetEvent::Disconnected(endpoint) => { // Only connection oriented protocols will generate this event clients.remove(&endpoint).unwrap(); diff --git a/examples/throughput/main.rs b/examples/throughput/main.rs index 971fd82e..5f4048fc 100644 --- a/examples/throughput/main.rs +++ b/examples/throughput/main.rs @@ -56,9 +56,8 @@ fn throughput_message_io(transport: Transport, packet_size: usize) { let handler = handler.clone(); listener.for_each_async(move |event| match event.network() { - NetEvent::Connected(_, _) => { - t_ready.send(()).unwrap(); - } + NetEvent::Connected(_, _) => (), + NetEvent::Accepted(_, _) => t_ready.send(()).unwrap(), NetEvent::Message(_, data) => { received_bytes += data.len(); if received_bytes >= EXPECTED_BYTES { @@ -74,8 +73,8 @@ fn throughput_message_io(transport: Transport, packet_size: usize) { r_ready.recv().unwrap(); } - // To improve accuracy, - // ensure that internal thread is initialized for not oriented connection protocols + // Ensure that the connection is performed, + // the internal thread is initialized for not oriented connection protocols // and we are waiting in the internal poll for data. std::thread::sleep(Duration::from_millis(100)); diff --git a/src/adapters/framed_tcp.rs b/src/adapters/framed_tcp.rs index 9218ea76..ea411c70 100644 --- a/src/adapters/framed_tcp.rs +++ b/src/adapters/framed_tcp.rs @@ -1,14 +1,14 @@ use crate::network::adapter::{ Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo, - ListeningInfo, + ListeningInfo, PendingStatus, }; -use crate::network::{RemoteAddr}; +use crate::network::{RemoteAddr, Readiness}; use crate::util::encoding::{self, Decoder, MAX_ENCODED_SIZE}; use mio::net::{TcpListener, TcpStream}; use mio::event::{Source}; -use std::net::{SocketAddr, TcpStream as StdTcpStream}; +use std::net::{SocketAddr}; use std::io::{self, ErrorKind, Read, Write}; use std::ops::{Deref}; use std::cell::{RefCell}; @@ -28,8 +28,8 @@ pub(crate) struct RemoteResource { } // SAFETY: -// That RefCell can be used with Sync because the decoder is only used in the read_event. -// This way, we save the cost of a Mutex. +// That RefCell can be used with Sync because the decoder is only used in the read_event, +// that will be called always from the same thread. This way, we save the cost of a Mutex. unsafe impl Sync for RemoteResource {} impl From for RemoteResource { @@ -47,10 +47,9 @@ impl Resource for RemoteResource { impl Remote for RemoteResource { fn connect(remote_addr: RemoteAddr) -> io::Result> { let peer_addr = *remote_addr.socket_addr(); - let stream = StdTcpStream::connect(peer_addr)?; + let stream = TcpStream::connect(peer_addr)?; let local_addr = stream.local_addr()?; - stream.set_nonblocking(true)?; - Ok(ConnectionInfo { remote: TcpStream::from_std(stream).into(), local_addr, peer_addr }) + Ok(ConnectionInfo { remote: stream.into(), local_addr, peer_addr }) } fn receive(&self, mut process_data: impl FnMut(&[u8])) -> ReadStatus { @@ -112,6 +111,10 @@ impl Remote for RemoteResource { } } } + + fn pending(&self, _readiness: Readiness) -> PendingStatus { + super::tcp::check_stream_ready(&self.stream) + } } pub(crate) struct LocalResource { diff --git a/src/adapters/tcp.rs b/src/adapters/tcp.rs index 0567dc6c..d77e22e6 100644 --- a/src/adapters/tcp.rs +++ b/src/adapters/tcp.rs @@ -1,13 +1,13 @@ use crate::network::adapter::{ Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo, - ListeningInfo, + ListeningInfo, PendingStatus, }; -use crate::network::{RemoteAddr}; +use crate::network::{RemoteAddr, Readiness}; use mio::net::{TcpListener, TcpStream}; use mio::event::{Source}; -use std::net::{SocketAddr, TcpStream as StdTcpStream}; +use std::net::{SocketAddr}; use std::io::{self, ErrorKind, Read, Write}; use std::ops::{Deref}; use std::mem::{MaybeUninit}; @@ -42,10 +42,9 @@ impl Resource for RemoteResource { impl Remote for RemoteResource { fn connect(remote_addr: RemoteAddr) -> io::Result> { let peer_addr = *remote_addr.socket_addr(); - let stream = StdTcpStream::connect(peer_addr)?; + let stream = TcpStream::connect(peer_addr)?; let local_addr = stream.local_addr()?; - stream.set_nonblocking(true)?; - Ok(ConnectionInfo { remote: TcpStream::from_std(stream).into(), local_addr, peer_addr }) + Ok(ConnectionInfo { remote: stream.into(), local_addr, peer_addr }) } fn receive(&self, mut process_data: impl FnMut(&[u8])) -> ReadStatus { @@ -98,6 +97,25 @@ impl Remote for RemoteResource { } } } + + fn pending(&self, _readiness: Readiness) -> PendingStatus { + check_stream_ready(&self.stream) + } +} + +/// Check if a TcpStream can be considered connected. +pub fn check_stream_ready(stream: &TcpStream) -> PendingStatus { + // A multiplatform non-blocking way to determine if the TCP stream is connected: + // Extracted from: https://github.com/tokio-rs/mio/issues/1486 + if let Ok(Some(_)) = stream.take_error() { + return PendingStatus::Disconnected + } + match stream.peer_addr() { + Ok(_) => PendingStatus::Ready, + Err(err) if err.kind() == io::ErrorKind::NotConnected => PendingStatus::Incomplete, + Err(err) if err.kind() == io::ErrorKind::InvalidInput => PendingStatus::Incomplete, + Err(_) => PendingStatus::Disconnected, + } } pub(crate) struct LocalResource { diff --git a/src/adapters/template.rs b/src/adapters/template.rs index 23696956..46815c4a 100644 --- a/src/adapters/template.rs +++ b/src/adapters/template.rs @@ -2,9 +2,9 @@ use crate::network::adapter::{ Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo, - ListeningInfo, + ListeningInfo, PendingStatus, }; -use crate::network::{RemoteAddr}; +use crate::network::{RemoteAddr, Readiness}; use mio::event::{Source}; @@ -20,28 +20,32 @@ impl Adapter for MyAdapter { pub(crate) struct RemoteResource; impl Resource for RemoteResource { fn source(&mut self) -> &mut dyn Source { - todo!(); + todo!() } } impl Remote for RemoteResource { fn connect(remote_addr: RemoteAddr) -> io::Result> { - todo!(); + todo!() } fn receive(&self, process_data: impl FnMut(&[u8])) -> ReadStatus { - todo!(); + todo!() } fn send(&self, data: &[u8]) -> SendStatus { - todo!(); + todo!() + } + + fn pending(&self, _readiness: Readiness) -> PendingStatus { + todo!() } } pub(crate) struct LocalResource; impl Resource for LocalResource { fn source(&mut self) -> &mut dyn Source { - todo!(); + todo!() } } @@ -49,10 +53,10 @@ impl Local for LocalResource { type Remote = RemoteResource; fn listen(addr: SocketAddr) -> io::Result> { - todo!(); + todo!() } fn accept(&self, accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) { - todo!(); + todo!() } } diff --git a/src/adapters/udp.rs b/src/adapters/udp.rs index 6ed06322..2618a61a 100644 --- a/src/adapters/udp.rs +++ b/src/adapters/udp.rs @@ -1,8 +1,8 @@ use crate::network::adapter::{ Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo, - ListeningInfo, + ListeningInfo, PendingStatus, }; -use crate::network::{RemoteAddr}; +use crate::network::{RemoteAddr, Readiness}; use mio::net::{UdpSocket}; use mio::event::{Source}; @@ -74,6 +74,10 @@ impl Remote for RemoteResource { fn send(&self, data: &[u8]) -> SendStatus { send_packet(data, |data| self.socket.send(data)) } + + fn pending(&self, _readiness: Readiness) -> PendingStatus { + PendingStatus::Ready + } } pub(crate) struct LocalResource { diff --git a/src/adapters/ws.rs b/src/adapters/ws.rs index 0506ec71..5296a587 100644 --- a/src/adapters/ws.rs +++ b/src/adapters/ws.rs @@ -1,8 +1,8 @@ use crate::network::adapter::{ Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo, - ListeningInfo, + ListeningInfo, PendingStatus, }; -use crate::network::{RemoteAddr}; +use crate::network::{RemoteAddr, Readiness}; use crate::util::thread::{OTHER_THREAD_ERR}; use mio::event::{Source}; @@ -14,13 +14,14 @@ use tungstenite::client::{client as ws_connect}; use tungstenite::handshake::{ HandshakeError, MidHandshake, server::{ServerHandshake, NoCallback}, + client::{ClientHandshake}, }; use tungstenite::error::{Error}; use url::Url; -use std::sync::{Mutex}; -use std::net::{SocketAddr, TcpStream as StdTcpStream}; +use std::sync::{Mutex, Arc}; +use std::net::{SocketAddr}; use std::io::{self, ErrorKind}; use std::ops::{DerefMut}; @@ -34,14 +35,17 @@ impl Adapter for WsAdapter { type Local = LocalResource; } -struct PendingHandshake { - mid_handshake: MidHandshake>, - pending_messages: Vec>, +enum PendingHandshake { + Connect(Url, ArcTcpStream), + Accept(ArcTcpStream), + Client(MidHandshake>), + Server(MidHandshake>), } enum RemoteState { - WebSocket(WebSocket), + WebSocket(WebSocket), Handshake(Option), + Error(ArcTcpStream), } pub(crate) struct RemoteResource { @@ -51,9 +55,21 @@ pub(crate) struct RemoteResource { impl Resource for RemoteResource { fn source(&mut self) -> &mut dyn Source { match self.state.get_mut().unwrap() { - RemoteState::WebSocket(web_socket) => web_socket.get_mut(), - RemoteState::Handshake(Some(handshake)) => handshake.mid_handshake.get_mut().get_mut(), + RemoteState::WebSocket(web_socket) => { + Arc::get_mut(&mut web_socket.get_mut().0).unwrap() + } + RemoteState::Handshake(Some(handshake)) => match handshake { + PendingHandshake::Connect(_, stream) => Arc::get_mut(&mut stream.0).unwrap(), + PendingHandshake::Accept(stream) => Arc::get_mut(&mut stream.0).unwrap(), + PendingHandshake::Client(handshake) => { + Arc::get_mut(&mut handshake.get_mut().get_mut().0).unwrap() + } + PendingHandshake::Server(handshake) => { + Arc::get_mut(&mut handshake.get_mut().get_mut().0).unwrap() + } + }, RemoteState::Handshake(None) => unreachable!(), + RemoteState::Error(stream) => Arc::get_mut(&mut stream.0).unwrap(), } } } @@ -77,32 +93,19 @@ impl Remote for RemoteResource { } }; - // Synchronous tcp handshake - let stream = StdTcpStream::connect(peer_addr)?; + let stream = TcpStream::connect(peer_addr)?; let local_addr = stream.local_addr()?; - // Make it an asynchronous mio TcpStream - stream.set_nonblocking(true)?; - let stream = TcpStream::from_std(stream); - - // Synchronous waiting for web socket handshake - let mut handshake_result = ws_connect(url, stream); - let remote = loop { - match handshake_result { - Ok((web_socket, _)) => { - break RemoteResource { state: Mutex::new(RemoteState::WebSocket(web_socket)) } - } - Err(HandshakeError::Interrupted(mid_handshake)) => { - handshake_result = mid_handshake.handshake(); - } - Err(HandshakeError::Failure(err)) => { - //CHECK: give to the user an io::Error? - panic!("WS connect handshake error: {}", err) - } - } - }; - - Ok(ConnectionInfo { remote, local_addr, peer_addr }) + Ok(ConnectionInfo { + remote: RemoteResource { + state: Mutex::new(RemoteState::Handshake(Some(PendingHandshake::Connect( + url, + stream.into(), + )))), + }, + local_addr, + peer_addr, + }) } fn receive(&self, mut process_data: impl FnMut(&[u8])) -> ReadStatus { @@ -120,7 +123,7 @@ impl Remote for RemoteResource { // Seems like windows consume the `WouldBlock` notification // at peek() when it happens, and the poll never wakes it again. #[cfg(not(target_os = "windows"))] - let _peek_result = web_socket.get_ref().peek(&mut [0; 0]); + let _peek_result = web_socket.get_ref().0.peek(&mut [0; 0]); // We can not call process_data while the socket is blocked. // The user could lock it again if sends from the callback. @@ -141,64 +144,160 @@ impl Remote for RemoteResource { break ReadStatus::Disconnected // should not happen } }, - RemoteState::Handshake(handshake) => { - let current_handshake = handshake.take().unwrap(); - match current_handshake.mid_handshake.handshake() { - Ok(mut web_socket) => { - for pending_data in current_handshake.pending_messages { - Self::send_by_socket(&mut web_socket, &pending_data); - } + RemoteState::Handshake(_) => unreachable!(), + RemoteState::Error(_) => unreachable!(), + } + } + } + + fn send(&self, data: &[u8]) -> SendStatus { + match self.state.lock().expect(OTHER_THREAD_ERR).deref_mut() { + RemoteState::WebSocket(web_socket) => { + let message = Message::Binary(data.to_vec()); + let mut result = web_socket.write_message(message); + loop { + match result { + Ok(_) => break SendStatus::Sent, + Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => { + result = web_socket.write_pending(); + } + Err(Error::Capacity(_)) => { + break SendStatus::MaxPacketSizeExceeded(data.len(), MAX_PAYLOAD_LEN) + } + Err(err) => { + log::error!("WS send error: {}", err); + break SendStatus::ResourceNotFound // should not happen + } + } + } + } + RemoteState::Handshake(_) => unreachable!(), + RemoteState::Error(_) => unreachable!(), + } + } + + fn pending(&self, _readiness: Readiness) -> PendingStatus { + let mut state = self.state.lock().expect(OTHER_THREAD_ERR); + match state.deref_mut() { + RemoteState::WebSocket(_) => PendingStatus::Ready, + RemoteState::Handshake(pending) => match pending.take().unwrap() { + PendingHandshake::Connect(url, stream) => { + let tcp_status = super::tcp::check_stream_ready(&stream.0); + if tcp_status != PendingStatus::Ready { + // TCP handshake not ready yet. + *pending = Some(PendingHandshake::Connect(url, stream)); + return tcp_status + } + let stream_backup = stream.clone(); + match ws_connect(url, stream) { + Ok((web_socket, _)) => { *state = RemoteState::WebSocket(web_socket); + PendingStatus::Ready } Err(HandshakeError::Interrupted(mid_handshake)) => { - *handshake = Some(PendingHandshake { - mid_handshake, - pending_messages: current_handshake.pending_messages, - }); - break ReadStatus::WaitNextEvent + *pending = Some(PendingHandshake::Client(mid_handshake)); + PendingStatus::Incomplete } - Err(HandshakeError::Failure(ref err)) => { + Err(HandshakeError::Failure(Error::Io(_))) => { + *state = RemoteState::Error(stream_backup); + PendingStatus::Disconnected + } + Err(HandshakeError::Failure(err)) => { + *state = RemoteState::Error(stream_backup); + log::error!("WS connect handshake error: {}", err); + PendingStatus::Disconnected // should not happen + } + } + } + PendingHandshake::Accept(stream) => { + let stream_backup = stream.clone(); + match ws_accept(stream) { + Ok(web_socket) => { + *state = RemoteState::WebSocket(web_socket); + PendingStatus::Ready + } + Err(HandshakeError::Interrupted(mid_handshake)) => { + *pending = Some(PendingHandshake::Server(mid_handshake)); + PendingStatus::Incomplete + } + Err(HandshakeError::Failure(Error::Io(_))) => { + *state = RemoteState::Error(stream_backup); + PendingStatus::Disconnected + } + Err(HandshakeError::Failure(err)) => { + *state = RemoteState::Error(stream_backup); log::error!("WS accept handshake error: {}", err); - break ReadStatus::Disconnected // should not happen + PendingStatus::Disconnected } } } - } + PendingHandshake::Client(mid_handshake) => { + let stream_backup = mid_handshake.get_ref().get_ref().clone(); + match mid_handshake.handshake() { + Ok((web_socket, _)) => { + *state = RemoteState::WebSocket(web_socket); + PendingStatus::Ready + } + Err(HandshakeError::Interrupted(mid_handshake)) => { + *pending = Some(PendingHandshake::Client(mid_handshake)); + PendingStatus::Incomplete + } + Err(HandshakeError::Failure(Error::Io(_))) => { + *state = RemoteState::Error(stream_backup); + PendingStatus::Disconnected + } + Err(HandshakeError::Failure(err)) => { + *state = RemoteState::Error(stream_backup); + log::error!("WS client handshake error: {}", err); + PendingStatus::Disconnected // should not happen + } + } + } + PendingHandshake::Server(mid_handshake) => { + let stream_backup = mid_handshake.get_ref().get_ref().clone(); + match mid_handshake.handshake() { + Ok(web_socket) => { + *state = RemoteState::WebSocket(web_socket); + PendingStatus::Ready + } + Err(HandshakeError::Interrupted(mid_handshake)) => { + *pending = Some(PendingHandshake::Server(mid_handshake)); + PendingStatus::Incomplete + } + Err(HandshakeError::Failure(Error::Io(_))) => { + *state = RemoteState::Error(stream_backup); + PendingStatus::Disconnected + } + Err(HandshakeError::Failure(err)) => { + *state = RemoteState::Error(stream_backup); + log::error!("WS server handshake error: {}", err); + PendingStatus::Disconnected // should not happen + } + } + } + }, + RemoteState::Error(_) => unreachable!(), } } - fn send(&self, data: &[u8]) -> SendStatus { + fn ready_to_write(&self) -> bool { + true + /* Is this needed? match self.state.lock().expect(OTHER_THREAD_ERR).deref_mut() { - RemoteState::WebSocket(web_socket) => Self::send_by_socket(web_socket, data), - RemoteState::Handshake(handshake) => { - handshake.as_mut().unwrap().pending_messages.push(data.to_vec()); - SendStatus::Sent //Future versions: SendStatus::Enqueued - } + RemoteState::WebSocket(web_socket) => match web_socket.write_pending() { + Ok(_) => true, + Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => true, + Err(_) => false, // Will be disconnected, + }, + // This function is only call on ready resources. + RemoteState::Handshake(_) => unreachable!(), + RemoteState::Error(_) => unreachable!(), } + */ } } impl RemoteResource { - fn send_by_socket(web_socket: &mut WebSocket, data: &[u8]) -> SendStatus { - let message = Message::Binary(data.to_vec()); - let mut result = web_socket.write_message(message); - loop { - match result { - Ok(_) => break SendStatus::Sent, - Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => { - result = web_socket.write_pending(); - } - Err(Error::Capacity(_)) => { - break SendStatus::MaxPacketSizeExceeded(data.len(), MAX_PAYLOAD_LEN) - } - Err(err) => { - log::error!("WS send error: {}", err); - break SendStatus::ResourceNotFound // should not happen - } - } - } - } - fn io_error_to_read_status(err: &io::Error) -> ReadStatus { if err.kind() == io::ErrorKind::WouldBlock { ReadStatus::WaitNextEvent @@ -236,24 +335,12 @@ impl Local for LocalResource { loop { match self.listener.accept() { Ok((stream, addr)) => { - let remote_state = match ws_accept(stream) { - Ok(web_socket) => Some(RemoteState::WebSocket(web_socket)), - Err(HandshakeError::Interrupted(mid_handshake)) => { - Some(RemoteState::Handshake(Some(PendingHandshake { - mid_handshake, - pending_messages: Vec::new(), - }))) - } - Err(HandshakeError::Failure(ref err)) => { - log::error!("WS accept handshake error: {}", err); - None - } + let remote = RemoteResource { + state: Mutex::new(RemoteState::Handshake(Some(PendingHandshake::Accept( + stream.into(), + )))), }; - - if let Some(remote_state) = remote_state { - let remote = RemoteResource { state: Mutex::new(remote_state) }; - accept_remote(AcceptedType::Remote(addr, remote)); - } + accept_remote(AcceptedType::Remote(addr, remote)); } Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, @@ -262,3 +349,37 @@ impl Local for LocalResource { } } } + +/// This struct is used to avoid the tungstenite handshake to take the ownership of the stream +/// an drop it without allow to the driver to deregister from the poll. +/// It can be removed when this issue is resolved: +/// https://github.com/snapview/tungstenite-rs/issues/51 +struct ArcTcpStream(Arc); + +impl From for ArcTcpStream { + fn from(stream: TcpStream) -> Self { + Self(Arc::new(stream)) + } +} + +impl io::Read for ArcTcpStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (&*self.0).read(buf) + } +} + +impl io::Write for ArcTcpStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + (&*self.0).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (&*self.0).flush() + } +} + +impl Clone for ArcTcpStream { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} diff --git a/src/network.rs b/src/network.rs index 85651cd7..8315d6cb 100644 --- a/src/network.rs +++ b/src/network.rs @@ -19,6 +19,7 @@ pub use endpoint::{Endpoint}; pub use remote_addr::{RemoteAddr, ToRemoteAddr}; pub use transport::{Transport}; pub use driver::{NetEvent}; +pub use poll::{Readiness}; use loader::{DriverLoader, ActionControllerList, EventProcessorList}; use poll::{Poll, PollEvent}; @@ -26,7 +27,7 @@ use poll::{Poll, PollEvent}; use strum::{IntoEnumIterator}; use std::net::{SocketAddr, ToSocketAddrs}; -use std::time::{Duration}; +use std::time::{Duration, Instant}; use std::io::{self}; /// Create a network instance giving its controller and processor. @@ -52,24 +53,118 @@ impl NetworkController { Self { controllers } } - /// Creates a connection to the specific address. + /// Creates a connection to the specified address. /// The endpoint, an identifier of the new connection, will be returned. - /// If the connection can not be performed (e.g. the address is not reached) - /// the corresponding IO error is returned. - /// This function blocks until the resource has been connected and is ready to use. + /// This function will generate a [`NetEvent::Connected`] event with the result of the connection. + /// This call will **NOT** block to perform the connection. + /// + /// Note that this function can return an error in the case the internal socket + /// could not be binded or open in the OS, but never will return an error an regarding + /// the connection itself. + /// If you want to check if the connection has been established or not you have to read the + /// boolean indicator in the [`NetEvent::Connected`] event. + /// + /// Example + /// ``` + /// use message_io::node::{self, NodeEvent}; + /// use message_io::network::{Transport, NetEvent}; + /// + /// let (handler, listener) = node::split(); + /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1)); + /// + /// let (id, addr) = handler.network().listen(Transport::FramedTcp, "127.0.0.1:0").unwrap(); + /// let (conn_endpoint, _) = handler.network().connect(Transport::FramedTcp, addr).unwrap(); + /// // The socket could not be able to send yet. + /// + /// listener.for_each(move |event| match event { + /// NodeEvent::Network(net_event) => match net_event { + /// NetEvent::Connected(endpoint, established) => { + /// assert_eq!(conn_endpoint, endpoint); + /// if established { + /// println!("Connected!"); + /// handler.network().send(endpoint, &[42]); + /// } + /// else { + /// println!("Could not connect"); + /// } + /// }, + /// NetEvent::Accepted(endpoint, listening_id) => { + /// assert_eq!(id, listening_id); + /// println!("New connected endpoint: {}", endpoint.addr()); + /// }, + /// _ => (), + /// } + /// NodeEvent::Signal(_) => handler.stop(), + /// }); + /// ``` pub fn connect( &self, transport: Transport, addr: impl ToRemoteAddr, ) -> io::Result<(Endpoint, SocketAddr)> { let addr = addr.to_remote_addr().unwrap(); - log::trace!("Connect to {} by adapter: {}", addr, transport.id()); self.controllers[transport.id() as usize].connect(addr).map(|(endpoint, addr)| { - log::trace!("Connected to {}", endpoint); + log::trace!("Connect to {}", endpoint); (endpoint, addr) }) } + /// Creates a connection to the specified address. + /// This function is similar to [`NetworkController::connect()`] but will block + /// to perform the connection, waiting until for the connection is ready. + /// If the connection can not be established, a `ConnectionRefused` error will be returned. + /// + /// Note that the `Connect` event will be also generated. + /// + /// Since this function blocks the current thread, it must not be used inside + /// the network callback. + /// In order to get the best scalability and performance, use the non-blocking + /// [`NetworkController::connect()`] version. + /// + /// Example + /// ``` + /// use message_io::node::{self, NodeEvent}; + /// use message_io::network::{Transport, NetEvent}; + /// + /// let (handler, listener) = node::split(); + /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1)); + /// + /// let (id, addr) = handler.network().listen(Transport::FramedTcp, "127.0.0.1:0").unwrap(); + /// match handler.network().connect_sync(Transport::FramedTcp, addr) { + /// Ok((endpoint, _)) => { + /// println!("Connected!"); + /// handler.network().send(endpoint, &[42]); + /// } + /// Err(err) if err.kind() == std::io::ErrorKind::ConnectionRefused => { + /// println!("Could not connect"); + /// } + /// Err(err) => println!("An OS error creating the socket"), + /// } + /// ``` + pub fn connect_sync( + &self, + transport: Transport, + addr: impl ToRemoteAddr, + ) -> io::Result<(Endpoint, SocketAddr)> { + let (endpoint, addr) = self.connect(transport, addr)?; + loop { + std::thread::sleep(Duration::from_millis(1)); + match self.is_ready(endpoint.resource_id()) { + Some(status) => { + if status { + return Ok((endpoint, addr)) + } + } + None => { + return Err(io::Error::new( + io::ErrorKind::ConnectionRefused, + "Connection refused", + )) + } + } + } + } + /// Listen messages from specified transport. /// The giver address will be used as interface and listening port. /// If the port can be opened, a [ResourceId] identifying the listener is returned @@ -82,13 +177,27 @@ impl NetworkController { addr: impl ToSocketAddrs, ) -> io::Result<(ResourceId, SocketAddr)> { let addr = addr.to_socket_addrs().unwrap().next().unwrap(); - log::trace!("Listen by {} by adapter: {}", addr, transport.id()); self.controllers[transport.id() as usize].listen(addr).map(|(resource_id, addr)| { - log::trace!("Listening by {}", resource_id); + log::trace!("Listening at {} by {}", addr, resource_id); (resource_id, addr) }) } + /// Send the data message thought the connection represented by the given endpoint. + /// This function returns a [`SendStatus`] indicating the status of this send. + /// There is no guarantee that send over a correct connection generates a [`SendStatus::Sent`] + /// because any time a connection can be disconnected (even while you are sending). + /// Except cases where you need to be sure that the message has been sent, + /// you will want to process a [`NetEvent::Disconnected`] to determine if the connection + + /// is *alive* instead of check if `send()` returned [`SendStatus::ResourceNotFound`]. + pub fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus { + log::trace!("Sending {} bytes to {}...", data.len(), endpoint); + let status = + self.controllers[endpoint.resource_id().adapter_id() as usize].send(endpoint, data); + log::trace!("Send status: {:?}", status); + status + } + /// Remove a network resource. /// Returns `false` if the resource id doesn't exists. /// This is used to remove resources as connection or listeners. @@ -108,19 +217,14 @@ impl NetworkController { value } - /// Send the data message thought the connection represented by the given endpoint. - /// This function returns a [`SendStatus`] indicating the status of this send. - /// There is no guarantee that send over a correct connection generates a [`SendStatus::Sent`] - /// because any time a connection can be disconnected (even while you are sending). - /// Except cases where you need to be sure that the message has been sent, - /// you will want to process a [`NetEvent::Disconnected`] to determine if the connection + - /// is *alive* instead of check if `send()` returned [`SendStatus::ResourceNotFound`]. - pub fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus { - log::trace!("Send {} bytes to {}", data.len(), endpoint); - let status = - self.controllers[endpoint.resource_id().adapter_id() as usize].send(endpoint, data); - log::trace!("Send status: {:?}", status); - status + /// Check a resource specified by `resource_id` is ready. + /// If the status is `true` means that the resource is ready to use. + /// In connection oriented transports, it implies the resource is connected. + /// If the status is `false` it means that the resource is not yet ready to use. + /// If the resource has been removed, disconnected, or does not exists in the network, + /// a `None` is returned. + pub fn is_ready(&self, resource_id: ResourceId) -> Option { + self.controllers[resource_id.adapter_id() as usize].is_ready(resource_id) } } @@ -137,7 +241,7 @@ impl NetworkProcessor { } /// Process the next poll event. - /// This functions waits the timeout specified until the poll event is generated. + /// This method waits the timeout specified until the poll event is generated. /// If `None` is passed as timeout, it will wait indefinitely. /// Note that there is no 1-1 relation between an internal poll event and a [`NetEvent`]. /// You need to assume that process an internal poll event could call 0 or N times to @@ -150,27 +254,139 @@ impl NetworkProcessor { let processors = &mut self.processors; self.poll.process_event(timeout, |poll_event| { match poll_event { - PollEvent::Network(resource_id) => { - let adapter_id = resource_id.adapter_id() as usize; - processors[adapter_id].process(resource_id, &mut |net_event| { + PollEvent::Network(resource_id, interest) => { + let processor = &processors[resource_id.adapter_id() as usize]; + processor.process(resource_id, interest, &mut |net_event| { log::trace!("Processed {:?}", net_event); event_callback(net_event); }); } + #[allow(dead_code)] //TODO: remove it with native event support PollEvent::Waker => todo!(), } }); } + + /// Process poll events until there is no more events during a `timeout` duration. + /// This method makes succesive calls to [`NetworkProcessor::process_poll_event()`]. + pub fn process_poll_events_until_timeout( + &mut self, + timeout: Duration, + mut event_callback: impl FnMut(NetEvent<'_>), + ) { + loop { + let now = Instant::now(); + self.process_poll_event(Some(timeout), |e| event_callback(e)); + if now.elapsed() > timeout { + break + } + } + } } #[cfg(test)] mod tests { use super::*; use std::time::{Duration}; + use crate::util::thread::{NamespacedThread}; + + use test_case::test_case; lazy_static::lazy_static! { static ref TIMEOUT: Duration = Duration::from_millis(1000); + static ref LOCALHOST_CONN_TIMEOUT: Duration = Duration::from_millis(5000); + } + + #[cfg_attr(feature = "tcp", test_case(Transport::Tcp))] + #[cfg_attr(feature = "tcp", test_case(Transport::FramedTcp))] + #[cfg_attr(feature = "websocket", test_case(Transport::Ws))] + fn successful_connection(transport: Transport) { + let (controller, mut processor) = self::split(); + let (listener_id, addr) = controller.listen(transport, "127.0.0.1:0").unwrap(); + let (endpoint, _) = controller.connect(transport, addr).unwrap(); + + let mut was_connected = 0; + let mut was_accepted = 0; + processor.process_poll_events_until_timeout(*TIMEOUT, |net_event| match net_event { + NetEvent::Connected(net_endpoint, status) => { + assert!(status); + assert_eq!(endpoint, net_endpoint); + was_connected += 1; + } + NetEvent::Accepted(_, net_listener_id) => { + assert_eq!(listener_id, net_listener_id); + was_accepted += 1; + } + _ => unreachable!(), + }); + assert_eq!(was_accepted, 1); + assert_eq!(was_connected, 1); + } + + #[cfg_attr(feature = "tcp", test_case(Transport::Tcp))] + #[cfg_attr(feature = "tcp", test_case(Transport::FramedTcp))] + #[cfg_attr(feature = "websocket", test_case(Transport::Ws))] + fn successful_connection_sync(transport: Transport) { + let (controller, mut processor) = self::split(); + let (_, addr) = controller.listen(transport, "127.0.0.1:0").unwrap(); + + let mut thread = NamespacedThread::spawn("test", move || { + let (endpoint, _) = controller.connect_sync(transport, addr).unwrap(); + assert!(controller.is_ready(endpoint.resource_id()).unwrap()); + }); + + processor.process_poll_events_until_timeout(*TIMEOUT, |_| ()); + + thread.join(); + } + + #[cfg_attr(feature = "tcp", test_case(Transport::Tcp))] + #[cfg_attr(feature = "tcp", test_case(Transport::FramedTcp))] + #[cfg_attr(feature = "websocket", test_case(Transport::Ws))] + fn unreachable_connection(transport: Transport) { + let (controller, mut processor) = self::split(); + + // Ensure that addr is not using by other process + // because it takes some secs to be reusable. + let (listener_id, addr) = controller.listen(transport, "127.0.0.1:0").unwrap(); + controller.remove(listener_id); + + let (endpoint, _) = controller.connect(transport, addr).unwrap(); + + let mut was_disconnected = false; + processor.process_poll_events_until_timeout(*LOCALHOST_CONN_TIMEOUT, |net_event| { + match net_event { + NetEvent::Connected(net_endpoint, status) => { + assert!(!status); + assert_eq!(endpoint, net_endpoint); + was_disconnected = true; + } + _ => unreachable!(), + } + }); + assert!(was_disconnected); + } + + #[cfg_attr(feature = "tcp", test_case(Transport::Tcp))] + #[cfg_attr(feature = "tcp", test_case(Transport::FramedTcp))] + #[cfg_attr(feature = "websocket", test_case(Transport::Ws))] + fn unreachable_connection_sync(transport: Transport) { + let (controller, mut processor) = self::split(); + + // Ensure that addr is not using by other process + // because it takes some secs to be reusable. + let (listener_id, addr) = controller.listen(transport, "127.0.0.1:0").unwrap(); + controller.remove(listener_id); + + let mut thread = NamespacedThread::spawn("test", move || { + let err = controller.connect_sync(transport, addr).unwrap_err(); + assert!(err.kind() == io::ErrorKind::ConnectionRefused); + }); + + processor.process_poll_events_until_timeout(*LOCALHOST_CONN_TIMEOUT, |_| ()); + + thread.join(); } #[test] @@ -180,9 +396,7 @@ mod tests { assert!(controller.remove(listener_id)); // Do not generate an event assert!(!controller.remove(listener_id)); - let mut was_event = false; - processor.process_poll_event(Some(*TIMEOUT), |_| was_event = true); - assert!(!was_event); + processor.process_poll_events_until_timeout(*TIMEOUT, |_| unreachable!()); } #[test] @@ -191,19 +405,16 @@ mod tests { let (listener_id, addr) = controller.listen(Transport::Tcp, "127.0.0.1:0").unwrap(); controller.connect(Transport::Tcp, addr).unwrap(); - let mut was_event = false; - processor.process_poll_event(Some(*TIMEOUT), |net_event| match net_event { - NetEvent::Connected(_, _) => { + let mut was_accepted = false; + processor.process_poll_events_until_timeout(*TIMEOUT, |net_event| match net_event { + NetEvent::Connected(..) => (), + NetEvent::Accepted(_, _) => { assert!(controller.remove(listener_id)); assert!(!controller.remove(listener_id)); - was_event = true; + was_accepted = true; } _ => unreachable!(), }); - assert!(was_event); - - let mut was_event = false; - processor.process_poll_event(Some(*TIMEOUT), |_| was_event = true); - assert!(!was_event); + assert!(was_accepted); } } diff --git a/src/network/adapter.rs b/src/network/adapter.rs index fb6ad493..6f1df409 100644 --- a/src/network/adapter.rs +++ b/src/network/adapter.rs @@ -1,4 +1,5 @@ -use crate::network::{RemoteAddr}; +use super::remote_addr::{RemoteAddr}; +use super::poll::{Readiness}; use mio::event::{Source}; @@ -23,8 +24,8 @@ pub trait Adapter: Send + Sync { /// asynchronously from events. /// Your [`Remote`] and [`Local`] entities must implement `Resource`. pub trait Resource: Send + Sync { - /// This is the only method required to make your element a resource. - /// Note: Any `mio` network element implements [`Source`], you probably wants to use + /// Returns a mutable reference to the internal `Source`. + /// Note: All `mio` network element implements [`Source`], you probably wants to use /// one of them as a base for your non-blocking transport. /// See [`Source`]. fn source(&mut self) -> &mut dyn Source; @@ -72,9 +73,14 @@ pub enum SendStatus { /// This implies that a [`crate::network::NetEvent::Disconnected`] has happened or that /// the resource never existed. ResourceNotFound, + + /// The resource can not perform the required send operation. + /// Usually this is due because it is performing the handshake. + ResourceNotAvailable, } /// Returned as a result of [`Remote::receive()`] +#[derive(Debug)] pub enum ReadStatus { /// This status must be returned if the resource has been disconnected or there was an error. /// The resource will be removed after this call and @@ -88,6 +94,26 @@ pub enum ReadStatus { WaitNextEvent, } +#[derive(Debug, Clone, PartialEq)] +pub enum PendingStatus { + /// The resource is no longer considered as a pending resource. + /// It it came from a listener, a [`crate::network::NetEvent::Accepted`] event will be generated. + /// It it came from a explicit connection, a [`crate::network::NetEvent::Connected`] + /// with its flag to `true` will be generated. + /// No more calls to [`Remote::pending()`] will be performed. + Ready, + + /// The resource needs more data to be considered as established. + Incomplete, + + /// The resource has not be able to perform the connection. + /// It it came from a listener, no event will be generated. + /// It it came from a explicit connection, a [`crate::network::NetEvent::Connected`] + /// with its flag to `false` will be generated. + /// No more calls to [`Remote::pending()`] will be performed and the resource will be removed. + Disconnected, +} + /// The resource used to represent a remote. /// It usually is a wrapper over a socket/stream. pub trait Remote: Resource + Sized { @@ -98,21 +124,47 @@ pub trait Remote: Resource + Sized { /// It also must return the extracted address as `SocketAddr`. fn connect(remote_addr: RemoteAddr) -> io::Result>; - /// Called when a remote endpoint received an event. - /// It means that the resource has available data to read, + /// Called when a remote resource received an event. + /// The resource must be *ready* to receive this call. + /// It means that it has available data to read, /// or there is some connection related issue, as a disconnection. /// The **implementator** is in charge of processing that action and returns a [`ReadStatus`]. + /// /// The `process_data` function must be called for each data chunk that represents a message. - /// This call will produce a `Message` API event. + /// This call will produce a [`crate::network::NetEvent::Message`] API event. /// Note that `receive()` could imply more than one call to `read`. /// The implementator must be read all data from the resource. /// For most of the cases it means read until the network resource returns `WouldBlock`. fn receive(&self, process_data: impl FnMut(&[u8])) -> ReadStatus; - /// Sends a raw data from a resource. + /// Sends raw data from a resource. + /// The resource must be *ready* to receive this call. /// The **implementator** is in charge to send the entire `data`. /// The [`SendStatus`] will contain the status of this attempt. fn send(&self, data: &[u8]) -> SendStatus; + + /// Called when a `Remote` is created (explicity of by a listener) + /// and it is not consider ready yet. + /// A remote resource **is considered ready** when it is totally connected + /// and can be used for writing data. + /// It implies that the user has received the `Connected` or `Accepted` method for that resource. + /// + /// This method is in charge to determine if a resource is ready or not. + /// No `Connected` or `Accepted` events will be generated until this function return + /// `PendingStatus::Ready`. + /// The method wil be called several times with different `Readiness` until the **implementator** + /// returns a `PendingStatus::Ready` or `PendingStatus::Disconnected`. + fn pending(&self, readiness: Readiness) -> PendingStatus; + + /// The resource is available to write. + /// It must be *ready* to receive this call. + /// Here the **implementator** optionally can try to write any pending data. + /// The return value is an identification of the operation result. + /// If the method returns `true`, the operation was successful, otherwise, the resource will + /// be disconnected and removed. + fn ready_to_write(&self) -> bool { + true + } } /// Used as a parameter callback in [`Local::accept()`] @@ -120,15 +172,15 @@ pub enum AcceptedType<'a, R> { /// The listener has accepted a remote (`R`) with the specified addr. /// The remote will be registered in order to generate read events. (calls to /// [`Remote::receive()`]). + /// A [`crate::network::NetEvent::Accepted`] will be generated once this remote resource + /// is considered *ready*. Remote(SocketAddr, R), /// The listener has accepted data that can be packed into a message from a specified addr. /// Despite of `Remote`, accept as a `Data` will not register any Remote. - /// This will produce a `Message` API event. - /// The endpoint along this event will be unique if base of the specified addr and the listener + /// This will produce a [`crate::network::NetEvent::Message`] event. + /// The endpoint of this event will be unique containing the specified addr and the listener /// whom generates it. - /// This means that the user can treat the [`crate::network::Endpoint`] as if - /// it was an internal resource. Data(SocketAddr, &'a [u8]), } @@ -158,9 +210,10 @@ pub trait Local: Resource + Sized { /// Sends a raw data from a resource. /// Similar to [`Remote::send()`] but the resource that sends the data is a `Local`. + /// This behaviour usually happens when the transport to implement is not connection oriented. + /// /// The **implementator** must **only** implement this function if the local resource can /// also send data. - /// This behaviour usually happens when the transport to implement is not connection oriented. fn send_to(&self, _addr: SocketAddr, _data: &[u8]) -> SendStatus { panic!("Adapter not configured to send messages directly from the local resource") } diff --git a/src/network/driver.rs b/src/network/driver.rs index 9fd250a1..4362108c 100644 --- a/src/network/driver.rs +++ b/src/network/driver.rs @@ -1,25 +1,47 @@ use super::endpoint::{Endpoint}; use super::resource_id::{ResourceId, ResourceType}; -use super::poll::{Poll}; -use super::registry::{ResourceRegistry}; +use super::poll::{Poll, Readiness}; +use super::registry::{ResourceRegistry, Register}; use super::remote_addr::{RemoteAddr}; -use super::adapter::{Adapter, Remote, Local, SendStatus, AcceptedType, ReadStatus}; +use super::adapter::{Adapter, Remote, Local, SendStatus, AcceptedType, ReadStatus, PendingStatus}; use std::net::{SocketAddr}; -use std::sync::{Arc}; +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; use std::io::{self}; #[cfg(doctest)] use super::transport::{Transport}; -/// Enum used to describe and event that an adapter network has produced. +/// Enum used to describe a network event that an internal transport adapter has produced. pub enum NetEvent<'a> { - /// New endpoint has been connected to a listener. - /// This event will be sent only in connection oriented protocols as *TCP*. - /// It also contains the resource id of the listener that accepted this connection. - Connected(Endpoint, ResourceId), + /// Connection result. + /// This event is only generated after a [`crate::network::NetworkController::connect()`] + /// call. + /// The event contains the endpoint of the connection + /// (same endpoint returned by the `connect()` method), + /// and a boolean indicating the *result* of that connection. + /// In *non connection-oriented transports* as *UDP* it simply means that the resource + /// is ready to use, and the boolean will be always `true`. + /// In connection-oriented transports it means that the handshake has been performed, and the + /// connection is established and ready to use. + /// Since this handshake could fail, the boolean could be `false`. + Connected(Endpoint, bool), + + /// New endpoint has been accepted by a listener and considered ready to use. + /// The event contains the resource id of the listener that accepted this connection. + /// + /// Note that this event will only be generated by connection-oriented transports as *TCP*. + Accepted(Endpoint, ResourceId), /// Input message received by the network. + /// In packet-based transports, the data of a message sent corresponds with the data of this + /// event. This one-to-one relation is not conserved in stream-based transports as *TCP*. + /// + /// If you want a packet-based protocol over *TCP* use + /// [`crate::network::Transport::FramedTcp`]. Message(Endpoint, &'a [u8]), /// This event is only dispatched when a connection is lost. @@ -27,15 +49,17 @@ pub enum NetEvent<'a> { /// When this event is received, the resource is considered already removed, /// the user do not need to remove it after this event. /// A [`NetEvent::Message`] event will never be generated after this event from this endpoint. - /// This event will be sent only in connection oriented protocols as *Tcp*. - /// *UDP*, for example, is NOT connection oriented, and the event can no be detected. + + /// Note that this event will only be generated by connection-oriented transports as *TCP*. + /// *UDP*, for example, is NOT connection-oriented, and the event can no be detected. Disconnected(Endpoint), } impl std::fmt::Debug for NetEvent<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let string = match self { - Self::Connected(endpoint, id) => format!("Connected({}, {})", endpoint, id), + Self::Connected(endpoint, status) => format!("Connected({}, {})", endpoint, status), + Self::Accepted(endpoint, id) => format!("Accepted({}, {})", endpoint, id), Self::Message(endpoint, data) => format!("Message({}, {})", endpoint, data.len()), Self::Disconnected(endpoint) => format!("Disconnected({})", endpoint), }; @@ -48,15 +72,38 @@ pub trait ActionController: Send + Sync { fn listen(&self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)>; fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus; fn remove(&self, id: ResourceId) -> bool; + fn is_ready(&self, id: ResourceId) -> Option; } pub trait EventProcessor: Send + Sync { - fn process(&self, resource_id: ResourceId, event_callback: &mut dyn FnMut(NetEvent<'_>)); + fn process(&self, id: ResourceId, readiness: Readiness, callback: &mut dyn FnMut(NetEvent<'_>)); +} + +struct RemoteProperties { + peer_addr: SocketAddr, + local: Option, + ready: AtomicBool, +} + +impl RemoteProperties { + fn new(peer_addr: SocketAddr, local: Option) -> Self { + Self { peer_addr, local, ready: AtomicBool::new(false) } + } + + pub fn is_ready(&self) -> bool { + self.ready.load(Ordering::Relaxed) + } + + pub fn mark_as_ready(&self) { + self.ready.store(true, Ordering::Relaxed); + } } +struct LocalProperties; + pub struct Driver { - remote_registry: Arc>, - local_registry: Arc>, + remote_registry: Arc>, + local_registry: Arc>, } impl Driver { @@ -69,8 +116,12 @@ impl Driver { let local_poll_registry = poll.create_registry(adapter_id, ResourceType::Local); Driver { - remote_registry: Arc::new(ResourceRegistry::::new(remote_poll_registry)), - local_registry: Arc::new(ResourceRegistry::::new(local_poll_registry)), + remote_registry: Arc::new(ResourceRegistry::::new( + remote_poll_registry, + )), + local_registry: Arc::new(ResourceRegistry::::new( + local_poll_registry, + )), } } } @@ -87,25 +138,29 @@ impl Clone for Driver { impl ActionController for Driver { fn connect(&self, addr: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)> { R::connect(addr).map(|info| { - ( - Endpoint::new( - self.remote_registry.add(info.remote, info.peer_addr), - info.peer_addr, - ), - info.local_addr, - ) + let id = self.remote_registry.register( + info.remote, + RemoteProperties::new(info.peer_addr, None), + true, + ); + (Endpoint::new(id, info.peer_addr), info.local_addr) }) } fn listen(&self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)> { - L::listen(addr) - .map(|info| (self.local_registry.add(info.local, info.local_addr), info.local_addr)) + L::listen(addr).map(|info| { + let id = self.local_registry.register(info.local, LocalProperties, false); + (id, info.local_addr) + }) } fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus { match endpoint.resource_id().resource_type() { ResourceType::Remote => match self.remote_registry.get(endpoint.resource_id()) { - Some(remote) => remote.resource.send(data), + Some(remote) => match remote.properties.is_ready() { + true => remote.resource.send(data), + false => SendStatus::ResourceNotAvailable, + }, None => SendStatus::ResourceNotFound, }, ResourceType::Local => match self.local_registry.get(endpoint.resource_id()) { @@ -117,68 +172,141 @@ impl ActionController for Driver { fn remove(&self, id: ResourceId) -> bool { match id.resource_type() { - ResourceType::Remote => self.remote_registry.remove(id), - ResourceType::Local => self.local_registry.remove(id), + ResourceType::Remote => self.remote_registry.deregister(id), + ResourceType::Local => self.local_registry.deregister(id), + } + } + + fn is_ready(&self, id: ResourceId) -> Option { + match id.resource_type() { + ResourceType::Remote => self.remote_registry.get(id).map(|r| r.properties.is_ready()), + ResourceType::Local => self.local_registry.get(id).map(|_| true), } } } impl> EventProcessor for Driver { - fn process(&self, id: ResourceId, event_callback: &mut dyn FnMut(NetEvent<'_>)) { + fn process( + &self, + id: ResourceId, + readiness: Readiness, + event_callback: &mut dyn FnMut(NetEvent<'_>), + ) { match id.resource_type() { - ResourceType::Remote => self.process_remote(id, event_callback), - ResourceType::Local => self.process_local(id, event_callback), + ResourceType::Remote => { + if let Some(remote) = self.remote_registry.get(id) { + let endpoint = Endpoint::new(id, remote.properties.peer_addr); + log::trace!("Processed remote for {}", endpoint); + + if !remote.properties.is_ready() { + self.resolve_pending_remote(&remote, endpoint, readiness, |e| { + event_callback(e) + }); + } + if remote.properties.is_ready() { + match readiness { + Readiness::Write => { + self.write_to_remote(&remote, endpoint, event_callback); + } + Readiness::Read => { + self.read_from_remote(&remote, endpoint, event_callback); + } + } + } + } + } + ResourceType::Local => { + if let Some(local) = self.local_registry.get(id) { + log::trace!("Processed local for {}", id); + match readiness { + Readiness::Write => (), + Readiness::Read => self.read_from_local(&local, id, event_callback), + } + } + } } } } impl> Driver { - fn process_remote(&self, id: ResourceId, mut event_callback: impl FnMut(NetEvent<'_>)) { - if let Some(remote) = self.remote_registry.get(id) { - let endpoint = Endpoint::new(id, remote.addr); - log::trace!("Processed remote for {}", endpoint); - let status = remote.resource.receive(|data| { - event_callback(NetEvent::Message(endpoint, data)); - }); - log::trace!("Processed remote receive status {}", status); - - if let ReadStatus::Disconnected = status { - // Checked becasue, the user in the callback could have removed the same resource. - if self.remote_registry.remove(id) { - event_callback(NetEvent::Disconnected(endpoint)); + fn resolve_pending_remote( + &self, + remote: &Arc>, + endpoint: Endpoint, + readiness: Readiness, + mut event_callback: impl FnMut(NetEvent<'_>), + ) { + let status = remote.resource.pending(readiness); + log::trace!("Resolve pending for {}: {:?}", endpoint, status); + match status { + PendingStatus::Ready => { + remote.properties.mark_as_ready(); + match remote.properties.local { + Some(listener_id) => event_callback(NetEvent::Accepted(endpoint, listener_id)), + None => event_callback(NetEvent::Connected(endpoint, true)), + } + remote.resource.ready_to_write(); + } + PendingStatus::Incomplete => (), + PendingStatus::Disconnected => { + self.remote_registry.deregister(endpoint.resource_id()); + if remote.properties.local.is_none() { + event_callback(NetEvent::Connected(endpoint, false)); } } } } - fn process_local(&self, id: ResourceId, mut event_callback: impl FnMut(NetEvent<'_>)) { - if let Some(local) = self.local_registry.get(id) { - log::trace!("Processed local for {}", id); - local.resource.accept(|accepted| { - log::trace!("Processed local accepted type {}", accepted); - match accepted { - AcceptedType::Remote(addr, remote) => { - let remote_id = self.remote_registry.add(remote, addr); - let endpoint = Endpoint::new(remote_id, addr); - event_callback(NetEvent::Connected(endpoint, id)); - } - AcceptedType::Data(addr, data) => { - let endpoint = Endpoint::new(id, addr); - event_callback(NetEvent::Message(endpoint, data)); - } - } - }); + fn write_to_remote( + &self, + remote: &Arc>, + endpoint: Endpoint, + mut event_callback: impl FnMut(NetEvent<'_>), + ) { + if !remote.resource.ready_to_write() { + event_callback(NetEvent::Disconnected(endpoint)); } } -} -impl std::fmt::Display for ReadStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let string = match self { - ReadStatus::Disconnected => "Disconnected", - ReadStatus::WaitNextEvent => "WaitNextEvent", - }; - write!(f, "ReadStatus::{}", string) + fn read_from_remote( + &self, + remote: &Arc>, + endpoint: Endpoint, + mut event_callback: impl FnMut(NetEvent<'_>), + ) { + let status = + remote.resource.receive(|data| event_callback(NetEvent::Message(endpoint, data))); + log::trace!("Receive status: {:?}", status); + if let ReadStatus::Disconnected = status { + // Checked because, the user in the callback could have removed the same resource. + if self.remote_registry.deregister(endpoint.resource_id()) { + event_callback(NetEvent::Disconnected(endpoint)); + } + } + } + + fn read_from_local( + &self, + local: &Arc>, + id: ResourceId, + mut event_callback: impl FnMut(NetEvent<'_>), + ) { + local.resource.accept(|accepted| { + log::trace!("Accepted type: {}", accepted); + match accepted { + AcceptedType::Remote(addr, remote) => { + self.remote_registry.register( + remote, + RemoteProperties::new(addr, Some(id)), + true, + ); + } + AcceptedType::Data(addr, data) => { + let endpoint = Endpoint::new(id, addr); + event_callback(NetEvent::Message(endpoint, data)); + } + } + }); } } diff --git a/src/network/endpoint.rs b/src/network/endpoint.rs index c87ac8f2..62fe37c1 100644 --- a/src/network/endpoint.rs +++ b/src/network/endpoint.rs @@ -55,7 +55,7 @@ impl Endpoint { // Only local resources allowed assert_eq!(id.resource_type(), super::resource_id::ResourceType::Local); - // Only packet based transport protocols allowed + // Only non connection-oriented transport protocols allowed assert!(!super::transport::Transport::from(id.adapter_id()).is_connection_oriented()); Endpoint::new(id, addr) @@ -90,22 +90,6 @@ mod tests { use crate::network::resource_id::{ResourceType, ResourceIdGenerator}; use crate::network::transport::{Transport}; - #[test] - #[should_panic] - fn from_remote_non_connection_oriented() { - let addr = "0.0.0.0:0".parse().unwrap(); - let generator = ResourceIdGenerator::new(Transport::Udp.id(), ResourceType::Remote); - Endpoint::from_listener(generator.generate(), addr); - } - - #[test] - #[should_panic] - fn from_local_connection_oriented() { - let addr = "0.0.0.0:0".parse().unwrap(); - let generator = ResourceIdGenerator::new(Transport::Tcp.id(), ResourceType::Local); - Endpoint::from_listener(generator.generate(), addr); - } - #[test] fn from_local_non_connection_oriented() { let addr = "0.0.0.0:0".parse().unwrap(); diff --git a/src/network/loader.rs b/src/network/loader.rs index c7d458c7..b775d2ba 100644 --- a/src/network/loader.rs +++ b/src/network/loader.rs @@ -1,6 +1,6 @@ use super::endpoint::{Endpoint}; use super::resource_id::{ResourceId}; -use super::poll::{Poll}; +use super::poll::{Poll, Readiness}; use super::remote_addr::{RemoteAddr}; use super::driver::{NetEvent, Driver, ActionController, EventProcessor}; use super::adapter::{Adapter, SendStatus}; @@ -77,10 +77,14 @@ impl ActionController for UnimplementedDriver { fn remove(&self, _: ResourceId) -> bool { panic!("{}", UNIMPLEMENTED_DRIVER_ERR); } + + fn is_ready(&self, _: ResourceId) -> Option { + panic!("{}", UNIMPLEMENTED_DRIVER_ERR); + } } impl EventProcessor for UnimplementedDriver { - fn process(&self, _: ResourceId, _: &mut dyn FnMut(NetEvent<'_>)) { + fn process(&self, _: ResourceId, _: Readiness, _: &mut dyn FnMut(NetEvent<'_>)) { panic!("{}", UNIMPLEMENTED_DRIVER_ERR); } } diff --git a/src/network/poll.rs b/src/network/poll.rs index 110c7504..7c2b37c1 100644 --- a/src/network/poll.rs +++ b/src/network/poll.rs @@ -7,8 +7,19 @@ use std::time::{Duration}; use std::sync::{Arc}; use std::io::{ErrorKind}; +#[derive(Clone, Copy, Debug, PartialEq)] +/// Used for the adapter implementation. +/// Specify the kind of event that is available for a resource. +pub enum Readiness { + /// The resource is available to write + Write, + + /// The resource is available to read (has any content to read). + Read, +} + pub enum PollEvent { - Network(ResourceId), + Network(ResourceId, Readiness), Waker, } @@ -51,21 +62,23 @@ impl Poll { where C: FnMut(PollEvent) { loop { match self.mio_poll.poll(&mut self.events, timeout) { - Ok(_) => { + Ok(()) => { for mio_event in &self.events { - let poll_event = match mio_event.token() { - Self::WAKER_TOKEN => { - log::trace!("POLL EVENT: waker"); - PollEvent::Waker + if Self::WAKER_TOKEN == mio_event.token() { + log::trace!("POLL WAKER EVENT"); + event_callback(PollEvent::Waker); + } + else { + let id = ResourceId::from(mio_event.token()); + if mio_event.is_readable() { + log::trace!("POLL EVENT (R): {}", id); + event_callback(PollEvent::Network(id, Readiness::Read)); } - token => { - let resource_id = ResourceId::from(token); - log::trace!("POLL EVENT: {}", resource_id); - PollEvent::Network(resource_id) + if mio_event.is_writable() { + log::trace!("POLL EVENT (W): {}", id); + event_callback(PollEvent::Network(id, Readiness::Write)); } - }; - - event_callback(poll_event); + } } break } @@ -98,10 +111,13 @@ impl PollRegistry { } } - pub fn add(&self, source: &mut dyn Source) -> ResourceId { + pub fn add(&self, source: &mut dyn Source, write_readiness: bool) -> ResourceId { let id = self.id_generator.generate(); - self.registry.register(source, id.into(), Interest::READABLE).unwrap(); - log::trace!("Register to poll: {}", id); + let interest = match write_readiness { + true => Interest::READABLE | Interest::WRITABLE, + false => Interest::READABLE, + }; + self.registry.register(source, id.into(), interest).unwrap(); id } @@ -136,3 +152,9 @@ impl PollWaker { log::trace!("Wake poll..."); } } + +impl Clone for PollWaker { + fn clone(&self) -> Self { + Self { waker: self.waker.clone() } + } +} diff --git a/src/network/registry.rs b/src/network/registry.rs index dbd431ec..1e3e03ba 100644 --- a/src/network/registry.rs +++ b/src/network/registry.rs @@ -5,37 +5,33 @@ use super::adapter::{Resource}; use crate::util::thread::{OTHER_THREAD_ERR}; use std::collections::{HashMap}; -use std::net::{SocketAddr}; use std::sync::{Arc, RwLock}; -pub struct Register { +pub struct Register { pub resource: S, - pub addr: SocketAddr, + pub properties: P, + poll_registry: Arc, } -impl Register { - fn new(resource: S, addr: SocketAddr, poll_registry: Arc) -> Self { - Self { resource, addr, poll_registry } +impl Register { + fn new(resource: S, properties: P, poll_registry: Arc) -> Self { + Self { resource, properties, poll_registry } } } -impl Drop for Register { +impl Drop for Register { fn drop(&mut self) { self.poll_registry.remove(self.resource.source()); } } -pub struct ResourceRegistry { - // We store the most significant addr of the resource because if the resource disconnects, - // it can not be retrieved. - // If the resource is a remote resource, the addr will be the peer addr. - // If the resource is a local resource, the addr will be the local addr. - resources: RwLock>>>, +pub struct ResourceRegistry { + resources: RwLock>>>, poll_registry: Arc, } -impl ResourceRegistry { +impl ResourceRegistry { pub fn new(poll_registry: PollRegistry) -> Self { ResourceRegistry { resources: RwLock::new(HashMap::new()), @@ -44,10 +40,13 @@ impl ResourceRegistry { } /// Add a resource into the registry. - pub fn add(&self, mut resource: S, addr: SocketAddr) -> ResourceId { - let id = self.poll_registry.add(resource.source()); - let register = Register::new(resource, addr, self.poll_registry.clone()); - self.resources.write().expect(OTHER_THREAD_ERR).insert(id, Arc::new(register)); + pub fn register(&self, mut resource: S, properties: P, write_readiness: bool) -> ResourceId { + // The registry must be locked for the entire implementation to avoid the poll + // to generate events over not yet registered resources. + let mut registry = self.resources.write().expect(OTHER_THREAD_ERR); + let id = self.poll_registry.add(resource.source(), write_readiness); + let register = Register::new(resource, properties, self.poll_registry.clone()); + registry.insert(id, Arc::new(register)); id } @@ -55,12 +54,12 @@ impl ResourceRegistry { /// This function ensure that the register is removed from the registry, /// but not the destruction of the resource itself. /// Because the resource is shared, the destruction will be delayed until the last reference. - pub fn remove(&self, id: ResourceId) -> bool { + pub fn deregister(&self, id: ResourceId) -> bool { self.resources.write().expect(OTHER_THREAD_ERR).remove(&id).is_some() } /// Returned a shared reference of the register. - pub fn get(&self, id: ResourceId) -> Option>> { + pub fn get(&self, id: ResourceId) -> Option>> { self.resources.read().expect(OTHER_THREAD_ERR).get(&id).cloned() } } diff --git a/src/network/resource_id.rs b/src/network/resource_id.rs index 6e25ba73..c2859b63 100644 --- a/src/network/resource_id.rs +++ b/src/network/resource_id.rs @@ -71,6 +71,16 @@ impl ResourceId { } } + /// Tells if the id preresents a local resource. + pub fn is_local(&self) -> bool { + self.resource_type() == ResourceType::Local + } + + /// Tells if the id preresents a remote resource. + pub fn is_remote(&self) -> bool { + self.resource_type() == ResourceType::Remote + } + /// Returns the associated adapter id. /// Note that this returned value is the same as the value of [`crate::network::Transport::id()`] /// if that transport uses the same adapter. diff --git a/src/node.rs b/src/node.rs index 5b89d073..dc0c277c 100644 --- a/src/node.rs +++ b/src/node.rs @@ -106,11 +106,13 @@ impl From> for StoredNodeEvent { } } -/// Analogous to [`NetEvent`] but without reference the data. -/// This kind of event is dispatched by `NodeListener::to_event_queue()`. +/// Analogous to [`NetEvent`] but with static lifetime (without reference the data). +/// This kind of event is dispatched by `NodeListener::to_event_queue()` +/// and can be easily stored in any container. #[derive(Debug, Clone)] pub enum StoredNetEvent { - Connected(Endpoint, ResourceId), + Connected(Endpoint, bool), + Accepted(Endpoint, ResourceId), Message(Endpoint, Vec), Disconnected(Endpoint), } @@ -118,7 +120,8 @@ pub enum StoredNetEvent { impl From> for StoredNetEvent { fn from(net_event: NetEvent<'_>) -> Self { match net_event { - NetEvent::Connected(endpoint, id) => Self::Connected(endpoint, id), + NetEvent::Connected(endpoint, status) => Self::Connected(endpoint, status), + NetEvent::Accepted(endpoint, id) => Self::Accepted(endpoint, id), NetEvent::Message(endpoint, data) => Self::Message(endpoint, Vec::from(data)), NetEvent::Disconnected(endpoint) => Self::Disconnected(endpoint), } @@ -127,10 +130,11 @@ impl From> for StoredNetEvent { impl StoredNetEvent { /// Use this `StoredNetEvent` as a `NetEvent` referencing its data. - fn borrow(&self) -> NetEvent<'_> { + pub fn borrow(&self) -> NetEvent<'_> { match self { - Self::Connected(endpoint, id) => NetEvent::Connected(*endpoint, *id), - Self::Message(endpoint, data) => NetEvent::Message(*endpoint, &data), + Self::Connected(endpoint, status) => NetEvent::Connected(*endpoint, *status), + Self::Accepted(endpoint, id) => NetEvent::Accepted(*endpoint, *id), + Self::Message(endpoint, data) => NetEvent::Message(*endpoint, data), Self::Disconnected(endpoint) => NetEvent::Disconnected(*endpoint), } } @@ -282,7 +286,7 @@ impl NodeListener { /// /// let (handler, listener) = node::split(); /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1)); - /// handler.network().listen(Transport::FramedTcp, "0.0.0.0:1234"); + /// let (id, addr) = handler.network().listen(Transport::FramedTcp, "127.0.0.1:0").unwrap(); /// /// listener.for_each(move |event| match event { /// NodeEvent::Network(net_event) => { /* Your logic here */ }, @@ -326,6 +330,7 @@ impl NodeListener { scope .builder() + .name(String::from("node-network-thread")) .spawn(move |_| { while handler.is_running() { if let Some(signal) = signal_receiver.receive_timeout(*SAMPLING_TIMEOUT) @@ -369,7 +374,7 @@ impl NodeListener { /// /// let (handler, listener) = node::split(); /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1)); - /// handler.network().listen(Transport::FramedTcp, "0.0.0.0:1234"); + /// let (id, addr) = handler.network().listen(Transport::FramedTcp, "127.0.0.1:0").unwrap(); /// /// let task = listener.for_each(move |event| match event { /// NodeEvent::Network(net_event) => { /* Your logic here */ }, @@ -462,7 +467,7 @@ impl NodeListener { /// /// let (handler, listener) = node::split(); /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1)); - /// handler.network().listen(Transport::FramedTcp, "0.0.0.0:1234"); + /// let (id, addr) = handler.network().listen(Transport::FramedTcp, "127.0.0.1:0").unwrap(); /// /// let (task, mut receiver) = listener.enqueue(); /// @@ -528,8 +533,6 @@ mod tests { let inner_handler = handler.clone(); listener.for_each(move |_| inner_handler.stop()); - // Since here `NodeTask` is already dropped just after listener call, - // the node is considered not running. assert!(!handler.is_running()); } diff --git a/tests/integration.rs b/tests/integration.rs index ecb81f98..f4ec9b12 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -82,6 +82,14 @@ fn start_echo_server( listener.for_each(move |event| match event { NodeEvent::Signal(_) => panic!("{}", TIMEOUT_EVENT_RECV_ERR), NodeEvent::Network(net_event) => match net_event { + NetEvent::Connected(..) => unreachable!(), + NetEvent::Accepted(endpoint, id) => { + assert_eq!(listener_id, id); + match transport.is_connection_oriented() { + true => assert!(clients.insert(endpoint)), + false => unreachable!(), + } + } NetEvent::Message(endpoint, data) => { assert_eq!(MIN_MESSAGE, data); @@ -100,13 +108,6 @@ fn start_echo_server( } } } - NetEvent::Connected(endpoint, id) => { - assert_eq!(listener_id, id); - match transport.is_connection_oriented() { - true => assert!(clients.insert(endpoint)), - false => unreachable!(), - } - } NetEvent::Disconnected(endpoint) => { match transport.is_connection_oriented() { true => { @@ -139,26 +140,32 @@ fn start_echo_client_manager( node.signals().send_with_timer((), *TIMEOUT); let mut clients = HashSet::new(); + let mut received = 0; for _ in 0..clients_number { - let (server, _) = node.network().connect(transport, server_addr).unwrap(); - let status = node.network().send(server, MIN_MESSAGE); - assert_eq!(SendStatus::Sent, status); - assert!(clients.insert(server)); + node.network().connect(transport, server_addr).unwrap(); } listener.for_each(move |event| match event { NodeEvent::Signal(_) => panic!("{}", TIMEOUT_EVENT_RECV_ERR), NodeEvent::Network(net_event) => match net_event { + NetEvent::Connected(server, status) => { + assert!(status); + let status = node.network().send(server, MIN_MESSAGE); + assert_eq!(SendStatus::Sent, status); + assert!(clients.insert(server)); + } NetEvent::Message(endpoint, data) => { assert!(clients.remove(&endpoint)); assert_eq!(MIN_MESSAGE, data); node.network().remove(endpoint.resource_id()); - if clients.len() == 0 { + + received += 1; + if received == clients_number { node.stop(); //Exit from thread. } } - NetEvent::Connected(..) => unreachable!(), + NetEvent::Accepted(..) => unreachable!(), NetEvent::Disconnected(_) => unreachable!(), }, }); @@ -181,6 +188,8 @@ fn start_burst_receiver( listener.for_each(move |event| match event { NodeEvent::Signal(_) => panic!("{}", TIMEOUT_EVENT_RECV_ERR), NodeEvent::Network(net_event) => match net_event { + NetEvent::Connected(..) => unreachable!(), + NetEvent::Accepted(..) => (), NetEvent::Message(_, data) => { let expected_message = format!("{}: {}", SMALL_MESSAGE, count); assert_eq!(expected_message, String::from_utf8_lossy(&data)); @@ -189,7 +198,6 @@ fn start_burst_receiver( node.stop(); } } - NetEvent::Connected(..) => (), NetEvent::Disconnected(_) => (), }, }); @@ -203,19 +211,40 @@ fn start_burst_sender( expected_count: usize, ) -> NamespacedThread<()> { NamespacedThread::spawn("test-sender", move || { - let (node, _) = node::split::<()>(); + let (node, listener) = node::split::<()>(); let (receiver, _) = node.network().connect(transport, receiver_addr).unwrap(); - for count in 0..expected_count { - let message = format!("{}: {}", SMALL_MESSAGE, count); - let status = node.network().send(receiver, message.as_bytes()); - assert_eq!(SendStatus::Sent, status); - if !transport.is_connection_oriented() { - // We need a rate to not lose packet. - std::thread::sleep(Duration::from_micros(20)); + let mut count = 0; + listener.for_each(move |event| match event { + NodeEvent::Signal(_) => { + if count < expected_count { + let message = format!("{}: {}", SMALL_MESSAGE, count); + let status = node.network().send(receiver, message.as_bytes()); + assert_eq!(SendStatus::Sent, status); + + count += 1; + if !transport.is_connection_oriented() { + // We need a rate to not lose packet. + node.signals().send_with_timer((), Duration::from_micros(50)); + } + else { + node.signals().send(()); + } + } + else { + node.stop(); + } } - } + NodeEvent::Network(net_event) => match net_event { + NetEvent::Connected(_, status) => { + assert!(status); + node.signals().send(()); + } + NetEvent::Disconnected(_) => (), + _ => unreachable!(), + }, + }); }) } @@ -252,7 +281,7 @@ fn burst(transport: Transport, messages_count: usize) { #[cfg_attr(feature = "udp", test_case(Transport::Udp, udp::MAX_COMPATIBLE_PAYLOAD_LEN))] #[cfg_attr(feature = "websocket", test_case(Transport::Ws, BIG_MESSAGE_SIZE))] fn message_size(transport: Transport, message_size: usize) { - //util::init_logger(LogThread::Enabled); // Enable it for better debugging + //util::init_logger(LogThread::Disabled); // Enable it for better debugging assert!(message_size <= transport.max_message_size()); @@ -263,21 +292,30 @@ fn message_size(transport: Transport, message_size: usize) { node.signals().send_with_timer((), *TIMEOUT); let (_, receiver_addr) = node.network().listen(transport, LOCAL_ADDR).unwrap(); - let (receiver, _) = node.network().connect(transport, receiver_addr).unwrap(); - if !transport.is_connection_oriented() { - let status = node.network().send(receiver, &sent_message); - assert_eq!(status, SendStatus::Sent); - } - - // Protocols as TCP blocks the sender if the receiver is not reading data and its buffer is fill. let mut _async_sender: Option> = None; - let mut received_message = Vec::new(); + listener.for_each(move |event| match event { NodeEvent::Signal(_) => panic!("{}", TIMEOUT_EVENT_RECV_ERR), NodeEvent::Network(net_event) => match net_event { + NetEvent::Connected(endpoint, status) => { + assert!(status); + assert_eq!(receiver, endpoint); + + let node = node.clone(); + let sent_message = sent_message.clone(); + + // Protocols as TCP blocks the sender if the receiver is not reading data + // and its buffer is fill. + _async_sender = Some(NamespacedThread::spawn("test-sender", move || { + let status = node.network().send(receiver, &sent_message); + assert_eq!(status, SendStatus::Sent); + assert!(node.network().remove(receiver.resource_id())); + })); + } + NetEvent::Accepted(..) => (), NetEvent::Message(_, data) => { if transport.is_packet_based() { received_message = data.to_vec(); @@ -288,20 +326,6 @@ fn message_size(transport: Transport, message_size: usize) { received_message.extend_from_slice(&data); } } - NetEvent::Connected(..) => { - if transport.is_connection_oriented() { - let node = node.clone(); - let sent_message = sent_message.clone(); - _async_sender = Some(NamespacedThread::spawn("test-sender", move || { - let status = node.network().send(receiver, &sent_message); - assert_eq!(status, SendStatus::Sent); - assert!(node.network().remove(receiver.resource_id())); - })); - } - else { - unreachable!(); - } - } NetEvent::Disconnected(_) => { assert_eq!(sent_message.len(), received_message.len()); assert_eq!(sent_message, received_message);