Skip to content

Commit e93e3e0

Browse files
authored
Merge pull request #91 from nymtech/feature/client-refactor
Feature/client refactor
2 parents 1fbbd7c + 974ac5e commit e93e3e0

32 files changed

+747
-515
lines changed

Cargo.lock

+13-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ members = [
1111
"common/addressing",
1212
"common/crypto",
1313
"common/healthcheck",
14+
"common/pemstore",
1415
"common/topology",
1516
"mixnode",
1617
"nym-client",

common/clients/mix-client/Cargo.toml

+7
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,14 @@ edition = "2018"
88

99
[dependencies]
1010
log = "0.4.8"
11+
rand = "0.7.2"
12+
rand_distr = "0.2.2"
1113
tokio = { version = "0.2", features = ["full"] }
1214

15+
## internal
16+
addressing = {path = "../../addressing"}
17+
topology = {path = "../../topology"}
18+
1319
## will be moved to proper dependencies once released
1420
sphinx = { git = "https://github.com/nymtech/sphinx", rev="1d8cefcb6a0cb8e87d00d89eb1ccf2839e92aa1f" }
21+

common/clients/mix-client/src/lib.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ use sphinx::SphinxPacket;
33
use std::net::SocketAddr;
44
use tokio::prelude::*;
55

6+
pub mod packet;
7+
pub mod poisson;
8+
69
pub struct MixClient {}
710

811
impl MixClient {
@@ -17,8 +20,7 @@ impl MixClient {
1720
mix_addr: SocketAddr,
1821
) -> Result<(), Box<dyn std::error::Error>> {
1922
let bytes = packet.to_bytes();
20-
21-
info!("socket addr: {:?}", mix_addr);
23+
debug!("Sending to the following address: {:?}", mix_addr);
2224

2325
let mut stream = tokio::net::TcpStream::connect(mix_addr).await?;
2426
stream.write_all(&bytes[..]).await?;

nym-client/src/utils/sphinx.rs common/clients/mix-client/src/packet.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::net::SocketAddr;
55
use topology::NymTopology;
66

77
pub const LOOP_COVER_MESSAGE_PAYLOAD: &[u8] = b"The cake is a lie!";
8+
pub const LOOP_COVER_MESSAGE_AVERAGE_DELAY: f64 = 2.0;
89

910
pub fn loop_cover_message<T: NymTopology>(
1011
our_address: DestinationAddressBytes,
@@ -13,21 +14,25 @@ pub fn loop_cover_message<T: NymTopology>(
1314
) -> (SocketAddr, SphinxPacket) {
1415
let destination = Destination::new(our_address, surb_id);
1516

16-
encapsulate_message(destination, LOOP_COVER_MESSAGE_PAYLOAD.to_vec(), topology)
17+
encapsulate_message(
18+
destination,
19+
LOOP_COVER_MESSAGE_PAYLOAD.to_vec(),
20+
topology,
21+
LOOP_COVER_MESSAGE_AVERAGE_DELAY,
22+
)
1723
}
1824

1925
pub fn encapsulate_message<T: NymTopology>(
2026
recipient: Destination,
2127
message: Vec<u8>,
2228
topology: &T,
29+
average_delay: f64,
2330
) -> (SocketAddr, SphinxPacket) {
2431
let mut providers = topology.get_mix_provider_nodes();
2532
let provider = providers.pop().unwrap().into();
2633

2734
let route = topology.route_to(provider).unwrap();
2835

29-
// Set average packet delay to an arbitrary but at least not super-slow value for testing.
30-
let average_delay = 0.1;
3136
let delays = sphinx::header::delays::generate(route.len(), average_delay);
3237

3338
// build the packet
File renamed without changes.

common/clients/provider-client/src/lib.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ impl ProviderClient {
6565

6666
pub async fn send_request(&self, bytes: Vec<u8>) -> Result<Vec<u8>, ProviderClientError> {
6767
let mut socket = tokio::net::TcpStream::connect(self.provider_network_address).await?;
68-
info!("keep alive: {:?}", socket.keepalive());
68+
6969
socket.set_keepalive(Some(Duration::from_secs(2))).unwrap();
7070
socket.write_all(&bytes[..]).await?;
7171
if let Err(_e) = socket.shutdown(Shutdown::Write) {
@@ -92,7 +92,6 @@ impl ProviderClient {
9292
let bytes = pull_request.to_bytes();
9393

9494
let response = self.send_request(bytes).await?;
95-
info!("Received the following response: {:?}", response);
9695

9796
let parsed_response = PullResponse::from_bytes(&response)?;
9897
Ok(parsed_response.messages)
@@ -111,4 +110,8 @@ impl ProviderClient {
111110

112111
Ok(parsed_response.auth_token)
113112
}
113+
114+
pub fn is_registered(&self) -> bool {
115+
self.auth_token.is_some()
116+
}
114117
}

common/pemstore/Cargo.toml

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "pemstore"
3+
version = "0.1.0"
4+
authors = ["Jedrzej Stuczynski <andrew@nymtech.net>"]
5+
edition = "2018"
6+
7+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
8+
9+
[dependencies]
10+
pem = "0.7.0"
11+
12+
## internal
13+
crypto = {path = "../crypto"}
File renamed without changes.

common/pemstore/src/pathfinder.rs

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use std::path::PathBuf;
2+
3+
pub trait PathFinder {
4+
fn config_dir(&self) -> PathBuf;
5+
fn private_identity_key(&self) -> PathBuf;
6+
fn public_identity_key(&self) -> PathBuf;
7+
8+
// Optional:
9+
fn private_encryption_key(&self) -> Option<PathBuf> {
10+
None
11+
}
12+
fn public_encryption_key(&self) -> Option<PathBuf> {
13+
None
14+
}
15+
}

nym-client/src/persistence/pemstore.rs common/pemstore/src/pemstore.rs

+5-14
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,9 @@
1-
use crate::persistence::pathfinder::Pathfinder;
1+
use crate::pathfinder::PathFinder;
22
use pem::{encode, parse, Pem};
33
use std::fs::File;
44
use std::io::prelude::*;
55
use std::path::PathBuf;
66

7-
pub fn read_mix_identity_keypair_from_disk(
8-
id: String,
9-
) -> crypto::identity::DummyMixIdentityKeyPair {
10-
let pathfinder = Pathfinder::new(id);
11-
let pem_store = PemStore::new(pathfinder);
12-
let keypair = pem_store.read_identity();
13-
keypair
14-
}
15-
167
#[allow(dead_code)]
178
pub fn read_mix_encryption_keypair_from_disk(_id: String) -> crypto::encryption::x25519::KeyPair {
189
unimplemented!()
@@ -25,11 +16,11 @@ pub struct PemStore {
2516
}
2617

2718
impl PemStore {
28-
pub fn new(pathfinder: Pathfinder) -> PemStore {
19+
pub fn new<P: PathFinder>(pathfinder: P) -> PemStore {
2920
PemStore {
30-
config_dir: pathfinder.config_dir,
31-
private_mix_key: pathfinder.private_mix_key,
32-
public_mix_key: pathfinder.public_mix_key,
21+
config_dir: pathfinder.config_dir(),
22+
private_mix_key: pathfinder.private_identity_key(),
23+
public_mix_key: pathfinder.public_identity_key(),
3324
}
3425
}
3526

nym-client/Cargo.toml

+1-2
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ futures = "0.3.1"
2121
hex = "0.4.0"
2222
log = "0.4.8"
2323
pem = "0.7.0"
24-
rand = "0.7.2"
25-
rand_distr = "0.2.2"
2624
reqwest = "0.9.22"
2725
serde = { version = "1.0.104", features = ["derive"] }
2826
serde_json = "1.0.44"
@@ -35,6 +33,7 @@ crypto = {path = "../common/crypto"}
3533
directory-client = { path = "../common/clients/directory-client" }
3634
healthcheck = { path = "../common/healthcheck" }
3735
mix-client = { path = "../common/clients/mix-client" }
36+
pemstore = {path = "../common/pemstore"}
3837
provider-client = { path = "../common/clients/provider-client" }
3938
sfw-provider-requests = { path = "../sfw-provider/sfw-provider-requests" }
4039
topology = {path = "../common/topology" }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use crate::client::mix_traffic::MixMessage;
2+
use crate::client::LOOP_COVER_AVERAGE_DELAY;
3+
use futures::channel::mpsc;
4+
use log::{info, trace};
5+
use sphinx::route::Destination;
6+
use std::time::Duration;
7+
use topology::NymTopology;
8+
9+
pub(crate) async fn start_loop_cover_traffic_stream<T>(
10+
tx: mpsc::UnboundedSender<MixMessage>,
11+
our_info: Destination,
12+
topology: T,
13+
) where
14+
T: NymTopology,
15+
{
16+
info!("Starting loop cover traffic stream");
17+
loop {
18+
trace!("next cover message!");
19+
let delay = mix_client::poisson::sample(LOOP_COVER_AVERAGE_DELAY);
20+
let delay_duration = Duration::from_secs_f64(delay);
21+
tokio::time::delay_for(delay_duration).await;
22+
let cover_message = mix_client::packet::loop_cover_message(
23+
our_info.address,
24+
our_info.identifier,
25+
&topology,
26+
);
27+
28+
// if this one fails, there's no retrying because it means that either:
29+
// - we run out of memory
30+
// - the receiver channel is closed
31+
// in either case there's no recovery and we can only panic
32+
tx.unbounded_send(MixMessage::new(cover_message.0, cover_message.1))
33+
.unwrap();
34+
}
35+
}

nym-client/src/client/mix_traffic.rs

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use futures::channel::mpsc;
2+
use futures::StreamExt;
3+
use log::{debug, error, info, trace};
4+
use sphinx::SphinxPacket;
5+
use std::net::SocketAddr;
6+
7+
pub(crate) struct MixMessage(SocketAddr, SphinxPacket);
8+
9+
impl MixMessage {
10+
pub(crate) fn new(address: SocketAddr, packet: SphinxPacket) -> Self {
11+
MixMessage(address, packet)
12+
}
13+
}
14+
15+
pub(crate) struct MixTrafficController;
16+
17+
impl MixTrafficController {
18+
pub(crate) async fn run(mut rx: mpsc::UnboundedReceiver<MixMessage>) {
19+
info!("Mix Traffic Controller started!");
20+
let mix_client = mix_client::MixClient::new();
21+
while let Some(mix_message) = rx.next().await {
22+
debug!("Got a mix_message for {:?}", mix_message.0);
23+
let send_res = mix_client.send(mix_message.1, mix_message.0).await;
24+
match send_res {
25+
Ok(_) => {
26+
trace!("sent a mix message");
27+
}
28+
// TODO: should there be some kind of threshold of failed messages
29+
// that if reached, the application blows?
30+
Err(e) => error!(
31+
"We failed to send the message to {} :( - {:?}",
32+
mix_message.0, e
33+
),
34+
};
35+
}
36+
}
37+
}

0 commit comments

Comments
 (0)