Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

non-blocking connection #86

Merged
merged 25 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
93856b5
Architecture changes
lemunozm May 1, 2021
18fd64b
Tcp async connection working.
lemunozm May 3, 2021
09f3198
All integration tests working except WS
lemunozm May 3, 2021
d3013d2
Fixed registry concurrence problem
lemunozm May 3, 2021
72ecbe0
Ws with client handshake schema. New adapter schema
lemunozm May 4, 2021
18c8e13
pending() adapter concept
lemunozm May 4, 2021
7e98a81
added ready_to_write
lemunozm May 4, 2021
771b8e8
Websocket working with async connections
lemunozm May 4, 2021
0cd4e69
Removed write readiness for local resources
lemunozm May 4, 2021
646f5a1
Examples and tests adapted
lemunozm May 5, 2021
07e521f
Added is_ready. Added connect_sync. Prepare version 0.14
lemunozm May 5, 2021
ed16cca
Added ResourceNotAvailable to SendStatus
lemunozm May 5, 2021
8a29815
Added pending as mandatory
lemunozm May 5, 2021
f62f28f
Added process_poll_events_until_timeout. Splitted conenctio tests for…
lemunozm May 9, 2021
3439749
Added Resource::source with Option. Forced because tungstenite takes …
lemunozm May 9, 2021
d0452cc
Fixed windows poll event issue
lemunozm May 9, 2021
8d04a5d
Added connection examples. Updated readme
lemunozm May 9, 2021
0eda95a
Websocket making use of ready_to_write
lemunozm May 9, 2021
288fa8a
Revert "Added Resource::source with Option. Forced because tungstenit…
lemunozm May 10, 2021
d996968
Temporal fix for poll leak in tungstenite
lemunozm May 10, 2021
cf528f2
Fixed ws accepted issue
lemunozm May 10, 2021
1ab895f
Minor changelog change
lemunozm May 10, 2021
a577f38
Added StoredNetEvent::borrow()
lemunozm May 11, 2021
c6bd25e
Added is_local() and is_remote()
lemunozm May 12, 2021
f8be160
Added minor comment
lemunozm May 16, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## Release 0.14.0
- Asynchronous connections: `NetworkController::connect()` behaviour modified.
Now it performs a non-blocking connection. Previous behaviour with `connect_sync` version.
- Reduced slightly the websocket latency.
- Adapter API modified to handle easily handshakes.
- Fixed websocket issue that could offer an accepted connection that was yet not valid.
- Added `NetworkController::is_ready()`
- Added `SendStatus::ResourceNotAvailable`
- Added `borrow()` method for `StoredNetEvent` to transform in into `NetEvent`.
- Added `is_local()` and `is_remote()` helpers to `ResourceId`.

## Release 0.13.3
- Fixed a bad internal assert.

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "message-io"
version = "0.13.3"
version = "0.14.0"
authors = ["lemunozm <lemunozm@gmail.com>"]
edition = "2018"
readme = "README.md"
Expand Down
30 changes: 17 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,24 @@ You could change the transport of your application in literally one line.
Add to your `Cargo.toml` (all transports included by default):
```toml
[dependencies]
message-io = "0.13"
message-io = "0.14"
```
If you **only** want to use a subset of the available transport battery,
you can select them by their associated features `tcp`, `udp`, and `websocket`.
For example, in order to include only *TCP* and *UDP*, add to your `Cargo.toml`:
```toml
[dependencies]
message-io = { version = "0.13", default-features = false, features = ["tcp", "udp"] }
message-io = { version = "0.14", default-features = false, features = ["tcp", "udp"] }
```

