Skip to content

Commit

Permalink
Query bandwidth
Browse files Browse the repository at this point in the history
  • Loading branch information
neacsu committed Sep 20, 2024
1 parent 197452b commit 5aefb9c
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 95 deletions.
1 change: 0 additions & 1 deletion common/authenticator-requests/src/v2/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ pub struct RegistredData {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RemainingBandwidthData {
pub available_bandwidth: u64,
pub suspended: bool,
}

/// Client that wants to register sends its PublicKey bytes mac digest encrypted with a DH shared secret.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use nym_credentials::ecash::utils::ecash_today;
use nym_credentials_interface::Bandwidth;
use nym_credentials_interface::{AvailableBandwidth, Bandwidth};
use nym_gateway_requests::ServerResponse;
use nym_gateway_storage::Storage;
use si_scale::helpers::bibytes2;
Expand Down Expand Up @@ -41,6 +41,10 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
}
}

pub fn available_bandwidth(&self) -> AvailableBandwidth {
self.client_bandwidth.bandwidth
}

async fn sync_expiration(&mut self) -> Result<()> {
self.storage
.set_expiration(self.client_id, self.client_bandwidth.bandwidth.expiration)
Expand Down
3 changes: 0 additions & 3 deletions common/wireguard-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,4 @@ pub use config::Config;
pub use error::Error;
pub use public_key::PeerPublicKey;

// To avoid any problems, keep this stale check time bigger (>2x) then the bandwidth cap
// reset time (currently that one is 24h, at UTC midnight)
pub const DEFAULT_PEER_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24 * 3); // 3 days
pub const DEFAULT_PEER_TIMEOUT_CHECK: Duration = Duration::from_secs(5); // 5 seconds
4 changes: 2 additions & 2 deletions common/wireguard/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ pub enum Error {
#[error("{0}")]
Defguard(#[from] defguard_wireguard_rs::error::WireguardInterfaceError),

#[error("internal error {0}")]
InternalError(String),
#[error("internal {0}")]
Internal(String),

#[error("storage should have the requested bandwidht entry")]
MissingClientBandwidthEntry,
Expand Down
20 changes: 10 additions & 10 deletions common/wireguard/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use nym_crypto::asymmetric::encryption::KeyPair;
use nym_wireguard_types::Config;
use peer_controller::PeerControlRequest;
use std::sync::Arc;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::RwLock;

const WG_TUN_NAME: &str = "nymwg";

Expand Down Expand Up @@ -44,15 +45,12 @@ impl Drop for WgApiWrapper {
pub struct WireguardGatewayData {
config: Config,
keypair: Arc<KeyPair>,
peer_tx: UnboundedSender<PeerControlRequest>,
peer_tx: Sender<PeerControlRequest>,
}

impl WireguardGatewayData {
pub fn new(
config: Config,
keypair: Arc<KeyPair>,
) -> (Self, UnboundedReceiver<PeerControlRequest>) {
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
pub fn new(config: Config, keypair: Arc<KeyPair>) -> (Self, Receiver<PeerControlRequest>) {
let (peer_tx, peer_rx) = mpsc::channel(1);
(
WireguardGatewayData {
config,
Expand All @@ -71,14 +69,14 @@ impl WireguardGatewayData {
&self.keypair
}

pub fn peer_tx(&self) -> &UnboundedSender<PeerControlRequest> {
pub fn peer_tx(&self) -> &Sender<PeerControlRequest> {
&self.peer_tx
}
}

pub struct WireguardData {
pub inner: WireguardGatewayData,
pub peer_rx: UnboundedReceiver<PeerControlRequest>,
pub peer_rx: Receiver<PeerControlRequest>,
}

/// Start wireguard device
Expand All @@ -105,7 +103,9 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>
.collect::<Result<Vec<_>, _>>()?;
for peer in peers.iter() {
let bandwidth_manager =
PeerController::generate_bandwidth_manager(storage.clone(), &peer.public_key).await?;
PeerController::generate_bandwidth_manager(storage.clone(), &peer.public_key)
.await?
.map(|bw_m| Arc::new(RwLock::new(bw_m)));
peer_bandwidth_managers.insert(peer.public_key.clone(), bandwidth_manager);
}
wg_api.create_interface()?;
Expand Down
115 changes: 75 additions & 40 deletions common/wireguard/src/peer_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use defguard_wireguard_rs::{
WireguardInterfaceApi,
};
use futures::channel::oneshot;
use nym_authenticator_requests::v2::registration::{RemainingBandwidthData, BANDWIDTH_CAP_PER_DAY};
use nym_authenticator_requests::{
v1::registration::BANDWIDTH_CAP_PER_DAY, v2::registration::RemainingBandwidthData,
};
use nym_credential_verification::{
bandwidth_storage_manager::BandwidthStorageManager, BandwidthFlushingBehaviourConfig,
ClientBandwidth,
Expand All @@ -18,9 +20,9 @@ use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::{wrappers::IntervalStream, StreamExt};

use crate::error::Error;
use crate::peer_handle::PeerHandle;
use crate::WgApiWrapper;
use crate::{error::Error, peer_handle::SharedBandwidthStorageManager};

pub enum PeerControlRequest {
AddPeer {
Expand Down Expand Up @@ -57,16 +59,18 @@ pub struct QueryPeerControlResponse {
}

pub struct QueryBandwidthControlResponse {
pub success: bool,
pub bandwidth_data: Option<RemainingBandwidthData>,
}

pub struct PeerController<St: Storage + Clone + 'static> {
storage: St,
// used to receive commands from individual handles too
request_tx: mpsc::UnboundedSender<PeerControlRequest>,
request_rx: mpsc::UnboundedReceiver<PeerControlRequest>,
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
wg_api: Arc<WgApiWrapper>,
host_information: Arc<RwLock<Host>>,
bw_storage_managers: HashMap<Key, Option<SharedBandwidthStorageManager<St>>>,
timeout_check_interval: IntervalStream,
task_client: nym_task::TaskClient,
}
Expand All @@ -76,21 +80,21 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
storage: St,
wg_api: Arc<WgApiWrapper>,
initial_host_information: Host,
bw_storage_managers: HashMap<Key, Option<BandwidthStorageManager<St>>>,
request_tx: mpsc::UnboundedSender<PeerControlRequest>,
request_rx: mpsc::UnboundedReceiver<PeerControlRequest>,
bw_storage_managers: HashMap<Key, Option<SharedBandwidthStorageManager<St>>>,
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
task_client: nym_task::TaskClient,
) -> Self {
let timeout_check_interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(DEFAULT_PEER_TIMEOUT_CHECK),
);
let host_information = Arc::new(RwLock::new(initial_host_information));
for (public_key, bandwidth_storage_manager) in bw_storage_managers {
for (public_key, bandwidth_storage_manager) in bw_storage_managers.iter() {
let mut handle = PeerHandle::new(
storage.clone(),
public_key,
public_key.clone(),
host_information.clone(),
bandwidth_storage_manager,
bandwidth_storage_manager.clone(),
request_tx.clone(),
&task_client,
);
Expand All @@ -105,6 +109,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
storage,
wg_api,
host_information,
bw_storage_managers,
request_tx,
request_rx,
timeout_check_interval,
Expand Down Expand Up @@ -135,8 +140,9 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
}

// Function that should be used for peer removal, to handle both storage and kernel interaction
pub async fn remove_peer(&self, key: &Key) -> Result<(), Error> {
pub async fn remove_peer(&mut self, key: &Key) -> Result<(), Error> {
self.storage.remove_wireguard_peer(&key.to_string()).await?;
self.bw_storage_managers.remove(key);
let ret = self.wg_api.inner.remove_peer(key);
if ret.is_err() {
log::error!("Wireguard peer could not be removed from wireguard kernel module. Process should be restarted so that the interface is reset.");
Expand Down Expand Up @@ -171,21 +177,25 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
}

async fn handle_add_request(
&self,
&mut self,
peer: &Peer,
with_client_id: bool,
) -> Result<Option<i64>, Error> {
let client_id = self.add_peer(peer, with_client_id).await?;
let bandwidth_storage_manager =
Self::generate_bandwidth_manager(self.storage.clone(), &peer.public_key).await?;
Self::generate_bandwidth_manager(self.storage.clone(), &peer.public_key)
.await?
.map(|bw_m| Arc::new(RwLock::new(bw_m)));
let mut handle = PeerHandle::new(
self.storage.clone(),
peer.public_key.clone(),
self.host_information.clone(),
bandwidth_storage_manager,
bandwidth_storage_manager.clone(),
self.request_tx.clone(),
&self.task_client,
);
self.bw_storage_managers
.insert(peer.public_key.clone(), bandwidth_storage_manager);
tokio::spawn(async move {
if let Err(e) = handle.run().await {
log::error!("Peer handle shut down ungracefully - {e}");
Expand All @@ -194,21 +204,44 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
Ok(client_id)
}

async fn handle_query_peer(&self, key: &Key) -> (bool, Option<Peer>) {
match self.storage.get_wireguard_peer(&key.to_string()).await {
Err(e) => {
log::error!("Could not query peer storage {e}");
(false, None)
}
Ok(None) => (true, None),
Ok(Some(storage_peer)) => match Peer::try_from(storage_peer) {
Ok(peer) => (true, Some(peer)),
Err(e) => {
log::error!("Could not parse storage peer {e}");
(false, None)
}
},
}
async fn handle_query_peer(&self, key: &Key) -> Result<Option<Peer>, Error> {
Ok(self
.storage
.get_wireguard_peer(&key.to_string())
.await?
.map(Peer::try_from)
.transpose()?)
}

async fn handle_query_bandwidth(
&self,
key: &Key,
) -> Result<Option<RemainingBandwidthData>, Error> {
let Some(bandwidth_storage_manager) = self.bw_storage_managers.get(key) else {
return Ok(None);
};
let available_bandwidth = if let Some(bandwidth_storage_manager) = bandwidth_storage_manager
{
bandwidth_storage_manager
.read()
.await
.available_bandwidth()
.bytes as u64
} else {
let peer = self
.host_information
.read()
.await
.peers
.get(key)
.ok_or(Error::PeerMismatch)?
.clone();
BANDWIDTH_CAP_PER_DAY.saturating_sub(peer.rx_bytes + peer.tx_bytes)
};

Ok(Some(RemainingBandwidthData {
available_bandwidth,
}))
}

pub async fn run(&mut self) {
Expand Down Expand Up @@ -240,18 +273,20 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
response_tx.send(RemovePeerControlResponse { success }).ok();
}
Some(PeerControlRequest::QueryPeer { key, response_tx }) => {
let (success, peer) = self.handle_query_peer(&key).await;
response_tx.send(QueryPeerControlResponse { success, peer }).ok();
let ret = self.handle_query_peer(&key).await;
if let Ok(peer) = ret {
response_tx.send(QueryPeerControlResponse { success: true, peer }).ok();
} else {
response_tx.send(QueryPeerControlResponse { success: false, peer: None }).ok();
}
}
Some(PeerControlRequest::QueryBandwidth{key, response_tx}) => {
// let msg = if self.suspended_peers.contains_key(&key) {
// PeerControlResponse::QueryBandwidth { bandwidth_data: Some(RemainingBandwidthData{ available_bandwidth: 0, suspended: true }) }
// } else if let Some(&consumed_bandwidth) = self.last_seen_bandwidth.get(&key) {
// PeerControlResponse::QueryBandwidth { bandwidth_data: Some(RemainingBandwidthData{ available_bandwidth: BANDWIDTH_CAP_PER_DAY - consumed_bandwidth, suspended: false })}
// } else {
// PeerControlResponse::QueryBandwidth { bandwidth_data: None }
// };
// response_tx.send(msg).ok();
Some(PeerControlRequest::QueryBandwidth { key, response_tx }) => {
let ret = self.handle_query_bandwidth(&key).await;
if let Ok(bandwidth_data) = ret {
response_tx.send(QueryBandwidthControlResponse { success: true, bandwidth_data }).ok();
} else {
response_tx.send(QueryBandwidthControlResponse { success: false, bandwidth_data: None }).ok();
}
}
None => {
log::trace!("PeerController [main loop]: stopping since channel closed");
Expand Down
Loading

0 comments on commit 5aefb9c

Please sign in to comment.