diff --git a/clients/socks5/src/socks/client.rs b/clients/socks5/src/socks/client.rs index 9e782d30ac8..14812841770 100644 --- a/clients/socks5/src/socks/client.rs +++ b/clients/socks5/src/socks/client.rs @@ -10,15 +10,13 @@ use futures::channel::mpsc; use futures::task::{Context, Poll}; use log::*; use nymsphinx::addressing::clients::Recipient; -use ordered_buffer::{OrderedMessageBuffer, OrderedMessageSender}; use pin_project::pin_project; -use proxy_helpers::available_reader::AvailableReader; use proxy_helpers::connection_controller::{ ConnectionReceiver, ControllerCommand, ControllerSender, }; use proxy_helpers::proxy_runner::ProxyRunner; use rand::RngCore; -use socks5_requests::{ConnectionId, Request}; +use socks5_requests::{ConnectionId, RemoteAddress, Request}; use std::net::{Shutdown, SocketAddr}; use std::pin::Pin; use tokio::prelude::*; @@ -145,8 +143,7 @@ pub(crate) struct SocksClient { impl Drop for SocksClient { fn drop(&mut self) { // TODO: decrease to debug/trace - info!("socksclient is going out of scope - the stream is getting dropped!"); - info!("Connection {} is getting closed", self.connection_id); + debug!("Connection {} is getting closed", self.connection_id); self.controller_sender .unbounded_send(ControllerCommand::Remove(self.connection_id)) .unwrap(); @@ -222,26 +219,38 @@ impl SocksClient { Ok(()) } - async fn send_request_to_mixnet(&mut self, request: Request) { - self.send_to_mixnet(request.into_bytes()).await; + async fn send_connect_to_mixnet(&mut self, remote_address: RemoteAddress) { + let req = Request::new_connect( + self.connection_id, + remote_address.clone(), + self.self_address.clone(), + ); + + let input_message = + InputMessage::new_fresh(self.service_provider.clone(), req.into_bytes(), false); + self.input_sender.unbounded_send(input_message).unwrap(); } - async fn run_proxy( - &mut self, - conn_receiver: ConnectionReceiver, - message_sender: OrderedMessageSender, - ) { + async fn run_proxy(&mut self, conn_receiver: ConnectionReceiver, remote_proxy_target: String) { + self.send_connect_to_mixnet(remote_proxy_target.clone()) + .await; + let stream = self.stream.run_proxy(); + let local_stream_remote = stream + .peer_addr() + .expect("failed to extract peer address") + .to_string(); let connection_id = self.connection_id; let input_sender = self.input_sender.clone(); let recipient = self.service_provider.clone(); let (stream, _) = ProxyRunner::new( stream, + local_stream_remote, + remote_proxy_target, conn_receiver, input_sender, connection_id, - message_sender, ) .run(move |conn_id, read_data, socket_closed| { let provider_request = Request::new_send(conn_id, read_data, socket_closed); @@ -262,14 +271,9 @@ impl SocksClient { // setup for receiving from the mixnet let (mix_sender, mix_receiver) = mpsc::unbounded(); - let ordered_buffer = OrderedMessageBuffer::new(); self.controller_sender - .unbounded_send(ControllerCommand::Insert( - self.connection_id, - mix_sender, - ordered_buffer, - )) + .unbounded_send(ControllerCommand::Insert(self.connection_id, mix_sender)) .unwrap(); match request.command { @@ -278,26 +282,16 @@ impl SocksClient { trace!("Connecting to: {:?}", remote_address.clone()); self.acknowledge_socks5().await; - let mut message_sender = OrderedMessageSender::new(); - // 'connect' needs to be handled manually due to different structure, - // but still needs to have correct sequence number on it! - - // read whatever we can - let available_reader = AvailableReader::new(&mut self.stream); - let (request_data_bytes, _) = available_reader.await?; - let ordered_message = message_sender.wrap_message(request_data_bytes.to_vec()); - - let socks_provider_request = Request::new_connect( - self.connection_id, + info!( + "Starting proxy for {} (id: {})", remote_address.clone(), - ordered_message, - self.self_address.clone(), + self.connection_id + ); + self.run_proxy(mix_receiver, remote_address.clone()).await; + info!( + "Proxy for {} is finished (id: {})", + remote_address, self.connection_id ); - - self.send_request_to_mixnet(socks_provider_request).await; - info!("Starting proxy for {}", remote_address.clone()); - self.run_proxy(mix_receiver, message_sender).await; - info!("Proxy for {} is finished", remote_address); } SocksCommand::Bind => unimplemented!(), // not handled @@ -307,16 +301,6 @@ impl SocksClient { Ok(()) } - /// Send serialized Socks5 request bytes to the mixnet. The request stream - /// will be chunked up into a series of one or more Sphinx packets and - /// reassembled at the destination service provider at the other end, then - /// sent onwards anonymously. - async fn send_to_mixnet(&self, request_bytes: Vec) { - let input_message = - InputMessage::new_fresh(self.service_provider.clone(), request_bytes, false); - self.input_sender.unbounded_send(input_message).unwrap(); - } - /// Writes a Socks5 header back to the requesting client's TCP stream, /// basically saying "I acknowledge your request and am dealing with it". async fn acknowledge_socks5(&mut self) { diff --git a/common/nymsphinx/params/src/packet_sizes.rs b/common/nymsphinx/params/src/packet_sizes.rs index f74a4378b07..9302662f4ee 100644 --- a/common/nymsphinx/params/src/packet_sizes.rs +++ b/common/nymsphinx/params/src/packet_sizes.rs @@ -34,11 +34,14 @@ pub struct InvalidPacketSize; #[repr(u8)] #[derive(Clone, Copy, Debug, PartialEq)] pub enum PacketSize { - RegularPacket = 1, // for example instant messaging use case - ACKPacket = 2, + RegularPacket = 1, + // for sending SURB-ACKs - ExtendedPacket = 3, // for example for streaming fast and furious in uncompressed 10bit 4K HDR quality + ACKPacket = 2, + + // for example for streaming fast and furious in uncompressed 10bit 4K HDR quality + ExtendedPacket = 3, } impl TryFrom for PacketSize { diff --git a/common/socks5/proxy-helpers/src/available_reader.rs b/common/socks5/proxy-helpers/src/available_reader.rs index a08ca01e1d4..2fed57602e1 100644 --- a/common/socks5/proxy-helpers/src/available_reader.rs +++ b/common/socks5/proxy-helpers/src/available_reader.rs @@ -20,13 +20,18 @@ use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::AsyncRead; +use tokio::stream::Stream; +use tokio::time::{delay_for, Delay, Duration, Instant}; + +const MAX_READ_AMOUNT: usize = 500 * 1000; // 0.5MB +const GRACE_DURATION: Duration = Duration::from_millis(1); pub struct AvailableReader<'a, R: AsyncRead + Unpin> { - // TODO: come up with a way to avoid using RefCell (not sure if possible though) + // TODO: come up with a way to avoid using RefCell (not sure if possible though due to having to + // mutably borrow both inner reader and buffer at the same time) buf: RefCell, inner: RefCell<&'a mut R>, - // idea for the future: tiny delay that allows to prevent unnecessary extra fragmentation - // grace_period: Option, + grace_period: Option, } impl<'a, R> AvailableReader<'a, R> @@ -39,20 +44,15 @@ where AvailableReader { buf: RefCell::new(BytesMut::with_capacity(Self::BUF_INCREMENT)), inner: RefCell::new(reader), - // grace_period: None, + grace_period: Some(delay_for(GRACE_DURATION)), } } } -// TODO: change this guy to a stream? Seems waaay more appropriate considering -// we're getting new Bytes items regularly rather than calling it once. - -impl<'a, R: AsyncRead + Unpin> Future for AvailableReader<'a, R> { - type Output = io::Result<(Bytes, bool)>; +impl<'a, R: AsyncRead + Unpin> Stream for AvailableReader<'a, R> { + type Item = io::Result; - // this SHOULD stay mutable, because we rely on runtime checks inside the method - #[allow(unused_mut)] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // if we have no space in buffer left - expand it if !self.buf.borrow().has_remaining_mut() { self.buf.borrow_mut().reserve(Self::BUF_INCREMENT); @@ -68,19 +68,43 @@ impl<'a, R: AsyncRead + Unpin> Future for AvailableReader<'a, R> { if self.buf.borrow().is_empty() { Poll::Pending } else { + // if exists - check grace period + if let Some(grace_period) = self.grace_period.as_mut() { + if Pin::new(grace_period).poll(cx).is_pending() { + return Poll::Pending; + } + } + let buf = self.buf.replace(BytesMut::new()); - Poll::Ready(Ok((buf.freeze(), false))) + Poll::Ready(Some(Ok(buf.freeze()))) } } - Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), Poll::Ready(Ok(n)) => { + // if exists - reset grace period + if let Some(grace_period) = self.grace_period.as_mut() { + let now = Instant::now(); + grace_period.reset(now + GRACE_DURATION); + } + // if we read a non-0 amount, we're not done yet! if n == 0 { let buf = self.buf.replace(BytesMut::new()); - Poll::Ready(Ok((buf.freeze(), true))) + if buf.len() > 0 { + Poll::Ready(Some(Ok(buf.freeze()))) + } else { + Poll::Ready(None) + } } else { // tell the waker we should be polled again! cx.waker().wake_by_ref(); + + // if we reached our maximum amount - return it + let read_bytes_len = self.buf.borrow().len(); + if read_bytes_len >= MAX_READ_AMOUNT { + let buf = self.buf.replace(BytesMut::new()); + return Poll::Ready(Some(Ok(buf.freeze()))); + } Poll::Pending } } @@ -91,19 +115,22 @@ impl<'a, R: AsyncRead + Unpin> Future for AvailableReader<'a, R> { #[cfg(test)] mod tests { use super::*; + use futures::poll; use std::io::Cursor; use std::time::Duration; + use tokio::stream::StreamExt; + use tokio_test::assert_pending; #[tokio::test] async fn available_reader_reads_all_available_data_smaller_than_its_buf() { let data = vec![42u8; 100]; let mut reader = Cursor::new(data.clone()); - let available_reader = AvailableReader::new(&mut reader); - let (read_data, is_finished) = available_reader.await.unwrap(); + let mut available_reader = AvailableReader::new(&mut reader); + let read_data = available_reader.next().await.unwrap().unwrap(); assert_eq!(read_data, data); - assert!(is_finished) + assert!(available_reader.next().await.is_none()) } #[tokio::test] @@ -111,11 +138,11 @@ mod tests { let data = vec![42u8; AvailableReader::>>::BUF_INCREMENT + 100]; let mut reader = Cursor::new(data.clone()); - let available_reader = AvailableReader::new(&mut reader); - let (read_data, is_finished) = available_reader.await.unwrap(); + let mut available_reader = AvailableReader::new(&mut reader); + let read_data = available_reader.next().await.unwrap().unwrap(); assert_eq!(read_data, data); - assert!(is_finished) + assert!(available_reader.next().await.is_none()) } #[tokio::test] @@ -129,11 +156,11 @@ mod tests { .read(&second_data_chunk) .build(); - let available_reader = AvailableReader::new(&mut reader_mock); - let (read_data, is_finished) = available_reader.await.unwrap(); + let mut available_reader = AvailableReader::new(&mut reader_mock); + let read_data = available_reader.next().await.unwrap().unwrap(); assert_eq!(read_data, first_data_chunk); - assert!(!is_finished) + assert_pending!(poll!(available_reader.next())); } #[tokio::test] @@ -145,10 +172,40 @@ mod tests { .read(&data) .build(); - let available_reader = AvailableReader::new(&mut reader_mock); - let (read_data, is_finished) = available_reader.await.unwrap(); + let mut available_reader = AvailableReader::new(&mut reader_mock); + let read_data = available_reader.next().await.unwrap().unwrap(); assert_eq!(read_data, data); - assert!(is_finished) + assert!(available_reader.next().await.is_none()) } + + // perhaps the issue of tokio io builder will be resolved in tokio 0.3? + // #[tokio::test] + // async fn available_reader_will_wait_for_more_data_if_its_within_grace_period() { + // let first_data_chunk = vec![42u8; 100]; + // let second_data_chunk = vec![123u8; 100]; + // + // let combined_chunks: Vec<_> = first_data_chunk + // .iter() + // .cloned() + // .chain(second_data_chunk.iter().cloned()) + // .collect(); + // + // let mut reader_mock = tokio_test::io::Builder::new() + // .read(&first_data_chunk) + // .wait(Duration::from_millis(2)) + // .read(&second_data_chunk) + // .build(); + // + // let mut available_reader = AvailableReader { + // buf: RefCell::new(BytesMut::with_capacity(4096)), + // inner: RefCell::new(&mut reader_mock), + // grace_period: Some(delay_for(Duration::from_millis(5))), + // }; + // + // let read_data = available_reader.next().await.unwrap().unwrap(); + // + // assert_eq!(read_data, combined_chunks); + // assert!(available_reader.next().await.is_none()) + // } } diff --git a/common/socks5/proxy-helpers/src/connection_controller.rs b/common/socks5/proxy-helpers/src/connection_controller.rs index 558197158e2..eaf2cada759 100644 --- a/common/socks5/proxy-helpers/src/connection_controller.rs +++ b/common/socks5/proxy-helpers/src/connection_controller.rs @@ -41,7 +41,7 @@ pub type ControllerSender = mpsc::UnboundedSender; pub type ControllerReceiver = mpsc::UnboundedReceiver; pub enum ControllerCommand { - Insert(ConnectionId, ConnectionSender, OrderedMessageBuffer), + Insert(ConnectionId, ConnectionSender), Remove(ConnectionId), Send(ConnectionId, Vec, bool), } @@ -75,7 +75,16 @@ pub struct Controller { active_connections: HashMap, receiver: ControllerReceiver, + // TODO: this will need to be either completely removed (from code) or periodically cleaned + // to avoid memory issues recently_closed: HashSet, + + // TODO: this can potentially be abused to ddos and kill provider. Not sure at this point + // how to handle it more gracefully + + // buffer for messages received before connection was established due to mixnet being able to + // un-order messages. Note we don't ever expect to have more than 1-2 messages per connection here + pending_messages: HashMap, bool)>>, } impl Controller { @@ -86,27 +95,28 @@ impl Controller { active_connections: HashMap::new(), receiver, recently_closed: HashSet::new(), + pending_messages: HashMap::new(), }, sender, ) } - fn insert_connection( - &mut self, - conn_id: ConnectionId, - connection_sender: ConnectionSender, - ordered_buffer: OrderedMessageBuffer, - ) { + fn insert_connection(&mut self, conn_id: ConnectionId, connection_sender: ConnectionSender) { let active_connection = ActiveConnection { is_closed: false, connection_sender: Some(connection_sender), - ordered_buffer, + ordered_buffer: OrderedMessageBuffer::new(), }; if let Some(_active_conn) = self.active_connections.insert(conn_id, active_connection) { - // we received 'Send' before 'connect' - drain what we currently accumulated into the fresh - // buffer as this new one is going to be used for the connection - // TODO: let's only do this if it's actually EVER fired - error!("Presumably received 'Send' before 'Connect'!") + error!("Received a duplicate 'Connect'!") + } else { + // check if there were any pending messages + if let Some(pending) = self.pending_messages.remove(&conn_id) { + debug!("There were some pending messages for {}", conn_id); + for (payload, is_closed) in pending { + self.send_to_connection(conn_id, payload, is_closed) + } + } } } @@ -129,7 +139,7 @@ impl Controller { active_connection.is_closed |= is_closed; if let Some(payload) = active_connection.read_from_buf() { - active_connection + if let Err(err) = active_connection .connection_sender .as_mut() .unwrap() @@ -137,14 +147,28 @@ impl Controller { payload, socket_closed: active_connection.is_closed, }) - .unwrap() + { + error!("WTF IS THIS: {:?}", err); + } + + // TODO: ABOVE UNWRAP CAUSED A CRASH IN A NORMAL USE!!!! + // TODO: + // TODO: surprisingly it only happened on socks client, never on nSP + // TODO: + // TODO: + // TODO: + // TODO: } } else { - error!("no connection exists with id: {:?}", conn_id); - warn!("'lost' bytes: {}", payload.len()); if !self.recently_closed.contains(&conn_id) { - // TODO: let's only do this if it's actually EVER fired - error!("Presumably received 'Send' before 'Connect'! - First") + warn!("Received a 'Send' before 'Connect' - going to buffer the data"); + let pending = self.pending_messages.entry(conn_id).or_insert(Vec::new()); + pending.push((payload, is_closed)); + } else { + error!( + "Tried to write to closed connection ({} bytes were 'lost)", + payload.len() + ) } } } @@ -155,8 +179,8 @@ impl Controller { ControllerCommand::Send(conn_id, data, is_closed) => { self.send_to_connection(conn_id, data, is_closed) } - ControllerCommand::Insert(conn_id, sender, ordered_buffer) => { - self.insert_connection(conn_id, sender, ordered_buffer) + ControllerCommand::Insert(conn_id, sender) => { + self.insert_connection(conn_id, sender) } ControllerCommand::Remove(conn_id) => self.remove_connection(conn_id), } diff --git a/common/socks5/proxy-helpers/src/lib.rs b/common/socks5/proxy-helpers/src/lib.rs index 2ab74886113..01413220473 100644 --- a/common/socks5/proxy-helpers/src/lib.rs +++ b/common/socks5/proxy-helpers/src/lib.rs @@ -15,4 +15,3 @@ pub mod available_reader; pub mod connection_controller; pub mod proxy_runner; -// pub mod read_delay_loop; diff --git a/common/socks5/proxy-helpers/src/proxy_runner.rs b/common/socks5/proxy-helpers/src/proxy_runner.rs index f4a583cb910..c951eaf2999 100644 --- a/common/socks5/proxy-helpers/src/proxy_runner.rs +++ b/common/socks5/proxy-helpers/src/proxy_runner.rs @@ -18,12 +18,10 @@ use futures::channel::mpsc; use log::*; use ordered_buffer::OrderedMessageSender; use socks5_requests::ConnectionId; -use std::sync::Arc; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::TcpStream; use tokio::prelude::*; use tokio::stream::StreamExt; -use tokio::sync::Notify; #[derive(Debug)] pub struct ProxyMessage { @@ -51,10 +49,9 @@ pub struct ProxyRunner { mix_sender: MixProxySender, socket: Option, + local_destination_address: String, + remote_source_address: String, connection_id: ConnectionId, - - // required for in-order delivery - message_sender: Option, } impl ProxyRunner @@ -63,118 +60,107 @@ where { pub fn new( socket: TcpStream, + local_destination_address: String, // addresses are provided for better logging + remote_source_address: String, mix_receiver: ConnectionReceiver, mix_sender: MixProxySender, connection_id: ConnectionId, - message_sender: OrderedMessageSender, ) -> Self { ProxyRunner { mix_receiver: Some(mix_receiver), mix_sender, socket: Some(socket), + local_destination_address, + remote_source_address, connection_id, - message_sender: Some(message_sender), } } async fn run_inbound( mut reader: OwnedReadHalf, - notify_closed: Arc, + local_destination_address: String, // addresses are provided for better logging + remote_source_address: String, connection_id: ConnectionId, mix_sender: MixProxySender, adapter_fn: F, - mut message_sender: OrderedMessageSender, - ) -> (OwnedReadHalf, OrderedMessageSender) + ) -> OwnedReadHalf where F: Fn(ConnectionId, Vec, bool) -> S + Send + 'static, { let mut available_reader = AvailableReader::new(&mut reader); + let mut message_sender = OrderedMessageSender::new(); loop { - tokio::select! { - _ = notify_closed.notified() => { - // the remote socket is closed, so there's no point - // in reading anything more because we won't be able to write to remote anyway! - break - } - // try to read from local socket and push everything to mixnet to the remote - reading_result = &mut available_reader => { - let (read_data, is_finished) = match reading_result { - Ok(data) => data, - Err(err) => { - error!("failed to read request from the socket - {}", err); - break; - } - }; - - info!( - "Going to send {} bytes via mixnet to remote {}. Is local closed: {}", - read_data.len(), - connection_id, - is_finished - ); - - // if we're sending through the mixnet increase the sequence number... - let ordered_msg = message_sender.wrap_message(read_data.to_vec()).into_bytes(); - mix_sender.unbounded_send(adapter_fn(connection_id, ordered_msg, is_finished)).unwrap(); - - if is_finished { - // technically we already informed it when we sent the message to mixnet above - info!("The local socket is closed - won't receive any more data. Informing remote about that..."); - // no point in reading from mixnet if connection is closed! - notify_closed.notify(); + // try to read from local socket and push everything to mixnet to the remote + let (read_data, is_finished) = match available_reader.next().await { + Some(data) => match data { + Ok(data) => (data, false), + Err(err) => { + error!(target: &*format!("({}) socks5 inbound", connection_id),"failed to read request from the socket - {}", err); break; - } else { - // delay_for(Duration::from_millis(2)).await; } - } + }, + None => (Default::default(), true), + }; + + debug!( + target: &*format!("({}) socks5 inbound", connection_id), + "[{} bytes]\t{} → local → mixnet → remote → {}. Local closed: {}", + read_data.len(), + local_destination_address, + remote_source_address, + is_finished + ); + + // if we're sending through the mixnet increase the sequence number... + let ordered_msg = message_sender.wrap_message(read_data.to_vec()).into_bytes(); + mix_sender + .unbounded_send(adapter_fn(connection_id, ordered_msg, is_finished)) + .unwrap(); + + if is_finished { + // technically we already informed it when we sent the message to mixnet above + debug!(target: &*format!("({}) socks5 inbound", connection_id), "The local socket is closed - won't receive any more data. Informing remote about that..."); + break; } } - (reader, message_sender) + reader } async fn run_outbound( mut writer: OwnedWriteHalf, - notify_closed: Arc, + local_destination_address: String, // addresses are provided for better logging + remote_source_address: String, mut mix_receiver: ConnectionReceiver, connection_id: ConnectionId, ) -> (OwnedWriteHalf, ConnectionReceiver) { loop { - tokio::select! { - _ = notify_closed.notified() => { - // no need to read from mixnet as we won't be able to send to socket - // anyway - break - } - mix_data = mix_receiver.next() => { - if mix_data.is_none() { - warn!("mix receiver is none so we already got removed somewhere. This isn't really a warning, but shouldn't happen to begin with, so please say if you see this message"); - // we already got closed - // not sure if we HAVE TO notify the other task, but might as well - notify_closed.notify(); - break - } - let connection_message = mix_data.unwrap(); - - info!( - "Going to write {} bytes received from mixnet to connection {}. Is remote closed: {}", - connection_message.payload.len(), - connection_id, - connection_message.socket_closed - ); - - if let Err(err) = writer.write_all(&connection_message.payload).await { - // the other half is probably going to blow up too (if not, this task also needs to notify the other one!!) - error!("failed to write response back to the socket - {}", err); - break; - } - if connection_message.socket_closed { - info!("Remote socket got closed - closing the local socket too"); - notify_closed.notify(); - break - } - } + let mix_data = mix_receiver.next().await; + if mix_data.is_none() { + warn!("mix receiver is none so we already got removed somewhere. This isn't really a warning, but shouldn't happen to begin with, so please say if you see this message"); + break; + } + let connection_message = mix_data.unwrap(); + + debug!( + target: &*format!("({}) socks5 outbound", connection_id), + "[{} bytes]\t{} → remote → mixnet → local → {} Remote closed: {}", + connection_message.payload.len(), + remote_source_address, + local_destination_address, + connection_message.socket_closed + ); + + if let Err(err) = writer.write_all(&connection_message.payload).await { + // the other half is probably going to blow up too (if not, this task also needs to notify the other one!!) + error!(target: &*format!("({}) socks5 outbound", connection_id), "failed to write response back to the socket - {}", err); + break; + } + if connection_message.socket_closed { + debug!(target: &*format!("({}) socks5 outbound", connection_id), + "Remote socket got closed - closing the local socket too"); + break; } } @@ -187,24 +173,22 @@ where where F: Fn(ConnectionId, Vec, bool) -> S + Send + 'static, { - let notify_closed = Arc::new(Notify::new()); - let notify_clone = Arc::clone(¬ify_closed); - let (read_half, write_half) = self.socket.take().unwrap().into_split(); // should run until either inbound closes or is notified from outbound let inbound_future = Self::run_inbound( read_half, - notify_closed, + self.local_destination_address.clone(), + self.remote_source_address.clone(), self.connection_id, self.mix_sender.clone(), adapter_fn, - self.message_sender.take().unwrap(), ); let outbound_future = Self::run_outbound( write_half, - notify_clone, + self.local_destination_address.clone(), + self.remote_source_address.clone(), self.mix_receiver.take().unwrap(), self.connection_id, ); @@ -222,12 +206,11 @@ where panic!("TODO: some future error?") } - let (read_half, message_sender) = inbound_result.unwrap(); + let read_half = inbound_result.unwrap(); let (write_half, mix_receiver) = outbound_result.unwrap(); self.socket = Some(write_half.reunite(read_half).unwrap()); self.mix_receiver = Some(mix_receiver); - self.message_sender = Some(message_sender); self } diff --git a/common/socks5/proxy-helpers/src/read_delay_loop.rs b/common/socks5/proxy-helpers/src/read_delay_loop.rs deleted file mode 100644 index 647cf2a2a83..00000000000 --- a/common/socks5/proxy-helpers/src/read_delay_loop.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2020 Nym Technologies SA -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// The only reason this exists is to remove duplicate code from -// nym\service-providers\simple-socks5\src\connection.rs::try_read_response_data -// and -// nym\clients\socks5\src\socks\request.rs::try_read_request_data - -// once those use sequence numbers, this code should be removed!! - -use crate::available_reader::AvailableReader; -use std::io; -use tokio::io::AsyncRead; -use tokio::time::Duration; - -// It returns data alognside information whether it timed out while reading from the socket -pub async fn try_read_data( - timeout: Duration, - mut reader: R, - address: &str, -) -> io::Result<(Vec, bool)> -where - R: AsyncRead + Unpin, -{ - let mut data = Vec::new(); - let mut delay = tokio::time::delay_for(timeout); - - let mut available_reader = AvailableReader::new(&mut reader); - - loop { - tokio::select! { - _ = &mut delay => { - if data.len() > 0 { - println!("Timed out. returning {} bytes received from {}", data.len(), address); - } - return Ok((data, true)) // we return all response data on timeout - } - read_data = &mut available_reader => { - match read_data { - Err(err) => { - return Err(err); - } - Ok(bytes) => { - if bytes.len() == 0 { - println!("Connection is closed! Returning {} bytes received from {}", data.len(), address); - // we return all we managed to read because - // we know no more stuff is coming - return Ok((data, false)) - } - let now = tokio::time::Instant::now(); - let next = now + timeout; - delay.reset(next); - println!("Received {} bytes from {}. Waiting for more...", bytes.len(), address); - - // temporarily this is fine... (this loop will go away anyway) - data.extend_from_slice(&bytes) - } - } - } - } - } -} diff --git a/common/socks5/requests/src/request.rs b/common/socks5/requests/src/request.rs index c4fe1979cc5..74b93139a37 100644 --- a/common/socks5/requests/src/request.rs +++ b/common/socks5/requests/src/request.rs @@ -1,5 +1,4 @@ use nymsphinx_addressing::clients::{Recipient, RecipientFormattingError}; -use ordered_buffer::OrderedMessage; use std::convert::TryFrom; use std::fmt::{self}; @@ -28,7 +27,7 @@ impl fmt::Display for RequestError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result { match self { RequestError::AddressLengthTooShort => { - write!(f, "not enough bytes to recover the lenght of the address") + write!(f, "not enough bytes to recover the length of the address") } RequestError::AddressTooShort => write!(f, "not enough bytes to recover the address"), RequestError::ConnectionIdTooShort => { @@ -77,7 +76,6 @@ pub enum Request { Connect { conn_id: ConnectionId, remote_addr: RemoteAddress, - message: OrderedMessage, return_address: Recipient, }, @@ -90,13 +88,11 @@ impl Request { pub fn new_connect( conn_id: ConnectionId, remote_addr: RemoteAddress, - message: OrderedMessage, return_address: Recipient, ) -> Request { Request::Connect { conn_id, remote_addr, - message, return_address, } } @@ -153,7 +149,8 @@ impl Request { // just a temporary reference to mid-slice for ease of use let recipient_data_bytes = &connect_request_bytes[address_end..]; - if recipient_data_bytes.len() < Recipient::LEN { + + if recipient_data_bytes.len() != Recipient::LEN { return Err(RequestError::ReturnAddressTooShort); } @@ -162,14 +159,9 @@ impl Request { let return_address = Recipient::try_from_bytes(return_bytes) .map_err(|err| RequestError::MalformedReturnAddress(err))?; - let message = - OrderedMessage::try_from_bytes(recipient_data_bytes[Recipient::LEN..].to_vec()) - .unwrap(); - Ok(Request::Connect { conn_id: connection_id, remote_addr: remote_address, - message, return_address, }) } @@ -187,11 +179,10 @@ impl Request { /// service provider which will make the request. pub fn into_bytes(self) -> Vec { match self { - // connect is: CONN_FLAG || CONN_ID || REMOTE_LEN || REMOTE || RETURN || DATA + // connect is: CONN_FLAG || CONN_ID || REMOTE_LEN || REMOTE || RETURN Request::Connect { conn_id, remote_addr, - message, return_address, } => { let remote_address_bytes = remote_addr.into_bytes(); @@ -202,7 +193,6 @@ impl Request { .chain(remote_address_bytes_len.to_be_bytes().iter().cloned()) .chain(remote_address_bytes.into_iter()) .chain(return_address.to_bytes().iter().cloned()) - .chain(message.into_bytes()) .collect() } Request::Send(conn_id, data, local_closed) => std::iter::once(RequestFlag::Send as u8) @@ -382,7 +372,6 @@ mod request_deserialization_tests { let request_bytes: Vec<_> = request_bytes .into_iter() .chain(recipient_bytes.iter().cloned()) - .chain(vec![0, 0, 0, 0, 0, 0, 0, 1]) // message index 1 .collect(); let request = Request::try_from_bytes(&request_bytes).unwrap(); @@ -390,7 +379,6 @@ mod request_deserialization_tests { Request::Connect { conn_id, remote_addr, - message, return_address, } => { assert_eq!("foo.com".to_string(), remote_addr); @@ -399,7 +387,6 @@ mod request_deserialization_tests { return_address.to_bytes().to_vec(), recipient.to_bytes().to_vec() ); - assert_eq!(Vec::::new(), message.data); } _ => unreachable!(), } @@ -436,8 +423,6 @@ mod request_deserialization_tests { let request_bytes: Vec<_> = request_bytes .into_iter() .chain(recipient_bytes.iter().cloned()) - .chain(vec![0, 0, 0, 0, 0, 0, 0, 1]) // ordered message sequence number 1 - .chain(vec![255, 255, 255].into_iter()) .collect(); let request = Request::try_from_bytes(&request_bytes).unwrap(); @@ -445,7 +430,6 @@ mod request_deserialization_tests { Request::Connect { conn_id, remote_addr, - message, return_address, } => { assert_eq!("foo.com".to_string(), remote_addr); @@ -454,8 +438,6 @@ mod request_deserialization_tests { return_address.to_bytes().to_vec(), recipient.to_bytes().to_vec() ); - assert_eq!(1, message.index); - assert_eq!(vec![255, 255, 255], message.data); } _ => unreachable!(), } diff --git a/service-providers/network-requester/src/connection.rs b/service-providers/network-requester/src/connection.rs index 36b7ba40e96..ef442f4157e 100644 --- a/service-providers/network-requester/src/connection.rs +++ b/service-providers/network-requester/src/connection.rs @@ -1,7 +1,5 @@ use futures::channel::mpsc; -use log::*; use nymsphinx::addressing::clients::Recipient; -use ordered_buffer::OrderedMessageSender; use proxy_helpers::connection_controller::ConnectionReceiver; use proxy_helpers::proxy_runner::ProxyRunner; use socks5_requests::{ConnectionId, RemoteAddress, Response}; @@ -23,18 +21,9 @@ impl Connection { pub(crate) async fn new( id: ConnectionId, address: RemoteAddress, - initial_data: &[u8], return_address: Recipient, ) -> io::Result { - let mut conn = TcpStream::connect(&address).await?; - - // write the initial data to the connection before continuing - info!( - "Sending initial {} bytes to {}", - initial_data.len(), - address - ); - conn.write_all(initial_data).await?; + let conn = TcpStream::connect(&address).await?; Ok(Connection { id, @@ -50,15 +39,16 @@ impl Connection { mix_sender: mpsc::UnboundedSender<(Response, Recipient)>, ) { let stream = self.conn.take().unwrap(); - let message_sender = OrderedMessageSender::new(); + let remote_source_address = "???".to_string(); // we don't know ip address of requester let connection_id = self.id; let recipient = self.return_address; let (stream, _) = ProxyRunner::new( stream, + self.address.clone(), + remote_source_address, mix_receiver, mix_sender, connection_id, - message_sender, ) .run(move |conn_id, read_data, socket_closed| { (Response::new(conn_id, read_data, socket_closed), recipient) diff --git a/service-providers/network-requester/src/core.rs b/service-providers/network-requester/src/core.rs index 9239764b18c..5484f8dc295 100644 --- a/service-providers/network-requester/src/core.rs +++ b/service-providers/network-requester/src/core.rs @@ -2,20 +2,24 @@ use crate::allowed_hosts::{HostsStore, OutboundRequestFilter}; use crate::connection::Connection; use crate::websocket; use futures::channel::mpsc; -use futures::stream::SplitSink; +use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt}; use log::*; use nymsphinx::addressing::clients::Recipient; -use ordered_buffer::OrderedMessageBuffer; -use proxy_helpers::connection_controller::{Controller, ControllerCommand}; -use socks5_requests::{Request, Response}; +use nymsphinx::receiver::ReconstructedMessage; +use proxy_helpers::connection_controller::{Controller, ControllerCommand, ControllerSender}; +use socks5_requests::{ConnectionId, Request, Response}; use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::net::TcpStream; use tokio_tungstenite::tungstenite::protocol::Message; use tokio_tungstenite::WebSocketStream; use websocket::WebsocketConnectionError; use websocket_requests::{requests::ClientRequest, responses::ServerResponse}; +// Since it's an atomic, it's safe to be kept static and shared across threads +static ACTIVE_PROXIES: AtomicUsize = AtomicUsize::new(0); + pub struct ServiceProvider { listening_address: String, outbound_request_filter: OutboundRequestFilter, @@ -61,6 +65,160 @@ impl ServiceProvider { } } + async fn read_websocket_message( + websocket_reader: &mut SplitStream>, + ) -> Option { + while let Some(msg) = websocket_reader.next().await { + let data = msg + .expect("we failed to read from the websocket!") + .into_data(); + + // try to recover the actual message from the mix network... + let deserialized_message = match ServerResponse::deserialize(&data) { + Ok(deserialized) => deserialized, + Err(err) => { + error!( + "Failed to deserialize received websocket message! - {}", + err + ); + continue; + } + }; + + let received = match deserialized_message { + ServerResponse::Received(received) => received, + ServerResponse::Error(err) => { + panic!("received error from native client! - {}", err) + } + _ => unimplemented!("probably should never be reached?"), + }; + return Some(received); + } + None + } + + async fn start_proxy( + conn_id: ConnectionId, + remote_addr: String, + return_address: Recipient, + controller_sender: ControllerSender, + mix_input_sender: mpsc::UnboundedSender<(Response, Recipient)>, + ) { + let mut conn = match Connection::new(conn_id, remote_addr.clone(), return_address).await { + Ok(conn) => conn, + Err(err) => { + error!( + "error while connecting to {:?} ! - {:?}", + remote_addr.clone(), + err + ); + return; + } + }; + + // Connect implies it's a fresh connection - register it with our controller + let (mix_sender, mix_receiver) = mpsc::unbounded(); + controller_sender + .unbounded_send(ControllerCommand::Insert(conn_id, mix_sender)) + .unwrap(); + + let old_count = ACTIVE_PROXIES.fetch_add(1, Ordering::SeqCst); + info!( + "Starting proxy for {} (currently there are {} proxies being handled)", + remote_addr, + old_count + 1 + ); + + // run the proxy on the connection + conn.run_proxy(mix_receiver, mix_input_sender).await; + + // proxy is done - remove the access channel from the controller + controller_sender + .unbounded_send(ControllerCommand::Remove(conn_id)) + .unwrap(); + + let old_count = ACTIVE_PROXIES.fetch_sub(1, Ordering::SeqCst); + info!( + "Proxy for {} is finished (currently there are {} proxies being handled)", + remote_addr, + old_count - 1 + ); + } + + fn handle_proxy_connect( + &mut self, + controller_sender: &mut ControllerSender, + mix_input_sender: &mpsc::UnboundedSender<(Response, Recipient)>, + conn_id: ConnectionId, + remote_addr: String, + return_address: Recipient, + ) { + if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) { + log::info!("Domain {:?} failed filter check", remote_addr); + return; + } + + let controller_sender_clone = controller_sender.clone(); + let mix_input_sender_clone = mix_input_sender.clone(); + + // and start the proxy for this connection + tokio::spawn(async move { + Self::start_proxy( + conn_id, + remote_addr, + return_address, + controller_sender_clone, + mix_input_sender_clone, + ) + .await + }); + } + + fn handle_proxy_send( + &self, + controller_sender: &mut ControllerSender, + conn_id: ConnectionId, + data: Vec, + closed: bool, + ) { + controller_sender + .unbounded_send(ControllerCommand::Send(conn_id, data, closed)) + .unwrap() + } + + fn handle_proxy_request( + &mut self, + raw_request: &[u8], + controller_sender: &mut ControllerSender, + mix_input_sender: &mpsc::UnboundedSender<(Response, Recipient)>, + ) { + // try to treat each received mix message as a service provider request + let deserialized_request = match Request::try_from_bytes(&raw_request) { + Ok(request) => request, + Err(err) => { + error!("Failed to deserialized received request! - {}", err); + return; + } + }; + + match deserialized_request { + Request::Connect { + conn_id, + remote_addr, + return_address, + } => self.handle_proxy_connect( + controller_sender, + mix_input_sender, + conn_id, + remote_addr, + return_address, + ), + Request::Send(conn_id, data, closed) => { + self.handle_proxy_send(controller_sender, conn_id, data, closed) + } + } + } + /// Start all subsystems pub async fn run(&mut self) { let websocket_stream = self.connect_websocket(&self.listening_address).await; @@ -73,7 +231,7 @@ impl ServiceProvider { let (mix_input_sender, mix_input_receiver) = mpsc::unbounded::<(Response, Recipient)>(); // controller for managing all active connections - let (mut active_connections_controller, controller_sender) = Controller::new(); + let (mut active_connections_controller, mut controller_sender) = Controller::new(); tokio::spawn(async move { active_connections_controller.run().await; }); @@ -86,108 +244,19 @@ impl ServiceProvider { println!("\nAll systems go. Press CTRL-C to stop the server."); // for each incoming message from the websocket... (which in 99.99% cases is going to be a mix message) - while let Some(msg) = websocket_reader.next().await { - let data = msg - .expect("we failed to read from the websocket!") - .into_data(); - - // try to recover the actual message from the mix network... - let deserialized_message = match ServerResponse::deserialize(&data) { - Ok(deserialized) => deserialized, - Err(err) => { - error!( - "Failed to deserialize received websocket message! - {}", - err - ); - continue; - } - }; - - let received = match deserialized_message { - ServerResponse::Received(received) => received, - ServerResponse::Error(err) => { - panic!("received error from native client! - {}", err) + loop { + let received = match Self::read_websocket_message(&mut websocket_reader).await { + Some(msg) => msg, + None => { + error!("The websocket stream has finished!"); + return; } - _ => unimplemented!("probably should never be reached?"), }; let raw_message = received.message; // TODO: here be potential SURB (i.e. received.reply_SURB) - // try to treat each received mix message as a service provider request - let deserialized_request = match Request::try_from_bytes(&raw_message) { - Ok(request) => request, - Err(err) => { - error!("Failed to deserialized received request! - {}", err); - continue; - } - }; - - match deserialized_request { - Request::Connect { - conn_id, - remote_addr, - message, - return_address, - } => { - if !self.open_proxy && !self.outbound_request_filter.check(&remote_addr) { - log::info!("Domain {:?} failed filter check", remote_addr); - continue; - } - - let controller_sender_clone = controller_sender.clone(); - let mut ordered_buffer = OrderedMessageBuffer::new(); - ordered_buffer.write(message); - let init_data = ordered_buffer - .read() - .expect("we received connect request but it wasn't sequence 0!"); - - // and start the proxy for this connection - let mix_input_sender_clone = mix_input_sender.clone(); - tokio::spawn(async move { - let mut conn = match Connection::new( - conn_id, - remote_addr.clone(), - &init_data, - return_address, - ) - .await - { - Ok(conn) => conn, - Err(err) => { - error!( - "error while connecting to {:?} ! - {:?}", - remote_addr.clone(), - err - ); - return; - } - }; - - // Connect implies it's a fresh connection - register it with our controller - let (mix_sender, mix_receiver) = mpsc::unbounded(); - controller_sender_clone - .unbounded_send(ControllerCommand::Insert( - conn_id, - mix_sender, - ordered_buffer, - )) - .unwrap(); - - info!("Starting proxy for {}", remote_addr.clone()); - conn.run_proxy(mix_receiver, mix_input_sender_clone).await; - // proxy is done - remove the access channel from the controller - controller_sender_clone - .unbounded_send(ControllerCommand::Remove(conn_id)) - .unwrap(); - info!("Proxy for {} is finished", remote_addr); - }); - } - // on send just tell the controller to send that data to the correct connection - Request::Send(conn_id, data, closed) => controller_sender - .unbounded_send(ControllerCommand::Send(conn_id, data, closed)) - .unwrap(), - } + self.handle_proxy_request(&raw_message, &mut controller_sender, &mix_input_sender) } }