**Warning**: Version **0.12** comes with important API changes ([changelog](CHANGELOG.md))
in order to reach [zero-copy write/read](https://github.com/lemunozm/message-io/issues/61) goal.
If you find problems porting your application to this version,
check the examples folder, API docs, and don't hesitate to open an issue.

_**Read before update to 0.14**: Version **0.14** modifies the [`connect()`](https://docs.rs/message-io/latest/message_io/network/struct.NetworkController.html#method.connect) behaviour to perform a
[**non**-blocking connections](https://github.com/lemunozm/message-io/issues/61) instead.
It is recommended to use this non-blocking mode in order to get the
best scalability and performance in your application. If you need to perform
a similar blocking connection as before (version 0.13), you can call to [`connect_sync()`](https://docs.rs/message-io/latest/message_io/network/struct.NetworkController.html#method.connect_sync).
Note also that the previous `NetEvent::Connect` has been renamed to `NetEvent::Accepted`.
The current `NetEvent::Connect` is a new event to deal with the new non-blocking connections.
See [`NetEvent`](https://docs.rs/message-io/latest/message_io/network/enum.NetEvent.html) docs for more info._

### All in one: TCP, UDP and WebSocket echo server
The following example is the simplest server that reads messages from the clients and responds
Expand All @@ -118,7 +121,8 @@ fn main() {

// Read incoming network events.
listener.for_each(move |event| match event.network() {
NetEvent::Connected(_endpoint, _) => println!("Client connected"), // Tcp or Ws
NetEvent::Connected(_, _) => unreachable!(), // Used for explicit connections.
NetEvent::Accepted(_endpoint, _listener) => println!("Client connected"), // Tcp or Ws
NetEvent::Message(endpoint, data) => {
println!("Received: {}", String::from_utf8_lossy(data));
handler.network().send(endpoint, data);
Expand Down Expand Up @@ -149,8 +153,6 @@ fn main() {
// You can change the transport to Udp or Ws (WebSocket).
let (server, _) = handler.network().connect(Transport::FramedTcp, "127.0.0.1:3042").unwrap();

handler.signals().send(Signal::Greet); // Start sending

listener.for_each(move |event| match event {
NodeEvent::Signal(signal) => match signal {
Signal::Greet => { // computed every second
Expand All @@ -159,10 +161,12 @@ fn main() {
}
}
NodeEvent::Network(net_event) => match net_event {
NetEvent::Connected(_endpoint, _ok) => handler.signals().send(Signal::Greet),
NetEvent::Accepted(_, _) => unreachable!(), // Only generated by listening
NetEvent::Message(_endpoint, data) => {
println!("Received: {}", String::from_utf8_lossy(data));
},
_ => unreachable!(), // Connected and Disconnected are only generated by listening
NetEvent::Disconnected(_endpoint) => (),
}
});
}
Expand Down Expand Up @@ -199,8 +203,8 @@ If a transport protocol can be built in top of [`mio`](https://github.com/tokio-

1. Add your *adapter* file in `src/adapters/<my-transport-protocol>.rs` that implements the
traits that you find [here](https://docs.rs/message-io/latest/message_io/network/adapter/index.html).
It contains only 7 mandatory functions to implement (see the [template](src/adapters/template.rs)),
and it takes little more than 150 lines to implement an adapter.
It contains only 8 mandatory functions to implement (see the [template](src/adapters/template.rs)),
and it takes arround 150 lines to implement an adapter.

1. Add a new field in the `Transport` enum found in
[src/network/transport.rs](src/network/transport.rs) to register your new adapter.
Expand Down
6 changes: 3 additions & 3 deletions docs/performance_benchmarks.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ The following results are measured for the transmision of 1GB of data by localho
| Transport | native | message-io | efficiency |
|:----------:|:--------:|:----------:|:----------:|
| UDP | 7.1 GB/s | 5.9 GB/s | ~83% |
| TCP | 6.4 GB/s | 5.4 GB/s | ~84% |
| TCP | 6.4 GB/s | 5.2 GB/s | ~81% |
| Framed TCP | 5.5 GB/s | 5.0 GB/s | ~91% |
| Web Socket | 590 MB/s | 560 MB/s | ~95% |

Expand Down Expand Up @@ -71,9 +71,9 @@ The following results are measured by transferring 1-byte by localhost:
| UDP | 1.2 us | 2.1 us | + ~0.9 us |
| TCP | 2.6 us | 3.5 us | + ~0.9 us |
| Framed TCP | 5.2 us | 6.6 us | + ~1.4 us |
| Web Socket | 9.1 us | 11.2 us | + ~2.1 us |
| Web Socket | 9.1 us | 10.1 us | + ~1.0 us |

Depending on the transport used, `message-io` adds around `1-2us` of overhead per chunk of data transsmision.
Depending on the transport used, `message-io` adds around `1us` of overhead per chunk of data transsmision.
Because it is zero-copy at reading/writing messages,
this overhead is constant and independently of the size of that chunk of data.
The library only copies the pointer to the data.
Expand Down
50 changes: 22 additions & 28 deletions examples/distributed/discovery_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use message_io::node::{self, NodeHandler, NodeListener};

use std::net::{SocketAddr};
use std::collections::{HashMap};
use std::io::{self};

struct ParticipantInfo {
addr: SocketAddr,
Expand All @@ -13,34 +14,31 @@ struct ParticipantInfo {

pub struct DiscoveryServer {
handler: NodeHandler<()>,
listener: Option<NodeListener<()>>,
node_listener: Option<NodeListener<()>>,
participants: HashMap<String, ParticipantInfo>,
}

impl DiscoveryServer {
pub fn new() -> Option<DiscoveryServer> {
let (handler, listener) = node::split::<()>();
pub fn new() -> io::Result<DiscoveryServer> {
let (handler, node_listener) = node::split::<()>();

let listen_addr = "127.0.0.1:5000";
match handler.network().listen(Transport::FramedTcp, listen_addr) {
Ok(_) => {
println!("Discovery server running at {}", listen_addr);
Some(DiscoveryServer {
handler,
listener: Some(listener),
participants: HashMap::new(),
})
}
Err(_) => {
println!("Can not listen on {}", listen_addr);
None
}
}
handler.network().listen(Transport::FramedTcp, listen_addr)?;

println!("Discovery server running at {}", listen_addr);

Ok(DiscoveryServer {
handler,
node_listener: Some(node_listener),
participants: HashMap::new(),
})
}

pub fn run(mut self) {
let listener = self.listener.take().unwrap();
listener.for_each(move |event| match event.network() {
let node_listener = self.node_listener.take().unwrap();
node_listener.for_each(move |event| match event.network() {
NetEvent::Connected(_, _) => unreachable!(), // There is no connect() calls.
NetEvent::Accepted(_, _) => (), // All endpoint accepted
NetEvent::Message(endpoint, input_data) => {
let message: Message = bincode::deserialize(&input_data).unwrap();
match message {
Expand All @@ -53,19 +51,15 @@ impl DiscoveryServer {
_ => unreachable!(),
}
}
NetEvent::Connected(_, _) => (),
NetEvent::Disconnected(endpoint) => {
// Participant disconection without explict unregistration.
// We must remove from the registry too.
let participant_name = self.participants.iter().find_map(|(name, info)| {
match info.endpoint == endpoint {
true => Some(name.clone()),
false => None,
}
});
let participant =
self.participants.iter().find(|(_, info)| info.endpoint == endpoint);

if let Some(name) = participant_name {
self.unregister(&name)
if let Some(participant) = participant {
let name = participant.0.to_string();
self.unregister(&name);
}
}
});
Expand Down
8 changes: 4 additions & 4 deletions examples/distributed/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ pub fn main() {

match args.get(1).unwrap_or(&String::new()).as_ref() {
"discovery-server" => match discovery_server::DiscoveryServer::new() {
Some(discovery_server) => discovery_server.run(),
None => println!("Can not run the discovery server"),
Ok(discovery_server) => discovery_server.run(),
Err(err) => println!("Can not run the discovery server: {}", err),
},
"participant" => match args.get(2) {
Some(name) => match participant::Participant::new(name) {
Some(participant) => participant.run(),
None => println!("Can not run the participant"),
Ok(participant) => participant.run(),
Err(err) => println!("Can not run the participant: {}", err),
},
None => println!("The participant needs a 'name'"),
},
Expand Down
104 changes: 52 additions & 52 deletions examples/distributed/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,67 +5,78 @@ use message_io::node::{self, NodeHandler, NodeListener};

use std::net::{SocketAddr};
use std::collections::{HashMap};
use std::io::{self};

pub struct Participant {
handler: NodeHandler<()>,
listener: Option<NodeListener<()>>,
node_listener: Option<NodeListener<()>>,
name: String,
discovery_endpoint: Endpoint,
public_addr: SocketAddr,
known_participants: HashMap<String, Endpoint>, // Used only for free resources later
grettings: HashMap<Endpoint, (String, String)>,
}

impl Participant {
pub fn new(name: &str) -> Option<Participant> {
let (handler, listener) = node::split();
pub fn new(name: &str) -> io::Result<Participant> {
let (handler, node_listener) = node::split();

// A listener for any other participant that want to establish connection.
// 'addr' contains the port that the OS gives for us when we put a 0.
// A node_listener for any other participant that want to establish connection.
// Returned 'listen_addr' contains the port that the OS gives for us when we put a 0.
let listen_addr = "127.0.0.1:0";
let listen_addr = match handler.network().listen(Transport::Udp, listen_addr) {
Ok((_, addr)) => addr,
Err(_) => {
println!("Can not listen on {}", listen_addr);
return None
}
};
let (_, listen_addr) = handler.network().listen(Transport::FramedTcp, listen_addr)?;

let discovery_addr = "127.0.0.1:5000"; // Connection to the discovery server.
match handler.network().connect(Transport::FramedTcp, discovery_addr) {
Ok((endpoint, _)) => Some(Participant {
handler,
listener: Some(listener),
name: name.to_string(),
discovery_endpoint: endpoint,
public_addr: listen_addr,
known_participants: HashMap::new(),
}),
Err(_) => {
println!("Can not connect to the discovery server at {}", discovery_addr);
return None
}
}
let (endpoint, _) = handler.network().connect(Transport::FramedTcp, discovery_addr)?;

Ok(Participant {
handler,
node_listener: Some(node_listener),
name: name.to_string(),
discovery_endpoint: endpoint,
public_addr: listen_addr,
known_participants: HashMap::new(),
grettings: HashMap::new(),
})
}

pub fn run(mut self) {
// Register this participant into the discovery server
let message = Message::RegisterParticipant(self.name.clone(), self.public_addr);
let output_data = bincode::serialize(&message).unwrap();
self.handler.network().send(self.discovery_endpoint, &output_data);

let listener = self.listener.take().unwrap();
listener.for_each(move |event| match event.network() {
let node_listener = self.node_listener.take().unwrap();
node_listener.for_each(move |event| match event.network() {
NetEvent::Connected(endpoint, established) => {
if endpoint == self.discovery_endpoint {
if established {
let message =
Message::RegisterParticipant(self.name.clone(), self.public_addr);
let output_data = bincode::serialize(&message).unwrap();
self.handler.network().send(self.discovery_endpoint, &output_data);
}
else {
println!("Can not connect to the discovery server");
}
}
else {
// Participant endpoint
let (name, message) = self.grettings.remove(&endpoint).unwrap();
if established {
let gretings = format!("Hi '{}', {}", name, message);
let message = Message::Gretings(self.name.clone(), gretings);
let output_data = bincode::serialize(&message).unwrap();
self.handler.network().send(endpoint, &output_data);
self.known_participants.insert(name.clone(), endpoint);
}
}
}
NetEvent::Accepted(_, _) => (),
NetEvent::Message(_, input_data) => {
let message: Message = bincode::deserialize(&input_data).unwrap();
match message {
Message::ParticipantList(participants) => {
println!("Participant list received ({} participants)", participants.len());
for (name, addr) in participants {
self.discovered_participant(
&name,
addr,
"I see you in the participant list",
);
let text = "I see you in the participant list";
self.discovered_participant(&name, addr, text);
}
}
Message::ParticipantNotificationAdded(name, addr) => {
Expand All @@ -74,12 +85,6 @@ impl Participant {
}
Message::ParticipantNotificationRemoved(name) => {
println!("Removed participant '{}' from the network", name);

// Free network resource.
// It is only necessary because the connections among participants
// are done by UDP,
// UDP is not connection-oriented protocol, and the
// Connected/Disconnected events are not generated by UDP.
if let Some(endpoint) = self.known_participants.remove(&name) {
self.handler.network().remove(endpoint.resource_id());
}
Expand All @@ -90,7 +95,6 @@ impl Participant {
_ => unreachable!(),
}
}
NetEvent::Connected(_, _) => (),
NetEvent::Disconnected(endpoint) => {
if endpoint == self.discovery_endpoint {
println!("Discovery server disconnected, closing");
Expand All @@ -100,13 +104,9 @@ impl Participant {
});
}

fn discovered_participant(&mut self, name: &str, addr: SocketAddr, message: &str) {
if let Ok((endpoint, _)) = self.handler.network().connect(Transport::Udp, addr) {
let gretings = format!("Hi '{}', {}", name, message);
let message = Message::Gretings(self.name.clone(), gretings);
let output_data = bincode::serialize(&message).unwrap();
self.handler.network().send(endpoint, &output_data);
self.known_participants.insert(name.to_string(), endpoint);
}
fn discovered_participant(&mut self, name: &str, addr: SocketAddr, text: &str) {
let (endpoint, _) = self.handler.network().connect(Transport::FramedTcp, addr).unwrap();
// Save the necessary info to send the message when the connection is established.
self.grettings.insert(endpoint, (name.into(), text.into()));
}
}
Loading