Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to sfw-provider - client communcation #180

Merged
merged 24 commits into from
Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ae88b90
Moved auth_token to seperate file
jstuczyn Apr 14, 2020
2268c01
Extracted check_id as separate type
jstuczyn Apr 14, 2020
7a9238f
Changes due to move of auth_token and making provider client mutable
jstuczyn Apr 14, 2020
d7d8be2
New way of serialization provider requests/responses
jstuczyn Apr 14, 2020
8dde353
Initial attempt of using new provider client
jstuczyn Apr 14, 2020
18d1307
Moved requests and responses to separate modules
jstuczyn Apr 14, 2020
0e11e2d
Moved serialization to separate files
jstuczyn Apr 14, 2020
f37b636
Extracted readers and writers to io related modules
jstuczyn Apr 14, 2020
6f6eb55
Extra tests + bug fixes
jstuczyn Apr 14, 2020
3fb50a1
Updated tokio dependency to require correct features
jstuczyn Apr 14, 2020
20c4753
typo
jstuczyn Apr 14, 2020
9a20689
Easier conversion of requests/responses into enum variants
jstuczyn Apr 14, 2020
cf3e82f
Renamed 'read_be_u16' to better show its purpose
jstuczyn Apr 15, 2020
dbbdb17
Serialization related tests and fixes
jstuczyn Apr 15, 2020
fcf3481
Tests for async_io + fixes
jstuczyn Apr 15, 2020
85e54c1
Future considerations
jstuczyn Apr 15, 2020
1ac7ecf
Configurable max request size
jstuczyn Apr 15, 2020
b46a73f
Configurable max response size for client
jstuczyn Apr 15, 2020
2f4b6fd
Removed debug drop implementations
jstuczyn Apr 15, 2020
7daa2d9
Removed debug print statement
jstuczyn Apr 15, 2020
b9fed2d
Changes to lock file
jstuczyn Apr 15, 2020
83b321e
Merge branch 'develop' into feature/sfw_provider_improvements
jstuczyn Apr 15, 2020
f110c09
Added license notifications
jstuczyn Apr 15, 2020
030a3e3
Cargo fmt
jstuczyn Apr 15, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 88 additions & 33 deletions common/clients/provider-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@

use futures::io::Error;
use log::*;
use sfw_provider_requests::requests::{ProviderRequest, PullRequest, RegisterRequest};
use sfw_provider_requests::auth_token::AuthToken;
use sfw_provider_requests::requests::{
async_io::TokioAsyncRequestWriter, ProviderRequest, PullRequest, RegisterRequest,
};
use sfw_provider_requests::responses::{
ProviderResponse, ProviderResponseError, PullResponse, RegisterResponse,
async_io::TokioAsyncResponseReader, ProviderResponse, ProviderResponseError,
};
use sfw_provider_requests::AuthToken;
use sphinx::route::DestinationAddressBytes;
use std::net::{Shutdown, SocketAddr};
use std::time::Duration;
use std::net::SocketAddr;
use tokio::prelude::*;

#[derive(Debug)]
Expand Down Expand Up @@ -50,6 +51,12 @@ impl From<ProviderResponseError> for ProviderClientError {
ProviderResponseError::MarshalError => InvalidRequestError,
ProviderResponseError::UnmarshalError => InvalidResponseError,
ProviderResponseError::UnmarshalErrorInvalidLength => InvalidResponseLengthError,
ProviderResponseError::UnmarshalErrorInvalidKind => InvalidResponseLengthError,

ProviderResponseError::TooLongResponseError => InvalidResponseError,
ProviderResponseError::TooShortResponseError => InvalidResponseError,
ProviderResponseError::IOError(_) => NetworkError,
ProviderResponseError::RemoteConnectionClosed => NetworkError,
}
}
}
Expand All @@ -58,72 +65,120 @@ pub struct ProviderClient {
provider_network_address: SocketAddr,
our_address: DestinationAddressBytes,
auth_token: Option<AuthToken>,
connection: Option<tokio::net::TcpStream>,
max_response_size: usize,
}

