Skip to content

Commit

Permalink
Implemented runs endpoint
Browse files Browse the repository at this point in the history
- Added process metrics to the data calculated during model application
  • Loading branch information
ohuu committed Sep 23, 2024
1 parent e2f8be3 commit 220ba5d
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 46 deletions.
7 changes: 7 additions & 0 deletions src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,18 @@ impl Data {
}
}

#[derive(Debug, Serialize, Clone)]
pub struct ProcessMetrics {
pub timestamp: i64,
pub cpu_usage: f64,
}

#[derive(Debug, Serialize)]
pub struct ProcessData {
pub process_id: String,
pub data: Data,
pub pow_perc: f64,
pub iteration_metrics: Vec<Vec<ProcessMetrics>>,
}

#[derive(Debug, Serialize)]
Expand Down
103 changes: 68 additions & 35 deletions src/data/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@ use itertools::Itertools;
use sea_orm::DatabaseConnection;
use std::collections::HashMap;

use super::{ProcessData, RunData, ScenarioData};
use super::{ProcessData, ProcessMetrics, RunData, ScenarioData};

pub enum AggregationMethod {
MostRecent,
Average,
Sum,
}

pub enum LiveDataFilter {
IncludeLive,
ExcludeLive,
OnlyLive,
}

/// Associates a single ScenarioIteration with all the metrics captured for it.
#[derive(Debug)]
pub struct IterationMetrics {
Expand All @@ -38,9 +44,9 @@ impl IterationMetrics {
pub fn by_process(&self) -> HashMap<String, Vec<&Metrics>> {
let mut metrics_by_process: HashMap<String, Vec<&Metrics>> = HashMap::new();
for metric in self.metrics.iter() {
let proc_id = metric.process_id.clone();
let proc_name = metric.process_name.clone();
metrics_by_process
.entry(proc_id)
.entry(proc_name)
.and_modify(|v| v.push(metric))
.or_insert(vec![metric]); // if entry doesn't exist then create a new vec
}
Expand Down Expand Up @@ -93,18 +99,22 @@ impl<'a> Dataset {
self.data.is_empty()
}

pub fn by_scenario(&'a self, is_live: bool) -> Vec<ScenarioDataset<'a>> {
pub fn by_scenario(&'a self, live_data_filter: LiveDataFilter) -> Vec<ScenarioDataset<'a>> {
// get all the scenarios in the dataset
let scenario_names = self
let unique_scenario_names = self
.data
.iter()
.map(|x| &x.iteration.scenario_name)
.unique()
.filter(|name| match is_live {
true => name.starts_with("live_"),
false => true,
})
.collect::<Vec<_>>();
.unique();
let scenario_names = match live_data_filter {
LiveDataFilter::IncludeLive => unique_scenario_names.collect_vec(),
LiveDataFilter::ExcludeLive => unique_scenario_names
.filter(|name| !name.starts_with("live"))
.collect_vec(),
LiveDataFilter::OnlyLive => unique_scenario_names
.filter(|name| name.starts_with("live"))
.collect_vec(),
};

scenario_names
.into_iter()
Expand Down Expand Up @@ -172,7 +182,7 @@ impl<'a> ScenarioDataset<'a> {
pub async fn apply_model(
&'a self,
db: &DatabaseConnection,
model: &impl Fn(Vec<&Metrics>, f32) -> Data,
model: &impl Fn(&Vec<&Metrics>, f32) -> Data,
aggregation_method: AggregationMethod,
) -> anyhow::Result<ScenarioData> {
let mut all_run_data = vec![];
Expand Down Expand Up @@ -252,57 +262,78 @@ impl<'a> ScenarioRunDataset<'a> {
pub async fn apply_model(
&'a self,
db: &DatabaseConnection,
model: &impl Fn(Vec<&Metrics>, f32) -> Data,
model: &impl Fn(&Vec<&Metrics>, f32) -> Data,
) -> anyhow::Result<RunData> {
let run = dao::run::fetch(self.run_id, &db).await?;
let cpu_avg_pow = run.cpu_avg_power;
let start_time = run.start_time;
let stop_time = run.stop_time;

// build up process map
// proc_id | data for proc per iteration
// proc_id | data & metrics per iteration for proc per iteration
// =======================================
// proc_id -> [<data>, <data>] <- 2 iterations
// proc_id -> [<data>, <data>]
let mut proc_iteration_data_map: HashMap<String, Vec<Data>> = HashMap::new();
// proc_id -> [<(data, [metrics)>, <(data, metrics)>] <- 2 iterations
// proc_id -> [<(data, metrics)>, <(data, metrics)>] <- 2 iterations
let mut proc_iteration_data_map: HashMap<String, (Vec<Data>, Vec<Vec<ProcessMetrics>>)> =
HashMap::new();
for scenario_run_iteration_dataset in self.by_iteration() {
for (proc_id, metrics) in scenario_run_iteration_dataset.by_process() {
// run the RAB model to get power and co2 emissions
let cardamon_data = model(metrics, cpu_avg_pow);
let cardamon_data = model(&metrics, cpu_avg_pow);

// convert the metrics database model into metrics data
let proc_metrics = metrics
.iter()
.map(|metrics| ProcessMetrics {
timestamp: metrics.time_stamp,
cpu_usage: metrics.cpu_usage,
})
.collect_vec();

// if key already exists in map the append cardamon_data to the end of the
// iteration data vector for that key, else create a new vector for that key.
let data_vec = match proc_iteration_data_map.get_mut(&proc_id) {
Some(data) => {
let mut it_data = vec![];
it_data.append(data);
it_data.push(cardamon_data);
it_data
Some((proc_data, iteration_metrics)) => {
let mut data = vec![];
data.append(proc_data);
data.push(cardamon_data);

let mut metrics = vec![];
metrics.append(iteration_metrics);
metrics.push(proc_metrics);

(data, metrics)
}

None => vec![cardamon_data],
None => (vec![cardamon_data], vec![proc_metrics]),
};
proc_iteration_data_map.insert(proc_id.to_string(), data_vec);
}
}

// average data for each process across all iterations
let proc_data_map: HashMap<String, Data> = proc_iteration_data_map
.iter()
.map(|(k, v)| (k, v.iter().collect_vec()))
.map(|(k, v)| (k.to_string(), Data::mean(&v)))
.collect();
let proc_data_map: HashMap<String, (Data, Vec<Vec<ProcessMetrics>>)> =
proc_iteration_data_map
.into_iter()
.map(|(k, (data, metrics))| {
(
k.to_string(),
(Data::mean(&data.iter().collect_vec()), metrics),
)
})
.collect();

// calculate total run data (pow + co2)
let total_run_data = Data::sum(&proc_data_map.values().collect_vec());
let total_run_data = Data::sum(&proc_data_map.values().map(|(data, _)| data).collect_vec());

// convert proc_data_map to vector of ProcessData
let process_data = proc_data_map
.into_iter()
.map(|(process_id, data)| ProcessData {
.map(|(process_id, (data, iteration_metrics))| ProcessData {
process_id,
pow_perc: data.pow / total_run_data.pow,
data,
iteration_metrics,
})
.collect_vec();

Expand All @@ -323,7 +354,9 @@ mod tests {
use itertools::Itertools;

use crate::{
data::dataset_builder::DatasetBuilder, db_connect, db_migrate, tests::setup_fixtures,
data::{dataset::LiveDataFilter, dataset_builder::DatasetBuilder},
db_connect, db_migrate,
tests::setup_fixtures,
};

#[tokio::test]
Expand Down Expand Up @@ -375,7 +408,7 @@ mod tests {
.build(&db)
.await?;

let scenario_datasets = dataset.by_scenario(false);
let scenario_datasets = dataset.by_scenario(LiveDataFilter::ExcludeLive);
assert_eq!(scenario_datasets.len(), 3);

// make sure the scenario names are correct
Expand Down Expand Up @@ -459,7 +492,7 @@ mod tests {
.build(&db)
.await?;

for scenario_dataset in dataset.by_scenario(false) {
for scenario_dataset in dataset.by_scenario(LiveDataFilter::ExcludeLive) {
let scenario_run_datasets = scenario_dataset.by_run();

match scenario_dataset.scenario_name {
Expand Down Expand Up @@ -520,7 +553,7 @@ mod tests {
.build(&db)
.await?;

for scenario_dataset in dataset.by_scenario(false) {
for scenario_dataset in dataset.by_scenario(LiveDataFilter::ExcludeLive) {
for scenario_run_dataset in scenario_dataset.by_run() {
let scenario_run_iteration_datasets = scenario_run_dataset.by_iteration();

Expand Down
9 changes: 6 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Context;
use cardamon::{
cleanup_stdout_stderr,
config::{self, Config, ExecutionPlan, ProcessToObserve},
data::{dataset_builder::DatasetBuilder, Data},
data::{dataset::LiveDataFilter, dataset_builder::DatasetBuilder, Data},
db_connect, db_migrate, init_config,
models::rab_linear_model,
run, server,
Expand Down Expand Up @@ -170,7 +170,10 @@ async fn main() -> anyhow::Result<()> {
.await?;

println!("\n{}", " Summary ".reversed().green());
for scenario_dataset in observation_dataset.by_scenario(false).iter() {
for scenario_dataset in observation_dataset
.by_scenario(LiveDataFilter::ExcludeLive)
.iter()
{
let run_datasets = scenario_dataset.by_run();

// execute model for current run
Expand Down Expand Up @@ -249,7 +252,7 @@ async fn main() -> anyhow::Result<()> {
}

let f = rab_linear_model(42.0);
for scenario_dataset in dataset.by_scenario(false) {
for scenario_dataset in dataset.by_scenario(LiveDataFilter::ExcludeLive) {
println!(
"{}:",
format!("{}", scenario_dataset.scenario_name()).green()
Expand Down
4 changes: 2 additions & 2 deletions src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sea_orm::DatabaseConnection;

pub type BoxFuture = Pin<Box<dyn Future<Output = anyhow::Result<Data>> + Send>>;

pub fn rab_linear_model(ci_g_w: f32) -> impl Fn(Vec<&Metrics>, f32) -> Data {
pub fn rab_linear_model(ci_g_w: f32) -> impl Fn(&Vec<&Metrics>, f32) -> Data {
return move |metrics, cpu_avg_pow_w| {
// TODO: THIS MUST BE FETCH ASYNCRONOUSLY USING THE run_id!

Expand Down Expand Up @@ -36,6 +36,6 @@ pub fn rab_linear_model(ci_g_w: f32) -> impl Fn(Vec<&Metrics>, f32) -> Data {
pub fn rab_nonlinear_model(
_ci: f32,
_db: &DatabaseConnection,
) -> impl Fn(&[&Metrics], i32) -> Data {
) -> impl Fn(&Vec<&Metrics>, f32) -> Data {
return move |_metrics, _run_id| todo!();
}
3 changes: 2 additions & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ async fn create_app(db: &DatabaseConnection) -> Router {
.layer(middleware::from_fn_with_state(pool.clone(), api_key_auth));
*/
Router::new()
.route("/api/scenarios", get(routes::get_scenarios))
.route("/", get(index_handler))
.route("/index.html", get(index_handler))
.route("/api/scenarios", get(routes::get_scenarios))
.route("/api/runs/:scenario_name", get(routes::get_runs))
.route("/*file", get(static_handler))
.fallback_service(get(not_found))
.with_state(db.clone())
Expand Down
Loading

0 comments on commit 220ba5d

Please sign in to comment.