Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement CardinalityAwareRowConverter while doing streaming merge #7401

Merged
merged 61 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
b80e702
Use CardinalityAwareRow converter
JayjeetAtGithub Aug 24, 2023
d83fae2
Move CardinalityAwareRowConverter in df
JayjeetAtGithub Aug 29, 2023
546ccff
Move CardinalityAwareRowConverter in df
JayjeetAtGithub Aug 29, 2023
e9fb8af
Remove unnecessary clone and make wrapper mod private
JayjeetAtGithub Aug 30, 2023
c5c707a
Use as_any_dictionary_opt
JayjeetAtGithub Aug 30, 2023
22fb159
Remove unnecessary comments
JayjeetAtGithub Aug 30, 2023
65a0209
Remove done
JayjeetAtGithub Aug 30, 2023
bd5faf7
Add test for cardinality aware row converter on high card dict
JayjeetAtGithub Aug 31, 2023
80d3bf2
Add test for cardinality aware row converter on low card dict
JayjeetAtGithub Aug 31, 2023
5eff541
Ignore the test_dict_merge_infinite test
JayjeetAtGithub Aug 31, 2023
2651532
Remove phantom Arc import
JayjeetAtGithub Sep 1, 2023
7da3681
Remove the infinite stream test
JayjeetAtGithub Sep 1, 2023
6cc6468
Update datafusion/core/src/physical_plan/wrapper.rs
JayjeetAtGithub Sep 7, 2023
74aaa59
Update convert_rows signature and add empty_rows
JayjeetAtGithub Sep 8, 2023
0d305ee
Add comments to the test
JayjeetAtGithub Sep 8, 2023
2391f43
Use Some and take() semantics
JayjeetAtGithub Sep 8, 2023
05685a1
Init with a row converter instance instead of none
JayjeetAtGithub Sep 8, 2023
f107f27
Remove unused variable
JayjeetAtGithub Sep 8, 2023
e44e600
Remove unused imports
JayjeetAtGithub Sep 8, 2023
068695e
Remove unused imports
JayjeetAtGithub Sep 8, 2023
c7fa020
Change GroupValues
JayjeetAtGithub Sep 8, 2023
4c054f6
Add comments, run fmt
alamb Sep 8, 2023
c58539b
Init with a empty row converter
JayjeetAtGithub Sep 8, 2023
74533c6
Use the cardinality aware row converter
JayjeetAtGithub Sep 8, 2023
b90f40d
Reconvert the group values
JayjeetAtGithub Sep 9, 2023
4cf0218
Rename wrapper to row_converter
JayjeetAtGithub Sep 9, 2023
eb81191
Recovert the group values
JayjeetAtGithub Sep 9, 2023
d22b645
Convert back to dictionary
JayjeetAtGithub Sep 10, 2023
aa24717
fmt
alamb Sep 11, 2023
dbd66f2
A fmt pass
JayjeetAtGithub Sep 11, 2023
8a42957
fix: fmt
alamb Sep 11, 2023
1a9a58d
Move the reconversion to dict to just consider group by columns
JayjeetAtGithub Sep 11, 2023
acf1cd4
Reconvert only the correct cols
JayjeetAtGithub Sep 11, 2023
6543b9d
Use assert eq
JayjeetAtGithub Sep 11, 2023
c6bf41a
clippy
alamb Sep 11, 2023
08c6f7d
clippy
alamb Sep 11, 2023
befc9b5
Merge branch 'sort-fix' of github.com:JayjeetAtGithub/arrow-datafusio…
alamb Sep 11, 2023
db800c4
Merge remote-tracking branch 'apache/main' into sort-fix
alamb Sep 11, 2023
c965bbe
Add comment about the reconversion to dict
JayjeetAtGithub Sep 11, 2023
8244c83
Fix the merge issues
JayjeetAtGithub Sep 11, 2023
235f3bc
move data type conversion
alamb Sep 12, 2023
f3eb44c
fix
alamb Sep 12, 2023
664e6a0
fix docs
alamb Sep 12, 2023
9a965ca
Merge remote-tracking branch 'apache/main' into sort-fix
alamb Sep 12, 2023
bbc5982
fix bug
alamb Sep 12, 2023
a1f69a7
Improve tests
alamb Sep 12, 2023
76feb4f
simplify
alamb Sep 12, 2023
137d78e
Use cardinality aware row converter in gby order
alamb Sep 12, 2023
65d31cc
clippy
alamb Sep 12, 2023
9b5681d
Adjust memory test
alamb Sep 12, 2023
b2fedac
Merge remote-tracking branch 'apache/main' into sort-fix
alamb Sep 13, 2023
b673c09
Add doc comments about row converter
alamb Sep 13, 2023
25861a7
remove outdated comment
alamb Sep 13, 2023
355ef73
Rework partition size calculation to make test clearer
alamb Sep 13, 2023
f48325a
Increase threshold to 512
JayjeetAtGithub Sep 15, 2023
22a90e7
Update row converter tests according to new threshold
JayjeetAtGithub Sep 15, 2023
819bd09
Merge remote-tracking branch 'apache/main' into sort-fix
alamb Sep 18, 2023
fdb4835
fix clippy
alamb Sep 18, 2023
b49a5bd
Merge remote-tracking branch 'apache/main' into sort-fix
alamb Sep 18, 2023
efca2b2
fix panic
alamb Sep 18, 2023
8498d04
Adjust constant for test
alamb Sep 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

