Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Further refine the Top K sort operator #9417

Open
gruuya opened this issue Mar 1, 2024 · 17 comments
Open

Further refine the Top K sort operator #9417

gruuya opened this issue Mar 1, 2024 · 17 comments
Labels
enhancement New feature or request

Comments

@gruuya
Copy link
Contributor

gruuya commented Mar 1, 2024

Is your feature request related to a problem or challenge?

The Top-K operator has recently been added for a specialized use case when encountering ORDER BY and LIMIT clauses together (#7250, #7721), as a way to optimize the memory usage of the sorting procedure.

Still the present implementation relies on keeping in memory the input record batches with potential row candidates for the final K output rows. This means that in the pathological case, there can be K batches in memory per the TopK operator, which are themselves spawned per input partition.

In particular this leads to the following error for ClickBench query 19:

% datafusion-cli -m 8gb
DataFusion CLI v36.0.0
❯ CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '/path/to/hits.parquet';
0 rows in set. Query took 0.032 seconds.

❯ SELECT "UserID", extract(minute FROM to_timestamp("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
Resources exhausted: Failed to allocate additional 220827928 bytes for TopK[3] with 883453086 bytes already allocated - maximum available is 150024911

In the above case I see 12 partitions x ~3.5 batches per TopK operator in memory x 223 MB per batch (which is kind of strange for 4 columns) = 9366 MB, thus peaking above the set memory limit of 8GB.

Describe the solution you'd like

Ideally something that doesn't hurt performance but reduces the memory footprint even more. Failing that, something that perhaps hurts performance only once the memory limit threshold has been surpassed (e.g. by spilling), but without crashing the query.

Describe alternatives you've considered

Option 1

Increasing or not setting a memory limit.

Option 2

Introduce spilling to disk for the TopK operator as a fallback when the memory limit is hit.

Option 3

Potentially something like converting the column arrays of the input record batch to rows, like for the evaluated sort keys
https://github.com/apache/arrow-datafusion/blob/b2ff249bfb918ac6697dbc92b51262a7bdbb5971/datafusion/physical-plan/src/topk/mod.rs#L163
and then making TopKRow track the projected rows, in addition to the sort keys, but compare only against the sort key. This would enable the BinaryHeap to discard the unneeded rows.

Finally one could use arrow_row::RowConverter::convert_rows to get back the columns when emiting.

However this is almost guaranteed to lead to worse performance in the general case due to all of the row-conversion taking place.

Additional context

Potentially relevant for #7195.

@yjshen
Copy link
Member

yjshen commented Mar 3, 2024

TL;DR: The issue is caused by "double" memory accounting for sliced batches in AggExec and TopkExec.


The primary cause of resource exhaustion is incorrect memory accounting for record batches stored in TopK's RecordBatchStore, as highlighted in the issue description (approximately 220MB per batch). Upon inspecting the memory size calculation output:

Getting mem size of batch in topk::insert with batch size: 8192
Column 0 mem: 37561184
Column 1 mem: 37561184
Column 2 mem: 78416312
Column 3 mem: 72507488
Inserting batch with mem size: 226046168

It becomes evident that the batch is a zero-copy slice of a larger batch, resulting in a discrepancy between actual and expected memory used by TopK, considering only 8192 rows.

Analyzing the physical plan:

| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|               |   SortPreservingMergeExec: [COUNT(*)@3 DESC], fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |     SortExec: TopK(fetch=10), expr=[COUNT(*)@3 DESC]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |       ProjectionExec: expr=[UserID@0 as UserID, date_part(Utf8("MINUTE"),to_timestamp(hits.EventTime))@1 as m, SearchPhrase@2 as SearchPhrase, COUNT(*)@3 as COUNT(*)]                                                                                                                                                                                                                                                                                                                                                                         |
|               |         AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID, date_part(Utf8("MINUTE"),to_timestamp(hits.EventTime))@1 as date_part(Utf8("MINUTE"),to_timestamp(hits.EventTime)), SearchPhrase@2 as SearchPhrase], aggr=[COUNT(*)]                                                                                                                                                                                                                                                                                                    |
|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |             RepartitionExec: partitioning=Hash([UserID@0, date_part(Utf8("MINUTE"),to_timestamp(hits.EventTime))@1, SearchPhrase@2], 12), input_partitions=12                                                                                                                                                                                                                                                                                                                                                                                  |
|               |               AggregateExec: mode=Partial, gby=[UserID@1 as UserID, date_part(MINUTE, to_timestamp(EventTime@0)) as date_part(Utf8("MINUTE"),to_timestamp(hits.EventTime)), SearchPhrase@2 as SearchPhrase], aggr=[COUNT(*)]                                                                                                                                                                                                                                                                                                                   |
|               |                 ParquetExec: file_groups={12 groups: ....

and AggExec:

https://github.com/apache/arrow-datafusion/blob/d5b635945307d5c7fe6fa10d3f65ee1ba2d58a5a/datafusion/physical-plan/src/aggregates/row_hash.rs#L493-L497

For the current plan, we can see that each batch we insert into TopK is a slice of the Agg output batch, which AggExec should track. We need to avoid double memory accounting for sliced batches to fix this issue.

And for option3, there is maybe_compact in TopK serving a similar purpose, but still keeps relevant records in record batch.

@gruuya
Copy link
Contributor Author

gruuya commented Mar 3, 2024

The issue is caused by "double" memory accounting for sliced batches in AggExec and TopkExec.

I thought something similar, namely that TopKExec makes some errors in memory accounting but then observing the actual memory usage seems to indicate that the peak is real.

  • just prior to executing the ClickBench query
image
  • peak memory, prior to DataFusionError::ResourcesExhausted error getting thrown
image

@Dandandan
Copy link
Contributor

Dandandan commented Mar 3, 2024

Note that in this query, AggregateExec needs a large amount of memory, as we need to keep all the data in memory before sorting/extract top k counts, so relatively high memory usage is to be expected (even though we probably can optimize it further).

@gruuya
Copy link
Contributor Author

gruuya commented Mar 3, 2024

Note that in this query, AggregateExec needs a large amount of memory, as we need to keep all the data in memory before sorting/extract top k counts, so relatively high memory usage is to be expected (even though we probably can optimize it further).

Ah good point, the cardinality of the input grouped data is indeed very large (~17M).

Indeed, a quick google search brought up the following recent paper (citing DataFusion/your blog post) about a new high-cardinality top K aggregation technique: https://www.microsoft.com/en-us/research/publication/cache-efficient-top-k-aggregation-over-high-cardinality-large-datasets/

@yjshen
Copy link
Member

yjshen commented Mar 6, 2024

From DataFusion's memory management perspective, I found that get_slice_memory_size, introduced in apache/arrow-rs#3501, better serves our requirements.

I suggest we have RecordBatch::get_effective_memory_size() in DF and use get_slice_memory_size to account for memory usage. Thoughts?

@alamb
Copy link
Contributor

alamb commented Mar 10, 2024

I agree with @Dandandan in #9417 (comment) that the core problem is with accounting

  1. The AggregateExec generates one single (giant) RecordBatch on output (source)
  2. Which is then emitted in parts (via RecordBatch::slice(), which does not actually allocate any additional memory) (source) -- note this means no memory is freed until the GroupByHash has output all the output
  3. The TopK operator, however, then treats each incoming RecordBatch as though it were were an additional allocation that needs to be tracked (source)

If we had infinite time / engineering hours I think a better approach would actually be to change GroupByHash so it didn't create a single giant contiguous RecordBatch

Instead it would be better if GroupByHash produced a Vec<RecordBatch> and then incrementally fed those batches out

Doing this would allow the GroupByHash to release memory incrementally as it output. This is analogous to how @korowa made join output incremental in #8658

@alamb
Copy link
Contributor

alamb commented Mar 10, 2024

If incremental output of Grouping sounds reasonable to people I can file a follow on ticket to track the work.

@yjshen
Copy link
Member

yjshen commented Mar 10, 2024

I agree that the core problem for the issue is accounting and that the most overreported batch slice would come from AggExec's mono output record batch. But I also believe there's a distinction between optimizing AggExec's output pattern and handling memory accounting.

To improve AggExec's mono output pattern, #7065 might be similar to the idea of incremental output.

Regarding the memory accounting side, I'm curious if you have considered alternatives that allow for more accurate accounting for different batches. The idea of having sliced batches not reporting their memory usage or using get_slice_memory_size for reporting is a good starting point. What do you think about this?

@ozankabak
Copy link
Contributor

If incremental output of Grouping sounds reasonable to people I can file a follow on ticket to track the work.

Yes, please do

@Dandandan
Copy link
Contributor

If incremental output of Grouping sounds reasonable to people I can file a follow on ticket to track the work.

Makes sense to me as well, thank you 🙏

@alamb
Copy link
Contributor

alamb commented Mar 11, 2024

But I also believe there's a distinction between optimizing AggExec's output pattern and handling memory accounting.

I agree

Regarding the memory accounting side, I'm curious if you have considered alternatives that allow for more accurate accounting for different batches. The idea of having sliced batches not reporting their memory usage or using get_slice_memory_size for reporting is a good starting point. What do you think about this?

I think it is tricky business and depends on what we are using the memory accounting for

At the moment I think the memory accounting is mostly to prevent OOM kills (over commit of memory), since memory for a sliced RecordBatch is not returned to the OS , a plan with a 3 row slice of a 1M row RecordBatch still is "using" all 1M rows from the OS perspective

However, ensuring we don't double count is important too (like two slices to the same 1M row RecordBatch will count as a total of 2M rows, even though there only a single allocation).

@alamb
Copy link
Contributor

alamb commented Mar 11, 2024

Filed #9562 to track incremental group by output

@samuelcolvin
Copy link
Contributor

I think #10511 is related to this, except it's using ExternalSorterMerge which from the name has no "top K" behaviour, hence the very high memory footprint?

@Dandandan
Copy link
Contributor

Coming back to this, I guess if we can implement another option without implementing spilling: force compaction once we hit the limit.
This probably slows down some queries if memory usage limit is hit, but the query will fail otherwise anyway.

@alamb
Copy link
Contributor

alamb commented Oct 23, 2024

Coming back to this, I guess if we can implement another option without implementing spilling: force compaction once we hit the limit. This probably slows down some queries if memory usage limit is hit, but the query will fail otherwise anyway.

I think this is a great idea

@alamb
Copy link
Contributor

alamb commented Oct 23, 2024

Implementing this "reduce memory usage when under pressure" might be a more interesting general approach to improve DataFusion's performance under memory pressure (e.g. maybe we can trigger other operators to clear memory (like partial aggregates) when we hit memory pressure 🤔

@Dandandan
Copy link
Contributor

Implementing this "reduce memory usage when under pressure" might be a more interesting general approach to improve DataFusion's performance under memory pressure (e.g. maybe we can trigger other operators to clear memory (like partial aggregates) when we hit memory pressure 🤔

That's an interesting idea :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants