Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using metrics interval received from server #403

Merged
merged 2 commits into from
Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions common/client-libs/metrics-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
pub mod models;
pub mod requests;

use crate::models::metrics::{MixMetric, PersistedMixMetric};
use crate::models::metrics::{MixMetric, MixMetricInterval, PersistedMixMetric};
use crate::requests::metrics_mixes_get::Request as MetricsMixRequest;
use crate::requests::metrics_mixes_post::Request as MetricsMixPost;

Expand Down Expand Up @@ -43,12 +43,14 @@ impl Client {
}
}

pub async fn post_mix_metrics(&self, metrics: MixMetric) -> reqwest::Result<reqwest::Response> {
pub async fn post_mix_metrics(&self, metrics: MixMetric) -> reqwest::Result<MixMetricInterval> {
let req = MetricsMixPost::new(&self.base_url, metrics);
self.reqwest_client
.post(&req.url())
.json(req.json_payload())
.send()
.await?
.json()
.await
}

Expand Down
6 changes: 6 additions & 0 deletions common/client-libs/metrics-client/src/models/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ pub struct MixMetric {
pub received: u64,
pub sent: HashMap<String, u64>,
}

#[derive(Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct MixMetricInterval {
pub next_report_in: u64,
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ mod metrics_post_request {
let _m = mock("POST", PATH).with_status(400).create();
let client = client_test_fixture(&mockito::server_url());
let result = client.post_mix_metrics(fixtures::new_metric()).await;
assert_eq!(400, result.unwrap().status());
assert!(result.is_err());
_m.assert();
}
}
Expand All @@ -65,7 +65,7 @@ mod metrics_post_request {
use super::*;
#[tokio::test]
async fn it_returns_a_response_with_200() {
let json = fixtures::mix_metrics_response_json();
let json = fixtures::metrics_interval_json();
let _m = mock("POST", "/api/metrics/mixes")
.with_status(201)
.with_body(json)
Expand All @@ -90,17 +90,11 @@ mod metrics_post_request {
}

#[cfg(test)]
pub fn mix_metrics_response_json() -> String {
pub fn metrics_interval_json() -> String {
r#"
{
"pubKey": "OwOqwWjh_IlnaWS2PxO6odnhNahOYpRCkju50beQCTA=",
"sent": {
"35.178.213.77:1789": 1,
"52.56.99.196:1789": 2
},
"received": 10,
"timestamp": 1576061080635800000
}
{
"nextReportIn": 5
}
"#
.to_string()
}
Expand Down
13 changes: 0 additions & 13 deletions mixnode/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub(crate) const DEFAULT_VALIDATOR_REST_ENDPOINT: &str = "https://directory.nymt
pub(crate) const DEFAULT_METRICS_SERVER: &str = "https://metrics.nymtech.net";

// 'DEBUG'
const DEFAULT_METRICS_SENDING_DELAY: Duration = Duration::from_millis(5_000);
const DEFAULT_METRICS_RUNNING_STATS_LOGGING_DELAY: Duration = Duration::from_millis(60_000);
const DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF: Duration = Duration::from_millis(10_000);
const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_millis(300_000);
Expand Down Expand Up @@ -303,10 +302,6 @@ impl Config {
self.mixnode.metrics_server_url.clone()
}

pub fn get_metrics_sending_delay(&self) -> Duration {
self.debug.metrics_sending_delay
}

pub fn get_metrics_running_stats_logging_delay(&self) -> Duration {
self.debug.metrics_running_stats_logging_delay
}
Expand Down Expand Up @@ -465,13 +460,6 @@ impl Default for Logging {
#[derive(Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Debug {
/// Delay between each subsequent metrics data being sent.
#[serde(
deserialize_with = "deserialize_duration",
serialize_with = "humantime_serde::serialize"
)]
metrics_sending_delay: Duration,

/// Delay between each subsequent running metrics statistics being logged.
#[serde(
deserialize_with = "deserialize_duration",
Expand Down Expand Up @@ -513,7 +501,6 @@ pub struct Debug {
impl Default for Debug {
fn default() -> Self {
Debug {
metrics_sending_delay: DEFAULT_METRICS_SENDING_DELAY,
metrics_running_stats_logging_delay: DEFAULT_METRICS_RUNNING_STATS_LOGGING_DELAY,
packet_forwarding_initial_backoff: DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF,
packet_forwarding_maximum_backoff: DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF,
Expand Down
26 changes: 15 additions & 11 deletions mixnode/src/node/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::task::JoinHandle;

const METRICS_FAILURE_BACKOFF: Duration = Duration::from_secs(30);

type SentMetricsMap = HashMap<String, u64>;

pub(crate) enum MetricEvent {
Expand Down Expand Up @@ -104,7 +106,6 @@ struct MetricsSender {
metrics: MixMetrics,
metrics_client: metrics_client::Client,
pub_key_str: String,
sending_delay: Duration,
metrics_informer: MetricsInformer,
}

Expand All @@ -113,7 +114,6 @@ impl MetricsSender {
metrics: MixMetrics,
metrics_server: String,
pub_key_str: String,
sending_delay: Duration,
running_logging_delay: Duration,
) -> Self {
MetricsSender {
Expand All @@ -122,23 +122,22 @@ impl MetricsSender {
metrics_server,
)),
pub_key_str,
sending_delay,
metrics_informer: MetricsInformer::new(running_logging_delay),
}
}

fn start(mut self) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
// set the deadline in the future
let sending_delay = tokio::time::delay_for(self.sending_delay);
// // set the deadline in the future
// let sending_delay = tokio::time::delay_for(self.sending_delay);
let (received, sent) = self.metrics.acquire_and_reset_metrics().await;

self.metrics_informer.update_running_stats(received, &sent);
self.metrics_informer.log_report_stats(received, &sent);
self.metrics_informer.try_log_running_stats();

match self
let sending_delay = match self
.metrics_client
.post_mix_metrics(MixMetric {
pub_key: self.pub_key_str.clone(),
Expand All @@ -147,9 +146,16 @@ impl MetricsSender {
})
.await
{
Err(err) => error!("failed to send metrics - {:?}", err),
Ok(_) => debug!("sent metrics information"),
}
Err(err) => {
error!("failed to send metrics - {:?}", err);
tokio::time::delay_for(METRICS_FAILURE_BACKOFF)
}
Ok(new_interval) => {
debug!("sent metrics information");
println!("received delay: {:?}", new_interval.next_report_in);
tokio::time::delay_for(Duration::from_secs(new_interval.next_report_in))
}
};

// wait for however much is left
sending_delay.await;
Expand Down Expand Up @@ -261,7 +267,6 @@ impl MetricsController {
pub(crate) fn new(
directory_server: String,
pub_key_str: String,
sending_delay: Duration,
running_stats_logging_delay: Duration,
) -> Self {
let (metrics_tx, metrics_rx) = mpsc::unbounded();
Expand All @@ -272,7 +277,6 @@ impl MetricsController {
shared_metrics.clone(),
directory_server,
pub_key_str,
sending_delay,
running_stats_logging_delay,
),
receiver: MetricsReceiver::new(shared_metrics, metrics_rx),
Expand Down
1 change: 0 additions & 1 deletion mixnode/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ impl MixNode {
metrics::MetricsController::new(
self.config.get_metrics_server(),
self.sphinx_keypair.public_key().to_base58_string(),
self.config.get_metrics_sending_delay(),
self.config.get_metrics_running_stats_logging_delay(),
)
.start()
Expand Down