Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of db-benchmark query 8 #13586

Open
Dandandan opened this issue Nov 27, 2024 · 13 comments
Open

Improve performance of db-benchmark query 8 #13586

Dandandan opened this issue Nov 27, 2024 · 13 comments
Assignees
Labels
enhancement New feature or request

Comments

@Dandandan
Copy link
Contributor

Is your feature request related to a problem or challenge?

Query 8 in db-benchmark is slower than other queries
https://github.com/MrPowers/mrpowers-benchmarks

The query is as follows

select id6, largest2_v3 from
  (select id6, v3 as largest2_v3, row_number() over (partition by id6 order by v3 desc) as order_v3
  from x
  where v3 is not null) sub_query
where order_v3 <= 2

Describe the solution you'd like

  • Profile / analyze this query
  • Improve performance

It might be the part row_number() over (partition by id6 order by v3 desc) or rather the sorting is the most expensive.

`

Describe alternatives you've considered

No response

Additional context

No response

@Dandandan Dandandan added the enhancement New feature or request label Nov 27, 2024
@alan910127
Copy link
Contributor

take

@Dandandan
Copy link
Contributor Author

Dandandan commented Nov 29, 2024

I didn't profile yet, but one potentially problematic line I found here

concat_batches(self.input_schema(), [input_buffer, &record_batch])?
:

This concatenates [input_buffer, &record_batch].

Changing the input_buffer state to a Vec<RecordBatch> and delaying concatenating would be better as concatenating in a loop is O(n^2) and has way more overhead.

@alan910127
Copy link
Contributor

alan910127 commented Nov 30, 2024

Hi @Dandandan, I tried profiling the execution of datafusion-cli with the following command:

CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph -o q8.svg -- -f q8.sql

Note: I have an x directory generated by falsa with

falsa groupby --path-prefix=./x --size MEDIUM --data-format PARQUET

From the flamegraph, I see that the concat_batches function you mentioned only takes < 5% of the total time. Since I'm not very good at this type of performance optimization, I'm unsure whether this is the primary issue. I may need more time to investigate further.

EDIT:
concat_batches is taking ~56% of the time running update_partition_batch

@Dandandan
Copy link
Contributor Author

concat_batches is taking ~56% of the time running update_partition_batch

That is an interesting finding. Any chance you could share the flamegraph?

I think for concat_batches: I think a solution could be to use Vec<RecordBatch> for the state and delay concat_batches

@alan910127
Copy link
Contributor

Any chance you could share the flamegraph?

Certainly! Here is it:

q8

@Dandandan
Copy link
Contributor Author

Hm so most of the time indeed seems spent in sort / merge, so I think that has the highest priority.

@alan910127
Copy link
Contributor

I’ve generated another flamegraph using --inverted --reverse, but it’s too large to include directly in this comment. Do you have any suggestions for sharing it?

@alan910127
Copy link
Contributor

alan910127 commented Dec 3, 2024

I found out that <arrow_row::Row as core::cmp::Ord>::cmp is consuming a significant amount of time in is_gt within update_loser_tree. It eventually calls SliceOrd::compare, which compares each element in the slices until it finds a non-equal case or, if all elements are equal, compares the lengths. Do we have any possible heuristics to avoid comparing the slices?

EDIT: I used Compiler Explorer to check what SliceOrd::compare::<u8> (which is what Row::cmp calls) does, and saw that it calls memcmp under the hood. Given that the [libc.so.6] portion appears quite large within Row::cmp in the flamegraph, I suspect that using heuristics to avoid comparing slices could be beneficial.

@Dandandan
Copy link
Contributor Author

Dandandan commented Dec 4, 2024

SortPreservingMergeStream now works as follows since #3386

  • Converting rows to row-format (which involves copying / converting the datasets to some byte format)
  • Comparing the sorted partitions/streams against each other (via Compare), choosing the smallest/biggest row

I think for single column sorts (like this query), we should be able to avoid converting to Row and comparing the (primitive) values instead, which should speedup single column sorts as it avoids the conversion step and will allow comparing the primitive values.

SortExec uses single-column sort already.

@akurmustafa
Copy link
Contributor

I have generated the plan for the query above, which is as follows

logical_plan
01)SubqueryAlias: sub_query
02)--Projection: x.id6, x.v3 AS largest2_v3
03)----Filter: row_number() PARTITION BY [x.id6] ORDER BY [x.v3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= UInt64(2)
04)------WindowAggr: windowExpr=[[row_number() PARTITION BY [x.id6] ORDER BY [x.v3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
05)--------Filter: x.v3 IS NOT NULL
06)----------TableScan: x projection=[id6, v3]
physical_plan
01)ProjectionExec: expr=[id6@0 as id6, v3@1 as largest2_v3]
02)--CoalesceBatchesExec: target_batch_size=8192
03)----FilterExec: row_number() PARTITION BY [x.id6] ORDER BY [x.v3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 <= 2, projection=[id6@0, v3@1]
04)------BoundedWindowAggExec: wdw=[row_number() PARTITION BY [x.id6] ORDER BY [x.v3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "row_number() PARTITION BY [x.id6] ORDER BY [x.v3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]
05)--------SortExec: expr=[id6@0 ASC NULLS LAST, v3@1 DESC], preserve_partitioning=[true]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------RepartitionExec: partitioning=Hash([id6@0], 4), input_partitions=4
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------CoalesceBatchesExec: target_batch_size=8192
10)------------------FilterExec: v3@1 IS NOT NULL
11)--------------------MemoryExec: partitions=1, partition_sizes=[1]

In this plan, there is no SortPreservingMerge. However, maybe the setting is different in original experiment. I have generated above plan, by default settings and constructing a dummy table x as below

statement ok
CREATE TABLE x (
    id6 INT,
    v3 INT
);

statement ok
INSERT INTO x (id6, v3) VALUES
(0, 3);

Is there any way to see the plan of the query during benchmark run?

@Dandandan
Copy link
Contributor Author

Possibly it's used via SortExec? This AFAIK also merges larger inputs via SortPreservingMergeStream.

@akurmustafa
Copy link
Contributor

Possibly it's used via SortExec? This AFAIK also merges larger inputs via SortPreservingMergeStream.

You are right, I missed this. I think ExternalSorter uses merging, while merging chunks written to the disc.

@Dandandan
Copy link
Contributor Author

Possibly it's used via SortExec? This AFAIK also merges larger inputs via SortPreservingMergeStream.

You are right, I missed this. I think ExternalSorter uses merging, while merging chunks written to the disc.

I think even when not writing to disk if the size of batches are bigger than self.sort_in_place_threshold_bytes it will use SPM. This could be something to tweak as well 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants