Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/client refactor #91

Merged
merged 27 commits into from
Jan 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9f69cc9
Moved MixTrafficController to separate module
jstuczyn Jan 21, 2020
abd88c0
Fixed accidentally broken pathfinder
jstuczyn Jan 21, 2020
6625f6f
Slightly decreases the aggressive logging in mix traffic controller
jstuczyn Jan 21, 2020
e9e5c83
Moved ReceivedMessagesBuffer to seperate module
jstuczyn Jan 21, 2020
af6a8f5
Better logging for ibid.
jstuczyn Jan 21, 2020
e4c7d21
Better handling of when messages failed to be sent to the requester
jstuczyn Jan 21, 2020
ea38387
ReceivedBuffer is now responsible for starting both controllers
jstuczyn Jan 21, 2020
2af18a8
Added "is_registered" method to provider client
jstuczyn Jan 21, 2020
7889ab4
Removed redundant logging statement
jstuczyn Jan 21, 2020
06b96bd
Moved provider-polling related functionalities to new module
jstuczyn Jan 21, 2020
f1debb9
Obtaining initial compatible topology in separate method
jstuczyn Jan 21, 2020
a57708d
Separate module for cover traffic stream
jstuczyn Jan 21, 2020
fe48c11
Moved real traffic controller to separate module
jstuczyn Jan 21, 2020
ee812a8
Moved definition of extended duration to before the loop
jstuczyn Jan 21, 2020
641dc78
Removed too aggressive logging
jstuczyn Jan 21, 2020
c921774
Implemented OutQueueControl as a proper Stream to get rid of the sele…
jstuczyn Jan 21, 2020
73438b6
Change in how OutQueueControl is spawned - not entirely sure at this …
jstuczyn Jan 21, 2020
2e817a9
Comments explaining purposes of different futures
jstuczyn Jan 21, 2020
551d0eb
More comments regarding uses of specific channels
jstuczyn Jan 21, 2020
ac988c6
Removed 'toml' from client-persistence as it will be module-specific
jstuczyn Jan 21, 2020
55e473d
Moved Pemstore as separate crate + defined pathfinder as trait
jstuczyn Jan 21, 2020
da575f3
ibid
jstuczyn Jan 21, 2020
98610e2
Removed unused utils/bytes module
jstuczyn Jan 21, 2020
b4c15ea
Sending real traffic at poisson rate rather than at uniform rate
jstuczyn Jan 21, 2020
2364fbd
Moved sphinx packet encapsulation and poisson delay sampling to mix c…
jstuczyn Jan 22, 2020
7045db7
ibid.
jstuczyn Jan 22, 2020
974ac5e
compiler warnings
jstuczyn Jan 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"common/addressing",
"common/crypto",
"common/healthcheck",
"common/pemstore",
"common/topology",
"mixnode",
"nym-client",
Expand Down
7 changes: 7 additions & 0 deletions common/clients/mix-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ edition = "2018"

[dependencies]
log = "0.4.8"
rand = "0.7.2"
rand_distr = "0.2.2"
tokio = { version = "0.2", features = ["full"] }

## internal
addressing = {path = "../../addressing"}
topology = {path = "../../topology"}

## will be moved to proper dependencies once released
sphinx = { git = "https://github.com/nymtech/sphinx", rev="1d8cefcb6a0cb8e87d00d89eb1ccf2839e92aa1f" }

6 changes: 4 additions & 2 deletions common/clients/mix-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use sphinx::SphinxPacket;
use std::net::SocketAddr;
use tokio::prelude::*;

pub mod packet;
pub mod poisson;

pub struct MixClient {}

