Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): Add parquet source node to new streaming engine #18152

Merged
merged 2 commits into from
Aug 23, 2024

Conversation

nameexhaustion
Copy link
Collaborator

@nameexhaustion nameexhaustion commented Aug 13, 2024

Enables scanning parquet files in the new streaming engine. This is done via a new parquet source
node that has been built to run natively on the new async executor for maximum performance.

Benchmarks

Setup

Dataset generation
import os

os.environ["POLARS_VERBOSE"] = "1"
from datetime import date
from pathlib import Path

import polars as pl

prefix = Path(".env/data/")
file_path = prefix / "data.parquet"
partitioned_files_path = prefix / "data"


def write_datasets():
    cols = {}
    for i in range(10):
        cols[f"a{i}"] = range(50_000_000)
        cols[f"b{i}"] = range(-50_000_000, 0)
        cols[f"c{i}"] = "a_string_value"
        cols[f"d{i}"] = "another_string_value"
        cols[f"e{i}"] = date.today()

    df = pl.DataFrame(data=cols).select(
        (1 + pl.int_range(pl.len()) // 1_000_000).alias("partition_id"), pl.all()
    )
    df.write_parquet(file_path)
    df.write_parquet(partitioned_files_path, partition_by="partition_id")


if not prefix.exists():
    print("Creating datasets")
    prefix.mkdir(parents=True)
    partitioned_files_path.mkdir()
    write_datasets()
    exit()
pl.scan_parquet(file_path).collect() # 1x 50M rows, 51 columns

this branch  (new streaming, build-opt) :  8.79s user 3.69s system 433% cpu 2.880 total
polars 1.4.1 (mem-engine)               :  8.75s user 3.80s system 432% cpu 2.904 total
polars 1.4.1 (streaming)                : 10.04s user 5.25s system 268% cpu 5.691 total
pl.scan_parquet(partitioned_files_path).collect() # 50x 1M rows (total 50M rows), 51 columns

this branch  (new streaming, build-opt) : 8.72s user 3.36s system 432% cpu 2.793 total
polars 1.4.1 (mem-engine)               : 8.71s user 3.65s system 436% cpu 2.834 total
polars 1.4.1 (streaming)                : 9.52s user 4.57s system 338% cpu 4.165 total

Feature parity

The source node in this PR should fully support all existing functionality of the in-memory engine
(including slices with negative offsets, which isn't supported by the existing streaming engine).

Metadata fetching optimization

The new source node uses a metadata size estimate for async reads that can allows us to potentially
save network requests. Small parquet files are also fully downloaded in one network request:

[ParquetSource]: 5 columns to be projected from 1 files
[ParquetSource]: Fetched all bytes for metadata on first try (initial estimate = 3108, actual size = 1101, excess = 2007)
[ParquetSource]: Parquet file was fully fetched during metadata read (3108 bytes).
[ParquetSource]: 50 columns to be projected from 1 files
[ParquetSource]: Extra 691849 bytes need to be fetched for metadata (initial estimate = 131072, actual size = 822921)

Slice pushdown

[ParquetSource]: 50 columns to be projected from 1 files
[ParquetSource]: Slice pushdown: Stopped reading at file at index 0 (remaining 0 files will not be read)
[ParquetSource]: Slice pushdown: Skipped row group 0 in file 0 (263157 rows)
[ParquetSource]: Slice pushdown: Skipped row group 1 in file 0 (263157 rows)
...(repeated lines omitted)
[ParquetSource]: Slice pushdown: Skipped row group 170 in file 0 (263157 rows)
[ParquetSource]: Slice pushdown: Stop at row group 172 in file 0 (remaining 17 row groups will not be read)

Slice pushdown (negative offset)

[ParquetSource]: 50 columns to be projected from 100 files
[ParquetSource]: Slice pushdown: Negatively-offsetted slice (-45000000, 10) begins at file index 10, translated to 0..10
[ParquetSource]: Slice pushdown: Stopped reading at file at index 10 (remaining 89 files will not be read)

Predicate pushdown

[ParquetSource]: 50 columns to be projected from 1 files
parquet file can be skipped, the statistics were sufficient to apply the predicate.
[ParquetSource]: Predicate pushdown: Skipped row group 0 in file 0 (263157 rows)
parquet file can be skipped, the statistics were sufficient to apply the predicate.
[ParquetSource]: Predicate pushdown: Skipped row group 1 in file 0 (263157 rows)
parquet file can be skipped, the statistics were sufficient to apply the predicate.
...(repeated lines omitted)
parquet file can be skipped, the statistics were sufficient to apply the predicate.
[ParquetSource]: Predicate pushdown: Skipped row group 170 in file 0 (263157 rows)
parquet file must be read, statistics not sufficient for predicate.
...(repeated lines omitted)

Byte source trait

This PR also introduces a new byte source trait that provides a unified interface to efficiently fetch byte ranges from both local and cloud files.

@github-actions github-actions bot added internal An internal refactor or improvement rust Related to Rust Polars labels Aug 13, 2024
@nameexhaustion nameexhaustion force-pushed the parquet-source branch 3 times, most recently from 7d22137 to 2756b43 Compare August 13, 2024 06:41
Copy link

codecov bot commented Aug 13, 2024

Codecov Report

Attention: Patch coverage is 3.89937% with 1528 lines in your changes missing coverage. Please review.

Project coverage is 79.83%. Comparing base (41d3048) to head (f340688).
Report is 11 commits behind head on main.

Files Patch % Lines
crates/polars-stream/src/nodes/parquet_source.rs 0.00% 1345 Missing ⚠️
crates/polars-io/src/utils/byte_source.rs 0.00% 92 Missing ⚠️
crates/polars-utils/src/mem.rs 43.47% 26 Missing ⚠️
crates/polars-stream/src/utils/notify_channel.rs 0.00% 21 Missing ⚠️
crates/polars-stream/src/async_executor/task.rs 0.00% 14 Missing ⚠️
crates/polars-core/src/utils/mod.rs 0.00% 13 Missing ⚠️
crates/polars-stream/src/physical_plan/to_graph.rs 0.00% 11 Missing ⚠️
crates/polars-stream/src/utils/task_handles_ext.rs 0.00% 6 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #18152      +/-   ##
==========================================
- Coverage   80.42%   79.83%   -0.60%     
==========================================
  Files        1492     1496       +4     
  Lines      198675   200238    +1563     
  Branches     2841     2841              
==========================================
+ Hits       159785   159854      +69     
- Misses      38365    39859    +1494     
  Partials      525      525              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Comment on lines 1382 to 1340
if let Some(predicate) = self.physical_predicate.as_deref() {
let mask = predicate.evaluate_io(&df)?;
let mask = mask.bool().unwrap();

par_filter_df(&mut df, mask, cpu_runtime.as_ref()).await?;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I very briefly discussed this with Ritchie, but we should maybe plan a quick call for this. I think for streaming, it always makes sense to evaluate to decode the column that are needed for the predicate first. Then, depending on the selectiveness of the data (which we can estimate with (mask ^ (mask >> 1)).popcount()), we can decide to use the par_filter_df or the direct filter=Bitmask(mask)` in the parquet reader. This would lead to very large speedups, and I definitely think is worth it.

This can maybe wait until we have a POC, though.

@nameexhaustion nameexhaustion force-pushed the parquet-source branch 7 times, most recently from 6104f7f to b5b3153 Compare August 13, 2024 10:57
@nameexhaustion nameexhaustion marked this pull request as ready for review August 13, 2024 11:03
@nameexhaustion nameexhaustion added the do not merge This pull requests should not be merged right now label Aug 13, 2024
crates/polars-stream/src/utils/byte_source.rs Outdated Show resolved Hide resolved
crates/polars-stream/src/async_executor/mod.rs Outdated Show resolved Hide resolved
crates/polars-stream/src/nodes/parquet_source.rs Outdated Show resolved Hide resolved
crates/polars-stream/src/nodes/parquet_source.rs Outdated Show resolved Hide resolved
@nameexhaustion nameexhaustion marked this pull request as draft August 14, 2024 03:49
@nameexhaustion nameexhaustion removed the do not merge This pull requests should not be merged right now label Aug 14, 2024
@nameexhaustion nameexhaustion force-pushed the parquet-source branch 2 times, most recently from 7e8ac05 to ef3a71b Compare August 14, 2024 06:36
@nameexhaustion nameexhaustion marked this pull request as ready for review August 14, 2024 06:39
@ritchie46
Copy link
Member

Ai.. Can you rebase. :/

crates/polars-stream/src/nodes/parquet_source.rs Outdated Show resolved Hide resolved
crates/polars-stream/src/nodes/parquet_source.rs Outdated Show resolved Hide resolved
crates/polars-stream/src/nodes/parquet_source.rs Outdated Show resolved Hide resolved
crates/polars-stream/src/nodes/parquet_source.rs Outdated Show resolved Hide resolved
Err(e) => Err(e),
}
})
.buffer_unordered(num_pipelines);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@orlp This changes the decode back to spawning tasks onto the executor to support splitting a single row group into multiple morsels. This lets me distribute the morsels more evenly across the pipelines - with the previous approach, I think if I added the splitting in-place, then even after I split the row group into several morsels they would still end up being sent serially across the same pipeline.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if I added the splitting in-place, then even after I split the row group into several morsels they would still end up being sent serially across the same pipeline.

I don't understand why, as long as the splitting happens before going into the work distributor, everything should be fine. Is the splitting itself also computationally intensive?

Copy link
Collaborator Author

@nameexhaustion nameexhaustion Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because we can't split the raw byte-data of the row group, so instead I'm splitting the row group after it's been decoded into a DataFrame. In the previous version the row group decoding took place after the work distributor was used to distribute the raw row group byte data - at that point I was no longer able to re-distribute the individual splits within a row group - they would have to be serially sent across the pipeline they were in.

I think, maybe we can have a compute node that specifically sits in front of the parquet source to ensure that we have good morsel sizes? It would split morsels that are too big, and combine morsels that are too small? Then it could also sit in front of other operators that have unpredictable morsel sizes (e.g. other source nodes, or the filter node).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I thought about it again and I don't like what I'm currently doing either.. it's not receiving backpressure properly from the pipeline.

I think it's better to leave it as 1 morsel per row-group for now?


if self.use_par_decode && decode_fut_iter.len() > 1 {
for handle in decode_fut_iter.map(|fut| {
async_executor::AbortOnDropHandle::new(async_executor::spawn(
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@orlp one more place where I spawn - this is for decoding the columns within a row group in parallel. I think in theory this makes sense for very wide tables, but from testing the performance was identical for 1M row groups with 50 columns. I've currently tuned it send a minimum of const VALUES_PER_THREAD: usize = 8_388_608 per thread

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you maybe add a comment explaining how/why that value was derived?

Copy link
Collaborator Author

@nameexhaustion nameexhaustion Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added 👍

*edit: doubled the value to 16M

// Early shutdown - our port state was set to `Done` by the downstream nodes. This
// also means we can discard any potential errors from the `shutdown()`. Note that
// `shutdown()` internally unwraps any critical errors.
let _ = polars_io::pl_async::get_runtime().block_on(self.shutdown());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdowns docs state it panics if called more than once. update_state can get called more than once. I think you should check is_finished before calling this.

Furthermore, do we actually need to block on this? Can't we just spawn it and it'll clean itself up in the background?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should check is_finished before calling this.

I believe it should be checked by the initial if self.is_finished.load(Ordering::Relaxed) { 😁

Furthermore, do we actually need to block on this? Can't we just spawn it and it'll clean itself up in the background?

I tried but the borrow checker wasn't happy - as shutdown() takes &mut self. I think the shutdown should be fairly quick, so it should be fine?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update - I added shutdown_in_background() and made it work by putting the task data behind an Arc<Mutex<>>

crates/polars-stream/src/nodes/parquet_source.rs Outdated Show resolved Hide resolved
Err(e) => Err(e),
}
})
.buffer_unordered(num_pipelines);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if I added the splitting in-place, then even after I split the row group into several morsels they would still end up being sent serially across the same pipeline.

I don't understand why, as long as the splitting happens before going into the work distributor, everything should be fine. Is the splitting itself also computationally intensive?

let mut dfs = vec![].into_iter();

'main: loop {
let Some(mut indexed_wait_group) = wait_groups.next().await else {
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've used a new approach for applying backpressure using the wait groups here - it makes us spawn much less tasks than before.

if cols_per_task <= df.width() {
df._filter_seq(mask)?
} else {
let mask = mask.clone();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added horizontal parallelism back to predicate filtering here in the row group decoder, but only when we are past VALUES_PER_THREAD. I want to do this here so that the predicate is applied before we potentially split the row group into multiple morsels - if we instead do the predicate after then we could end up with very small morsels.

@nameexhaustion nameexhaustion marked this pull request as ready for review August 22, 2024 11:40
@nameexhaustion nameexhaustion marked this pull request as draft August 22, 2024 11:42
@nameexhaustion nameexhaustion marked this pull request as ready for review August 22, 2024 11:46
@nameexhaustion nameexhaustion marked this pull request as draft August 23, 2024 04:29
loop {
use crate::async_primitives::connector::SendError;

let port_index = indexed_wait_group.index;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give this a different name? A port refers to an input or output of a node. One port can consist of a serial sender/receiver, or a series of parallel senders/receivers, but they all belong to the same port.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed it to channel_index

let mut row_group_data_fetcher = Box::pin(self);
let current_future = Box::pin(
unsafe {
std::mem::transmute::<&mut RowGroupDataFetcher, &'static mut RowGroupDataFetcher>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of solving this with lifetime transmutes which make me rather uncomfortable, can you change it to use Arc + Mutexes instead? You'll likely need to move the mutable state inside a Mutex and change next() to take &self instead of &mut self.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a way to get rid of the unsafe transmute without introducing any locks 😁

crates/polars-stream/src/nodes/parquet_source.rs Outdated Show resolved Hide resolved
@nameexhaustion nameexhaustion marked this pull request as ready for review August 23, 2024 12:54
@orlp
Copy link
Collaborator

orlp commented Aug 23, 2024

Nice work :) There are still some changes I'd like to make to clean everything up but we (or just me) can do that later in future PR's.

@ritchie46
Copy link
Member

Great effort to get this in. Thanks both! We can iron out in future PR's. First get this huge PR in. :)

@ritchie46 ritchie46 merged commit ffb66aa into pola-rs:main Aug 23, 2024
21 checks passed
@c-peters c-peters added the accepted Ready for implementation label Aug 26, 2024
@nameexhaustion nameexhaustion deleted the parquet-source branch August 29, 2024 09:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted Ready for implementation internal An internal refactor or improvement rust Related to Rust Polars
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

5 participants