Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/client refactoring #128

Merged
merged 12 commits into from
Mar 6, 2020
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ before_script:
- rustup component add rustfmt
script:
- cargo build
- cargo test
- cargo test -- --test-threads=1
- cargo fmt -- --check
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions common/clients/mix-client/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl From<AddressTypeError> for SphinxPacketEncapsulationError {
}
}

#[deprecated(note = "please use loop_cover_message_route instead")]
pub fn loop_cover_message<T: NymTopology>(
our_address: DestinationAddressBytes,
surb_id: SURBIdentifier,
Expand All @@ -47,6 +48,7 @@ pub fn loop_cover_message<T: NymTopology>(
) -> Result<(SocketAddr, SphinxPacket), SphinxPacketEncapsulationError> {
let destination = Destination::new(our_address, surb_id);

#[allow(deprecated)]
encapsulate_message(
destination,
LOOP_COVER_MESSAGE_PAYLOAD.to_vec(),
Expand All @@ -55,6 +57,23 @@ pub fn loop_cover_message<T: NymTopology>(
)
}

pub fn loop_cover_message_route(
our_address: DestinationAddressBytes,
surb_id: SURBIdentifier,
route: Vec<sphinx::route::Node>,
average_delay: time::Duration,
) -> Result<(SocketAddr, SphinxPacket), SphinxPacketEncapsulationError> {
let destination = Destination::new(our_address, surb_id);

encapsulate_message_route(
destination,
LOOP_COVER_MESSAGE_PAYLOAD.to_vec(),
route,
average_delay,
)
}

#[deprecated(note = "please use encapsulate_message_route instead")]
pub fn encapsulate_message<T: NymTopology>(
recipient: Destination,
message: Vec<u8>,
Expand All @@ -81,3 +100,20 @@ pub fn encapsulate_message<T: NymTopology>(

Ok((first_node_address, packet))
}

pub fn encapsulate_message_route(
recipient: Destination,
message: Vec<u8>,
route: Vec<sphinx::route::Node>,
average_delay: time::Duration,
) -> Result<(SocketAddr, SphinxPacket), SphinxPacketEncapsulationError> {
let delays = sphinx::header::delays::generate_from_average_duration(route.len(), average_delay);

// build the packet
let packet = sphinx::SphinxPacket::new(message, &route[..], &recipient, &delays)?;

let first_node_address =
addressing::socket_address_from_encoded_bytes(route.first().unwrap().address.to_bytes())?;

Ok((first_node_address, packet))
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<'a> ConnectionManager<'a> {
return match conn_writer.write_all(msg).await {
// if we failed to write to connection we should reconnect
// TODO: is this true? can we fail to write to a connection while it still remains open and valid?
Ok(res) => Ok(res),
Ok(_) => Ok(()),
Err(e) => {
trace!("Creating connection reconnector!");
self.state = ConnectionState::Reconnecting(ConnectionReconnector::new(
Expand Down
4 changes: 2 additions & 2 deletions common/crypto/src/identity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ impl MixIdentityPrivateKey {

// TODO: this will be implemented differently by using the proper trait
impl MixIdentityPrivateKey {
pub fn as_scalar(self) -> Scalar {
let encryption_key = self.0;
pub fn as_scalar(&self) -> Scalar {
let encryption_key = &self.0;
encryption_key.0
}
}
Expand Down
4 changes: 3 additions & 1 deletion common/topology/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ mod filter;
pub mod mix;
pub mod provider;

pub trait NymTopology: Sized + std::fmt::Debug + Send + Sync {
// TODO: Figure out why 'Clone' was required to have 'TopologyAccessor<T>' working
// even though it only contains an Arc
pub trait NymTopology: Sized + std::fmt::Debug + Send + Sync + Clone {
fn new(directory_server: String) -> Self;
fn new_from_nodes(
mix_nodes: Vec<mix::Node>,
Expand Down
2 changes: 1 addition & 1 deletion mixnode/src/node/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn process_socket_connection(

// we must be able to handle multiple packets from same connection independently
tokio::spawn(process_received_packet(
buf.clone(),
buf,
// note: processing_data is relatively cheap (and safe) to clone -
// it contains arc to private key and metrics reporter (which is just
// a single mpsc unbounded sender)
Expand Down
1 change: 1 addition & 0 deletions nym-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ crypto = {path = "../common/crypto"}
directory-client = { path = "../common/clients/directory-client" }
healthcheck = { path = "../common/healthcheck" }
mix-client = { path = "../common/clients/mix-client" }
multi-tcp-client = { path = "../common/clients/multi-tcp-client" }
pemstore = {path = "../common/pemstore"}
provider-client = { path = "../common/clients/provider-client" }
sfw-provider-requests = { path = "../sfw-provider/sfw-provider-requests" }
Expand Down
120 changes: 93 additions & 27 deletions nym-client/src/client/cover_traffic_stream.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,120 @@
use crate::client::mix_traffic::MixMessage;
use crate::client::topology_control::TopologyInnerRef;
use futures::channel::mpsc;
use log::{error, info, trace, warn};
use crate::client::mix_traffic::{MixMessage, MixMessageSender};
use crate::client::topology_control::TopologyAccessor;
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
use log::*;
use sphinx::route::Destination;
use std::time;
use std::pin::Pin;
use std::time::Duration;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;
use tokio::time;
use topology::NymTopology;

pub(crate) async fn start_loop_cover_traffic_stream<T: NymTopology>(
tx: mpsc::UnboundedSender<MixMessage>,
pub(crate) struct LoopCoverTrafficStream<T: NymTopology> {
average_packet_delay: Duration,
average_cover_message_sending_delay: Duration,
next_delay: time::Delay,
mix_tx: MixMessageSender,
our_info: Destination,
topology_ctrl_ref: TopologyInnerRef<T>,
average_cover_message_delay_duration: time::Duration,
average_packet_delay_duration: time::Duration,
) {
info!("Starting loop cover traffic stream");
loop {
trace!("next cover message!");
let delay_duration = mix_client::poisson::sample(average_cover_message_delay_duration);
tokio::time::delay_for(delay_duration).await;
topology_access: TopologyAccessor<T>,
}

impl<T: NymTopology> Stream for LoopCoverTrafficStream<T> {
// Item is only used to indicate we should create a new message rather than actual cover message
// reason being to not introduce unnecessary complexity by having to keep state of topology
// mutex when trying to acquire it. So right now the Stream trait serves as a glorified timer.
// Perhaps this should be changed in the future.
type Item = ();

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// it is not yet time to return a message
if Pin::new(&mut self.next_delay).poll(cx).is_pending() {
return Poll::Pending;
};

// we know it's time to send a message, so let's prepare delay for the next one
// Get the `now` by looking at the current `delay` deadline
let now = self.next_delay.deadline();
let next_poisson_delay =
mix_client::poisson::sample(self.average_cover_message_sending_delay);

// The next interval value is `next_poisson_delay` after the one that just
// yielded.
let next = now + next_poisson_delay;
self.next_delay.reset(next);

Poll::Ready(Some(()))
}
}

impl<T: 'static + NymTopology> LoopCoverTrafficStream<T> {
pub(crate) fn new(
mix_tx: MixMessageSender,
our_info: Destination,
topology_access: TopologyAccessor<T>,
average_cover_message_sending_delay: time::Duration,
average_packet_delay: time::Duration,
) -> Self {
LoopCoverTrafficStream {
average_packet_delay,
average_cover_message_sending_delay,
next_delay: time::delay_for(Default::default()),
mix_tx,
our_info,
topology_access,
}
}

let read_lock = topology_ctrl_ref.read().await;
let topology = match read_lock.topology.as_ref() {
async fn on_new_message(&mut self) {
trace!("next cover message!");
let route = match self.topology_access.random_route().await {
None => {
warn!("No valid topology detected - won't send any loop cover message this time");
continue;
return;
}
Some(topology) => topology,
Some(route) => route,
};

let cover_message = match mix_client::packet::loop_cover_message(
our_info.address.clone(),
our_info.identifier,
topology,
average_packet_delay_duration,
let cover_message = match mix_client::packet::loop_cover_message_route(
self.our_info.address.clone(),
self.our_info.identifier,
route,
self.average_packet_delay,
) {
Ok(message) => message,
Err(err) => {
error!(
"Somehow we managed to create an invalid cover message - {:?}",
err
);
continue;
return;
}
};

// if this one fails, there's no retrying because it means that either:
// - we run out of memory
// - the receiver channel is closed
// in either case there's no recovery and we can only panic
tx.unbounded_send(MixMessage::new(cover_message.0, cover_message.1))
self.mix_tx
.unbounded_send(MixMessage::new(cover_message.0, cover_message.1))
.unwrap();
}

async fn run(&mut self) {
// we should set initial delay only when we actually start the stream
self.next_delay = time::delay_for(mix_client::poisson::sample(
self.average_cover_message_sending_delay,
));

while let Some(_) = self.next().await {
self.on_new_message().await;
}
}

pub(crate) fn start(mut self, handle: &Handle) -> JoinHandle<()> {
handle.spawn(async move {
self.run().await;
})
}
}
78 changes: 57 additions & 21 deletions nym-client/src/client/mix_traffic.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,73 @@
use futures::channel::mpsc;
use futures::StreamExt;
use log::{debug, error, info, trace};
use log::*;
use sphinx::SphinxPacket;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;

pub(crate) struct MixMessage(SocketAddr, SphinxPacket);
pub(crate) type MixMessageSender = mpsc::UnboundedSender<MixMessage>;
pub(crate) type MixMessageReceiver = mpsc::UnboundedReceiver<MixMessage>;

impl MixMessage {
pub(crate) fn new(address: SocketAddr, packet: SphinxPacket) -> Self {
MixMessage(address, packet)
}
}

pub(crate) struct MixTrafficController;

impl MixTrafficController {
pub(crate) async fn run(mut rx: mpsc::UnboundedReceiver<MixMessage>) {
info!("Mix Traffic Controller started!");
let mix_client = mix_client::MixClient::new();
while let Some(mix_message) = rx.next().await {
debug!("Got a mix_message for {:?}", mix_message.0);
let send_res = mix_client.send(mix_message.1, mix_message.0).await;
match send_res {
Ok(_) => {
trace!("sent a mix message");
}
// TODO: should there be some kind of threshold of failed messages
// that if reached, the application blows?
Err(e) => error!(
"We failed to send the message to {} :( - {:?}",
mix_message.0, e
),
};
// TODO: put our TCP client here
pub(crate) struct MixTrafficController<'a> {
tcp_client: multi_tcp_client::Client<'a>,
mix_rx: MixMessageReceiver,
}

impl MixTrafficController<'static> {
pub(crate) async fn new(
initial_endpoints: Vec<SocketAddr>,
initial_reconnection_backoff: Duration,
maximum_reconnection_backoff: Duration,
mix_rx: MixMessageReceiver,
) -> Self {
let tcp_client_config = multi_tcp_client::Config::new(
initial_endpoints,
initial_reconnection_backoff,
maximum_reconnection_backoff,
);

MixTrafficController {
tcp_client: multi_tcp_client::Client::new(tcp_client_config).await,
mix_rx,
}
}

async fn on_message(&mut self, mix_message: MixMessage) {
debug!("Got a mix_message for {:?}", mix_message.0);
match self
.tcp_client
.send(mix_message.0, &mix_message.1.to_bytes())
.await
{
Ok(_) => trace!("sent a mix message"),
// TODO: should there be some kind of threshold of failed messages
// that if reached, the application blows?
Err(e) => error!(
"We failed to send the packet to {} - {:?}",
mix_message.0, e
),
};
}

pub(crate) async fn run(&mut self) {
while let Some(mix_message) = self.mix_rx.next().await {
self.on_message(mix_message).await;
}
}

pub(crate) fn start(mut self, handle: &Handle) -> JoinHandle<()> {
handle.spawn(async move {
self.run().await;
})
}
}
Loading