Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/instant sending #359

Merged
merged 32 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
80259ad
Ability to set client in vpn mode
jstuczyn Sep 17, 2020
b040ab8
Connection handler for mixnode
jstuczyn Sep 17, 2020
b64f004
Initial vpn mode for mixes
jstuczyn Sep 21, 2020
c62ae61
Updated SphinxCodec to contain more metadata
jstuczyn Sep 22, 2020
2138da3
Renaming
jstuczyn Sep 22, 2020
0bccb05
Removed handle from mixnet client and introduced forwarder
jstuczyn Sep 22, 2020
9d4fb7c
Mixnode using new forwarder
jstuczyn Sep 22, 2020
e812767
Mixnode common module containing shared packet processing
jstuczyn Sep 23, 2020
b460a00
ibid. incorporated inside mixnode
jstuczyn Sep 23, 2020
c470ee0
New processing for gateway
jstuczyn Sep 23, 2020
1cc652f
Type cleanup
jstuczyn Sep 24, 2020
5a75122
Wasm fix
jstuczyn Sep 24, 2020
372ab21
Fixed client config
jstuczyn Sep 24, 2020
6e742d0
Fixed mixnode runtime issues
jstuczyn Sep 24, 2020
baa503a
Formatting
jstuczyn Sep 24, 2020
e4f1f87
Client re-using secret on 'normal' packets
jstuczyn Sep 24, 2020
4eb0f7d
Using the same key for acks
jstuczyn Sep 24, 2020
cf51b66
WIP
jstuczyn Sep 28, 2020
e481d65
vpn key manager cleanup
jstuczyn Sep 29, 2020
e37b95c
wasm fix
jstuczyn Sep 29, 2020
115ebdd
VPN_KEY_REUSE_LIMIT moved to config
jstuczyn Sep 29, 2020
7984295
Moved AckDelayQueue to separate common crate
jstuczyn Sep 29, 2020
bad5888
Key cache invalidator
jstuczyn Sep 30, 2020
a666887
Updated dashmap used in gateway
jstuczyn Sep 30, 2020
1746950
Old typo
jstuczyn Sep 30, 2020
68c0843
Additional comment
jstuczyn Sep 30, 2020
3a3d213
Cargo fmt
jstuczyn Sep 30, 2020
cbad460
Fixed tests
jstuczyn Sep 30, 2020
2bb3a5c
Merge branch 'develop' into feature/instant_sending
jstuczyn Sep 30, 2020
4fe2124
Sphinx update
jstuczyn Sep 30, 2020
af0286e
cache ttl as config option
jstuczyn Sep 30, 2020
553b102
Cargo fmt
jstuczyn Sep 30, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 56 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ members = [
"common/client-libs/validator-client",
"common/config",
"common/crypto",
"common/mixnode-common",
"common/nonexhaustive-delayqueue",
"common/nymsphinx",
"common/nymsphinx/acknowledgements",
"common/nymsphinx/addressing",
"common/nymsphinx/anonymous-replies",
"common/nymsphinx/chunking",
"common/nymsphinx/cover",
"common/nymsphinx/forwarding",
"common/nymsphinx/framing",
"common/nymsphinx/params",
"common/nymsphinx/types",
Expand Down
1 change: 1 addition & 0 deletions clients/client-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ crypto = { path = "../../common/crypto" }
directory-client = { path = "../../common/client-libs/directory-client" }
gateway-client = { path = "../../common/client-libs/gateway-client" }
gateway-requests = { path = "../../gateway/gateway-requests" }
nonexhaustive-delayqueue = { path = "../../common/nonexhaustive-delayqueue" }
nymsphinx = { path = "../../common/nymsphinx" }
pemstore = { path = "../../common/pemstore" }
topology = { path = "../../common/topology" }
Expand Down
10 changes: 4 additions & 6 deletions clients/client-core/src/client/cover_traffic_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::client::mix_traffic::{MixMessage, MixMessageSender};
use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::topology_control::TopologyAccessor;
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
Expand Down Expand Up @@ -50,7 +50,7 @@ where

/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
/// out to the network without any further delays.
mix_tx: MixMessageSender,
mix_tx: BatchMixMessageSender,

/// Represents full address of this client.
our_full_destination: Recipient,
Expand Down Expand Up @@ -101,7 +101,7 @@ impl LoopCoverTrafficStream<OsRng> {
average_ack_delay: time::Duration,
average_packet_delay: time::Duration,
average_cover_message_sending_delay: time::Duration,
mix_tx: MixMessageSender,
mix_tx: BatchMixMessageSender,
our_full_destination: Recipient,
topology_access: TopologyAccessor,
) -> Self {
Expand Down Expand Up @@ -152,9 +152,7 @@ impl LoopCoverTrafficStream<OsRng> {
// - we run out of memory
// - the receiver channel is closed
// in either case there's no recovery and we can only panic
self.mix_tx
.unbounded_send(MixMessage::new(cover_message.0, cover_message.1))
.unwrap();
self.mix_tx.unbounded_send(vec![cover_message]).unwrap();

// TODO: I'm not entirely sure whether this is really required, because I'm not 100%
// sure how `yield_now()` works - whether it just notifies the scheduler or whether it
Expand Down
51 changes: 27 additions & 24 deletions clients/client-core/src/client/mix_traffic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,67 +16,70 @@ use futures::channel::mpsc;
use futures::StreamExt;
use gateway_client::GatewayClient;
use log::*;
use nymsphinx::{addressing::nodes::NymNodeRoutingAddress, SphinxPacket};
use nymsphinx::forwarding::packet::MixPacket;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;

pub struct MixMessage(NymNodeRoutingAddress, SphinxPacket);
pub type MixMessageSender = mpsc::UnboundedSender<MixMessage>;
pub type MixMessageReceiver = mpsc::UnboundedReceiver<MixMessage>;

impl MixMessage {
pub fn new(address: NymNodeRoutingAddress, packet: SphinxPacket) -> Self {
MixMessage(address, packet)
}
}
pub type BatchMixMessageSender = mpsc::UnboundedSender<Vec<MixPacket>>;
pub type BatchMixMessageReceiver = mpsc::UnboundedReceiver<Vec<MixPacket>>;

const MAX_FAILURE_COUNT: usize = 100;

pub struct MixTrafficController {
// TODO: most likely to be replaced by some higher level construct as
// later on gateway_client will need to be accessible by other entities
gateway_client: GatewayClient,
mix_rx: MixMessageReceiver,
mix_rx: BatchMixMessageReceiver,

// TODO: this is temporary work-around.
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
consecutive_gateway_failure_count: usize,
}

impl MixTrafficController {
pub fn new(mix_rx: MixMessageReceiver, gateway_client: GatewayClient) -> MixTrafficController {
pub fn new(
mix_rx: BatchMixMessageReceiver,
gateway_client: GatewayClient,
) -> MixTrafficController {
MixTrafficController {
gateway_client,
mix_rx,
consecutive_gateway_failure_count: 0,
}
}

async fn on_message(&mut self, mix_message: MixMessage) {
debug!("Got a mix_message for {:?}", mix_message.0);
match self
.gateway_client
.send_sphinx_packet(mix_message.0, mix_message.1)
.await
{
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
debug_assert!(!mix_packets.is_empty());

let success = if mix_packets.len() == 1 {
let mix_packet = mix_packets.pop().unwrap();
self.gateway_client.send_mix_packet(mix_packet).await
} else {
self.gateway_client
.batch_send_mix_packets(mix_packets)
.await
};

match success {
Err(e) => {
error!("Failed to send sphinx packet to the gateway! - {:?}", e);
error!("Failed to send sphinx packet(s) to the gateway! - {:?}", e);
self.consecutive_gateway_failure_count += 1;
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// todo: in the future this should initiate a 'graceful' shutdown
// todo: in the future this should initiate a 'graceful' shutdown or try
// to reconnect?
panic!("failed to send sphinx packet to the gateway {} times in a row - assuming the gateway is dead. Can't do anything about it yet :(", MAX_FAILURE_COUNT)
}
}
Ok(_) => {
trace!("We *might* have managed to forward sphinx packet to the gateway!");
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
}
}

pub async fn run(&mut self) {
while let Some(mix_message) = self.mix_rx.next().await {
self.on_message(mix_message).await;
while let Some(mix_packets) = self.mix_rx.next().await {
self.on_messages(mix_packets).await;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
// limitations under the License.

use super::PendingAcknowledgement;
use crate::client::real_messages_control::acknowledgement_control::ack_delay_queue::AckDelayQueue;
use crate::client::real_messages_control::acknowledgement_control::RetransmissionRequestSender;
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use log::*;
use nonexhaustive_delayqueue::NonExhaustiveDelayQueue;
use nymsphinx::chunking::fragment::FragmentIdentifier;
use nymsphinx::Delay as SphinxDelay;
use std::collections::HashMap;
Expand Down Expand Up @@ -105,7 +105,7 @@ pub(super) struct ActionController {
// previous version.
/// DelayQueue with all `PendingAcknowledgement` that are waiting to be either received or
/// retransmitted if their timer fires up.
pending_acks_timers: AckDelayQueue<FragmentIdentifier>,
pending_acks_timers: NonExhaustiveDelayQueue<FragmentIdentifier>,

/// Channel for receiving `Action`s from other modules.
incoming_actions: UnboundedReceiver<Action>,
Expand All @@ -124,7 +124,7 @@ impl ActionController {
ActionController {
config,
pending_acks_data: HashMap::new(),
pending_acks_timers: AckDelayQueue::new(),
pending_acks_timers: NonExhaustiveDelayQueue::new(),
incoming_actions: receiver,
retransmission_sender,
},
Expand Down
Loading