-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Row groups are read out of order or with completely different values #10572
Comments
Thank you for the report and the reproducer ❤️
This is not my expectation. DataFusion reads row groups in parallel, potentially out of order, with multiple threads as an optimization. To preserve the order of the data you can either set the configuration
Yes I agree these are also my expectation Maybe you can try setting |
Setting let session_cfg =
SessionConfig::new().set_str("datafusion.optimizer.repartition_file_scans", "false");
let session_ctx = SessionContext::new_with_config(session_cfg); However, it's unclear how it interacts with other options and affects memory and performance. So here's what I have - It is a given that the data will be sorted based on timestamp like this let parquet_options = ParquetReadOptions::<'_> {
skip_metadata: Some(false),
file_sort_order: vec![vec![Expr::Sort(Sort {
expr: Box::new(col("ts_init")),
asc: true,
nulls_first: true,
})]],
..Default::default()
}; Then there are two approaches to get row groups/data in order -
It's not clear to me how each option affects the performance and memory usage. Do you have any guidance around it? |
That is great news
This is the approach I would recommend (we do it in InfluxDB 3.0). You can verify that there are no additional sorts, etc in the plan by using If you use this approach, you should also be able to avoid setting |
Thanks for sharing and helping resolve this issue so quickly 😁 |
So I tried out some experiments with different combinations of using The script reads a file and prints the number of rows in the first row group. Ideally it should only load the first row group. However, As you can see, even after specifying the sort order of the file the
But the best result is by removing sorting and turning of re-partitioning. I believe it only loads the one row group required. It'll be very helpful to document this interaction.
I'll share more results for reading the whole file. Footnotes |
Here's the result for the script that reads the whole file and counts the total number of rows also included in the repo. 1
It seems like order=false and repartition=false seems to be the holy grail of performant, low-memory footprint streaming in sorted order. What do you think? Footnotes |
Hi @twitu -- thanks for this Some comments: I took a quick peek at https://github.com/nautechsystems/nautilus_experiments/tree/efficient-query and: it looks like it reads only a single batch out https://github.com/nautechsystems/nautilus_experiments/blob/a4ceb950de3b4bbc43ec82b64ee1495d077f5116/src/bin/single_row_group.rs#L45 This means you are running the equivalent of
I would expect those settings will be the lowest latency (time to first batch)
I would expect that it loads the first row group of each file and begins merging them together (e.g. if you did an @suremarc and @matthewmturner and others have been working on optimizing a similar case -- see #7490 for example. We have other items tracked #10313 (notably we have a way to avoid opening all the files if we know the data is already sorted and doesn't overlap: #10316) cc @NGA-TRAN |
I also added a query explanation and here're the results.
You'll see that the queries with
You'll see in the experiments repo that I've implement binaries for both reading a single row group and reading the whole file. The queries are the same the behaviour is changed in how the resulting stream is consumed. I don't think this is equivalent to adding a
Surprisingly, these settings also give lowest latency and memory foot print when reading the full file as shown in the above table1. If you need an additional contributor in any of the above mentioned issues, I'm happy to help 😄 Footnotes |
Hi @twitu -- I am very sorry for the delay in responding -- I have been traveling for sever
One thing that might be going on is that the NULLS FIRST doesn't seem to match In your plan the sort is putting nulls last
but in your code you specify NULLS first file_sort_order: vec![vec![Expr::Sort(Sort {
expr: Box::new(col("ts_init")),
asc: true,
nulls_first: true,
})]],
DataFusion is a streaming engine, so if you open a parquet file and read one batch and stop then the entire file will not be opened read (the batches are basically created on demand) There are certain "pipeline breaking" operators that do require reading the entire input, such as
We are always looking for contributors -- anything you can do to help others would be most appreciated. For example, perhaps you can add an example to |
Sorted row groups read OUT-OF-ORDER when
|
I would expect to get this to work correctly, you need to use an If you don't include an With an You can see that it does so by running You may also have to set |
I added the | order | filter | read sorted order | It seems like the best combination for efficient, low-memory streaming, queries while supporting filtering is achievable by
However, a point to note is that there does not seem to be a verify if The optimized plan for a query with
The optimized plan for a query WITHOUT
Shouldn't the optimized plan for both be the same, since I have set |
This ensures queries with filter clauses return data in order. apache/datafusion#10572 (comment)
This ensures queries with filter clauses return data in order. apache/datafusion#10572 (comment)
- Add test for ordered filter query - Configure `datafusion` to prefer existing sort This ensures queries with filter clauses return data in order. apache/datafusion#10572 (comment)
@alamb Is there anyway to verify that setting As I've shared above, the optimized query plan does not reflect this. It still has the Sort expression. This makes it hard to verify the behaviour of the query without actually running it. |
The only way I know is to use the Explain plan. The physical plan reflects the actual plan that will be run -- so if it has a SortExec then there will be some sort of sort during execuction Many of the operators have specializations, however, like TopK for sort with limit, so interpreting exactly what will happen from a plan is somewhat of an art form. Here is some documentation about how to read explain plans (thanks @devanbenz !): https://datafusion.apache.org/user-guide/explain-usage.html |
Describe the bug
Datafusion is reading row groups out of order and sometimes with completely different values for the row groups. The data is verified by reading the same files using the Python
pyarrow.parquet
library.The
pyarrrow
anddatafusion
reader read the same values when the file has 126 row groups. But give complete different values when the file has 127 row groups.To Reproduce
https://github.com/nautechsystems/nautilus_experiments/tree/datafusion-bug
The steps, data and results are documented in this repo and branch. The README is shared here again.
Use the python script to extract row group information from the parquet files using pyarrow.
Run the rust executable to extract row group information from the parquet files using datafusion.
Ideally there should be no difference between the csv files for the row groups. However, 126 works properly. But 127 gives different results for Python and Rust.
This shows that indeed there's no difference with 126 groups.
We can also make sure that these are in fact from the same data source with just one extra row group with this command which shows 127 groups python has only one extra entry at the end.
Expected behavior
Datafusion reader should
pyarrow
parquet readerAdditional context
No response
The text was updated successfully, but these errors were encountered: