From 97e27895cb5a99794a6c3b40e10241d0f07c08ba Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 00:07:17 +0800 Subject: [PATCH 01/11] add bench with join type and cache workload --- src/stream/benches/stream_hash_join_mem.rs | 7 +- src/stream/benches/stream_hash_join_rt.rs | 21 ++- src/stream/src/executor/test_utils.rs | 193 ++++++++++++++------- 3 files changed, 143 insertions(+), 78 deletions(-) diff --git a/src/stream/benches/stream_hash_join_mem.rs b/src/stream/benches/stream_hash_join_mem.rs index 7f4275162ad9..c0b319f94321 100644 --- a/src/stream/benches/stream_hash_join_mem.rs +++ b/src/stream/benches/stream_hash_join_mem.rs @@ -16,15 +16,16 @@ //! To run this benchmark you can use the following command: //! ```sh -//! cargo bench --features dhat-heap --bench stream_hash_join +//! cargo bench --features dhat-heap --bench stream_hash_join_mem //! ``` //! //! You may also specify the amplification size, e.g. 40000 //! ```sh -//! cargo bench --features dhat-heap --bench stream_hash_join -- 40000 +//! cargo bench --features dhat-heap --bench stream_hash_join_mem -- 40000 //! ``` use std::env; +use risingwave_stream::executor::JoinType; use risingwave_stream::executor::test_utils::hash_join_executor::*; @@ -44,7 +45,7 @@ async fn main() { } else { 100_000 }; - let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp).await; + let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp, HashJoinWorkload::NotInCache, JoinType::Inner).await; { // Start the profiler later, after we have ingested the data for hash join build-side. #[cfg(feature = "dhat-heap")] diff --git a/src/stream/benches/stream_hash_join_rt.rs b/src/stream/benches/stream_hash_join_rt.rs index 5beed531575e..0e3a6bea6258 100644 --- a/src/stream/benches/stream_hash_join_rt.rs +++ b/src/stream/benches/stream_hash_join_rt.rs @@ -23,6 +23,7 @@ use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use futures::executor::block_on; use risingwave_stream::executor::test_utils::hash_join_executor::*; use tokio::runtime::Runtime; +use risingwave_stream::executor::JoinType; risingwave_expr_impl::enable!(); @@ -32,14 +33,18 @@ fn bench_hash_join(c: &mut Criterion) { let rt = Runtime::new().unwrap(); for amp in [10_000, 20_000, 30_000, 40_000, 100_000, 200_000, 400_000] { - let name = format!("hash_join_rt_{}", amp); - group.bench_function(&name, |b| { - b.to_async(&rt).iter_batched( - || block_on(setup_bench_stream_hash_join(amp)), - |(tx_l, tx_r, out)| handle_streams(amp, tx_l, tx_r, out), - BatchSize::SmallInput, - ) - }); + for workload in [HashJoinWorkload::NotInCache, HashJoinWorkload::InCache] { + for join_type in [JoinType::Inner, JoinType::LeftOuter] { + let name = format!("hash_join_rt_{}_{}_{}", amp, workload, join_type); + group.bench_function(&name, |b| { + b.to_async(&rt).iter_batched( + || block_on(setup_bench_stream_hash_join(amp, workload, join_type)), + |(tx_l, tx_r, out)| handle_streams(workload, join_type, amp, tx_l, tx_r, out), + BatchSize::SmallInput, + ) + }); + } + } } } diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 2b02c5882eac..5b2c75266b53 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -630,7 +630,8 @@ pub mod hash_join_executor { use std::sync::Arc; use itertools::Itertools; - use risingwave_common::array::{I64Array, Op, StreamChunkTestExt}; + use strum_macros::Display; + use risingwave_common::array::{I64Array, Op}; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId}; use risingwave_common::hash::Key128; use risingwave_common::util::sort_util::OrderType; @@ -642,6 +643,13 @@ pub mod hash_join_executor { use crate::executor::prelude::StateTable; use crate::executor::test_utils::{MessageSender, MockSource}; use crate::executor::{ActorContext, HashJoinExecutor, JoinParams, JoinType}; + use crate::executor::join::JoinTypePrimitive; + + #[derive(Clone, Copy, Debug, Display)] + pub enum HashJoinWorkload { + InCache, + NotInCache, + } pub async fn create_in_memory_state_table( mem_state: MemoryStateStore, @@ -701,6 +709,8 @@ pub mod hash_join_executor { /// 4. Check memory utilization. pub async fn setup_bench_stream_hash_join( amp: usize, + workload: HashJoinWorkload, + join_type_primitive: JoinTypePrimitive, ) -> (MessageSender, MessageSender, BoxedMessageStream) { let fields = vec![DataType::Int64, DataType::Int64, DataType::Int64]; let orders = vec![OrderType::ascending(), OrderType::ascending()]; @@ -715,27 +725,8 @@ pub mod hash_join_executor { create_in_memory_state_table(state_store.clone(), &fields, &orders, &[0, 1], 2).await; // Insert 100K records into the build side. - { - // Create column [0]: join key. Each record has the same value, to trigger join amplification. - let mut int64_jk_builder = DataType::Int64.create_array_builder(amp); - int64_jk_builder - .append_array(&I64Array::from_iter(vec![Some(200_000); amp].into_iter()).into()); - let jk = int64_jk_builder.finish(); - - // Create column [1]: pk. The original pk will be here, it will be unique. - let mut int64_pk_data_chunk_builder = DataType::Int64.create_array_builder(amp); - let seq = I64Array::from_iter((0..amp as i64).map(Some)); - int64_pk_data_chunk_builder.append_array(&I64Array::from(seq).into()); - let pk = int64_pk_data_chunk_builder.finish(); - - // Create column [2]: value. This can be an arbitrary value, so just clone the pk column. - let values = pk.clone(); - - // Build the stream chunk. - let columns = vec![jk.into(), pk.into(), values.into()]; - let ops = vec![Op::Insert; amp]; - let stream_chunk = StreamChunk::new(ops, columns); - + if matches!(workload, HashJoinWorkload::NotInCache) { + let stream_chunk = build_chunk(amp, 200_000); // Write to state table. rhs_state_table.write_chunk(stream_chunk); } @@ -765,31 +756,85 @@ pub mod hash_join_executor { let params_l = JoinParams::new(vec![0], vec![1]); let params_r = JoinParams::new(vec![0], vec![1]); - let executor = HashJoinExecutor::::new( - ActorContext::for_test(123), - info, - source_l, - source_r, - params_l, - params_r, - vec![false], // null-safe - (0..schema_len).collect_vec(), - None, // condition, it is an eq join, we have no condition - vec![], // ineq pairs - lhs_state_table, - lhs_degree_state_table, - rhs_state_table, - rhs_degree_state_table, - Arc::new(AtomicU64::new(0)), // watermark epoch - false, // is_append_only - Arc::new(StreamingMetrics::unused()), - 1024, // chunk_size - 2048, // high_join_amplification_threshold - ); - (tx_l, tx_r, executor.boxed().execute()) + match join_type_primitive { + JoinType::Inner => { + let executor = HashJoinExecutor::::new( + ActorContext::for_test(123), + info, + source_l, + source_r, + params_l, + params_r, + vec![false], // null-safe + (0..schema_len).collect_vec(), + None, // condition, it is an eq join, we have no condition + vec![], // ineq pairs + lhs_state_table, + lhs_degree_state_table, + rhs_state_table, + rhs_degree_state_table, + Arc::new(AtomicU64::new(0)), // watermark epoch + false, // is_append_only + Arc::new(StreamingMetrics::unused()), + 1024, // chunk_size + 2048, // high_join_amplification_threshold + ); + (tx_l, tx_r, executor.boxed().execute()) + } + JoinType::LeftOuter => { + let executor = HashJoinExecutor::::new( + ActorContext::for_test(123), + info, + source_l, + source_r, + params_l, + params_r, + vec![false], // null-safe + (0..schema_len).collect_vec(), + None, // condition, it is an eq join, we have no condition + vec![], // ineq pairs + lhs_state_table, + lhs_degree_state_table, + rhs_state_table, + rhs_degree_state_table, + Arc::new(AtomicU64::new(0)), // watermark epoch + false, // is_append_only + Arc::new(StreamingMetrics::unused()), + 1024, // chunk_size + 2048, // high_join_amplification_threshold + ); + (tx_l, tx_r, executor.boxed().execute()) + } + _ => panic!("Unsupported join type"), + } + + } + + fn build_chunk(size: usize, join_key_value: i64) -> StreamChunk { + // Create column [0]: join key. Each record has the same value, to trigger join amplification. + let mut int64_jk_builder = DataType::Int64.create_array_builder(size); + int64_jk_builder + .append_array(&I64Array::from_iter(vec![Some(join_key_value); size].into_iter()).into()); + let jk = int64_jk_builder.finish(); + + // Create column [1]: pk. The original pk will be here, it will be unique. + let mut int64_pk_data_chunk_builder = DataType::Int64.create_array_builder(size); + let seq = I64Array::from_iter((0..size as i64).map(Some)); + int64_pk_data_chunk_builder.append_array(&I64Array::from(seq).into()); + let pk = int64_pk_data_chunk_builder.finish(); + + // Create column [2]: value. This can be an arbitrary value, so just clone the pk column. + let values = pk.clone(); + + // Build the stream chunk. + let columns = vec![jk.into(), pk.into(), values.into()]; + let ops = vec![Op::Insert; size]; + StreamChunk::new(ops, columns) } pub async fn handle_streams( + hash_join_workload: HashJoinWorkload, + join_type_primitive: JoinTypePrimitive, amp: usize, mut tx_l: MessageSender, mut tx_r: MessageSender, @@ -798,11 +843,22 @@ pub mod hash_join_executor { // Init executors tx_l.push_barrier(test_epoch(1), false); tx_r.push_barrier(test_epoch(1), false); - // Push a single record into tx_l, matches 100K records in the build side. - let chunk = StreamChunk::from_pretty( - " I I I - + 200000 0 1", - ); + + if matches!(hash_join_workload, HashJoinWorkload::InCache) { + // Push a single record into tx_r, so 100K records to be matched are cached. + let chunk = build_chunk(amp, 200_000); + tx_r.push_chunk(chunk); + } + + // Push a chunk of records into tx_l, matches 100K records in the build side. + let chunk_size = 1024; + let chunk = match join_type_primitive { + // Make sure all match + JoinType::Inner => build_chunk(chunk_size, 200_000), + // Make sure no match is found. + JoinType::LeftOuter => build_chunk(chunk_size, 300_000), + _ => panic!("Unsupported join type"), + }; tx_l.push_chunk(chunk); match stream.next().await { @@ -814,27 +870,30 @@ pub mod hash_join_executor { } } - let chunks = amp / 1024; - let remainder = amp % 1024; - - for _ in 0..chunks { - match stream.next().await { - Some(Ok(Message::Chunk(c))) => { - assert_eq!(c.cardinality(), 1024); - } - other => { - panic!("Expected a barrier, got {:?}", other); + match join_type_primitive { + JoinType::LeftOuter => { + match stream.next().await { + Some(Ok(Message::Chunk(c))) => { + assert_eq!(c.cardinality(), 1024); + } + other => { + panic!("Expected a barrier, got {:?}", other); + } } } - } - - match stream.next().await { - Some(Ok(Message::Chunk(c))) => { - assert_eq!(c.cardinality(), remainder); - } - other => { - panic!("Expected a barrier, got {:?}", other); + JoinType::Inner => { + for _ in 0..amp { + match stream.next().await { + Some(Ok(Message::Chunk(c))) => { + assert_eq!(c.cardinality(), chunk_size); + } + other => { + panic!("Expected a barrier, got {:?}", other); + } + } + } } + _ => panic!("Unsupported join type"), } } } From c7a660219d9fa52411d504ae1fb2cc240ab05224 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 00:21:02 +0800 Subject: [PATCH 02/11] loosen constrains --- src/stream/src/executor/test_utils.rs | 33 ++++++++++----------------- 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 5b2c75266b53..9ec6deed66c3 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -870,30 +870,21 @@ pub mod hash_join_executor { } } - match join_type_primitive { - JoinType::LeftOuter => { - match stream.next().await { - Some(Ok(Message::Chunk(c))) => { - assert_eq!(c.cardinality(), 1024); - } - other => { - panic!("Expected a barrier, got {:?}", other); - } + let expected_count = match join_type_primitive { + JoinType::LeftOuter => amp * chunk_size, + JoinType::Inner => chunk_size, + _ => panic!("Unsupported join type"), + }; + let mut current_count = 0; + while current_count < expected_count { + match stream.next().await { + Some(Ok(Message::Chunk(c))) => { + current_count += c.cardinality(); } - } - JoinType::Inner => { - for _ in 0..amp { - match stream.next().await { - Some(Ok(Message::Chunk(c))) => { - assert_eq!(c.cardinality(), chunk_size); - } - other => { - panic!("Expected a barrier, got {:?}", other); - } - } + other => { + panic!("Expected a barrier, got {:?}", other); } } - _ => panic!("Unsupported join type"), } } } From ea40461dfd81d70bacf68c79566c4e114b764d05 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 00:28:19 +0800 Subject: [PATCH 03/11] fix --- src/stream/src/executor/test_utils.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 9ec6deed66c3..3951f2a2fc2a 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -871,8 +871,8 @@ pub mod hash_join_executor { } let expected_count = match join_type_primitive { - JoinType::LeftOuter => amp * chunk_size, - JoinType::Inner => chunk_size, + JoinType::LeftOuter => chunk_size, + JoinType::Inner => amp * chunk_size, _ => panic!("Unsupported join type"), }; let mut current_count = 0; From bbc17db2a4c57a6c80d5549bd0bc62bc097bf01c Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 09:14:03 +0800 Subject: [PATCH 04/11] use 64 chunk_size --- src/stream/src/executor/test_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 3951f2a2fc2a..b03b3bc7456b 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -851,7 +851,7 @@ pub mod hash_join_executor { } // Push a chunk of records into tx_l, matches 100K records in the build side. - let chunk_size = 1024; + let chunk_size = 64; let chunk = match join_type_primitive { // Make sure all match JoinType::Inner => build_chunk(chunk_size, 200_000), From 12c91cf90f424af660b29d30c0e4149772dd185e Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 09:18:33 +0800 Subject: [PATCH 05/11] control chunk size based on workload --- src/stream/src/executor/test_utils.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index b03b3bc7456b..25299dc4dca9 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -851,7 +851,10 @@ pub mod hash_join_executor { } // Push a chunk of records into tx_l, matches 100K records in the build side. - let chunk_size = 64; + let chunk_size = match hash_join_workload { + HashJoinWorkload::InCache => 64, + HashJoinWorkload::NotInCache => 1, + }; let chunk = match join_type_primitive { // Make sure all match JoinType::Inner => build_chunk(chunk_size, 200_000), From 7f6534e8046d55de7ae63d07707ebe2fe9949aa6 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 09:23:55 +0800 Subject: [PATCH 06/11] use pb join type instead to get name instead of number --- src/stream/benches/stream_hash_join_mem.rs | 2 +- src/stream/benches/stream_hash_join_rt.rs | 2 +- src/stream/src/executor/test_utils.rs | 18 +++++++++--------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/stream/benches/stream_hash_join_mem.rs b/src/stream/benches/stream_hash_join_mem.rs index c0b319f94321..ddbecfd50335 100644 --- a/src/stream/benches/stream_hash_join_mem.rs +++ b/src/stream/benches/stream_hash_join_mem.rs @@ -25,7 +25,7 @@ //! ``` use std::env; -use risingwave_stream::executor::JoinType; +use risingwave_pb::plan_common::JoinType; use risingwave_stream::executor::test_utils::hash_join_executor::*; diff --git a/src/stream/benches/stream_hash_join_rt.rs b/src/stream/benches/stream_hash_join_rt.rs index 0e3a6bea6258..58d61b82b550 100644 --- a/src/stream/benches/stream_hash_join_rt.rs +++ b/src/stream/benches/stream_hash_join_rt.rs @@ -23,7 +23,7 @@ use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use futures::executor::block_on; use risingwave_stream::executor::test_utils::hash_join_executor::*; use tokio::runtime::Runtime; -use risingwave_stream::executor::JoinType; +use risingwave_pb::plan_common::JoinType; risingwave_expr_impl::enable!(); diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 25299dc4dca9..ddd9a45a5ebe 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -635,6 +635,7 @@ pub mod hash_join_executor { use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId}; use risingwave_common::hash::Key128; use risingwave_common::util::sort_util::OrderType; + use risingwave_pb::plan_common::JoinType; use risingwave_storage::memory::MemoryStateStore; use super::*; @@ -642,8 +643,7 @@ pub mod hash_join_executor { use crate::executor::monitor::StreamingMetrics; use crate::executor::prelude::StateTable; use crate::executor::test_utils::{MessageSender, MockSource}; - use crate::executor::{ActorContext, HashJoinExecutor, JoinParams, JoinType}; - use crate::executor::join::JoinTypePrimitive; + use crate::executor::{ActorContext, HashJoinExecutor, JoinParams, JoinType as ConstJoinType}; #[derive(Clone, Copy, Debug, Display)] pub enum HashJoinWorkload { @@ -710,7 +710,7 @@ pub mod hash_join_executor { pub async fn setup_bench_stream_hash_join( amp: usize, workload: HashJoinWorkload, - join_type_primitive: JoinTypePrimitive, + join_type: JoinType, ) -> (MessageSender, MessageSender, BoxedMessageStream) { let fields = vec![DataType::Int64, DataType::Int64, DataType::Int64]; let orders = vec![OrderType::ascending(), OrderType::ascending()]; @@ -756,9 +756,9 @@ pub mod hash_join_executor { let params_l = JoinParams::new(vec![0], vec![1]); let params_r = JoinParams::new(vec![0], vec![1]); - match join_type_primitive { + match join_type { JoinType::Inner => { - let executor = HashJoinExecutor::::new( + let executor = HashJoinExecutor::::new( ActorContext::for_test(123), info, source_l, @@ -782,7 +782,7 @@ pub mod hash_join_executor { (tx_l, tx_r, executor.boxed().execute()) } JoinType::LeftOuter => { - let executor = HashJoinExecutor::::new( + let executor = HashJoinExecutor::::new( ActorContext::for_test(123), info, source_l, @@ -834,7 +834,7 @@ pub mod hash_join_executor { pub async fn handle_streams( hash_join_workload: HashJoinWorkload, - join_type_primitive: JoinTypePrimitive, + join_type: JoinType, amp: usize, mut tx_l: MessageSender, mut tx_r: MessageSender, @@ -855,7 +855,7 @@ pub mod hash_join_executor { HashJoinWorkload::InCache => 64, HashJoinWorkload::NotInCache => 1, }; - let chunk = match join_type_primitive { + let chunk = match join_type { // Make sure all match JoinType::Inner => build_chunk(chunk_size, 200_000), // Make sure no match is found. @@ -873,7 +873,7 @@ pub mod hash_join_executor { } } - let expected_count = match join_type_primitive { + let expected_count = match join_type { JoinType::LeftOuter => chunk_size, JoinType::Inner => amp * chunk_size, _ => panic!("Unsupported join type"), From aab72b441a5aeea6057c7e12445a42d3a8e2ae6f Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 09:27:49 +0800 Subject: [PATCH 07/11] fix format string --- src/stream/benches/stream_hash_join_mem.rs | 6 ++++-- src/stream/benches/stream_hash_join_rt.rs | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/stream/benches/stream_hash_join_mem.rs b/src/stream/benches/stream_hash_join_mem.rs index ddbecfd50335..5a47703c3be4 100644 --- a/src/stream/benches/stream_hash_join_mem.rs +++ b/src/stream/benches/stream_hash_join_mem.rs @@ -45,12 +45,14 @@ async fn main() { } else { 100_000 }; - let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp, HashJoinWorkload::NotInCache, JoinType::Inner).await; + let workload = HashJoinWorkload::NotInCache; + let join_type = JoinType::Inner; + let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp, workload, join_type).await; { // Start the profiler later, after we have ingested the data for hash join build-side. #[cfg(feature = "dhat-heap")] let _profiler = dhat::Profiler::new_heap(); - handle_streams(amp, tx_l, tx_r, out).await; + handle_streams(workload, join_type, amp, tx_l, tx_r, out).await; } } diff --git a/src/stream/benches/stream_hash_join_rt.rs b/src/stream/benches/stream_hash_join_rt.rs index 58d61b82b550..0eb458c98053 100644 --- a/src/stream/benches/stream_hash_join_rt.rs +++ b/src/stream/benches/stream_hash_join_rt.rs @@ -35,7 +35,7 @@ fn bench_hash_join(c: &mut Criterion) { for amp in [10_000, 20_000, 30_000, 40_000, 100_000, 200_000, 400_000] { for workload in [HashJoinWorkload::NotInCache, HashJoinWorkload::InCache] { for join_type in [JoinType::Inner, JoinType::LeftOuter] { - let name = format!("hash_join_rt_{}_{}_{}", amp, workload, join_type); + let name = format!("hash_join_rt_{}_{}_{:#?}", amp, workload, join_type); group.bench_function(&name, |b| { b.to_async(&rt).iter_batched( || block_on(setup_bench_stream_hash_join(amp, workload, join_type)), From e94320780a0f34876b4f6abe78c2cf3988c8a9c5 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 10:49:04 +0800 Subject: [PATCH 08/11] add program to monitor mem use --- src/stream/benches/stream_hash_join.py | 39 ++++++++++++++++++++++ src/stream/benches/stream_hash_join_mem.rs | 36 ++++++++++++-------- 2 files changed, 61 insertions(+), 14 deletions(-) create mode 100755 src/stream/benches/stream_hash_join.py diff --git a/src/stream/benches/stream_hash_join.py b/src/stream/benches/stream_hash_join.py new file mode 100755 index 000000000000..d5ca9994051f --- /dev/null +++ b/src/stream/benches/stream_hash_join.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +# Executes full benchmark for stream_hash_join runtime and memory consumption +# Outputs a json file with the results. + +import subprocess +import re +import sys + +# Print header +print("Amp,Workload,JoinType,Total Blocks,Total Bytes") + +# Run benchmarks and capture results +for amp in [10_000, 20_000, 30_000, 40_000, 100_000, 200_000, 400_000]: + for workload in ["NotInCache", "InCache"]: + for join_type in ["Inner", "LeftOuter"]: + # Construct the command + cmd = f'ARGS={amp},{workload},{join_type} cargo bench --features dhat-heap --bench stream_hash_join_mem' + + # Run the command and capture output + try: + output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True) + + # Extract total blocks and bytes + total_blocks_match = re.search(r'max_blocks:\s*(\d+)', output) + total_bytes_match = re.search(r'max_bytes:\s*(\d+)', output) + + if total_blocks_match and total_bytes_match: + total_blocks = total_blocks_match.group(1) + total_bytes = total_bytes_match.group(1) + + # Print results immediately + print(f"{amp},{workload},{join_type},{total_blocks},{total_bytes}") + else: + print(f"No total_blocks or total_bytes found for: Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr) + + except subprocess.CalledProcessError as e: + print(f"Error running benchmark for Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr) + print(f"Error output: {e.output}", file=sys.stderr) \ No newline at end of file diff --git a/src/stream/benches/stream_hash_join_mem.rs b/src/stream/benches/stream_hash_join_mem.rs index 5a47703c3be4..50df173bfe7d 100644 --- a/src/stream/benches/stream_hash_join_mem.rs +++ b/src/stream/benches/stream_hash_join_mem.rs @@ -14,14 +14,9 @@ #![feature(let_chains)] -//! To run this benchmark you can use the following command: +//! Specify the amplification_size,workload,join_type e.g. 40000 //! ```sh -//! cargo bench --features dhat-heap --bench stream_hash_join_mem -//! ``` -//! -//! You may also specify the amplification size, e.g. 40000 -//! ```sh -//! cargo bench --features dhat-heap --bench stream_hash_join_mem -- 40000 +//! ARGS=40000,NotInCache,Inner cargo bench --features dhat-heap --bench stream_hash_join_mem //! ``` use std::env; @@ -37,16 +32,26 @@ static ALLOC: dhat::Alloc = dhat::Alloc; #[tokio::main] async fn main() { - let args: Vec<_> = env::args().collect(); - let amp = if let Some(raw_arg) = args.get(1) - && let Ok(arg) = raw_arg.parse() + let arg = env::var("ARGS"); + let (amp, workload, join_type) = if let Ok(raw_arg) = arg { - arg + let parts = raw_arg.split(',').collect::>(); + let amp = parts[0].parse::().expect(format!("invalid amplification_size: {}", parts[0]).as_str()); + let workload = match parts[1] { + "NotInCache" => HashJoinWorkload::NotInCache, + "InCache" => HashJoinWorkload::InCache, + _ => panic!("Invalid workload: {}", parts[1]), + }; + let join_type = match parts[2] { + "Inner" => JoinType::Inner, + "LeftOuter" => JoinType::LeftOuter, + _ => panic!("Invalid join type: {}", parts[2]), + }; + (amp, workload, join_type) } else { - 100_000 + panic!("invalid ARGS: {:?}", arg); }; - let workload = HashJoinWorkload::NotInCache; - let join_type = JoinType::Inner; + let (tx_l, tx_r, out) = setup_bench_stream_hash_join(amp, workload, join_type).await; { // Start the profiler later, after we have ingested the data for hash join build-side. @@ -54,5 +59,8 @@ async fn main() { let _profiler = dhat::Profiler::new_heap(); handle_streams(workload, join_type, amp, tx_l, tx_r, out).await; + let stats= dhat::HeapStats::get(); + println!("max_blocks: {}", stats.max_blocks); + println!("max_bytes: {}", stats.max_bytes); } } From e7ac4acceac521c7f5a21ad421e60daffdf09a60 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 12:04:28 +0800 Subject: [PATCH 09/11] update script to add rt --- src/stream/benches/stream_hash_join.py | 38 ++++++++++++++++++++------ 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/src/stream/benches/stream_hash_join.py b/src/stream/benches/stream_hash_join.py index d5ca9994051f..07deebb63121 100755 --- a/src/stream/benches/stream_hash_join.py +++ b/src/stream/benches/stream_hash_join.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 - +import json # Executes full benchmark for stream_hash_join runtime and memory consumption # Outputs a json file with the results. @@ -8,18 +8,21 @@ import sys # Print header -print("Amp,Workload,JoinType,Total Blocks,Total Bytes") +results = ["Amp,Workload,JoinType,Total Blocks,Total Bytes,Runtime (ns)"] # Run benchmarks and capture results -for amp in [10_000, 20_000, 30_000, 40_000, 100_000, 200_000, 400_000]: +for amp in [20_000, 40_000, 200_000, 400_000]: for workload in ["NotInCache", "InCache"]: for join_type in ["Inner", "LeftOuter"]: # Construct the command - cmd = f'ARGS={amp},{workload},{join_type} cargo bench --features dhat-heap --bench stream_hash_join_mem' + cmd_mem = f'ARGS={amp},{workload},{join_type} cargo bench --features dhat-heap --bench stream_hash_join_mem' + cmd_rt = f'cargo criterion --message-format json --bench stream_hash_join_rt -- hash_join_rt_{amp}_{workload}_{join_type}' + + s = "" - # Run the command and capture output try: - output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True) + # Run cmd_mem and capture output + output = subprocess.check_output(cmd_mem, shell=True, stderr=subprocess.STDOUT, universal_newlines=True) # Extract total blocks and bytes total_blocks_match = re.search(r'max_blocks:\s*(\d+)', output) @@ -29,11 +32,28 @@ total_blocks = total_blocks_match.group(1) total_bytes = total_bytes_match.group(1) - # Print results immediately - print(f"{amp},{workload},{join_type},{total_blocks},{total_bytes}") + s+=f"{amp},{workload},{join_type},{total_blocks},{total_bytes}" else: print(f"No total_blocks or total_bytes found for: Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr) + # Run cmd_rt and capture output + json_output = subprocess.check_output(cmd_rt, shell=True, universal_newlines=True) + json_output = json_output.split('\n') + try: + time_ns = json.loads(json_output[0])["typical"]["estimate"] + except Exception as e: + print(f"could not parse {json_output[0]} due to {e}") + exit(1) + if time_ns: + s+=f",{time_ns}" + else: + print(f"No runtime found for: Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr) + + results.append(s) + except subprocess.CalledProcessError as e: print(f"Error running benchmark for Amp={amp}, Workload={workload}, JoinType={join_type}", file=sys.stderr) - print(f"Error output: {e.output}", file=sys.stderr) \ No newline at end of file + print(f"Error output: {e.output}", file=sys.stderr) + +for result in results: + print(result) \ No newline at end of file From 8bc1e2aa4deeb38b01c899cd2cf4370ab04de0cb Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 13 Dec 2024 14:42:44 +0800 Subject: [PATCH 10/11] docs --- src/stream/benches/stream_hash_join_rt.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/stream/benches/stream_hash_join_rt.rs b/src/stream/benches/stream_hash_join_rt.rs index 0eb458c98053..418ecf17761f 100644 --- a/src/stream/benches/stream_hash_join_rt.rs +++ b/src/stream/benches/stream_hash_join_rt.rs @@ -18,6 +18,11 @@ //! ```sh //! cargo bench --bench stream_hash_join_rt //! ``` +//! +//! Generate flamegraph: +//! ```sh +//! sudo cargo flamegraph --bench stream_hash_join_rt -- hash_join_rt_40000_InCache_Inner +//! ``` use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use futures::executor::block_on; From 3676a59743db0c16c8ffd45ff621a27e10bab971 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Sat, 14 Dec 2024 20:09:48 +0800 Subject: [PATCH 11/11] add barrier to synchronize --- src/stream/src/executor/test_utils.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index ddd9a45a5ebe..d4cd228c0ca6 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -850,6 +850,9 @@ pub mod hash_join_executor { tx_r.push_chunk(chunk); } + tx_l.push_barrier(test_epoch(2), false); + tx_r.push_barrier(test_epoch(2), false); + // Push a chunk of records into tx_l, matches 100K records in the build side. let chunk_size = match hash_join_workload { HashJoinWorkload::InCache => 64, @@ -873,6 +876,15 @@ pub mod hash_join_executor { } } + match stream.next().await { + Some(Ok(Message::Barrier(b))) => { + assert_eq!(b.epoch.curr, test_epoch(2)); + } + other => { + panic!("Expected a barrier, got {:?}", other); + } + } + let expected_count = match join_type { JoinType::LeftOuter => chunk_size, JoinType::Inner => amp * chunk_size,