Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory limited hash join #5490

Merged
merged 1 commit into from
Mar 9, 2023
Merged

Memory limited hash join #5490

merged 1 commit into from
Mar 9, 2023

Conversation

korowa
Copy link
Contributor

@korowa korowa commented Mar 6, 2023

Which issue does this PR close?

Part of #5339
Part of #5220

Rationale for this change

Control over memory allocations made by HashJoinExec

What changes are included in this PR?

Major change is MemoryReservation.try_grow calls added to 3 places where allocations for build-side data and related structures are performed:

  1. build-side record batches collection -- similar to CrossJoinExec memory reserved for each RecordBatch fetched from build-side, reservation size calculated using already fetched batch, which could lead to insignificant (no more then single batch size) overallocations
  2. build-side hashtable -- there is only one allocation happens during RawTable::with_capacity() and it could be massive, so it seems reasonable to preemptively estimate the size required for RawTable and reserve it in pool -- the calculations of the size are described in comment
  3. visited_left_side bitmap in each stream (if required by join type) -- like in case with hashtable, there is an attempt to estimate required size and reserve memory before ArrayBuilder creation.

Another change is stream/operator-level reservation (also affects CrossJoinExec) -- storing MemoryReservation, and freeing it after ...JoinStream (how it was done in #5339) doesn't seem to be correct for CrossJoinExec and HashJoinExec with PartitionMode::CollectLeft for a couple of reasons:

  1. only one stream out of multiple partitions executes collect_left_data function and reserves memory -- in case of partition processing time being skewed, the only stream with reservation could be dropped before Join execution is completed
  2. LeftData stored in ...JoinExec rather than in ...ExecStream, and the object could potentially outlive reservation

These issues are fixed by OperatorMemoryReservation added to both CrossJoinExec and HashJoinExec -- for hash joins this reservation used only for PartitionedMode::CollectLeft cases, in case of parittioned joins -- stream-level memory reservations are used.

Are these changes tested?

Test cases for overallocation has been added for all JoinTypes, for both CollectLeft and Partitioned modes.

Are there any user-facing changes?

HashJoinExec should now respect runtime memory limitations.

@github-actions github-actions bot added the core Core DataFusion crate label Mar 6, 2023
@alamb
Copy link
Contributor

alamb commented Mar 7, 2023

Thank you @korowa -- I plan to review this PR carefully tomorrow morning

@Dandandan
Copy link
Contributor

Nice PR!

I think it would be great if we could run some benchmarks to show that we're not regressing too much (e.g. running tpch benchmark queries with joins). Some reasons I defaulted to initializing the hashmap using the size of the left side is as following:

  • The build side (for the partition) already has to be loaded into memory, and usually will at least as much and often more memory than the hash table
  • For many cases (e.g. unique identifiers) we need this capacity and the estimate is optimal
  • Rebuilding the hash table can be slow (although some improvements were made in this area)

@korowa
Copy link
Contributor Author

korowa commented Mar 8, 2023

Thanks @Dandandan!

I think it would be great if we could run some benchmarks to show that we're not regressing too much (e.g. running tpch benchmark queries with joins).

I've done a couple of iterations for

cargo run --release --bin tpch -- benchmark datafusion --iterations 5 --path ./parquet --format parquet

With the result (a bit floating but I can't see any noticeable regressions -- ofc I may be wrong):

Query               branch         main
----------------------------------------------
Query 1 avg time:   1047.93 ms     1135.36 ms
Query 2 avg time:   280.91 ms      286.69 ms
Query 3 avg time:   323.87 ms      351.31 ms
Query 4 avg time:   146.87 ms      146.58 ms
Query 5 avg time:   482.85 ms      463.07 ms
Query 6 avg time:   274.73 ms      342.29 ms
Query 7 avg time:   750.73 ms      762.43 ms
Query 8 avg time:   443.34 ms      426.89 ms
Query 9 avg time:   821.48 ms      775.03 ms
Query 10 avg time:  585.21 ms      584.16 ms
Query 11 avg time:  247.56 ms      232.90 ms
Query 12 avg time:  258.51 ms      231.19 ms
Query 13 avg time:  899.16 ms      885.56 ms
Query 14 avg time:  300.63 ms      282.56 ms
Query 15 avg time:  346.36 ms      318.97 ms
Query 16 avg time:  198.33 ms      184.26 ms
Query 17 avg time:  4197.54 ms     4101.92 ms
Query 18 avg time:  2726.41 ms     2548.96 ms
Query 19 avg time:  566.67 ms      535.74 ms
Query 20 avg time:  1193.82 ms     1319.49 ms
Query 21 avg time:  1027.00 ms     1050.08 ms
Query 22 avg time:  120.03 ms      111.32 ms

Also

cargo run --release --bin tpch -- benchmark datafusion --query 17 --iterations 5 --path ./parquet --format parquet --debug | grep -e "build_time" -e "iteration"

is not showing any difference in join build / probe times. Full log is in attachment.
mem_mgmt_benches.txt

As for now, I tend to think that these changes do not affect performance significantly (due to the fact that reservations and memory estimations executed only once per partition/query), but I'll be happy to perform any additional checks.

Some reasons I defaulted to initializing the hashmap using the size of the left side is as following

All of them look pretty reasonable and I agree with this approach, I was just trying to explain why I've decided to duplicate calculations from some functions subsequently called by RawTable.with_capacity.

@Dandandan
Copy link
Contributor

Thanks @Dandandan!

I think it would be great if we could run some benchmarks to show that we're not regressing too much (e.g. running tpch benchmark queries with joins).

I've done a couple of iterations for

cargo run --release --bin tpch -- benchmark datafusion --iterations 5 --path ./parquet --format parquet

With the result (a bit floating but I can't see any noticeable regressions -- ofc I may be wrong):

Query               branch         main
----------------------------------------------
Query 1 avg time:   1047.93 ms     1135.36 ms
Query 2 avg time:   280.91 ms      286.69 ms
Query 3 avg time:   323.87 ms      351.31 ms
Query 4 avg time:   146.87 ms      146.58 ms
Query 5 avg time:   482.85 ms      463.07 ms
Query 6 avg time:   274.73 ms      342.29 ms
Query 7 avg time:   750.73 ms      762.43 ms
Query 8 avg time:   443.34 ms      426.89 ms
Query 9 avg time:   821.48 ms      775.03 ms
Query 10 avg time:  585.21 ms      584.16 ms
Query 11 avg time:  247.56 ms      232.90 ms
Query 12 avg time:  258.51 ms      231.19 ms
Query 13 avg time:  899.16 ms      885.56 ms
Query 14 avg time:  300.63 ms      282.56 ms
Query 15 avg time:  346.36 ms      318.97 ms
Query 16 avg time:  198.33 ms      184.26 ms
Query 17 avg time:  4197.54 ms     4101.92 ms
Query 18 avg time:  2726.41 ms     2548.96 ms
Query 19 avg time:  566.67 ms      535.74 ms
Query 20 avg time:  1193.82 ms     1319.49 ms
Query 21 avg time:  1027.00 ms     1050.08 ms
Query 22 avg time:  120.03 ms      111.32 ms

Also

cargo run --release --bin tpch -- benchmark datafusion --query 17 --iterations 5 --path ./parquet --format parquet --debug | grep -e "build_time" -e "iteration"

is not showing any difference in join build / probe times. Full log is in attachment. mem_mgmt_benches.txt

As for now, I tend to think that these changes do not affect performance significantly (due to the fact that reservations and memory estimations executed only once per partition/query), but I'll be happy to perform any additional checks.

Some reasons I defaulted to initializing the hashmap using the size of the left side is as following

All of them look pretty reasonable and I agree with this approach, I was just trying to explain why I've decided to duplicate calculations from some functions subsequently called by RawTable.with_capacity.

Awesome, thanks for checking. It might be worth it to repeat the benchmark run with -m (loading to in-memory table) to get the Parquet scan out of the equation.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went over this change very carefully @korowa -- really nice work 🏆

I am running the benchmarks but assuming they look good I think this PR can be merged.

cc @liukun4515

/// [`MemoryReservation`] used at query operator level
/// `Option` wrapper allows to initialize empty reservation in operator constructor,
/// and set it to actual reservation at stream level.
pub(crate) type OperatorMemoryReservation = Arc<Mutex<Option<SharedMemoryReservation>>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the double layer of Mutex is necessary (Mutex over an Option of a Mutex)

Update after reading the code I see why it is necessary.

Perhaps it would make the code clearer (as a follow on PR, perhaps) if we had a proper SharedMemoryReservation that could be cloned and handled mutability internally. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I haven't found better solution yet, but I'll check if there is better way to wrap MemoryReservation into something allowing attempts of creation (or initialization) reservation from multiple streams.

Another solution, probably, is to allow to allow creation of MemoryReservation without providing context (in HashJoinExec::new), and registering it in MemoryPool later (in HashJoinExec.execute, when we have context available).

Copy link
Contributor Author

@korowa korowa Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if there are no objections, I'd prefer to figure out proper way to handle operator-level reservations as a separate follow-up PR, or during adding memory limitations to one of remaining joins (at least NL will have the same issue 100%)

@@ -1173,6 +1149,18 @@ impl HashJoinStream {
};
build_timer.done();

// Reserving memory for visited_left_side bitmap in case it hasn't been initialied yet
// and join_type requires to store it
if self.visited_left_side.is_none()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be able to make this less verbose by moving the initialization into the initialization of visited_left_size

        let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
            let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(), 8);
            self.reservation.lock().try_grow(visited_bitmap_size)?;
            self.join_metrics.build_mem_used.add(visited_bitmap_size);
            ... 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was my first intention, but then I've realized that Option doesn't have fallible versions of get_or_insert-like methods (i.e.try_get_or_insert_with in this case would be just right), and, the only alternative I saw, was storing visited_left_side as a `Result<_, DataFusionError> which, I guess, would also be not really clean.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran with the tpch query benchmark with SF10 (10GB) using memory per @Dandandan 's suggestion

cargo run --release --bin tpch -- benchmark datafusion -m --query 17 --iterations 5 --path ~/tpch_data/parquet_data_SF10 --format parquet

Here is the result on main (merge-base with this PR, deeaa56)

Query 17 iteration 0 took 86475.5 ms and returned 1 rows
Query 17 iteration 1 took 80791.0 ms and returned 1 rows
Query 17 iteration 2 took 79514.7 ms and returned 1 rows
Query 17 iteration 3 took 81253.2 ms and returned 1 rows
Query 17 iteration 4 took 99265.5 ms and returned 1 rows
Query 17 avg time: 85459.99 ms

Here it is on this branch (if anything it looks faster 🤔 )

Query 17 iteration 0 took 80484.2 ms and returned 1 rows
Query 17 iteration 1 took 87880.9 ms and returned 1 rows
Query 17 iteration 2 took 82665.1 ms and returned 1 rows
Query 17 iteration 3 took 83641.5 ms and returned 1 rows
Query 17 iteration 4 took 83985.6 ms and returned 1 rows
Query 17 avg time: 83731.47 ms

Nice work


reservation.lock().try_grow(estimated_hastable_size)?;
metrics.build_mem_used.add(estimated_hastable_size);

let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I didn't realize the allocation is the same, only guarding against overallocation👍

@korowa
Copy link
Contributor Author

korowa commented Mar 8, 2023

I've also benchmarked vs main using -m flag -- and despite it should be cleaner due to excluding file scanning from execution time, there still are fluctuations in both sides

Query               branch         main
----------------------------------------------
Query 1 avg time:   668.26 ms      652.55 ms
Query 2 avg time:   212.92 ms      216.46 ms
Query 3 avg time:   113.40 ms      116.00 ms
Query 4 avg time:   73.98 ms       65.35 ms
Query 5 avg time:   333.21 ms      318.39 ms
Query 6 avg time:   26.49 ms       28.68 ms
Query 7 avg time:   690.75 ms      732.79 ms
Query 8 avg time:   151.35 ms      170.01 ms
Query 9 avg time:   378.67 ms      415.37 ms
Query 10 avg time:  228.83 ms      252.53 ms
Query 11 avg time:  240.78 ms      216.22 ms
Query 12 avg time:  98.05 ms       96.31 ms
Query 13 avg time:  657.82 ms      638.33 ms
Query 14 avg time:  38.87 ms       35.11 ms
Query 15 avg time:  101.51 ms      109.23 ms
Query 16 avg time:  149.93 ms      149.01 ms
Query 17 avg time:  3713.62 ms     3952.60 ms
Query 18 avg time:  2312.22 ms     2463.61 ms
Query 19 avg time:  106.72 ms      107.72 ms
Query 20 avg time:  1047.35 ms     1022.07 ms
Query 21 avg time:  892.56 ms      855.86 ms
Query 22 avg time:  87.27 ms       73.53 ms

@Dandandan
Copy link
Contributor

It looks like the changes don't affect performance, which is perfect 👍

@alamb alamb merged commit f5d23ff into apache:main Mar 9, 2023
@alamb
Copy link
Contributor

alamb commented Mar 9, 2023

Thanks again @korowa !

@ursabot
Copy link

ursabot commented Mar 9, 2023

Benchmark runs are scheduled for baseline = ff013e2 and contender = f5d23ff. f5d23ff is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@korowa
Copy link
Contributor Author

korowa commented Mar 9, 2023

Thank you @alamb @Dandandan !

I've also created #5531 to support passing boolean by values in one of the tests added in this PR 😞

@alamb
Copy link
Contributor

alamb commented Mar 9, 2023

I've also created #5531 to support passing boolean by values in one of the tests added in this PR 😞

Thank you for your diligence here 🏅

@Dandandan
Copy link
Contributor

Nice, thanks @korowa !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants