Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet parallel scan #5057

Merged
merged 8 commits into from
Jan 30, 2023
Merged

Parquet parallel scan #5057

merged 8 commits into from
Jan 30, 2023

Conversation

korowa
Copy link
Contributor

@korowa korowa commented Jan 25, 2023

Which issue does this PR close?

Closes #137.

Rationale for this change

Improved performance for reading single parquet file / parquet files in quantity less than number of cores in multicore runtime

What changes are included in this PR?

  1. repartition_file_scans & repartition_file_min_size optimizer settings - by default repartitioning of file scans disabled, and performed if total size of files to scan greater than 10MB (to avoid splitting small amount of small files)
  2. get_repartitioned in ParquetExec - returns cloned object with ranged PartitionedFiles redistributed file groups in base_config
  3. repartition.rs - now calls get_repartitioned for ParquetExec in case repartitioning is allowed (upstream operator benefit from it / no data ordering violations / etc.)

As any other repartitioning operation, parallelization is applied only in case ParquetExec is underloaded in terms of partitions -- two files scan will be distributed over 4 partitions, but no redistribution will be performed in case of "2 files - 2 target partitions".

Are these changes tested?

Tests for ParquetExec.get_repartitioned added, and more tests added for repartition rule -- mostly copies of existing tests for cases when parallelization should be ignored, to ensure it won't break physical plans.

Are there any user-facing changes?

New configuration settings repartition_file_scans & repartition_file_min_size

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Jan 25, 2023
@tustvold
Copy link
Contributor

Only had time to take a brief look at this PR, and so I'm likely missing something but please bear with me 😄

This PR modifies ListingTable to pair together PartitionedFile with Vec<Option<FileRange>>, this makes this approach specific to ListingTable and also adds parallelism control to a part of the system that doesn't really have context on how much parallelism is needed, nor what invariants such as sort orders may need to be upheld.

I have two suggestions that may be stupid:

  • Make this a physical optimizer rule that looks at operators containing FileScanConfig and adds more partitions based on the target_partitions property
  • Rather than adding a new FileRanges property, instead using the existing range: Option<FileRange> already present on PartitionedFile, the same file with disjoint ranges can then appear in multiple partitions

};