impl MixClient {
Expand All @@ -17,8 +20,7 @@ impl MixClient {
mix_addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
let bytes = packet.to_bytes();

info!("socket addr: {:?}", mix_addr);
debug!("Sending to the following address: {:?}", mix_addr);

let mut stream = tokio::net::TcpStream::connect(mix_addr).await?;
stream.write_all(&bytes[..]).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::net::SocketAddr;
use topology::NymTopology;

pub const LOOP_COVER_MESSAGE_PAYLOAD: &[u8] = b"The cake is a lie!";
pub const LOOP_COVER_MESSAGE_AVERAGE_DELAY: f64 = 2.0;

pub fn loop_cover_message<T: NymTopology>(
our_address: DestinationAddressBytes,
Expand All @@ -13,21 +14,25 @@ pub fn loop_cover_message<T: NymTopology>(
) -> (SocketAddr, SphinxPacket) {
let destination = Destination::new(our_address, surb_id);

encapsulate_message(destination, LOOP_COVER_MESSAGE_PAYLOAD.to_vec(), topology)
encapsulate_message(
destination,
LOOP_COVER_MESSAGE_PAYLOAD.to_vec(),
topology,
LOOP_COVER_MESSAGE_AVERAGE_DELAY,
)
}

pub fn encapsulate_message<T: NymTopology>(
recipient: Destination,
message: Vec<u8>,
topology: &T,
average_delay: f64,
) -> (SocketAddr, SphinxPacket) {
let mut providers = topology.get_mix_provider_nodes();
let provider = providers.pop().unwrap().into();

let route = topology.route_to(provider).unwrap();

// Set average packet delay to an arbitrary but at least not super-slow value for testing.
let average_delay = 0.1;
let delays = sphinx::header::delays::generate(route.len(), average_delay);

// build the packet
Expand Down
7 changes: 5 additions & 2 deletions common/clients/provider-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl ProviderClient {

pub async fn send_request(&self, bytes: Vec<u8>) -> Result<Vec<u8>, ProviderClientError> {
let mut socket = tokio::net::TcpStream::connect(self.provider_network_address).await?;
info!("keep alive: {:?}", socket.keepalive());

socket.set_keepalive(Some(Duration::from_secs(2))).unwrap();
socket.write_all(&bytes[..]).await?;
if let Err(_e) = socket.shutdown(Shutdown::Write) {
Expand All @@ -92,7 +92,6 @@ impl ProviderClient {
let bytes = pull_request.to_bytes();

let response = self.send_request(bytes).await?;
info!("Received the following response: {:?}", response);

let parsed_response = PullResponse::from_bytes(&response)?;
Ok(parsed_response.messages)
Expand All @@ -111,4 +110,8 @@ impl ProviderClient {

Ok(parsed_response.auth_token)
}

pub fn is_registered(&self) -> bool {
self.auth_token.is_some()
}
}
13 changes: 13 additions & 0 deletions common/pemstore/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "pemstore"
version = "0.1.0"
authors = ["Jedrzej Stuczynski <andrew@nymtech.net>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
pem = "0.7.0"

## internal
crypto = {path = "../crypto"}
File renamed without changes.
15 changes: 15 additions & 0 deletions common/pemstore/src/pathfinder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use std::path::PathBuf;

pub trait PathFinder {
fn config_dir(&self) -> PathBuf;
fn private_identity_key(&self) -> PathBuf;
fn public_identity_key(&self) -> PathBuf;

// Optional:
fn private_encryption_key(&self) -> Option<PathBuf> {
None
}
fn public_encryption_key(&self) -> Option<PathBuf> {
None
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
use crate::persistence::pathfinder::Pathfinder;
use crate::pathfinder::PathFinder;
use pem::{encode, parse, Pem};
use std::fs::File;
use std::io::prelude::*;
use std::path::PathBuf;

pub fn read_mix_identity_keypair_from_disk(
id: String,
) -> crypto::identity::DummyMixIdentityKeyPair {
let pathfinder = Pathfinder::new(id);
let pem_store = PemStore::new(pathfinder);
let keypair = pem_store.read_identity();
keypair
}

#[allow(dead_code)]
pub fn read_mix_encryption_keypair_from_disk(_id: String) -> crypto::encryption::x25519::KeyPair {
unimplemented!()
Expand All @@ -25,11 +16,11 @@ pub struct PemStore {
}

impl PemStore {
pub fn new(pathfinder: Pathfinder) -> PemStore {
pub fn new<P: PathFinder>(pathfinder: P) -> PemStore {
PemStore {
config_dir: pathfinder.config_dir,
private_mix_key: pathfinder.private_mix_key,
public_mix_key: pathfinder.public_mix_key,
config_dir: pathfinder.config_dir(),
private_mix_key: pathfinder.private_identity_key(),
public_mix_key: pathfinder.public_identity_key(),
}
}

Expand Down
3 changes: 1 addition & 2 deletions nym-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ futures = "0.3.1"
hex = "0.4.0"
log = "0.4.8"
pem = "0.7.0"
rand = "0.7.2"
rand_distr = "0.2.2"
reqwest = "0.9.22"
serde = { version = "1.0.104", features = ["derive"] }
serde_json = "1.0.44"
Expand All @@ -35,6 +33,7 @@ crypto = {path = "../common/crypto"}
directory-client = { path = "../common/clients/directory-client" }
healthcheck = { path = "../common/healthcheck" }
mix-client = { path = "../common/clients/mix-client" }
pemstore = {path = "../common/pemstore"}
provider-client = { path = "../common/clients/provider-client" }
sfw-provider-requests = { path = "../sfw-provider/sfw-provider-requests" }
topology = {path = "../common/topology" }
Expand Down
35 changes: 35 additions & 0 deletions nym-client/src/client/cover_traffic_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::client::mix_traffic::MixMessage;
use crate::client::LOOP_COVER_AVERAGE_DELAY;
use futures::channel::mpsc;
use log::{info, trace};
use sphinx::route::Destination;
use std::time::Duration;
use topology::NymTopology;

pub(crate) async fn start_loop_cover_traffic_stream<T>(
tx: mpsc::UnboundedSender<MixMessage>,
our_info: Destination,
topology: T,
) where
T: NymTopology,
{
info!("Starting loop cover traffic stream");
loop {
trace!("next cover message!");
let delay = mix_client::poisson::sample(LOOP_COVER_AVERAGE_DELAY);
let delay_duration = Duration::from_secs_f64(delay);
tokio::time::delay_for(delay_duration).await;
let cover_message = mix_client::packet::loop_cover_message(
our_info.address,
our_info.identifier,
&topology,
);

// if this one fails, there's no retrying because it means that either:
// - we run out of memory
// - the receiver channel is closed
// in either case there's no recovery and we can only panic
tx.unbounded_send(MixMessage::new(cover_message.0, cover_message.1))
.unwrap();
}
}
37 changes: 37 additions & 0 deletions nym-client/src/client/mix_traffic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use futures::channel::mpsc;
use futures::StreamExt;
use log::{debug, error, info, trace};
use sphinx::SphinxPacket;
use std::net::SocketAddr;

pub(crate) struct MixMessage(SocketAddr, SphinxPacket);

impl MixMessage {
pub(crate) fn new(address: SocketAddr, packet: SphinxPacket) -> Self {
MixMessage(address, packet)
}
}

pub(crate) struct MixTrafficController;

impl MixTrafficController {
pub(crate) async fn run(mut rx: mpsc::UnboundedReceiver<MixMessage>) {
info!("Mix Traffic Controller started!");
let mix_client = mix_client::MixClient::new();
while let Some(mix_message) = rx.next().await {
debug!("Got a mix_message for {:?}", mix_message.0);
let send_res = mix_client.send(mix_message.1, mix_message.0).await;
match send_res {
Ok(_) => {
trace!("sent a mix message");
}
// TODO: should there be some kind of threshold of failed messages
// that if reached, the application blows?
Err(e) => error!(
"We failed to send the message to {} :( - {:?}",
mix_message.0, e
),
};
}
}
}
Loading