122 changes: 96 additions & 26 deletions datafusion/core/tests/memory_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ async fn symmetric_hash_join() {

#[tokio::test]
async fn sort_preserving_merge() {
let partition_size = batches_byte_size(&dict_batches());
let scenario = Scenario::new_dictionary_strings(2);
let partition_size = scenario.partition_size();

TestCase::new()
// This query uses the exact same ordering as the input table
Expand All @@ -213,7 +214,7 @@ async fn sort_preserving_merge() {
// provide insufficient memory to merge
.with_memory_limit(partition_size / 2)
// two partitions of data, so a merge is required
.with_scenario(Scenario::DictionaryStrings(2))
.with_scenario(scenario)
.with_expected_plan(
// It is important that this plan only has
// SortPreservingMergeExec (not a Sort which would compete
Expand All @@ -238,7 +239,10 @@ async fn sort_preserving_merge() {

#[tokio::test]
async fn sort_spill_reservation() {
let partition_size = batches_byte_size(&dict_batches());
let scenario = Scenario::new_dictionary_strings(1)
// make the batches small enough to avoid triggering CardinalityAwareRowConverter
.with_single_row_batches(true);
let partition_size = scenario.partition_size();

let base_config = SessionConfig::new()
// do not allow the sort to use the 'concat in place' path
Expand All @@ -248,30 +252,30 @@ async fn sort_spill_reservation() {
// purposely sorting data that requires non trivial memory to
// sort/merge.
let test = TestCase::new()
// This query uses a different order than the input table to
// force a sort. It also needs to have multiple columns to
// force RowFormat / interner that makes merge require
// substantial memory
// This query uses a different order than the input table to
// force a sort. It also needs to have multiple columns to
// force RowFormat / interner that makes merge require
// substantial memory
.with_query("select * from t ORDER BY a , b DESC")
// enough memory to sort if we don't try to merge it all at once
// enough memory to sort if we don't try to merge it all at once
.with_memory_limit(partition_size)
// use a single partiton so only a sort is needed
.with_scenario(Scenario::DictionaryStrings(1))
// use a single partiton so only a sort is needed
.with_scenario(scenario)
.with_disk_manager_config(DiskManagerConfig::NewOs)
.with_expected_plan(
// It is important that this plan only has a SortExec, not
// also merge, so we can ensure the sort could finish
// given enough merging memory
&[
"+---------------+--------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+--------------------------------------------------------------------------------------------------------+",
"| logical_plan | Sort: t.a ASC NULLS LAST, t.b DESC NULLS FIRST |",
"| | TableScan: t projection=[a, b] |",
"| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] |",
"| | MemoryExec: partitions=1, partition_sizes=[5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
"| | |",
"+---------------+--------------------------------------------------------------------------------------------------------+",
"+---------------+----------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+----------------------------------------------------------------------------------------------------------+",
"| logical_plan | Sort: t.a ASC NULLS LAST, t.b DESC NULLS FIRST |",
"| | TableScan: t projection=[a, b] |",
"| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] |",
"| | MemoryExec: partitions=1, partition_sizes=[245], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
"| | |",
"+---------------+----------------------------------------------------------------------------------------------------------+",
]
);

Expand Down Expand Up @@ -471,11 +475,48 @@ enum Scenario {
/// [`StreamingTable`]
AccessLogStreaming,

/// N partitions of of sorted, dictionary encoded strings
DictionaryStrings(usize),
/// N partitions of of sorted, dictionary encoded strings.
DictionaryStrings {
partitions: usize,
/// If true, splits all input batches into 1 row each
single_row_batches: bool,
},
}

impl Scenario {
/// Create a new DictionaryStrings scenario with the number of partitions
fn new_dictionary_strings(partitions: usize) -> Self {
Self::DictionaryStrings {
partitions,
single_row_batches: false,
}
}

/// Should the input be split into 1 row batches?
fn with_single_row_batches(mut self, val: bool) -> Self {
if let Self::DictionaryStrings {
single_row_batches, ..
} = &mut self
{
*single_row_batches = val;
} else {
panic!("Scenario does not support single row batches");
}
self
}

/// return the size, in bytes, of each partition
fn partition_size(&self) -> usize {
if let Self::DictionaryStrings {
single_row_batches, ..
} = self
{
batches_byte_size(&maybe_split_batches(dict_batches(), *single_row_batches))
} else {
panic!("Scenario does not support partition size");
}
}

/// return a TableProvider with data for the test
fn table(&self) -> Arc<dyn TableProvider> {
match self {
Expand All @@ -500,11 +541,17 @@ impl Scenario {
.with_infinite_table(true);
Arc::new(table)
}
Self::DictionaryStrings(num_partitions) => {
Self::DictionaryStrings {
partitions,
single_row_batches,
} => {
use datafusion::physical_expr::expressions::col;
let batches: Vec<Vec<_>> = std::iter::repeat(dict_batches())
.take(*num_partitions)
.collect();
let batches: Vec<Vec<_>> = std::iter::repeat(maybe_split_batches(
dict_batches(),
*single_row_batches,
))
.take(*partitions)
.collect();

let schema = batches[0][0].schema();
let options = SortOptions {
Expand Down Expand Up @@ -544,7 +591,7 @@ impl Scenario {
// first
Some(vec![Arc::new(JoinSelection::new())])
}
Self::DictionaryStrings(_) => {
Self::DictionaryStrings { .. } => {
// Use default rules
None
}
Expand All @@ -559,6 +606,29 @@ fn access_log_batches() -> Vec<RecordBatch> {
.collect()
}

/// If `one_row_batches` is true, then returns new record batches that
/// are one row in size
fn maybe_split_batches(
batches: Vec<RecordBatch>,
one_row_batches: bool,
) -> Vec<RecordBatch> {
if !one_row_batches {
return batches;
}

batches
.into_iter()
.flat_map(|mut batch| {
let mut batches = vec![];
while batch.num_rows() > 1 {
batches.push(batch.slice(0, 1));
batch = batch.slice(1, batch.num_rows() - 1);
}
batches
})
.collect()
}

static DICT_BATCHES: OnceLock<Vec<RecordBatch>> = OnceLock::new();

/// Returns 5 sorted string dictionary batches each with 50 rows with
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ tempfile = "3"
#[dev-dependencies]
termtree = "0.4.1"
tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
uuid = { version = "^1.2", features = ["v4"] }
Loading