if collect_file_ranges {
let file_ranges = parquet_metadata
Copy link
Contributor

@tustvold tustvold Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW the way these ranges are applied in parquet is based on if the row group's midpoint lies within the given range, as a result there is no requirement that these ranges exactly delimit boundaries.

For example you could take a parquet file of 2GB and blindly chop it into 4x 512MB slices. This makes the assumption that there are at least 4 row groups and the row groups are similarly sized, in practice this is probably fine. This is what Spark does and avoids needing the file's metadata to do the optimisation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, cutting file on N even parts will allow to read only row groups having their start offset inside corresponding ranges without any duplicate reads or skipped row groups - so, splitting could be much easier without using metadata (except for ObjectMetadata for the size)

@korowa
Copy link
Contributor Author

korowa commented Jan 25, 2023

@tustvold , thank you for the comments!

Initially my intention was to handle scan planning as early as possible, so ListingTable looked like proper place for this - it actually holds parallelism settings in options attribute, and fetches required metadata.

But, yeah, now I see that physical optimizer, and especially its repartitioning rule is much better suited for repatitioning ParquetExec 🤔 .

I guess I'll convert this PR to draft and come up a bit later with updated version of this optimizer rule.

@korowa korowa marked this pull request as draft January 25, 2023 18:54
@korowa korowa marked this pull request as ready for review January 27, 2023 10:01
@korowa
Copy link
Contributor Author

korowa commented Jan 27, 2023

I've reworked this PR by utilizing repartition rule, and updated the description -- it looks way more simple now, and, what's more important, it takes into account any plan-related restrictions for ParquetExec.

@tustvold, thank you for the tip about using physical optimizer!

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really cool, left some minor comments

datafusion/core/src/physical_optimizer/repartition.rs Outdated Show resolved Hide resolved
@@ -846,6 +871,182 @@ mod tests {
Ok(())
}

#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the test coverage

@korowa korowa force-pushed the parquet_parallel_scan branch 2 times, most recently from 53e8fc5 to d6e95f7 Compare January 27, 2023 17:52
@alamb
Copy link
Contributor

alamb commented Jan 27, 2023

I plan to review this carefully either later today or tomorrow. Very exciting @korowa -- thank you

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the code carefully and I really like it. Thank you @korowa -- do you have any performance benchmarks you can share? I think this will mostly help when scanning single large parquet files.

I would like to explore turning this feature on by default (perhaps we can have a separate ticket to track that)

I already feel bad that we don't have other parquet options enabled by default.

datafusion/common/src/config.rs Outdated Show resolved Hide resolved
datafusion/core/src/physical_optimizer/repartition.rs Outdated Show resolved Hide resolved
datafusion/core/src/physical_optimizer/repartition.rs Outdated Show resolved Hide resolved
@alamb
Copy link
Contributor

alamb commented Jan 28, 2023

My measurements suggest this setting can improve the performance with single large parquet files significantly (over 2x in my measurement). 👨‍🍳 👌 -- very nice

I tested this out by making a 9G parquet file from https://github.com/tustvold/access-log-gen/

Then using datafusion-cli:

select avg(request_bytes), avg(response_bytes), avg(response_status), host from '/Users/alamb/Software/access-log-gen/logs.9G.parquet' group by host;
...
927 rows in set. Query took 2.313 seconds.

And then I enabled this setting:

set datafusion.optimizer.repartition_file_scans = true;
0 rows in set. Query took 0.000 seconds.
❯ select avg(request_bytes), avg(response_bytes), avg(response_status), host from '/Users/alamb/Software/access-log-gen/logs.9G.parquet' group by host;

927 rows in set. Query took 0.962 seconds.

😮

@korowa
Copy link
Contributor Author

korowa commented Jan 28, 2023

I have only these in my notes

// Single partition
./target/release/examples/clickbench  4.10s user 0.59s system 113% cpu 4.142 total
./target/release/examples/clickbench  4.14s user 0.56s system 136% cpu 3.451 total
./target/release/examples/clickbench  4.13s user 0.55s system 135% cpu 3.446 total
./target/release/examples/clickbench  4.29s user 0.55s system 134% cpu 3.603 total

// Multiple partition
./target/release/examples/clickbench  5.32s user 0.96s system 288% cpu 2.174 total
./target/release/examples/clickbench  5.14s user 0.89s system 392% cpu 1.534 total
./target/release/examples/clickbench  5.24s user 0.92s system 405% cpu 1.517 total
./target/release/examples/clickbench  5.14s user 0.91s system 399% cpu 1.516 total
./target/release/examples/clickbench  5.23s user 0.92s system 405% cpu 1.518 total

which doesn't look like "official" benchmark at all 🙃 . These are results for this query from clickbench over 14GB parquet file on 2.6 GHz 6-Core Intel Core i7

korowa and others added 2 commits January 28, 2023 16:46
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
@korowa
Copy link
Contributor Author

korowa commented Jan 28, 2023

I think this will mostly help when scanning single large parquet files.

As it works for now - yes, use case is mostly "relatively large files less than number of target_partitions" -- I guess it could be improved / reworked later to something like "perform repartitioning even for target_partitions in case there is significant skew in current partitioning"

I would like to explore turning this feature on by default (perhaps we can have a separate ticket to track that)

I don't mind enabling parallelism by default and it seems to be the fastest way to deliver this feature, but (I'm not sure, just a suggestion) maybe better time for this will be in 1 (or 2) releases after the setting itself will be released?

@alamb
Copy link
Contributor

alamb commented Jan 28, 2023

As it works for now - yes, use case is mostly "relatively large files less than number of target_partitions" -- I guess it could be improved / reworked later to something like "perform repartitioning even for target_partitions in case there is significant skew in current partitioning"

I think to make this effective we will need to have more runtime dynamics (aka using a morsel driven scheduler)

I don't mind enabling parallelism by default and it seems to be the fastest way to deliver this feature, but (I'm not sure, just a suggestion) maybe better time for this will be in 1 (or 2) releases after the setting itself will be released?

I agree -- let's get this PR merged in (default to off) and then plan to enable it by default in a few weeks (we just need to remember to do so!)

"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
// Multiple source files splitted across partitions
"ParquetExec: limit=None, partitions={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is quite clever that the partitions have different parts of the same file 👍

@alamb
Copy link
Contributor

alamb commented Jan 28, 2023

I plan to leave this open for the rest of the weekend so others have a chance to comment if they want, and then merge on Monday

@alamb alamb merged commit 67b1da8 into apache:master Jan 30, 2023
@alamb
Copy link
Contributor

alamb commented Jan 30, 2023

Thanks again @korowa ❤️

@ursabot
Copy link

ursabot commented Jan 30, 2023

Benchmark runs are scheduled for baseline = 74b05fa and contender = 67b1da8. 67b1da8 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@Ted-Jiang
Copy link
Member

Thanks @korowa this is really cool! 👍 nice work!
I have a question about multiple row groups from the same file may be read concurrently

from the code

 let target_partition_size =
            (total_size as usize + (target_partitions) - 1) / (target_partitions);

How could we make sure the file is dived into row group? 🤔

@alamb
Copy link
Contributor

alamb commented Jan 31, 2023

I filed #5125 to track turning this on by default

@tustvold
Copy link
Contributor

How could we make sure the file is dived into row group?

Without the parquet file metadata we can't reliably, but it isn't important to correctness that we do so. Not needing the metadata significantly simplifies the planning and avoids potentially costly round trips to object store whilst planning. FWIW I believe this is a similar approach as taken by Spark.

@alamb
Copy link
Contributor

alamb commented Jan 31, 2023

I believe what happens is that the file is divided into byte ranges and then the row groups whose data falls within those ranges are scanned. It would be good to double check this undertanding though

https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/datasource/listing/mod.rs#L52-L63

@tustvold
Copy link
Contributor

whose data falls within those ranges are scanned

More specifically it is the row groups with a midpoint that falls into the range, this means that so long as the ranges are disjoint there is no risk of reading the same row group twice

@korowa
Copy link
Contributor Author

korowa commented Jan 31, 2023

@Ted-Jiang here is the exact place where DF decides to read/not to read RowGroup depending on range. So it actually isn't required to split files on ranges with boundaries same as RowGroups boundaries.

@Ted-Jiang
Copy link
Member

Ted-Jiang commented Feb 1, 2023

Thanks for all kindly reply ! ❤️

here is the exact place where DF decides to read/not to read RowGroup depending on range. So it actually isn't required to split files on ranges with boundaries same as RowGroups boundaries.

So PartitionedFile range is not used for fetch bytes from objectStore, finally we use rowgroup start, offset to fetch bytest ? am i right?

I think i miss this part cause the misunderstanding😂

@korowa
Copy link
Contributor Author

korowa commented Feb 1, 2023

So PartitionedFile range is not used for fetch bytes from objectStore, finally we use rowgroup start, offset to fetch bytest ? am i right?

Yeah, I can't find any places where range is used, except for row_group pruning

@Ted-Jiang
Copy link
Member

So PartitionedFile range is not used for fetch bytes from objectStore, finally we use rowgroup start, offset to fetch bytest ? am i right?

Yeah, I can't find any places where range is used, except for row_group pruning

Thanks again for everyone ✌️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow ParquetExec to parallelize work based on row groups
5 participants