Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending sphinx packet independent of the receiver task #210

Merged
merged 1 commit into from
Apr 30, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions clients/desktop/src/client/mix_traffic.rs
Original file line number Diff line number Diff line change
@@ -57,13 +57,7 @@ impl<'a> MixTrafficController<'static> {
.await
{
Err(e) => error!("Failed to send sphinx packet to the gateway! - {:?}", e),
Ok(was_successful) if !was_successful => {
warn!("Sent sphinx packet to the gateway but it failed to get processed!")
}
Ok(was_successful) if was_successful => {
trace!("Successfully forwarded sphinx packet to the gateway!")
}
Ok(_) => unreachable!("to shut up the compiler because all patterns ARE covered"),
Ok(_) => trace!("We *might* have managed to forward sphinx packet to the gateway!"),
}
}

1 change: 1 addition & 0 deletions common/client-libs/gateway-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ edition = "2018"
# TODO: (for this and other crates), similarly to 'tokio', import only required "futures" modules rather than
# the entire crate
futures = "0.3"
log = "0.4"
tokio = { version = "0.2", features = ["macros", "rt-core", "stream", "sync", "time"] }
tokio-tungstenite = "0.10"

206 changes: 125 additions & 81 deletions common/client-libs/gateway-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::stream::{SplitSink, SplitStream};
use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
use gateway_requests::auth_token::{AuthToken, AuthTokenConversionError};
use gateway_requests::{BinaryRequest, ClientControlRequest, ServerResponse};
use log::*;
use nymsphinx::DestinationAddressBytes;
use std::convert::TryFrom;
use std::fmt::{self, Error, Formatter};
@@ -113,18 +115,101 @@ impl fmt::Display for GatewayClientError {
}
}

// We have ownership over sink half of the connection, but the stream is owned
// by some other task, however, we can notify it to get the stream back.
struct PartiallyDelegated<'a> {
sink_half: SplitSink<WsConn, Message>,
delegated_stream: (
BoxFuture<'a, Result<SplitStream<WsConn>, GatewayClientError>>,
Arc<Notify>,
),
}

impl<'a> PartiallyDelegated<'a> {
// TODO: this can be potentially bad as we have no direct restrictions of ensuring it's called
// within tokio runtime. Perhaps we should use the "old" way of passing explicit
// runtime handle to the constructor and using that instead?
fn split_and_listen_for_sphinx_packets(
conn: WsConn,
sphinx_packet_sender: SphinxPacketSender,
) -> Result<Self, GatewayClientError> {
// when called for, it NEEDS TO yield back the stream so that we could merge it and
// read control request responses.
let notify = Arc::new(Notify::new());
let notify_clone = Arc::clone(&notify);

let (sink, mut stream) = conn.split();

let sphinx_receiver_future = async move {
let mut should_return = false;
while !should_return {
tokio::select! {
_ = notify_clone.notified() => {
should_return = true;
}
msg = stream.next() => {
if msg.is_none() {
return Err(GatewayClientError::ConnectionAbruptlyClosed);
}
let msg = match msg.unwrap() {
Ok(msg) => msg,
Err(err) => {
return Err(GatewayClientError::NetworkError(err));
}
};
match msg {
Message::Binary(bin_msg) => {
// TODO: some batching mechanism to allow reading and sending more than
// one packet at the time, because the receiver can easily handle it
sphinx_packet_sender.unbounded_send(vec![bin_msg]).unwrap()
},
// I think that in the future we should perhaps have some sequence number system, i.e.
// so each request/reponse pair can be easily identified, so that if messages are
// not ordered (for some peculiar reason) we wouldn't lose anything.
// This would also require NOT discarding any text responses here.
Message::Text(_) => debug!("received a text message - probably a response to some previous query!"),
_ => (),
};
}
};
}
Ok(stream)
};

let spawned_boxed_task = tokio::spawn(sphinx_receiver_future)
.map(|join_handle| {
join_handle.expect("task must have not failed to finish its execution!")
})
.boxed();

Ok(PartiallyDelegated {
sink_half: sink,
delegated_stream: (spawned_boxed_task, notify),
})
}

// if we want to send a message and don't care about response, we can don't need to reunite the split,
// the sink itself is enough
async fn send_without_response(&mut self, msg: Message) -> Result<(), GatewayClientError> {
Ok(self.sink_half.send(msg).await?)
}

async fn merge(self) -> Result<WsConn, GatewayClientError> {
let (stream_fut, notify) = self.delegated_stream;
notify.notify();
let stream = stream_fut.await?;
// the error is thrown when trying to reunite sink and stream that did not originate
// from the same split which is impossible to happen here
Ok(self.sink_half.reunite(stream).unwrap())
}
}

// we can either have the stream itself or an option to re-obtain it
// by notifying the future owning it to finish the execution and awaiting the result
// which should be almost immediate (or an invalid state which should never, ever happen)
// TODO: perhaps restore the previous idea of Split(Stream, Sink) state to allow for
// sending messages without waiting for any responses and having no effect on rate of
// messages being pushed to us
enum SocketState<'a> {
Available(WsConn),
Delegated(
BoxFuture<'a, Result<WsConn, GatewayClientError>>,
Arc<Notify>,
),
PartiallyDelegated(PartiallyDelegated<'a>),
NotConnected,
Invalid,
}
@@ -137,16 +222,16 @@ impl<'a> SocketState<'a> {
}
}

fn is_delegated(&self) -> bool {
fn is_partially_delegated(&self) -> bool {
match self {
SocketState::Delegated(_, _) => true,
SocketState::PartiallyDelegated(_) => true,
_ => false,
}
}

fn is_established(&self) -> bool {
match self {
SocketState::Available(_) | SocketState::Delegated(_, _) => true,
SocketState::Available(_) | SocketState::PartiallyDelegated(_) => true,
_ => false,
}
}
@@ -249,25 +334,22 @@ where
res.expect("response value should have been written in one of the branches!. If you see this error, please report a bug!")
}

// If we want to send a message, we need to have a full control over the socket,
// If we want to send a message (with response), we need to have a full control over the socket,
// as we need to be able to write the request and read the subsequent response
async fn send_websocket_message(
&mut self,
msg: Message,
) -> Result<ServerResponse, GatewayClientError> {
let mut should_restart_sphinx_listener = false;
if self.connection.is_delegated() {
if self.connection.is_partially_delegated() {
self.recover_socket_connection().await?;
should_restart_sphinx_listener = true;
}

let conn = match self.connection {
SocketState::Available(ref mut conn) => conn,
SocketState::Delegated(_, _) => {
return Err(GatewayClientError::ConnectionInInvalidState)
}
SocketState::Invalid => return Err(GatewayClientError::ConnectionInInvalidState),
SocketState::NotConnected => return Err(GatewayClientError::ConnectionNotEstablished),
_ => return Err(GatewayClientError::ConnectionInInvalidState),
};
conn.send(msg).await?;
let response = self.read_control_response().await;
@@ -278,13 +360,18 @@ where
response
}

// next on TODO list:
// so that presumably we could increase our sending/receiving rate
async fn send_websocket_message_without_response(
&mut self,
msg: Message,
) -> Result<(), GatewayClientError> {
unimplemented!()
match self.connection {
SocketState::Available(ref mut conn) => Ok(conn.send(msg).await?),
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
partially_delegated.send_without_response(msg).await
}
SocketState::NotConnected => Err(GatewayClientError::ConnectionNotEstablished),
_ => Err(GatewayClientError::ConnectionInInvalidState),
}
}

pub async fn register(&mut self) -> Result<AuthToken, GatewayClientError> {
@@ -348,102 +435,59 @@ where
}
}

// TODO: make it optionally use `send_websocket_message_without_response`
// TODO: possibly make responses optional
pub async fn send_sphinx_packet(
&mut self,
address: SocketAddr,
packet: Vec<u8>,
) -> Result<bool, GatewayClientError> {
) -> Result<(), GatewayClientError> {
if !self.authenticated {
return Err(GatewayClientError::NotAuthenticated);
}
if !self.connection.is_established() {
return Err(GatewayClientError::ConnectionNotEstablished);
}
let msg = BinaryRequest::new_forward_request(address, packet).into();
match self.send_websocket_message(msg).await? {
ServerResponse::Send { status } => Ok(status),
ServerResponse::Error { message } => Err(GatewayClientError::GatewayError(message)),
_ => unreachable!(),
}
self.send_websocket_message_without_response(msg).await
}

async fn recover_socket_connection(&mut self) -> Result<(), GatewayClientError> {
if self.connection.is_available() {
return Ok(());
}
if !self.connection.is_delegated() {
if !self.connection.is_partially_delegated() {
return Err(GatewayClientError::ConnectionInInvalidState);
}

let (conn_fut, notify) = match std::mem::replace(&mut self.connection, SocketState::Invalid)
{
SocketState::Delegated(conn_fut, notify) => (conn_fut, notify),
let conn = match std::mem::replace(&mut self.connection, SocketState::Invalid) {
SocketState::PartiallyDelegated(delegated_conn) => delegated_conn.merge().await?,
_ => unreachable!(),
};

// tell the future to wrap up whatever it's doing now
notify.notify();
self.connection = SocketState::Available(conn_fut.await?);
self.connection = SocketState::Available(conn);
Ok(())
}

// TODO: this can be potentially bad as we have no direct restrictions of ensuring it's called
// within tokio runtime. Perhaps we should use the "old" way of passing explicit
// runtime handle to the constructor and using that instead?
fn start_listening_for_sphinx_packets(&mut self) -> Result<(), GatewayClientError> {
if self.connection.is_partially_delegated() {
return Ok(());
}
if !self.connection.is_available() {
return Err(GatewayClientError::ConnectionInInvalidState);
}

// when called for, it NEEDS TO yield back the stream so that we could merge it and
// read control request responses.
let notify = Arc::new(Notify::new());
let notify_clone = Arc::clone(&notify);

let mut extracted_connection =
let partially_delegated =
match std::mem::replace(&mut self.connection, SocketState::Invalid) {
SocketState::Available(conn) => conn,
_ => unreachable!(), // impossible due to initial check
SocketState::Available(conn) => {
PartiallyDelegated::split_and_listen_for_sphinx_packets(
conn,
self.sphinx_packet_sender.clone(),
)?
}
_ => unreachable!(),
};

let sphinx_packet_sender = self.sphinx_packet_sender.clone();
let sphinx_receiver_future = async move {
let mut should_return = false;
while !should_return {
tokio::select! {
_ = notify_clone.notified() => {
should_return = true;
}
msg = extracted_connection.next() => {
if msg.is_none() {
return Err(GatewayClientError::ConnectionAbruptlyClosed);
}
let msg = match msg.unwrap() {
Ok(msg) => msg,
Err(err) => {
return Err(GatewayClientError::NetworkError(err));
}
};
match msg {
Message::Binary(bin_msg) => {
sphinx_packet_sender.unbounded_send(vec![bin_msg]).unwrap()
}
_ => (),
};
}
};
}
Ok(extracted_connection)
};

let spawned_boxed_task = tokio::spawn(sphinx_receiver_future)
.map(|join_handle| {
join_handle.expect("task must have not failed to finish its execution!")
})
.boxed();

self.connection = SocketState::Delegated(spawned_boxed_task, notify);
self.connection = SocketState::PartiallyDelegated(partially_delegated);
Ok(())
}
}