Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/panic improvements #101

Merged
merged 8 commits into from
Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions common/addressing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ pub fn encoded_bytes_from_socket_address(address: SocketAddr) -> [u8; 32] {
address_bytes
}

pub fn socket_address_from_encoded_bytes(b: [u8; 32]) -> SocketAddr {
let address_type: AddressType = b[0].try_into().unwrap();
pub fn socket_address_from_encoded_bytes(b: [u8; 32]) -> Result<SocketAddr, AddressTypeError> {
let address_type: AddressType = b[0].try_into()?;

let port: u16 = u16::from_be_bytes([b[1], b[2]]);

Expand All @@ -76,5 +76,5 @@ pub fn socket_address_from_encoded_bytes(b: [u8; 32]) -> SocketAddr {
}
};

SocketAddr::new(ip, port)
Ok(SocketAddr::new(ip, port))
}
35 changes: 20 additions & 15 deletions common/clients/directory-client/src/presence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::net::ToSocketAddrs;
use topology::coco;
use topology::mix;
use topology::provider;
use topology::provider::Node;
use topology::NymTopology;

// special version of 'PartialEq' that does not care about last_seen field (where applicable)
Expand Down Expand Up @@ -62,16 +63,18 @@ impl TryInto<topology::mix::Node> for MixNodePresence {
type Error = io::Error;

fn try_into(self) -> Result<topology::mix::Node, Self::Error> {
let resolved_hostname = self.host.to_socket_addrs()?.next();
if resolved_hostname.is_none() {
return Err(io::Error::new(
io::ErrorKind::Other,
"no valid socket address",
));
}
let resolved_hostname = match self.host.to_socket_addrs()?.next() {
None => {
return Err(io::Error::new(
io::ErrorKind::Other,
"no valid socket address",
));
}
Some(host) => host,
};

Ok(topology::mix::Node {
host: resolved_hostname.unwrap(),
host: resolved_hostname,
pub_key: self.pub_key,
layer: self.layer,
last_seen: self.last_seen,
Expand Down Expand Up @@ -129,11 +132,13 @@ impl PresenceEq for MixProviderPresence {
}
}

impl Into<topology::provider::Node> for MixProviderPresence {
fn into(self) -> topology::provider::Node {
topology::provider::Node {
client_listener: self.client_listener.parse().unwrap(),
mixnet_listener: self.mixnet_listener.parse().unwrap(),
impl TryInto<topology::provider::Node> for MixProviderPresence {
type Error = std::net::AddrParseError;

fn try_into(self) -> Result<Node, Self::Error> {
Ok(topology::provider::Node {
client_listener: self.client_listener.parse()?,
mixnet_listener: self.mixnet_listener.parse()?,
pub_key: self.pub_key,
registered_clients: self
.registered_clients
Expand All @@ -142,7 +147,7 @@ impl Into<topology::provider::Node> for MixProviderPresence {
.collect(),
last_seen: self.last_seen,
version: self.version,
}
})
}
}

Expand Down Expand Up @@ -345,7 +350,7 @@ impl NymTopology for Topology {
fn providers(&self) -> Vec<provider::Node> {
self.mix_provider_nodes
.iter()
.map(|x| x.clone().into())
.filter_map(|x| x.clone().try_into().ok())
.collect()
}

Expand Down
51 changes: 44 additions & 7 deletions common/clients/mix-client/src/packet.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,49 @@
use addressing;
use addressing::AddressTypeError;
use sphinx::route::{Destination, DestinationAddressBytes, SURBIdentifier};
use sphinx::SphinxPacket;
use std::net::SocketAddr;
use topology::NymTopology;
use topology::{NymTopology, NymTopologyError};

pub const LOOP_COVER_MESSAGE_PAYLOAD: &[u8] = b"The cake is a lie!";
pub const LOOP_COVER_MESSAGE_AVERAGE_DELAY: f64 = 2.0;

#[derive(Debug)]
pub enum SphinxPacketEncapsulationError {
NoValidProvidersError,
InvalidTopologyError,
SphinxEncapsulationError(sphinx::header::SphinxUnwrapError),
InvalidFirstMixAddress,
}

impl From<topology::NymTopologyError> for SphinxPacketEncapsulationError {
fn from(_: NymTopologyError) -> Self {
use SphinxPacketEncapsulationError::*;
InvalidTopologyError
}
}

// it is correct error we're converting from, it just has an unfortunate name
// related issue: https://github.com/nymtech/sphinx/issues/40
impl From<sphinx::header::SphinxUnwrapError> for SphinxPacketEncapsulationError {
fn from(err: sphinx::header::SphinxUnwrapError) -> Self {
use SphinxPacketEncapsulationError::*;
SphinxEncapsulationError(err)
}
}

impl From<AddressTypeError> for SphinxPacketEncapsulationError {
fn from(_: AddressTypeError) -> Self {
use SphinxPacketEncapsulationError::*;
InvalidFirstMixAddress
}
}

pub fn loop_cover_message<T: NymTopology>(
our_address: DestinationAddressBytes,
surb_id: SURBIdentifier,
topology: &T,
) -> (SocketAddr, SphinxPacket) {
) -> Result<(SocketAddr, SphinxPacket), SphinxPacketEncapsulationError> {
let destination = Destination::new(our_address, surb_id);

encapsulate_message(
Expand All @@ -27,19 +59,24 @@ pub fn encapsulate_message<T: NymTopology>(
message: Vec<u8>,
topology: &T,
average_delay: f64,
) -> (SocketAddr, SphinxPacket) {
) -> Result<(SocketAddr, SphinxPacket), SphinxPacketEncapsulationError> {
let mut providers = topology.providers();
if providers.len() == 0 {
return Err(SphinxPacketEncapsulationError::NoValidProvidersError);
}
// unwrap is fine here as we asserted there is at least single provider
let provider = providers.pop().unwrap().into();

let route = topology.route_to(provider).unwrap();
let route = topology.route_to(provider)?;

let delays = sphinx::header::delays::generate(route.len(), average_delay);

// build the packet
let packet = sphinx::SphinxPacket::new(message, &route[..], &recipient, &delays).unwrap();
let packet = sphinx::SphinxPacket::new(message, &route[..], &recipient, &delays)?;

// we know the mix route must be valid otherwise we would have already returned an error
let first_node_address =
addressing::socket_address_from_encoded_bytes(route.first().unwrap().address.to_bytes());
addressing::socket_address_from_encoded_bytes(route.first().unwrap().address.to_bytes())?;

(first_node_address, packet)
Ok((first_node_address, packet))
}
3 changes: 3 additions & 0 deletions common/clients/mix-client/src/poisson.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use rand_distr::{Distribution, Exp};

pub fn sample(average_delay: f64) -> f64 {
// this is our internal code used by our traffic streams
// the error is only thrown if average delay is less than 0, which will never happen
// so call to unwrap is perfectly safe here
let exp = Exp::new(1.0 / average_delay).unwrap();
exp.sample(&mut rand::thread_rng())
}
15 changes: 9 additions & 6 deletions common/clients/provider-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl ProviderClient {
pub async fn send_request(&self, bytes: Vec<u8>) -> Result<Vec<u8>, ProviderClientError> {
let mut socket = tokio::net::TcpStream::connect(self.provider_network_address).await?;

socket.set_keepalive(Some(Duration::from_secs(2))).unwrap();
socket.set_keepalive(Some(Duration::from_secs(2)))?;
socket.write_all(&bytes[..]).await?;
if let Err(e) = socket.shutdown(Shutdown::Write) {
warn!("failed to close write part of the socket; err = {:?}", e)
Expand All @@ -82,11 +82,14 @@ impl ProviderClient {
}

pub async fn retrieve_messages(&self) -> Result<Vec<Vec<u8>>, ProviderClientError> {
if self.auth_token.is_none() {
return Err(ProviderClientError::EmptyAuthTokenError);
}

let pull_request = PullRequest::new(self.our_address, self.auth_token.unwrap());
let auth_token = match self.auth_token {
Some(token) => token,
None => {
return Err(ProviderClientError::EmptyAuthTokenError);
}
};

let pull_request = PullRequest::new(self.our_address, auth_token);
let bytes = pull_request.to_bytes();

let response = self.send_request(bytes).await?;
Expand Down
1 change: 1 addition & 0 deletions common/crypto/src/encryption/x25519.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl MixnetEncryptionPrivateKey for PrivateKey {
fn from_bytes(b: &[u8]) -> Self {
let mut bytes = [0; 32];
bytes.copy_from_slice(&b[..]);
// due to trait restriction we have no choice but to panic if this fails
let key = Scalar::from_canonical_bytes(bytes).unwrap();
Self(key)
}
Expand Down
34 changes: 27 additions & 7 deletions common/healthcheck/src/path_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,23 @@ impl PathChecker {
let mut provider_messages = Vec::new();
for provider_client in self.provider_clients.values() {
// if it was none all associated paths were already marked as unhealthy
if provider_client.is_some() {
let pc = provider_client.as_ref().unwrap();
provider_messages.extend(self.resolve_pending_provider_checks(pc).await);
}
let pc = match provider_client {
Some(pc) => pc,
None => continue,
};

provider_messages.extend(self.resolve_pending_provider_checks(pc).await);
}

self.update_path_statuses(provider_messages);
}

pub(crate) async fn send_test_packet(&mut self, path: &Vec<SphinxNode>, iteration: u8) {
if path.len() == 0 {
warn!("trying to send test packet through an empty path!");
return;
}

debug!("Checking path: {:?} ({})", path, iteration);
let path_identifier = PathChecker::unique_path_key(path, iteration);

Expand All @@ -171,7 +178,13 @@ impl PathChecker {
// does provider exist?
let provider_client = self
.provider_clients
.get(&path.last().unwrap().pub_key.to_bytes())
.get(
&path
.last()
.expect("We checked the path to contain at least one entry")
.pub_key
.to_bytes(),
)
.unwrap();

if provider_client.is_none() {
Expand All @@ -186,10 +199,15 @@ impl PathChecker {
return;
}

let layer_one_mix = path.first().unwrap();
let layer_one_mix = path
.first()
.expect("We checked the path to contain at least one entry");
let first_node_key = layer_one_mix.pub_key.to_bytes();

// we generated the bytes data so unwrap is fine
let first_node_address =
addressing::socket_address_from_encoded_bytes(layer_one_mix.address.to_bytes());
addressing::socket_address_from_encoded_bytes(layer_one_mix.address.to_bytes())
.unwrap();

let first_node_client = self
.layer_one_clients
Expand All @@ -208,10 +226,12 @@ impl PathChecker {
return;
}

// we already checked for 'None' case
let first_node_client = first_node_client.as_ref().unwrap();

let delays: Vec<_> = path.iter().map(|_| Delay::new(0)).collect();

// all of the data used to create the packet was created by us
let packet = sphinx::SphinxPacket::new(
path_identifier.clone(),
&path[..],
Expand Down
6 changes: 3 additions & 3 deletions common/healthcheck/src/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ pub struct HealthCheckResult(Vec<NodeScore>);
impl std::fmt::Display for HealthCheckResult {
fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
write!(f, "NETWORK HEALTH\n==============\n")?;
self.0
.iter()
.for_each(|score| write!(f, "{}\n", score).unwrap());
for score in self.0.iter() {
write!(f, "{}\n", score)?
}
Ok(())
}
}
Expand Down
Loading