Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

add metrics service #487

Merged
merged 5 commits into from
Aug 10, 2020
Merged

add metrics service #487

merged 5 commits into from
Aug 10, 2020

Conversation

Robert-Steiner
Copy link
Contributor

First part of the Implement data collection to InfluxDB task.
This PR focus on the definition of the influx data models and the implementation of the metrics service + sender. The integration of the metrics service into the state machine is addressed in a separate PR.

metrics/mod.rs

The mod.rs file contains the MetricsService, the MetricsSender and the functions to generate metrics.
The MetricsSender communicates with the MetricsService via an unbounded channel. The unbounded channel
accepts the type WriteQuery. The WriteQuery is a struct of the influxdb crate.

My actual plan was to build a wrapper around that struct like:

enum Metric {
	RoundTotalNumber(RoundTotalNumber);
	Phase(Phase);
}

receiver: UnboundedReceiver<Metric>,

However, the #[derive(InfluxDbWriteable)] cannot be applied on enums. I would have had to implement the method into_query on each variant. So I decided against it.

I tried to find a solution to encode some properties/information of the metrics into the code. I first started with a couple of structs but that didn't work that well. In the end I used the module system.
Of course, I could have implemented it more generic and save some lines of code. However, I think the abstraction makes it easier to use and less error-prone.

Structure:

    use crate::metrics;
    metrics::round_parameters::sum::update(0.3);
    metrics::round_parameters::update::update(0.3);
    metrics::phase::update(PhaseName);
    metrics::phase::error::emit(StateError);
    metrics::masks::total_number::update(1);
    metrics::round::total_number::update(1);
    metrics::round::successful::increment();
    metrics::message::sum::increment(11, PhaseName);
    metrics::message::update::increment(21, PhaseName);
    metrics::message::sum2::increment(31, PhaseName);
    metrics::message::discarded::increment(41, PhaseName);
    metrics::message::rejected::increment(41, PhaseName);

The models.rs file only contains the influx data models. Through InfluxDbWriteable the structs are automatically implement the into_query methods.

@codecov
Copy link

codecov bot commented Aug 3, 2020

Codecov Report

Merging #487 into master will increase coverage by 0.46%.
The diff coverage is 78.18%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #487      +/-   ##
==========================================
+ Coverage   51.56%   52.02%   +0.46%     
==========================================
  Files          63       65       +2     
  Lines        3105     3160      +55     
==========================================
+ Hits         1601     1644      +43     
- Misses       1504     1516      +12     
Impacted Files Coverage Δ
rust/src/metrics/mod.rs 75.51% <75.51%> (ø)
rust/src/metrics/models.rs 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9a6bcd1...e8cfb8e. Read the comment docs.

Copy link
Contributor

@finiteprods finiteprods left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, interesting use of modules! a few comments for clarification:

However, the #[derive(InfluxDbWriteable)] cannot be applied on enums.

So instead, we ended up with lots of little #[derive(InfluxDbWriteable)] structs, one for each metric, I guess?

I first started with a couple of structs but that didn't work that well.

I would be interested to hear why it didn't work out?

@Robert-Steiner
Copy link
Contributor Author

So instead, we ended up with lots of little #[derive(InfluxDbWriteable)] structs, one for each metric, I guess?

The idea of the enum was to have another layer of abstraction.
Currently we use the type WriteQuery which is a type defined in the influxdb crate.
The issue is that we don't have control over this type and that leads to some unnecessary complications.
You can see one example in the tests. Since the fields of WriteQuery are private,
we have to call build to actually get something back that we can compare in the assert macro.

Therefore my idea was to introduce another layer of abstraction via an enum.

enum Metrics {
	RoundParamSum(RoundParamSum),
	...
}

We would still need the structs because they define how the data is transformed into influx data model.

#[derive(InfluxDbWriteable)]
pub struct MessageRejected {
    pub time: DateTime<Utc>, 	// timestamp
    pub message_rejected: u8,   // field
    #[tag] 
    pub round_id: u64,      	  // tag
    #[tag]
    pub phase: u8, 	        // tag
}

The into_query method will transform this data into the influx protocol syntax.

state_machine,round_id=1,phase=0 message_rejected=1 1465839830100400200
  |           ------------------ ------------------ -------------------
  |               |                       |                 |
  |               |                       |                 |
+------------+--------+--------------+-----------------+---------+
|measurement |,tag_set|              |field_set|       |timestamp|
+------------+--------+--------------+-----------------+---------+

But instead of sending a WriteQuery we would send a variant of the Metrics enum

- pub struct MetricsSender(UnboundedSender<WriteQuery>);
+ pub struct MetricsSender(UnboundedSender<Metrics>);

and call the method into_query on the receiver half of the channel (the run_metric_service function).

pub async fn run_metric_service(mut metics_service: MetricsService) {
    loop {
        match metics_service.receiver.recv().await {
-            Some(write_query) => { 
+            Some(metric) => { 			              <---- Type Metrics
                let _ = metics_service
                    .client
-                    .query(&write_query)
+                    .query(&metric.into_query())         <---- call into_query
                    .await
                    .map_err(|e| error!("{}", e));
            }
            None => {
                warn!("All senders have been dropped!");
                return;
            }
        }
    }
}

However for this to work, we need to implement into_query on the enum or we need to use the match pattern in the run_metric_service function.

impl Metrics {
	fn into_query(self) -> WriteQuery {
		match metric {
			RoundParamSum(metric) => metric.into_query()
			...
		}
	}
}

// or

pub async fn run_metric_service(mut metics_service: MetricsService) {
    loop {
        match metics_service.receiver.recv().await {
            Some(metric) => { 			      
            
                   
            	  let query = match metric {
	             RoundParamSum(metric) => metric.into_query(),
		     ...
	          };
					
					
                let _ = metics_service
                    .client
                    .query(&query())        
                    .await
                    .map_err(|e| error!("{}", e));
            }
            None => {
                warn!("All senders have been dropped!");
                return;
            }
        }
    }
}

In both cases we need to alter the match pattern manually once we add/change or remove a metric which is quite annoying I think.
Therefore I used WriteQuery for now.
But my plan is, to create a PR on the influxdb crate which implements #[derive(InfluxDbWriteable)] for enums.

@little-dude little-dude self-requested a review August 7, 2020 07:06
}
}

pub async fn run_metric_service(mut metics_service: MetricsService) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One downside of this implementation is that we process the request one by one. If the queries are slow, the queries will accumulate in the channel. Maybe we could make that channel bounded to apply some backpressure. On the longer run, we could use tower to benefit from timeouts, backpressure and concurrent requests.

@little-dude little-dude self-requested a review August 7, 2020 07:34
Copy link
Contributor

@little-dude little-dude left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my plan is, to create a PR on the influxdb crate which implements #[derive(InfluxDbWriteable)] for enums.

That would be nice, but your workaround is very clean too. This is a good first implementation imo. I'd just convert the unbounded channels to bounded ones to be on the safe side.

@Robert-Steiner
Copy link
Contributor Author

@finiteprods @little-dude thanks for the review👍

@little-dude I agree that is a good idea to use bounded channels.
However, I have one question. When I use a bounded channel I need to use try_send instead of send otherwise the metrics channel can block the execution of the state machine, right?

@little-dude
Copy link
Contributor

Yes exactly

@Robert-Steiner Robert-Steiner merged commit 87405b8 into master Aug 10, 2020
@Robert-Steiner Robert-Steiner deleted the metrics-service branch August 10, 2020 09:12
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants