Skip to content

Commit

Permalink
feat(batch): support spill hash agg for the batch query (#16771)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored May 29, 2024
1 parent 6172211 commit c7ad769
Show file tree
Hide file tree
Showing 14 changed files with 619 additions and 48 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ arrow-schema = { workspace = true }
assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
bytes = "1"
either = "1"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand All @@ -30,9 +31,11 @@ hytra = "0.1.2"
icelake = { workspace = true }
itertools = { workspace = true }
memcomparable = "0.2"
opendal = "0.45.1"
parking_lot = { workspace = true }
paste = "1"
prometheus = { version = "0.13", features = ["process"] }
prost = "0.12"
rand = { workspace = true }
risingwave_common = { workspace = true }
risingwave_common_estimate_size = { workspace = true }
Expand Down Expand Up @@ -62,6 +65,8 @@ tokio-stream = "0.1"
tokio-util = { workspace = true }
tonic = { workspace = true }
tracing = "0.1"
twox-hash = "1"
uuid = { version = "1", features = ["v4"] }

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }
Expand Down
5 changes: 4 additions & 1 deletion src/batch/benches/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.
pub mod utils;

use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use itertools::Itertools;
use risingwave_batch::executor::aggregation::build as build_agg;
Expand Down Expand Up @@ -96,14 +98,15 @@ fn create_hash_agg_executor(
let schema = Schema { fields };

Box::new(HashAggExecutor::<hash::Key64>::new(
agg_init_states,
Arc::new(agg_init_states),
group_key_columns,
group_key_types,
schema,
input,
"HashAggExecutor".to_string(),
CHUNK_SIZE,
MemoryContext::none(),
false,
ShutdownToken::empty(),
))
}
Expand Down
7 changes: 7 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ pub enum BatchError {

#[error("Not enough memory to run this query, batch memory limit is {0} bytes")]
OutOfMemory(u64),

#[error("Failed to spill out to disk")]
Spill(
#[from]
#[backtrace]
opendal::Error,
),
}

// Serialize/deserialize error.
Expand Down
Loading

0 comments on commit c7ad769

Please sign in to comment.