Skip to content

Commit

Permalink
Added pending as mandatory
Browse files Browse the repository at this point in the history
  • Loading branch information
lemunozm committed May 8, 2021
1 parent ed16cca commit 8a29815
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 84 deletions.
8 changes: 6 additions & 2 deletions src/adapters/framed_tcp.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::network::adapter::{
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
ListeningInfo,
ListeningInfo, PendingStatus,
};
use crate::network::{RemoteAddr};
use crate::network::{RemoteAddr, Readiness};
use crate::util::encoding::{self, Decoder, MAX_ENCODED_SIZE};

use mio::net::{TcpListener, TcpStream};
Expand Down Expand Up @@ -111,6 +111,10 @@ impl Remote for RemoteResource {
}
}
}

fn pending(&self, _readiness: Readiness) -> PendingStatus {
super::tcp::check_stream_ready(&self.stream)
}
}

pub(crate) struct LocalResource {
Expand Down
20 changes: 18 additions & 2 deletions src/adapters/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::network::adapter::{
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
ListeningInfo,
ListeningInfo, PendingStatus,
};
use crate::network::{RemoteAddr};
use crate::network::{RemoteAddr, Readiness};

use mio::net::{TcpListener, TcpStream};
use mio::event::{Source};
Expand Down Expand Up @@ -97,6 +97,22 @@ impl Remote for RemoteResource {
}
}
}

fn pending(&self, _readiness: Readiness) -> PendingStatus {
check_stream_ready(&self.stream)
}
}

pub fn check_stream_ready(stream: &TcpStream) -> PendingStatus {
if let Ok(Some(_)) = stream.take_error() {
return PendingStatus::Disconnected
}
match stream.peer_addr() {
Ok(_) => PendingStatus::Ready,
Err(err) if err.kind() == io::ErrorKind::NotConnected => PendingStatus::Incomplete,
Err(err) if err.kind() == io::ErrorKind::InvalidInput => PendingStatus::Incomplete,
Err(_) => PendingStatus::Disconnected,
}
}