impl ProviderClient {
pub fn new(
provider_network_address: SocketAddr,
our_address: DestinationAddressBytes,
auth_token: Option<AuthToken>,
max_response_size: usize,
) -> Self {
ProviderClient {
provider_network_address,
our_address,
auth_token,
max_response_size,
// establish connection when it's necessary (mainly to not break current code
// as then 'new' would need to be called within async context)
connection: None,
}
}

async fn check_connection(&mut self) -> bool {
if self.connection.is_some() {
true
} else {
// TODO: possibly also introduce timeouts here?
// However, at this point it's slightly less important as we are in full control
// of providers.
self.connection = tokio::net::TcpStream::connect(self.provider_network_address)
.await
.ok();
self.connection.is_some()
}
}

pub fn update_token(&mut self, auth_token: AuthToken) {
self.auth_token = Some(auth_token)
}

pub async fn send_request(&self, bytes: Vec<u8>) -> Result<Vec<u8>, ProviderClientError> {
let mut socket = tokio::net::TcpStream::connect(self.provider_network_address).await?;

socket.set_keepalive(Some(Duration::from_secs(2)))?;
socket.write_all(&bytes[..]).await?;
if let Err(e) = socket.shutdown(Shutdown::Write) {
warn!("failed to close write part of the socket; err = {:?}", e)
pub async fn send_request(
&mut self,
request: ProviderRequest,
) -> Result<ProviderResponse, ProviderClientError> {
if !self.check_connection().await {
return Err(ProviderClientError::NetworkError);
}

let mut response = Vec::new();
socket.read_to_end(&mut response).await?;
if let Err(e) = socket.shutdown(Shutdown::Read) {
debug!("failed to close read part of the socket; err = {:?}. It was probably already closed by the provider", e)
let socket = self.connection.as_mut().unwrap();
let (mut socket_reader, mut socket_writer) = socket.split();

// TODO: benchmark and determine if below should be done:
// let mut socket_writer = tokio::io::BufWriter::new(socket_writer);
// let mut socket_reader = tokio::io::BufReader::new(socket_reader);

let mut request_writer = TokioAsyncRequestWriter::new(&mut socket_writer);
let mut response_reader =
TokioAsyncResponseReader::new(&mut socket_reader, self.max_response_size);

if let Err(e) = request_writer.try_write_request(request).await {
debug!("Failed to write the request - {:?}", e);
return Err(e.into());
}

Ok(response)
Ok(response_reader.try_read_response().await?)
}

pub async fn retrieve_messages(&self) -> Result<Vec<Vec<u8>>, ProviderClientError> {
pub async fn retrieve_messages(&mut self) -> Result<Vec<Vec<u8>>, ProviderClientError> {
let auth_token = match self.auth_token.as_ref() {
Some(token) => token.clone(),
None => {
return Err(ProviderClientError::EmptyAuthTokenError);
}
};

let pull_request = PullRequest::new(self.our_address.clone(), auth_token);
let bytes = pull_request.to_bytes();

let response = self.send_request(bytes).await?;

let parsed_response = PullResponse::from_bytes(&response)?;
Ok(parsed_response.messages)
let pull_request =
ProviderRequest::Pull(PullRequest::new(self.our_address.clone(), auth_token));
match self.send_request(pull_request).await? {
ProviderResponse::Pull(res) => Ok(res.extract_messages()),
ProviderResponse::Failure(res) => {
error!(
"We failed to get our request processed - {:?}",
res.get_message()
);
Err(ProviderClientError::InvalidResponseError)
}
_ => {
error!("Received response of unexpected type!");
Err(ProviderClientError::InvalidResponseError)
}
}
}

pub async fn register(&self) -> Result<AuthToken, ProviderClientError> {
pub async fn register(&mut self) -> Result<AuthToken, ProviderClientError> {
if self.auth_token.is_some() {
return Err(ProviderClientError::ClientAlreadyRegisteredError);
}

let register_request = RegisterRequest::new(self.our_address.clone());
let bytes = register_request.to_bytes();

let response = self.send_request(bytes).await?;
let parsed_response = RegisterResponse::from_bytes(&response)?;

Ok(parsed_response.auth_token)
let register_request =
ProviderRequest::Register(RegisterRequest::new(self.our_address.clone()));
match self.send_request(register_request).await? {
ProviderResponse::Register(res) => Ok(res.get_token()),
ProviderResponse::Failure(res) => {
error!(
"We failed to get our request processed - {:?}",
res.get_message()
);
Err(ProviderClientError::InvalidResponseError)
}
_ => {
error!("Received response of unexpected type!");
Err(ProviderClientError::InvalidResponseError)
}
}
}

pub fn is_registered(&self) -> bool {
Expand Down
3 changes: 3 additions & 0 deletions common/healthcheck/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use std::fmt::{Error, Formatter};
use std::time::Duration;
use topology::{NymTopology, NymTopologyError};

// basically no limit
pub(crate) const MAX_PROVIDER_RESPONSE_SIZE: usize = 1024 * 1024;

pub mod config;
mod path_check;
mod result;
Expand Down
31 changes: 21 additions & 10 deletions common/healthcheck/src/path_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::MAX_PROVIDER_RESPONSE_SIZE;
use crypto::identity::MixIdentityKeyPair;
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
Expand All @@ -25,6 +26,8 @@ use std::net::SocketAddr;
use std::time::Duration;
use topology::provider;

pub(crate) type CheckId = [u8; 16];

#[derive(Debug, PartialEq, Clone)]
pub enum PathStatus {
Healthy,
Expand All @@ -37,23 +40,30 @@ pub(crate) struct PathChecker {
mixnet_client: multi_tcp_client::Client,
paths_status: HashMap<Vec<u8>, PathStatus>,
our_destination: Destination,
check_id: [u8; 16],
check_id: CheckId,
}

impl PathChecker {
pub(crate) async fn new(
providers: Vec<provider::Node>,
identity_keys: &MixIdentityKeyPair,
connection_timeout: Duration,
check_id: [u8; 16],
check_id: CheckId,
) -> Self {
let mut provider_clients = HashMap::new();

let address = identity_keys.public_key().derive_address();

for provider in providers {
let mut provider_client =
ProviderClient::new(provider.client_listener, address.clone(), None);
let mut provider_client = ProviderClient::new(
provider.client_listener,
address.clone(),
None,
MAX_PROVIDER_RESPONSE_SIZE,
);
// TODO: we might be sending unnecessary register requests since after first healthcheck,
// we are registered for any subsequent ones (since our address did not change)

let insertion_result = match provider_client.register().await {
Ok(token) => {
debug!("[Healthcheck] registered at provider {}", provider.pub_key);
Expand Down Expand Up @@ -96,7 +106,7 @@ impl PathChecker {

// iteration is used to distinguish packets sent through the same path (as the healthcheck
// may try to send say 10 packets through given path)
fn unique_path_key(path: &[SphinxNode], check_id: [u8; 16], iteration: u8) -> Vec<u8> {
fn unique_path_key(path: &[SphinxNode], check_id: CheckId, iteration: u8) -> Vec<u8> {
check_id
.iter()
.cloned()
Expand Down Expand Up @@ -147,8 +157,8 @@ impl PathChecker {

// pull messages from given provider until there are no more 'real' messages
async fn resolve_pending_provider_checks(
&self,
provider_client: &ProviderClient,
provider_client: &mut ProviderClient,
check_id: CheckId,
) -> Vec<Vec<u8>> {
// keep getting messages until we encounter the dummy message
let mut provider_messages = Vec::new();
Expand All @@ -165,7 +175,7 @@ impl PathChecker {
if msg == sfw_provider_requests::DUMMY_MESSAGE_CONTENT {
// finish iterating the loop as the messages might not be ordered
should_stop = true;
} else if msg[..16] != self.check_id {
} else if msg[..16] != check_id {
warn!("received response from previous healthcheck")
} else {
provider_messages.push(msg);
Expand All @@ -183,14 +193,15 @@ impl PathChecker {
pub(crate) async fn resolve_pending_checks(&mut self) {
// not sure how to nicely put it into an iterator due to it being async calls
let mut provider_messages = Vec::new();
for provider_client in self.provider_clients.values() {
for provider_client in self.provider_clients.values_mut() {
// if it was none all associated paths were already marked as unhealthy
let pc = match provider_client {
Some(pc) => pc,
None => continue,
};

provider_messages.extend(self.resolve_pending_provider_checks(pc).await);
provider_messages
.extend(Self::resolve_pending_provider_checks(pc, self.check_id).await);
}

self.update_path_statuses(provider_messages);
Expand Down
3 changes: 2 additions & 1 deletion nym-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use futures::channel::{mpsc, oneshot};
use log::*;
use nymsphinx::chunking::split_and_prepare_payloads;
use pemstore::pemstore::PemStore;
use sfw_provider_requests::AuthToken;
use sfw_provider_requests::auth_token::AuthToken;
use sphinx::route::Destination;
use tokio::runtime::Runtime;
use topology::NymTopology;
Expand Down Expand Up @@ -185,6 +185,7 @@ impl NymClient {
.map(|str_token| AuthToken::try_from_base58_string(str_token).ok())
.unwrap_or(None),
self.config.get_fetch_message_delay(),
self.config.get_max_response_size(),
);

if !provider_poller.is_registered() {
Expand Down
8 changes: 5 additions & 3 deletions nym-client/src/client/provider_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use futures::channel::mpsc;
use log::*;
use provider_client::ProviderClientError;
use sfw_provider_requests::AuthToken;
use sfw_provider_requests::auth_token::AuthToken;
use sphinx::route::DestinationAddressBytes;
use std::net::SocketAddr;
use std::time;
Expand All @@ -38,12 +38,14 @@ impl ProviderPoller {
client_address: DestinationAddressBytes,
auth_token: Option<AuthToken>,
polling_rate: time::Duration,
max_response_size: usize,
) -> Self {
ProviderPoller {
provider_client: provider_client::ProviderClient::new(
provider_client_listener_address,
client_address,
auth_token,
max_response_size,
),
poller_tx,
polling_rate,
Expand Down Expand Up @@ -74,7 +76,7 @@ impl ProviderPoller {
Ok(())
}

pub(crate) async fn start_provider_polling(self) {
pub(crate) async fn start_provider_polling(&mut self) {
let loop_message = &mix_client::packet::LOOP_COVER_MESSAGE_PAYLOAD.to_vec();
let dummy_message = &sfw_provider_requests::DUMMY_MESSAGE_CONTENT.to_vec();

Expand Down Expand Up @@ -114,7 +116,7 @@ impl ProviderPoller {
}
}

pub(crate) fn start(self, handle: &Handle) -> JoinHandle<()> {
pub(crate) fn start(mut self, handle: &Handle) -> JoinHandle<()> {
handle.spawn(async move { self.start_provider_polling().await })
}
}
Loading