Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/socks improvements #423

Merged
merged 18 commits into from
Nov 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 31 additions & 47 deletions clients/socks5/src/socks/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ use futures::channel::mpsc;
use futures::task::{Context, Poll};
use log::*;
use nymsphinx::addressing::clients::Recipient;
use ordered_buffer::{OrderedMessageBuffer, OrderedMessageSender};
use pin_project::pin_project;
use proxy_helpers::available_reader::AvailableReader;
use proxy_helpers::connection_controller::{
ConnectionReceiver, ControllerCommand, ControllerSender,
};
use proxy_helpers::proxy_runner::ProxyRunner;
use rand::RngCore;
use socks5_requests::{ConnectionId, Request};
use socks5_requests::{ConnectionId, RemoteAddress, Request};
use std::net::{Shutdown, SocketAddr};
use std::pin::Pin;
use tokio::prelude::*;
Expand Down Expand Up @@ -145,8 +143,7 @@ pub(crate) struct SocksClient {
impl Drop for SocksClient {
fn drop(&mut self) {
// TODO: decrease to debug/trace
info!("socksclient is going out of scope - the stream is getting dropped!");
info!("Connection {} is getting closed", self.connection_id);
debug!("Connection {} is getting closed", self.connection_id);
self.controller_sender
.unbounded_send(ControllerCommand::Remove(self.connection_id))
.unwrap();
Expand Down Expand Up @@ -222,26 +219,38 @@ impl SocksClient {
Ok(())
}

async fn send_request_to_mixnet(&mut self, request: Request) {
self.send_to_mixnet(request.into_bytes()).await;
async fn send_connect_to_mixnet(&mut self, remote_address: RemoteAddress) {
let req = Request::new_connect(
self.connection_id,
remote_address.clone(),
self.self_address.clone(),
);

let input_message =
InputMessage::new_fresh(self.service_provider.clone(), req.into_bytes(), false);
self.input_sender.unbounded_send(input_message).unwrap();
}

async fn run_proxy(
&mut self,
conn_receiver: ConnectionReceiver,
message_sender: OrderedMessageSender,
) {
async fn run_proxy(&mut self, conn_receiver: ConnectionReceiver, remote_proxy_target: String) {
self.send_connect_to_mixnet(remote_proxy_target.clone())
.await;

let stream = self.stream.run_proxy();
let local_stream_remote = stream
.peer_addr()
.expect("failed to extract peer address")
.to_string();
let connection_id = self.connection_id;
let input_sender = self.input_sender.clone();

let recipient = self.service_provider.clone();
let (stream, _) = ProxyRunner::new(
stream,
local_stream_remote,
remote_proxy_target,
conn_receiver,
input_sender,
connection_id,
message_sender,
)
.run(move |conn_id, read_data, socket_closed| {
let provider_request = Request::new_send(conn_id, read_data, socket_closed);
Expand All @@ -262,14 +271,9 @@ impl SocksClient {

// setup for receiving from the mixnet
let (mix_sender, mix_receiver) = mpsc::unbounded();
let ordered_buffer = OrderedMessageBuffer::new();

self.controller_sender
.unbounded_send(ControllerCommand::Insert(
self.connection_id,
mix_sender,
ordered_buffer,
))
.unbounded_send(ControllerCommand::Insert(self.connection_id, mix_sender))
.unwrap();

match request.command {
Expand All @@ -278,26 +282,16 @@ impl SocksClient {
trace!("Connecting to: {:?}", remote_address.clone());
self.acknowledge_socks5().await;

let mut message_sender = OrderedMessageSender::new();
// 'connect' needs to be handled manually due to different structure,
// but still needs to have correct sequence number on it!

// read whatever we can
let available_reader = AvailableReader::new(&mut self.stream);
let (request_data_bytes, _) = available_reader.await?;
let ordered_message = message_sender.wrap_message(request_data_bytes.to_vec());

let socks_provider_request = Request::new_connect(
self.connection_id,
info!(
"Starting proxy for {} (id: {})",
remote_address.clone(),
ordered_message,
self.self_address.clone(),
self.connection_id
);
self.run_proxy(mix_receiver, remote_address.clone()).await;
info!(
"Proxy for {} is finished (id: {})",
remote_address, self.connection_id
);

self.send_request_to_mixnet(socks_provider_request).await;
info!("Starting proxy for {}", remote_address.clone());
self.run_proxy(mix_receiver, message_sender).await;
info!("Proxy for {} is finished", remote_address);
}

SocksCommand::Bind => unimplemented!(), // not handled
Expand All @@ -307,16 +301,6 @@ impl SocksClient {
Ok(())
}

/// Send serialized Socks5 request bytes to the mixnet. The request stream
/// will be chunked up into a series of one or more Sphinx packets and
/// reassembled at the destination service provider at the other end, then
/// sent onwards anonymously.
async fn send_to_mixnet(&self, request_bytes: Vec<u8>) {
let input_message =
InputMessage::new_fresh(self.service_provider.clone(), request_bytes, false);
self.input_sender.unbounded_send(input_message).unwrap();
}

/// Writes a Socks5 header back to the requesting client's TCP stream,
/// basically saying "I acknowledge your request and am dealing with it".
async fn acknowledge_socks5(&mut self) {
Expand Down
9 changes: 6 additions & 3 deletions common/nymsphinx/params/src/packet_sizes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ pub struct InvalidPacketSize;
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum PacketSize {
RegularPacket = 1,
// for example instant messaging use case
ACKPacket = 2,
RegularPacket = 1,

// for sending SURB-ACKs
ExtendedPacket = 3, // for example for streaming fast and furious in uncompressed 10bit 4K HDR quality
ACKPacket = 2,

// for example for streaming fast and furious in uncompressed 10bit 4K HDR quality
ExtendedPacket = 3,
}

impl TryFrom<u8> for PacketSize {
Expand Down
111 changes: 84 additions & 27 deletions common/socks5/proxy-helpers/src/available_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead;
use tokio::stream::Stream;
use tokio::time::{delay_for, Delay, Duration, Instant};

const MAX_READ_AMOUNT: usize = 500 * 1000; // 0.5MB
const GRACE_DURATION: Duration = Duration::from_millis(1);

pub struct AvailableReader<'a, R: AsyncRead + Unpin> {
// TODO: come up with a way to avoid using RefCell (not sure if possible though)
// TODO: come up with a way to avoid using RefCell (not sure if possible though due to having to
// mutably borrow both inner reader and buffer at the same time)
buf: RefCell<BytesMut>,
inner: RefCell<&'a mut R>,
// idea for the future: tiny delay that allows to prevent unnecessary extra fragmentation
// grace_period: Option<Delay>,
grace_period: Option<Delay>,
}

impl<'a, R> AvailableReader<'a, R>
Expand All @@ -39,20 +44,15 @@ where
AvailableReader {
buf: RefCell::new(BytesMut::with_capacity(Self::BUF_INCREMENT)),
inner: RefCell::new(reader),
// grace_period: None,
grace_period: Some(delay_for(GRACE_DURATION)),
}
}
}

// TODO: change this guy to a stream? Seems waaay more appropriate considering
// we're getting new Bytes items regularly rather than calling it once.

impl<'a, R: AsyncRead + Unpin> Future for AvailableReader<'a, R> {
type Output = io::Result<(Bytes, bool)>;
impl<'a, R: AsyncRead + Unpin> Stream for AvailableReader<'a, R> {
type Item = io::Result<Bytes>;

// this SHOULD stay mutable, because we rely on runtime checks inside the method
#[allow(unused_mut)]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// if we have no space in buffer left - expand it
if !self.buf.borrow().has_remaining_mut() {
self.buf.borrow_mut().reserve(Self::BUF_INCREMENT);
Expand All @@ -68,19 +68,43 @@ impl<'a, R: AsyncRead + Unpin> Future for AvailableReader<'a, R> {
if self.buf.borrow().is_empty() {
Poll::Pending
} else {
// if exists - check grace period
if let Some(grace_period) = self.grace_period.as_mut() {
if Pin::new(grace_period).poll(cx).is_pending() {
return Poll::Pending;
}
}

let buf = self.buf.replace(BytesMut::new());
Poll::Ready(Ok((buf.freeze(), false)))
Poll::Ready(Some(Ok(buf.freeze())))
}
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
Poll::Ready(Ok(n)) => {
// if exists - reset grace period
if let Some(grace_period) = self.grace_period.as_mut() {
let now = Instant::now();
grace_period.reset(now + GRACE_DURATION);
}

// if we read a non-0 amount, we're not done yet!
if n == 0 {
let buf = self.buf.replace(BytesMut::new());
Poll::Ready(Ok((buf.freeze(), true)))
if buf.len() > 0 {
Poll::Ready(Some(Ok(buf.freeze())))
} else {
Poll::Ready(None)
}
} else {
// tell the waker we should be polled again!
cx.waker().wake_by_ref();

// if we reached our maximum amount - return it
let read_bytes_len = self.buf.borrow().len();
if read_bytes_len >= MAX_READ_AMOUNT {
let buf = self.buf.replace(BytesMut::new());
return Poll::Ready(Some(Ok(buf.freeze())));
}
Poll::Pending
}
}
Expand All @@ -91,31 +115,34 @@ impl<'a, R: AsyncRead + Unpin> Future for AvailableReader<'a, R> {
#[cfg(test)]
mod tests {
use super::*;
use futures::poll;
use std::io::Cursor;
use std::time::Duration;
use tokio::stream::StreamExt;
use tokio_test::assert_pending;

#[tokio::test]
async fn available_reader_reads_all_available_data_smaller_than_its_buf() {
let data = vec![42u8; 100];
let mut reader = Cursor::new(data.clone());

let available_reader = AvailableReader::new(&mut reader);
let (read_data, is_finished) = available_reader.await.unwrap();
let mut available_reader = AvailableReader::new(&mut reader);
let read_data = available_reader.next().await.unwrap().unwrap();

assert_eq!(read_data, data);
assert!(is_finished)
assert!(available_reader.next().await.is_none())
}

#[tokio::test]
async fn available_reader_reads_all_available_data_bigger_than_its_buf() {
let data = vec![42u8; AvailableReader::<Cursor<Vec<u8>>>::BUF_INCREMENT + 100];
let mut reader = Cursor::new(data.clone());

let available_reader = AvailableReader::new(&mut reader);
let (read_data, is_finished) = available_reader.await.unwrap();
let mut available_reader = AvailableReader::new(&mut reader);
let read_data = available_reader.next().await.unwrap().unwrap();

assert_eq!(read_data, data);
assert!(is_finished)
assert!(available_reader.next().await.is_none())
}

#[tokio::test]
Expand All @@ -129,11 +156,11 @@ mod tests {
.read(&second_data_chunk)
.build();

let available_reader = AvailableReader::new(&mut reader_mock);
let (read_data, is_finished) = available_reader.await.unwrap();
let mut available_reader = AvailableReader::new(&mut reader_mock);
let read_data = available_reader.next().await.unwrap().unwrap();

assert_eq!(read_data, first_data_chunk);
assert!(!is_finished)
assert_pending!(poll!(available_reader.next()));
}

#[tokio::test]
Expand All @@ -145,10 +172,40 @@ mod tests {
.read(&data)
.build();

let available_reader = AvailableReader::new(&mut reader_mock);
let (read_data, is_finished) = available_reader.await.unwrap();
let mut available_reader = AvailableReader::new(&mut reader_mock);
let read_data = available_reader.next().await.unwrap().unwrap();

assert_eq!(read_data, data);
assert!(is_finished)
assert!(available_reader.next().await.is_none())
}

// perhaps the issue of tokio io builder will be resolved in tokio 0.3?
// #[tokio::test]
// async fn available_reader_will_wait_for_more_data_if_its_within_grace_period() {
// let first_data_chunk = vec![42u8; 100];
// let second_data_chunk = vec![123u8; 100];
//
// let combined_chunks: Vec<_> = first_data_chunk
// .iter()
// .cloned()
// .chain(second_data_chunk.iter().cloned())
// .collect();
//
// let mut reader_mock = tokio_test::io::Builder::new()
// .read(&first_data_chunk)
// .wait(Duration::from_millis(2))
// .read(&second_data_chunk)
// .build();
//
// let mut available_reader = AvailableReader {
// buf: RefCell::new(BytesMut::with_capacity(4096)),
// inner: RefCell::new(&mut reader_mock),
// grace_period: Some(delay_for(Duration::from_millis(5))),
// };
//
// let read_data = available_reader.next().await.unwrap().unwrap();
//
// assert_eq!(read_data, combined_chunks);
// assert!(available_reader.next().await.is_none())
// }
}
Loading