pub(crate) struct LocalResource {
Expand Down
22 changes: 13 additions & 9 deletions src/adapters/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

use crate::network::adapter::{
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
ListeningInfo,
ListeningInfo, PendingStatus,
};
use crate::network::{RemoteAddr};
use crate::network::{RemoteAddr, Readiness};

use mio::event::{Source};

Expand All @@ -20,39 +20,43 @@ impl Adapter for MyAdapter {
pub(crate) struct RemoteResource;
impl Resource for RemoteResource {
fn source(&mut self) -> &mut dyn Source {
todo!();
todo!()
}
}

impl Remote for RemoteResource {
fn connect(remote_addr: RemoteAddr) -> io::Result<ConnectionInfo<Self>> {
todo!();
todo!()
}

fn receive(&self, process_data: impl FnMut(&[u8])) -> ReadStatus {
todo!();
todo!()
}

fn send(&self, data: &[u8]) -> SendStatus {
todo!();
todo!()
}

fn pending(&self, _readiness: Readiness) -> PendingStatus {
todo!()
}
}

pub(crate) struct LocalResource;
impl Resource for LocalResource {
fn source(&mut self) -> &mut dyn Source {
todo!();
todo!()
}
}

impl Local for LocalResource {
type Remote = RemoteResource;

fn listen(addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
todo!();
todo!()
}

fn accept(&self, accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) {
todo!();
todo!()
}
}
8 changes: 6 additions & 2 deletions src/adapters/udp.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::network::adapter::{
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
ListeningInfo,
ListeningInfo, PendingStatus,
};
use crate::network::{RemoteAddr};
use crate::network::{RemoteAddr, Readiness};

use mio::net::{UdpSocket};
use mio::event::{Source};
Expand Down Expand Up @@ -74,6 +74,10 @@ impl Remote for RemoteResource {
fn send(&self, data: &[u8]) -> SendStatus {
send_packet(data, |data| self.socket.send(data))
}

fn pending(&self, _readiness: Readiness) -> PendingStatus {
PendingStatus::Ready
}
}

pub(crate) struct LocalResource {
Expand Down
96 changes: 58 additions & 38 deletions src/adapters/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl Adapter for WsAdapter {
}

enum PendingHandshake {
Connect(Url, TcpStream),
Client(MidHandshake<ClientHandshake<TcpStream>>),
Server(MidHandshake<ServerHandshake<TcpStream, NoCallback>>),
}
Expand All @@ -54,6 +55,7 @@ impl Resource for RemoteResource {
match self.state.get_mut().unwrap() {
RemoteState::WebSocket(web_socket) => web_socket.get_mut(),
RemoteState::Handshake(Some(handshake)) => match handshake {
PendingHandshake::Connect(_, stream) => stream,
PendingHandshake::Client(mid_handshake) => mid_handshake.get_mut().get_mut(),
PendingHandshake::Server(mid_handshake) => mid_handshake.get_mut().get_mut(),
},
Expand Down Expand Up @@ -84,19 +86,15 @@ impl Remote for RemoteResource {
let stream = TcpStream::connect(peer_addr)?;
let local_addr = stream.local_addr()?;

let remote = match ws_connect(url, stream) {
Ok((web_socket, _)) => RemoteState::WebSocket(web_socket),
Err(HandshakeError::Interrupted(mid_handshake)) => {
RemoteState::Handshake(Some(PendingHandshake::Client(mid_handshake)))
}
Err(HandshakeError::Failure(Error::Io(err))) => return Err(err),
Err(HandshakeError::Failure(err)) => {
panic!("WS connect handshake error: {}", err)
}
};
/*
*/

Ok(ConnectionInfo {
remote: RemoteResource { state: Mutex::new(remote) },
remote: RemoteResource {
state: Mutex::new(RemoteState::Handshake(Some(PendingHandshake::Connect(
url, stream,
)))),
},
local_addr,
peer_addr,
})
Expand Down Expand Up @@ -143,11 +141,58 @@ impl Remote for RemoteResource {
}
}

fn send(&self, data: &[u8]) -> SendStatus {
match self.state.lock().expect(OTHER_THREAD_ERR).deref_mut() {
RemoteState::WebSocket(web_socket) => {
let message = Message::Binary(data.to_vec());
let mut result = web_socket.write_message(message);
loop {
match result {
Ok(_) => break SendStatus::Sent,
Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => {
result = web_socket.write_pending();
}
Err(Error::Capacity(_)) => {
break SendStatus::MaxPacketSizeExceeded(data.len(), MAX_PAYLOAD_LEN)
}
Err(err) => {
log::error!("WS send error: {}", err);
break SendStatus::ResourceNotFound // should not happen
}
}
}
}
RemoteState::Handshake(_) => unreachable!(),
}
}

fn pending(&self, _readiness: Readiness) -> PendingStatus {
let mut state = self.state.lock().expect(OTHER_THREAD_ERR);
match state.deref_mut() {
RemoteState::WebSocket(_) => PendingStatus::Ready,
RemoteState::Handshake(pending) => match pending.take().unwrap() {
PendingHandshake::Connect(url, stream) => {
match super::tcp::check_stream_ready(&stream) {
PendingStatus::Ready => match ws_connect(url, stream) {
Ok((web_socket, _)) => {
*state = RemoteState::WebSocket(web_socket);
PendingStatus::Ready
}
Err(HandshakeError::Interrupted(mid_handshake)) => {
*pending = Some(PendingHandshake::Client(mid_handshake));
PendingStatus::Incomplete
}
Err(HandshakeError::Failure(Error::Io(_))) => {
PendingStatus::Disconnected
}
Err(HandshakeError::Failure(err)) => {
log::error!("WS connect handshake error: {}", err);
PendingStatus::Disconnected // should not happen
}
},
other => other,
}
}
PendingHandshake::Client(mid_handshake) => match mid_handshake.handshake() {
Ok((web_socket, _)) => {
*state = RemoteState::WebSocket(web_socket);
Expand All @@ -159,7 +204,7 @@ impl Remote for RemoteResource {
}
Err(HandshakeError::Failure(Error::Io(_))) => PendingStatus::Disconnected,
Err(HandshakeError::Failure(err)) => {
log::error!("WS connect handshake error: {}", err);
log::error!("WS client handshake error: {}", err);
PendingStatus::Disconnected // should not happen
}
},
Expand All @@ -174,38 +219,13 @@ impl Remote for RemoteResource {
}
Err(HandshakeError::Failure(Error::Io(_))) => PendingStatus::Disconnected,
Err(HandshakeError::Failure(err)) => {
log::error!("WS accept handshake error: {}", err);
log::error!("WS server handshake error: {}", err);
PendingStatus::Disconnected // should not happen
}
},
},
}
}

fn send(&self, data: &[u8]) -> SendStatus {
match self.state.lock().expect(OTHER_THREAD_ERR).deref_mut() {
RemoteState::WebSocket(web_socket) => {
let message = Message::Binary(data.to_vec());
let mut result = web_socket.write_message(message);
loop {
match result {
Ok(_) => break SendStatus::Sent,
Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => {
result = web_socket.write_pending();
}
Err(Error::Capacity(_)) => {
break SendStatus::MaxPacketSizeExceeded(data.len(), MAX_PAYLOAD_LEN)
}
Err(err) => {
log::error!("WS send error: {}", err);
break SendStatus::ResourceNotFound // should not happen
}
}
}
}
RemoteState::Handshake(_) => unreachable!(),
}
}
}

impl RemoteResource {
Expand Down
51 changes: 32 additions & 19 deletions src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ mod tests {

lazy_static::lazy_static! {
static ref TIMEOUT: Duration = Duration::from_millis(1000);
static ref LOCALHOST_CONN_TIMEOUT: Duration = Duration::from_millis(5000);
}

fn no_more_events(mut processor: NetworkProcessor) {
Expand All @@ -238,19 +239,22 @@ mod tests {

let mut was_connected = 0;
let mut was_accepted = 0;
for _ in 0..2 {
processor.process_poll_event(Some(*TIMEOUT), |net_event| match net_event {
NetEvent::Connected(net_endpoint, status) => {
assert!(status);
assert_eq!(endpoint, net_endpoint);
was_connected += 1;
}
NetEvent::Accepted(_, net_listener_id) => {
assert_eq!(listener_id, net_listener_id);
was_accepted += 1;
}
_ => unreachable!(),
});
for _ in 0..5 {
processor.process_poll_event(
Some(*TIMEOUT),
|net_event| match net_event {
NetEvent::Connected(net_endpoint, status) => {
assert!(status);
assert_eq!(endpoint, net_endpoint);
was_connected += 1;
}
NetEvent::Accepted(_, net_listener_id) => {
assert_eq!(listener_id, net_listener_id);
was_accepted += 1;
}
_ => unreachable!(),
},
);
}
assert_eq!(was_connected, 1);
assert_eq!(was_accepted, 1);
Expand All @@ -268,8 +272,8 @@ mod tests {
assert!(controller.is_ready(endpoint.resource_id()).unwrap());
});

for _ in 0..2 {
processor.process_poll_event(Some(*TIMEOUT), |_| ());
for _ in 0..5 {
processor.process_poll_event(Some(*LOCALHOST_CONN_TIMEOUT), |_| ());
}

thread.join();
Expand All @@ -278,10 +282,15 @@ mod tests {
#[test]
fn unreachable_connection() {
let (controller, mut processor) = self::split();
let (endpoint, _) = controller.connect(Transport::Tcp, "127.0.0.1:5555").unwrap();

// Ensure that addr is not using by other process because it takes some secs to be reusable.
let (listener_id, addr) = controller.listen(Transport::Tcp, "127.0.0.1:0").unwrap();
controller.remove(listener_id);

let (endpoint, _) = controller.connect(Transport::Tcp, addr).unwrap();

let mut was_disconnected = false;
processor.process_poll_event(Some(*TIMEOUT), |net_event| match net_event {
processor.process_poll_event(Some(*LOCALHOST_CONN_TIMEOUT), |net_event| match net_event {
NetEvent::Connected(net_endpoint, status) => {
assert!(!status);
assert_eq!(endpoint, net_endpoint);
Expand All @@ -298,12 +307,16 @@ mod tests {
fn unreachable_connection_sync() {
let (controller, mut processor) = self::split();

// Ensure that addr is not using by other process because it takes some secs to be reusable.
let (listener_id, addr) = controller.listen(Transport::Tcp, "127.0.0.1:0").unwrap();
controller.remove(listener_id);

let mut thread = NamespacedThread::spawn("test", move || {
let err = controller.connect_sync(Transport::Tcp, "127.0.0.1:5555").unwrap_err();
let err = controller.connect_sync(Transport::Tcp, addr).unwrap_err();
assert!(err.kind() == io::ErrorKind::ConnectionRefused);
});

processor.process_poll_event(Some(*TIMEOUT), |_| ());
processor.process_poll_event(Some(*LOCALHOST_CONN_TIMEOUT), |_| ());

thread.join();
}
Expand Down
Loading

0 comments on commit 8a29815

Please sign in to comment.