Skip to content

Commit

Permalink
Update to latest git aws-throwaway
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 18, 2023
1 parent 7f1f6a2 commit 8b1aada
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 24 deletions.
15 changes: 7 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ rand_distr = "0.4.1"
clap = { version = "4.0.4", features = ["cargo", "derive"] }
async-trait = "0.1.30"
typetag = "0.2.5"
aws-throwaway = "0.2.0"
#aws-throwaway = "0.2.0"
aws-throwaway = { git = "https://github.com/shotover/aws-throwaway" }
tokio-bin-process = "0.4.0"
ordered-float = { version = "4.0.0", features = ["serde"] }
15 changes: 12 additions & 3 deletions ec2-cargo/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ else
fi
"#).await;
while let Some(line) = receiver.recv().await {
println!("{}", line)
match line {
Ok(line) => println!("{line}"),
Err(err) => panic!("Failed to setup instance: {err:?}"),
}
}

println!("Finished creating instance.");
Expand Down Expand Up @@ -151,7 +154,10 @@ cargo nextest run {} 2>&1
))
.await;
while let Some(line) = receiver.recv().await {
println!("{}", line)
match line {
Ok(line) => println!("{line}"),
Err(err) => println!("{err:?}"),
}
}

Ok(())
Expand All @@ -176,7 +182,10 @@ cargo windsock {} 2>&1
))
.await;
while let Some(line) = receiver.recv().await {
println!("{}", line)
match line {
Ok(line) => println!("{line}"),
Err(err) => println!("{err:?}"),
}
}

rsync_fetch_windsock_results(state).await;
Expand Down
14 changes: 10 additions & 4 deletions shotover-proxy/benches/windsock/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,14 @@ sudo docker system prune -af"#,
loop {
match tokio::time::timeout(Duration::from_secs(120), receiver.recv()).await {
Ok(Some(line)) => {
writeln!(logs, "{}", line).unwrap();
if regex.is_match(&line) {
return;
match line {
Ok(line) => {
writeln!(logs, "{line}").unwrap();
if regex.is_match(&line) {
return;
}
}
Err(err) => panic!("docker logs failed: {err:?}"),
}
}
Ok(None) => panic!(
Expand Down Expand Up @@ -287,7 +292,7 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo
tokio::select! {
line = receiver.recv() => {
match line {
Some(line) => {
Some(Ok(line)) => {
let event = Event::from_json_str(&line).unwrap();
if let Level::Warn = event.level {
tracing::error!("shotover warn:\n {event}");
Expand All @@ -299,6 +304,7 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo
return
}
}
Some(Err(err)) => panic!("shotover-bin failed: {err:?}"),
None => return,
}
},
Expand Down
5 changes: 3 additions & 2 deletions shotover-proxy/benches/windsock/profilers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use self::samply::Samply;
use crate::common::Shotover;
use anyhow::Result;
use aws_throwaway::Ec2Instance;
use perf_flamegraph::Perf;
use std::{collections::HashMap, path::PathBuf};
Expand All @@ -19,7 +20,7 @@ pub struct ProfilerRunner {
results_path: PathBuf,
perf: Option<Perf>,
samply: Option<Samply>,
sys_monitor: Option<UnboundedReceiver<String>>,
sys_monitor: Option<UnboundedReceiver<Result<String>>>,
}

impl ProfilerRunner {
Expand Down Expand Up @@ -98,7 +99,7 @@ impl Drop for ProfilerRunner {

pub struct CloudProfilerRunner {
bench_name: String,
monitor_instances: HashMap<String, UnboundedReceiver<String>>,
monitor_instances: HashMap<String, UnboundedReceiver<Result<String>>>,
}

impl CloudProfilerRunner {
Expand Down
14 changes: 8 additions & 6 deletions shotover-proxy/benches/windsock/profilers/sar.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! This module provides abstractions for getting system usage from the unix command `sar`, on ubuntu it is contained within the package `sysstat`.

use anyhow::Result;
use aws_throwaway::Ec2Instance;
use std::{collections::HashMap, process::Stdio};
use time::OffsetDateTime;
Expand Down Expand Up @@ -110,7 +111,7 @@ fn metric_with_formatter<F: Fn(&str) -> String>(
/// 12:19:52 kbmemfree kbavail kbmemused %memused kbbuffers kbcached kbcommit %commit kbactive kbinact kbdirty
/// 12:19:53 10827924 17655248 14426872 43.97 482592 6566224 20441924 62.30 13649508 7304056 148
/// ```
pub fn parse_sar(rx: &mut UnboundedReceiver<String>) -> ParsedSar {
pub fn parse_sar(rx: &mut UnboundedReceiver<Result<String>>) -> ParsedSar {
let mut named_values = HashMap::new();

// read date command
Expand All @@ -121,7 +122,7 @@ pub fn parse_sar(rx: &mut UnboundedReceiver<String>) -> ParsedSar {
};
};
let started_at =
OffsetDateTime::from_unix_timestamp_nanos(started_at.parse().unwrap()).unwrap();
OffsetDateTime::from_unix_timestamp_nanos(started_at.unwrap().parse().unwrap()).unwrap();

// skip header
if rx.try_recv().is_err() {
Expand Down Expand Up @@ -152,8 +153,9 @@ pub fn parse_sar(rx: &mut UnboundedReceiver<String>) -> ParsedSar {
};
};
for (head, data) in header
.unwrap()
.split_whitespace()
.zip(data.split_whitespace())
.zip(data.unwrap().split_whitespace())
.skip(1)
{
named_values
Expand All @@ -176,7 +178,7 @@ const SAR_COMMAND: &str = "date +%s%N; sar -r -u 1";

/// Run the sar command on the local machine.
/// Each line of output is returned via the `UnboundedReceiver`
pub fn run_sar_local() -> UnboundedReceiver<String> {
pub fn run_sar_local() -> UnboundedReceiver<Result<String>> {
let (tx, rx) = unbounded_channel();
tokio::spawn(async move {
let mut child = Command::new("bash")
Expand All @@ -187,7 +189,7 @@ pub fn run_sar_local() -> UnboundedReceiver<String> {
.unwrap();
let mut reader = BufReader::new(child.stdout.take().unwrap()).lines();
while let Some(line) = reader.next_line().await.unwrap() {
if tx.send(line).is_err() {
if tx.send(Ok(line)).is_err() {
child.kill().await.unwrap();
return;
}
Expand All @@ -199,6 +201,6 @@ pub fn run_sar_local() -> UnboundedReceiver<String> {

/// Run the sar command over ssh on the passed instance.
/// Each line of output is returned via the `UnboundedReceiver`
pub async fn run_sar_remote(instance: &Ec2Instance) -> UnboundedReceiver<String> {
pub async fn run_sar_remote(instance: &Ec2Instance) -> UnboundedReceiver<Result<String>> {
instance.ssh().shell_stdout_lines(SAR_COMMAND).await
}

0 comments on commit 8b1aada

Please sign in to comment.