Skip to content

Commit

Permalink
add program to monitor mem use
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Dec 13, 2024
1 parent aab72b4 commit e943207
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 14 deletions.
39 changes: 39 additions & 0 deletions src/stream/benches/stream_hash_join.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env python3

# Executes full benchmark for stream_hash_join runtime and memory consumption
# Outputs a json file with the results.

import subprocess
import re
import sys

# Print header
print("Amp,Workload,JoinType,Total Blocks,Total Bytes")

# Run benchmarks and capture results
for amp in [10_000, 20_000, 30_000, 40_000, 100_000, 200_000, 400_000]:
for workload in ["NotInCache", "InCache"]:
for join_type in ["Inner", "LeftOuter"]:
# Construct the command
cmd = f'ARGS={amp},{workload},{join_type} cargo bench --features dhat-heap --bench stream_hash_join_mem'

# Run the command and capture output
try:
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True)

# Extract total blocks and bytes
total_blocks_match = re.search(r'max_blocks:\s*(\d+)', output)
total_bytes_match = re.search(r'max_bytes:\s*(\d+)', output)

if total_blocks_match and total_bytes_match:
total_blocks = total_blocks_match.group(1)
total_bytes = total_bytes_match.group(1)

# Print results immediately
print(f"{amp},{workload},{join_type},{total_blocks},{total_bytes}")
else:
print(f"No total_blocks or total_bytes found for: Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr)

except subprocess.CalledProcessError as e:
print(f"Error running benchmark for Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr)
print(f"Error output: {e.output}", file=sys.stderr)
36 changes: 22 additions & 14 deletions src/stream/benches/stream_hash_join_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@

#![feature(let_chains)]

//! To run this benchmark you can use the following command:
//! Specify the amplification_size,workload,join_type e.g. 40000
//! ```sh
//! cargo bench --features dhat-heap --bench stream_hash_join_mem
//! ```
//!
//! You may also specify the amplification size, e.g. 40000
//! ```sh
//! cargo bench --features dhat-heap --bench stream_hash_join_mem -- 40000
//! ARGS=40000,NotInCache,Inner cargo bench --features dhat-heap --bench stream_hash_join_mem
//! ```
use std::env;
Expand All @@ -37,22 +32,35 @@ static ALLOC: dhat::Alloc = dhat::Alloc;

#[tokio::main]
async fn main() {
let args: Vec<_> = env::args().collect();
let amp = if let Some(raw_arg) = args.get(1)
&& let Ok(arg) = raw_arg.parse()
let arg = env::var("ARGS");
let (amp, workload, join_type) = if let Ok(raw_arg) = arg
{
arg
let parts = raw_arg.split(',').collect::<Vec<_>>();
let amp = parts[0].parse::<usize>().expect(format!("invalid amplification_size: {}", parts[0]).as_str());
let workload = match parts[1] {
"NotInCache" => HashJoinWorkload::NotInCache,
"InCache" => HashJoinWorkload::InCache,
_ => panic!("Invalid workload: {}", parts[1]),
};
let join_type = match parts[2] {
"Inner" => JoinType::Inner,
"LeftOuter" => JoinType::LeftOuter,
_ => panic!("Invalid join type: {}", parts[2]),
};
(amp, workload, join_type)
} else {
100_000
panic!("invalid ARGS: {:?}", arg);
};
let workload = HashJoinWorkload::NotInCache;
let join_type = JoinType::Inner;

let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp, workload, join_type).await;
{
// Start the profiler later, after we have ingested the data for hash join build-side.
#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::new_heap();

handle_streams(workload, join_type, amp, tx_l, tx_r, out).await;
let stats= dhat::HeapStats::get();
println!("max_blocks: {}", stats.max_blocks);
println!("max_bytes: {}", stats.max_bytes);
}
}

0 comments on commit e943207

Please sign in to comment.