Skip to content

Commit 1a344a4

Browse files
authored
Merge pull request #128 from nymtech/feature/client-refactoring
Feature/client refactoring
2 parents 1d26ec1 + 6c730c2 commit 1a344a4

File tree

23 files changed

+824
-444
lines changed

23 files changed

+824
-444
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ before_script:
1111
- rustup component add rustfmt
1212
script:
1313
- cargo build
14-
- cargo test
14+
- cargo test -- --test-threads=1
1515
- cargo fmt -- --check

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/clients/mix-client/src/packet.rs

+36
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ impl From<AddressTypeError> for SphinxPacketEncapsulationError {
3939
}
4040
}
4141

42+
#[deprecated(note = "please use loop_cover_message_route instead")]
4243
pub fn loop_cover_message<T: NymTopology>(
4344
our_address: DestinationAddressBytes,
4445
surb_id: SURBIdentifier,
@@ -47,6 +48,7 @@ pub fn loop_cover_message<T: NymTopology>(
4748
) -> Result<(SocketAddr, SphinxPacket), SphinxPacketEncapsulationError> {
4849
let destination = Destination::new(our_address, surb_id);
4950

51+
#[allow(deprecated)]
5052
encapsulate_message(
5153
destination,
5254
LOOP_COVER_MESSAGE_PAYLOAD.to_vec(),
@@ -55,6 +57,23 @@ pub fn loop_cover_message<T: NymTopology>(
5557
)
5658
}
5759

60+
pub fn loop_cover_message_route(
61+
our_address: DestinationAddressBytes,
62+
surb_id: SURBIdentifier,
63+
route: Vec<sphinx::route::Node>,
64+
average_delay: time::Duration,
65+
) -> Result<(SocketAddr, SphinxPacket), SphinxPacketEncapsulationError> {
66+
let destination = Destination::new(our_address, surb_id);
67+
68+
encapsulate_message_route(
69+
destination,
70+
LOOP_COVER_MESSAGE_PAYLOAD.to_vec(),
71+
route,
72+
average_delay,
73+
)
74+
}
75+
76+
#[deprecated(note = "please use encapsulate_message_route instead")]
5877
pub fn encapsulate_message<T: NymTopology>(
5978
recipient: Destination,
6079
message: Vec<u8>,
@@ -81,3 +100,20 @@ pub fn encapsulate_message<T: NymTopology>(
81100

82101
Ok((first_node_address, packet))
83102
}
103+
104+
pub fn encapsulate_message_route(
105+
recipient: Destination,
106+
message: Vec<u8>,
107+
route: Vec<sphinx::route::Node>,
108+
average_delay: time::Duration,
109+
) -> Result<(SocketAddr, SphinxPacket), SphinxPacketEncapsulationError> {
110+
let delays = sphinx::header::delays::generate_from_average_duration(route.len(), average_delay);
111+
112+
// build the packet
113+
let packet = sphinx::SphinxPacket::new(message, &route[..], &recipient, &delays)?;
114+
115+
let first_node_address =
116+
addressing::socket_address_from_encoded_bytes(route.first().unwrap().address.to_bytes())?;
117+
118+
Ok((first_node_address, packet))
119+
}

common/clients/multi-tcp-client/src/connection_manager/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl<'a> ConnectionManager<'a> {
7777
return match conn_writer.write_all(msg).await {
7878
// if we failed to write to connection we should reconnect
7979
// TODO: is this true? can we fail to write to a connection while it still remains open and valid?
80-
Ok(res) => Ok(res),
80+
Ok(_) => Ok(()),
8181
Err(e) => {
8282
trace!("Creating connection reconnector!");
8383
self.state = ConnectionState::Reconnecting(ConnectionReconnector::new(

common/crypto/src/identity/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ impl MixIdentityPrivateKey {
121121

122122
// TODO: this will be implemented differently by using the proper trait
123123
impl MixIdentityPrivateKey {
124-
pub fn as_scalar(self) -> Scalar {
125-
let encryption_key = self.0;
124+
pub fn as_scalar(&self) -> Scalar {
125+
let encryption_key = &self.0;
126126
encryption_key.0
127127
}
128128
}

common/topology/src/lib.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ mod filter;
1010
pub mod mix;
1111
pub mod provider;
1212

13-
pub trait NymTopology: Sized + std::fmt::Debug + Send + Sync {
13+
// TODO: Figure out why 'Clone' was required to have 'TopologyAccessor<T>' working
14+
// even though it only contains an Arc
15+
pub trait NymTopology: Sized + std::fmt::Debug + Send + Sync + Clone {
1416
fn new(directory_server: String) -> Self;
1517
fn new_from_nodes(
1618
mix_nodes: Vec<mix::Node>,

mixnode/src/node/listener.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async fn process_socket_connection(
5555

5656
// we must be able to handle multiple packets from same connection independently
5757
tokio::spawn(process_received_packet(
58-
buf.clone(),
58+
buf,
5959
// note: processing_data is relatively cheap (and safe) to clone -
6060
// it contains arc to private key and metrics reporter (which is just
6161
// a single mpsc unbounded sender)

nym-client/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ crypto = {path = "../common/crypto"}
3434
directory-client = { path = "../common/clients/directory-client" }
3535
healthcheck = { path = "../common/healthcheck" }
3636
mix-client = { path = "../common/clients/mix-client" }
37+
multi-tcp-client = { path = "../common/clients/multi-tcp-client" }
3738
pemstore = {path = "../common/pemstore"}
3839
provider-client = { path = "../common/clients/provider-client" }
3940
sfw-provider-requests = { path = "../sfw-provider/sfw-provider-requests" }
+93-27
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,120 @@
1-
use crate::client::mix_traffic::MixMessage;
2-
use crate::client::topology_control::TopologyInnerRef;
3-
use futures::channel::mpsc;
4-
use log::{error, info, trace, warn};
1+
use crate::client::mix_traffic::{MixMessage, MixMessageSender};
2+
use crate::client::topology_control::TopologyAccessor;
3+
use futures::task::{Context, Poll};
4+
use futures::{Future, Stream, StreamExt};
5+
use log::*;
56
use sphinx::route::Destination;
6-
use std::time;
7+
use std::pin::Pin;
8+
use std::time::Duration;
9+
use tokio::runtime::Handle;
10+
use tokio::task::JoinHandle;
11+
use tokio::time;
712
use topology::NymTopology;
813

9-
pub(crate) async fn start_loop_cover_traffic_stream<T: NymTopology>(
10-
tx: mpsc::UnboundedSender<MixMessage>,
14+
pub(crate) struct LoopCoverTrafficStream<T: NymTopology> {
15+
average_packet_delay: Duration,
16+
average_cover_message_sending_delay: Duration,
17+
next_delay: time::Delay,
18+
mix_tx: MixMessageSender,
1119
our_info: Destination,
12-
topology_ctrl_ref: TopologyInnerRef<T>,
13-
average_cover_message_delay_duration: time::Duration,
14-
average_packet_delay_duration: time::Duration,
15-
) {
16-
info!("Starting loop cover traffic stream");
17-
loop {
18-
trace!("next cover message!");
19-
let delay_duration = mix_client::poisson::sample(average_cover_message_delay_duration);
20-
tokio::time::delay_for(delay_duration).await;
20+
topology_access: TopologyAccessor<T>,
21+
}
22+
23+
impl<T: NymTopology> Stream for LoopCoverTrafficStream<T> {
24+
// Item is only used to indicate we should create a new message rather than actual cover message
25+
// reason being to not introduce unnecessary complexity by having to keep state of topology
26+
// mutex when trying to acquire it. So right now the Stream trait serves as a glorified timer.
27+
// Perhaps this should be changed in the future.
28+
type Item = ();
29+
30+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31+
// it is not yet time to return a message
32+
if Pin::new(&mut self.next_delay).poll(cx).is_pending() {
33+
return Poll::Pending;
34+
};
35+
36+
// we know it's time to send a message, so let's prepare delay for the next one
37+
// Get the `now` by looking at the current `delay` deadline
38+
let now = self.next_delay.deadline();
39+
let next_poisson_delay =
40+
mix_client::poisson::sample(self.average_cover_message_sending_delay);
41+
42+
// The next interval value is `next_poisson_delay` after the one that just
43+
// yielded.
44+
let next = now + next_poisson_delay;
45+
self.next_delay.reset(next);
46+
47+
Poll::Ready(Some(()))
48+
}
49+
}
50+
51+
impl<T: 'static + NymTopology> LoopCoverTrafficStream<T> {
52+
pub(crate) fn new(
53+
mix_tx: MixMessageSender,
54+
our_info: Destination,
55+
topology_access: TopologyAccessor<T>,
56+
average_cover_message_sending_delay: time::Duration,
57+
average_packet_delay: time::Duration,
58+
) -> Self {
59+
LoopCoverTrafficStream {
60+
average_packet_delay,
61+
average_cover_message_sending_delay,
62+
next_delay: time::delay_for(Default::default()),
63+
mix_tx,
64+
our_info,
65+
topology_access,
66+
}
67+
}
2168

22-
let read_lock = topology_ctrl_ref.read().await;
23-
let topology = match read_lock.topology.as_ref() {
69+
async fn on_new_message(&mut self) {
70+
trace!("next cover message!");
71+
let route = match self.topology_access.random_route().await {
2472
None => {
2573
warn!("No valid topology detected - won't send any loop cover message this time");
26-
continue;
74+
return;
2775
}
28-
Some(topology) => topology,
76+
Some(route) => route,
2977
};
3078

31-
let cover_message = match mix_client::packet::loop_cover_message(
32-
our_info.address.clone(),
33-
our_info.identifier,
34-
topology,
35-
average_packet_delay_duration,
79+
let cover_message = match mix_client::packet::loop_cover_message_route(
80+
self.our_info.address.clone(),
81+
self.our_info.identifier,
82+
route,
83+
self.average_packet_delay,
3684
) {
3785
Ok(message) => message,
3886
Err(err) => {
3987
error!(
4088
"Somehow we managed to create an invalid cover message - {:?}",
4189
err
4290
);
43-
continue;
91+
return;
4492
}
4593
};
4694

4795
// if this one fails, there's no retrying because it means that either:
4896
// - we run out of memory
4997
// - the receiver channel is closed
5098
// in either case there's no recovery and we can only panic
51-
tx.unbounded_send(MixMessage::new(cover_message.0, cover_message.1))
99+
self.mix_tx
100+
.unbounded_send(MixMessage::new(cover_message.0, cover_message.1))
52101
.unwrap();
53102
}
103+
104+
async fn run(&mut self) {
105+
// we should set initial delay only when we actually start the stream
106+
self.next_delay = time::delay_for(mix_client::poisson::sample(
107+
self.average_cover_message_sending_delay,
108+
));
109+
110+
while let Some(_) = self.next().await {
111+
self.on_new_message().await;
112+
}
113+
}
114+
115+
pub(crate) fn start(mut self, handle: &Handle) -> JoinHandle<()> {
116+
handle.spawn(async move {
117+
self.run().await;
118+
})
119+
}
54120
}

nym-client/src/client/mix_traffic.rs

+57-21
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,73 @@
11
use futures::channel::mpsc;
22
use futures::StreamExt;
3-
use log::{debug, error, info, trace};
3+
use log::*;
44
use sphinx::SphinxPacket;
55
use std::net::SocketAddr;
6+
use std::time::Duration;
7+
use tokio::runtime::Handle;
8+
use tokio::task::JoinHandle;
69

710
pub(crate) struct MixMessage(SocketAddr, SphinxPacket);
11+
pub(crate) type MixMessageSender = mpsc::UnboundedSender<MixMessage>;
12+
pub(crate) type MixMessageReceiver = mpsc::UnboundedReceiver<MixMessage>;
813

914
impl MixMessage {
1015
pub(crate) fn new(address: SocketAddr, packet: SphinxPacket) -> Self {
1116
MixMessage(address, packet)
1217
}
1318
}
1419

15-
pub(crate) struct MixTrafficController;
16-
17-
impl MixTrafficController {
18-
pub(crate) async fn run(mut rx: mpsc::UnboundedReceiver<MixMessage>) {
19-
info!("Mix Traffic Controller started!");
20-
let mix_client = mix_client::MixClient::new();
21-
while let Some(mix_message) = rx.next().await {
22-
debug!("Got a mix_message for {:?}", mix_message.0);
23-
let send_res = mix_client.send(mix_message.1, mix_message.0).await;
24-
match send_res {
25-
Ok(_) => {
26-
trace!("sent a mix message");
27-
}
28-
// TODO: should there be some kind of threshold of failed messages
29-
// that if reached, the application blows?
30-
Err(e) => error!(
31-
"We failed to send the message to {} :( - {:?}",
32-
mix_message.0, e
33-
),
34-
};
20+
// TODO: put our TCP client here
21+
pub(crate) struct MixTrafficController<'a> {
22+
tcp_client: multi_tcp_client::Client<'a>,
23+
mix_rx: MixMessageReceiver,
24+
}
25+
26+
impl MixTrafficController<'static> {
27+
pub(crate) async fn new(
28+
initial_endpoints: Vec<SocketAddr>,
29+
initial_reconnection_backoff: Duration,
30+
maximum_reconnection_backoff: Duration,
31+
mix_rx: MixMessageReceiver,
32+
) -> Self {
33+
let tcp_client_config = multi_tcp_client::Config::new(
34+
initial_endpoints,
35+
initial_reconnection_backoff,
36+
maximum_reconnection_backoff,
37+
);
38+
39+
MixTrafficController {
40+
tcp_client: multi_tcp_client::Client::new(tcp_client_config).await,
41+
mix_rx,
42+
}
43+
}
44+
45+
async fn on_message(&mut self, mix_message: MixMessage) {
46+
debug!("Got a mix_message for {:?}", mix_message.0);
47+
match self
48+
.tcp_client
49+
.send(mix_message.0, &mix_message.1.to_bytes())
50+
.await
51+
{
52+
Ok(_) => trace!("sent a mix message"),
53+
// TODO: should there be some kind of threshold of failed messages
54+
// that if reached, the application blows?
55+
Err(e) => error!(
56+
"We failed to send the packet to {} - {:?}",
57+
mix_message.0, e
58+
),
59+
};
60+
}
61+
62+
pub(crate) async fn run(&mut self) {
63+
while let Some(mix_message) = self.mix_rx.next().await {
64+
self.on_message(mix_message).await;
3565
}
3666
}
67+
68+
pub(crate) fn start(mut self, handle: &Handle) -> JoinHandle<()> {
69+
handle.spawn(async move {
70+
self.run().await;
71+
})
72+
}
3773
}

0 commit comments

Comments
 (0)