Skip to content

Commit 4f6b2ae

Browse files
authored
Feature/instant sending (#359)
* Ability to set client in vpn mode * Connection handler for mixnode * Initial vpn mode for mixes * Updated SphinxCodec to contain more metadata * Renaming * Removed handle from mixnet client and introduced forwarder * Mixnode using new forwarder * Mixnode common module containing shared packet processing * ibid. incorporated inside mixnode * New processing for gateway * Type cleanup * Wasm fix * Fixed client config * Fixed mixnode runtime issues * Formatting * Client re-using secret on 'normal' packets * Using the same key for acks * WIP * vpn key manager cleanup * wasm fix * VPN_KEY_REUSE_LIMIT moved to config * Moved AckDelayQueue to separate common crate * Key cache invalidator * Updated dashmap used in gateway * Old typo * Additional comment * Cargo fmt * Fixed tests * Sphinx update * cache ttl as config option * Cargo fmt
1 parent ebea016 commit 4f6b2ae

File tree

78 files changed

+3073
-3924
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+3073
-3924
lines changed

Cargo.lock

+56-26
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@ members = [
2020
"common/client-libs/validator-client",
2121
"common/config",
2222
"common/crypto",
23+
"common/mixnode-common",
24+
"common/nonexhaustive-delayqueue",
2325
"common/nymsphinx",
2426
"common/nymsphinx/acknowledgements",
2527
"common/nymsphinx/addressing",
2628
"common/nymsphinx/anonymous-replies",
2729
"common/nymsphinx/chunking",
2830
"common/nymsphinx/cover",
31+
"common/nymsphinx/forwarding",
2932
"common/nymsphinx/framing",
3033
"common/nymsphinx/params",
3134
"common/nymsphinx/types",

clients/client-core/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ crypto = { path = "../../common/crypto" }
2222
directory-client = { path = "../../common/client-libs/directory-client" }
2323
gateway-client = { path = "../../common/client-libs/gateway-client" }
2424
gateway-requests = { path = "../../gateway/gateway-requests" }
25+
nonexhaustive-delayqueue = { path = "../../common/nonexhaustive-delayqueue" }
2526
nymsphinx = { path = "../../common/nymsphinx" }
2627
pemstore = { path = "../../common/pemstore" }
2728
topology = { path = "../../common/topology" }

clients/client-core/src/client/cover_traffic_stream.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use crate::client::mix_traffic::{MixMessage, MixMessageSender};
15+
use crate::client::mix_traffic::BatchMixMessageSender;
1616
use crate::client::topology_control::TopologyAccessor;
1717
use futures::task::{Context, Poll};
1818
use futures::{Future, Stream, StreamExt};
@@ -50,7 +50,7 @@ where
5050

5151
/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
5252
/// out to the network without any further delays.
53-
mix_tx: MixMessageSender,
53+
mix_tx: BatchMixMessageSender,
5454

5555
/// Represents full address of this client.
5656
our_full_destination: Recipient,
@@ -101,7 +101,7 @@ impl LoopCoverTrafficStream<OsRng> {
101101
average_ack_delay: time::Duration,
102102
average_packet_delay: time::Duration,
103103
average_cover_message_sending_delay: time::Duration,
104-
mix_tx: MixMessageSender,
104+
mix_tx: BatchMixMessageSender,
105105
our_full_destination: Recipient,
106106
topology_access: TopologyAccessor,
107107
) -> Self {
@@ -152,9 +152,7 @@ impl LoopCoverTrafficStream<OsRng> {
152152
// - we run out of memory
153153
// - the receiver channel is closed
154154
// in either case there's no recovery and we can only panic
155-
self.mix_tx
156-
.unbounded_send(MixMessage::new(cover_message.0, cover_message.1))
157-
.unwrap();
155+
self.mix_tx.unbounded_send(vec![cover_message]).unwrap();
158156

159157
// TODO: I'm not entirely sure whether this is really required, because I'm not 100%
160158
// sure how `yield_now()` works - whether it just notifies the scheduler or whether it

clients/client-core/src/client/mix_traffic.rs

+27-24
Original file line numberDiff line numberDiff line change
@@ -16,67 +16,70 @@ use futures::channel::mpsc;
1616
use futures::StreamExt;
1717
use gateway_client::GatewayClient;
1818
use log::*;
19-
use nymsphinx::{addressing::nodes::NymNodeRoutingAddress, SphinxPacket};
19+
use nymsphinx::forwarding::packet::MixPacket;
2020
use tokio::runtime::Handle;
2121
use tokio::task::JoinHandle;
2222

23-
pub struct MixMessage(NymNodeRoutingAddress, SphinxPacket);
24-
pub type MixMessageSender = mpsc::UnboundedSender<MixMessage>;
25-
pub type MixMessageReceiver = mpsc::UnboundedReceiver<MixMessage>;
26-
27-
impl MixMessage {
28-
pub fn new(address: NymNodeRoutingAddress, packet: SphinxPacket) -> Self {
29-
MixMessage(address, packet)
30-
}
31-
}
23+
pub type BatchMixMessageSender = mpsc::UnboundedSender<Vec<MixPacket>>;
24+
pub type BatchMixMessageReceiver = mpsc::UnboundedReceiver<Vec<MixPacket>>;
3225

3326
const MAX_FAILURE_COUNT: usize = 100;
3427

3528
pub struct MixTrafficController {
3629
// TODO: most likely to be replaced by some higher level construct as
3730
// later on gateway_client will need to be accessible by other entities
3831
gateway_client: GatewayClient,
39-
mix_rx: MixMessageReceiver,
32+
mix_rx: BatchMixMessageReceiver,
4033

4134
// TODO: this is temporary work-around.
4235
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
4336
consecutive_gateway_failure_count: usize,
4437
}
4538

4639
impl MixTrafficController {
47-
pub fn new(mix_rx: MixMessageReceiver, gateway_client: GatewayClient) -> MixTrafficController {
40+
pub fn new(
41+
mix_rx: BatchMixMessageReceiver,
42+
gateway_client: GatewayClient,
43+
) -> MixTrafficController {
4844
MixTrafficController {
4945
gateway_client,
5046
mix_rx,
5147
consecutive_gateway_failure_count: 0,
5248
}
5349
}
5450

55-
async fn on_message(&mut self, mix_message: MixMessage) {
56-
debug!("Got a mix_message for {:?}", mix_message.0);
57-
match self
58-
.gateway_client
59-
.send_sphinx_packet(mix_message.0, mix_message.1)
60-
.await
61-
{
51+
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
52+
debug_assert!(!mix_packets.is_empty());
53+
54+
let success = if mix_packets.len() == 1 {
55+
let mix_packet = mix_packets.pop().unwrap();
56+
self.gateway_client.send_mix_packet(mix_packet).await
57+
} else {
58+
self.gateway_client
59+
.batch_send_mix_packets(mix_packets)
60+
.await
61+
};
62+
63+
match success {
6264
Err(e) => {
63-
error!("Failed to send sphinx packet to the gateway! - {:?}", e);
65+
error!("Failed to send sphinx packet(s) to the gateway! - {:?}", e);
6466
self.consecutive_gateway_failure_count += 1;
6567
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
66-
// todo: in the future this should initiate a 'graceful' shutdown
68+
// todo: in the future this should initiate a 'graceful' shutdown or try
69+
// to reconnect?
6770
panic!("failed to send sphinx packet to the gateway {} times in a row - assuming the gateway is dead. Can't do anything about it yet :(", MAX_FAILURE_COUNT)
6871
}
6972
}
7073
Ok(_) => {
71-
trace!("We *might* have managed to forward sphinx packet to the gateway!");
74+
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
7275
self.consecutive_gateway_failure_count = 0;
7376
}
7477
}
7578
}
7679

7780
pub async fn run(&mut self) {
78-
while let Some(mix_message) = self.mix_rx.next().await {
79-
self.on_message(mix_message).await;
81+
while let Some(mix_packets) = self.mix_rx.next().await {
82+
self.on_messages(mix_packets).await;
8083
}
8184
}
8285

clients/client-core/src/client/real_messages_control/acknowledgement_control/action_controller.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
// limitations under the License.
1414

1515
use super::PendingAcknowledgement;
16-
use crate::client::real_messages_control::acknowledgement_control::ack_delay_queue::AckDelayQueue;
1716
use crate::client::real_messages_control::acknowledgement_control::RetransmissionRequestSender;
1817
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
1918
use log::*;
19+
use nonexhaustive_delayqueue::NonExhaustiveDelayQueue;
2020
use nymsphinx::chunking::fragment::FragmentIdentifier;
2121
use nymsphinx::Delay as SphinxDelay;
2222
use std::collections::HashMap;
@@ -105,7 +105,7 @@ pub(super) struct ActionController {
105105
// previous version.
106106
/// DelayQueue with all `PendingAcknowledgement` that are waiting to be either received or
107107
/// retransmitted if their timer fires up.
108-
pending_acks_timers: AckDelayQueue<FragmentIdentifier>,
108+
pending_acks_timers: NonExhaustiveDelayQueue<FragmentIdentifier>,
109109

110110
/// Channel for receiving `Action`s from other modules.
111111
incoming_actions: UnboundedReceiver<Action>,
@@ -124,7 +124,7 @@ impl ActionController {
124124
ActionController {
125125
config,
126126
pending_acks_data: HashMap::new(),
127-
pending_acks_timers: AckDelayQueue::new(),
127+
pending_acks_timers: NonExhaustiveDelayQueue::new(),
128128
incoming_actions: receiver,
129129
retransmission_sender,
130130
},

0 commit comments

Comments
 (0)