Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid buffering the entire row group in Async parquet reader #6946

Open
alamb opened this issue Jan 6, 2025 · 6 comments
Open

Avoid buffering the entire row group in Async parquet reader #6946

alamb opened this issue Jan 6, 2025 · 6 comments
Labels
enhancement Any new improvement worthy of a entry in the changelog

Comments

@alamb
Copy link
Contributor

alamb commented Jan 6, 2025

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

The async_reader ParquetRecordBatchStreamBuilder is awesome and fast

However, it is currently quite aggressive when reading and will buffer an entire row group in memory during read. For data reorganization operations that read all columns, such merging multiple files together, we see significant memory used buffering this entire input

For example, we have some files at InfluxData that contain 1M rows each with 10 columns. Using the default WriterProperties with row group size 1M and 20k rows per page results in a parquet file with

  1. a single 1M row group
  2. 10 column chunks each with 50 pages.
  3. Each file is 60MB

This merging 10 such files requires 600MB of memory just to buffer the parquet data. 100 such files requires 6GB buffer, etc.

We can reduce the memory required by reducing the number of files merged concurrently (the fan-in) as well as reducing the number of rows in each row group. However, I also think there is potential improvement in the parquet reader itself to not buffer the entire file while reading

The root cause of this memory use is that all the pages of the current RowGroup are read in memory via the InMemoryRowGroup

In pictures, if we have a parquet file on disk/object store:

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
┃    ┌──────────────┐  │     ┌──────────────┐  │            ┌──────────────┐  │ ┃
┃ │  │    Page 1    │     │  │    Page 1    │            │  │    Page 1    │    ┃
┃    └──────────────┘  │     └──────────────┘  │            └──────────────┘  │ ┃
┃ │  ┌──────────────┐     │  ┌──────────────┐            │  ┌──────────────┐    ┃
┃    │    Page 2    │  │     │    Page 2    │  │            │    Page 2    │  │ ┃
┃ │  └──────────────┘     │  └──────────────┘      ...   │  └──────────────┘    ┃
┃          ...         │           ...         │                  ...         │ ┃
┃ │                       │                              │                      ┃
┃    ┌──────────────┐  │     ┌──────────────┐  │            ┌──────────────┐  │ ┃
┃ │  │    Page N    │     │  │    Page N    │            │  │    Page N    │    ┃
┃    └──────────────┘  │     └──────────────┘  │            └──────────────┘  │ ┃
┃ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─          └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
┃      ColumnChunk             ColumnChunk                    ColumnChunk       ┃
┃                                                                               ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━Row Group 1━━┛

Reading it requires loading all pages from all columns read into InMemoryRowGroup

┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ┓ 
                                                                                    ┃ 
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ 
┃ ┃ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃   
┃ ┃    ┌──────────────┐  │     ┌──────────────┐  │            ┌──────────────┐  │ ┃ ┃ 
  ┃ │  │    Page 1    │     │  │    Page 1    │            │  │    Page 1    │    ┃ ┃ 
┃ ┃    └──────────────┘  │     └──────────────┘  │            └──────────────┘  │ ┃ ┃ 
┃ ┃ │  ┌──────────────┐     │  ┌──────────────┐            │  ┌──────────────┐    ┃   
┃ ┃    │    Page 2    │  │     │    Page 2    │  │            │    Page 2    │  │ ┃ ┃ 
  ┃ │  └──────────────┘     │  └──────────────┘      ...   │  └──────────────┘    ┃ ┃ 
┃ ┃          ...         │           ...         │                  ...         │ ┃ ┃ 
┃ ┃ │                       │                              │                      ┃   
┃ ┃    ┌──────────────┐  │     ┌──────────────┐  │            ┌──────────────┐  │ ┃ ┃ 
  ┃ │  │    Page N    │     │  │    Page N    │            │  │    Page N    │    ┃ ┃ 
┃ ┃    └──────────────┘  │     └──────────────┘  │            └──────────────┘  │ ┃ ┃ 
┃ ┃ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─          └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃   
┃ ┃      ColumnChunk             ColumnChunk                    ColumnChunk       ┃ ┃ 
  ┃                                                                               ┃ ┃ 
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━Row Group 1━━┛ ┃ 
┃                                                                                     
┗ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛ 
                                                                                      
                                                                  InMemoryRowGroup    
                                                                                      

Describe the solution you'd like

Basically, TLDR only read a subset of the pages into memory at any time and do additional IO to fetch others

┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ┓  
┃                                                                                   ┃  
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃  
  ┃ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃    
┃ ┃    ┌──────────────┐  │     ┌──────────────┐  │            ┌──────────────┐  │ ┃ ┃  
┃ ┃ │  │    Page 1    │     │  │    Page 1    │            │  │    Page 1    │    ┃ ┃  
┃ ┃    └──────────────┘  │     └──────────────┘  │            └──────────────┘  │ ┃ ┃  
  ┃ │  ┌──────────────┐     │  ┌──────────────┐            │  ┌──────────────┐    ┃    
┃ ┃    │    Page 2    │  │     │    Page 2    │  │            │    Page 2    │  │ ┃ ┃  
┃ ┃ │  └──────────────┘     │  └──────────────┘      ...   │  └──────────────┘    ┃ ┃  
┃ ┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘          ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃ ┃  
  ┃      ColumnChunk             ColumnChunk                    ColumnChunk       ┃    
┃ ┃                                                                               ┃ ┃  
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━Row Group 1━━┛ ┃  
┃                                                                                   ┃  
 ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━   
                                                                                       
                                                                   InMemoryRowGroup    
                                                                                       

Describe alternatives you've considered

If course there is a tradeoff between buffer usage and number of I/O requests so there is unlikely to be one setting that works for all use cases. In our examples the two extremes are to

Target I/Os required Buffer Required
Minimize I/O requests (behavior today) 1 request (ObjectStore::get_ranges call) 60MB Buffer
Minimize buffering 500 requests (one for each page) ~1MB buffer (60MB/50 pages)
Intermediate buffering 10 requests (get 5 pages per column per request) 10MB buffer

So I would propose implementing some new optional setting like row_group_page_buffer_size that the reader would respect and try and limit its buffer usage to row_group_page_buffer_size, ideally fetching multiple pages at a time.

large pages (e.g. large string data) are likely to make it impossible to always keep to a particular buffer size

Additional context

@alamb alamb added the enhancement Any new improvement worthy of a entry in the changelog label Jan 6, 2025
@tustvold
Copy link
Contributor

tustvold commented Jan 7, 2025

I think this boils down to what the characteristics of the backing store are:

Memory

Data is already buffered in memory it doesn't matter what approach we use, tbh people should be using the sync reader for this.

Object Storage

Object stores do not support true vectorized I/O, that is you can't fetch disjoint ranges in a single request. This means that to fetch 5 pages from each column, as in the intermediate buffering approach, it will actually perform one request per column. Unless the column chunk is small in which case it may just fetch the entire column chunk, and then chuck the extra pages away, which is even worse. These requests are made in parallel, but this does have potentially significant cost implications.

I'd posit that the current approach is the right one for object stores.

Local SSD

Local SSD is the option where this gets potentially interesting, as something closer to the minimize buffering approach becomes viable. IMO this is where something like #5522 becomes more relevant, especially if wanting to use something like io_uring. The current abstraction just starts to fall apart at this point, and I'm not sure it is sensible to try to contort it to make it work.

Alternatives

A simpler option might just be to write files with smaller row groups, this effectively "materializes" the intermediate buffering approach into the file, but with the added benefit that it doesn't break IO coalescing. This is effectively the observation made by file formats like Lance when they got rid of the column chunk.

There is a slight real difference between one row group with 50 pages and 5 row groups with 10 pages each, concerning dictionary pages, but based on what I guess is motivating this request, just writing smaller row groups may be a better option

@alamb
Copy link
Contributor Author

alamb commented Jan 7, 2025

Local SSD

Local SSD is the option where this gets potentially interesting, as something closer to the minimize buffering approach becomes viable. IMO this is where something like #5522 becomes more relevant, especially if wanting to use something like io_uring. The current abstraction just starts to fall apart at this point, and I'm not sure it is sensible to try to contort it to make it work.

This is a very good observation -- the usecase we have is exactly this (data is local on SSD not on remote object storage)

I also think there might be value in a hybrid approach to reduce RAM requirements: fetching data from on object store to a local SSD and then reading it more incrementally

So if we can use the same abstraction that would be very nice

A simpler option might just be to write files with smaller row groups, this effectively "materializes" the intermediate buffering approach into the file, but with the added benefit that it doesn't break IO coalescing. This is effectively the observation made by file formats like Lance when they got rid of the column chunk.

Indeed, this is exactly the workaround we are trying internally and I or @hiltontj will report back here on how well it worked.

@tustvold
Copy link
Contributor

tustvold commented Jan 7, 2025

Tbh if using local SSD I'd be tempted to just use the sync ParquetRecordBatchReader, the runtime needs to handle blocking anyway due to the CPU bound work, and the IO latencies are likely small enough to be irrelevant in comparison.

@alamb
Copy link
Contributor Author

alamb commented Jan 7, 2025

Tbh if using local SSD I'd be tempted to just use the sync ParquetRecordBatchReader, the runtime needs to handle blocking anyway due to the CPU bound work, and the IO latencies are likely small enough to be irrelevant in comparison.

In the context of DataFusion all IO is done in terms of the ObjectStore trait -- so I think the trick is in figuring out how to wire in / call the right version.

If we decide reducing buffering is important enough usecase for us at Influx we'll spend some more time trying out various options and report back

@XiangpengHao
Copy link
Contributor

I believe we need something like AsyncChunkReader (https://github.com/apache/arrow-rs/blob/main/parquet/src/file/reader.rs#L63) which allows us to read the pages on demand in async, and thus avoiding buffering the entire row group in memory before creating the record batch stream.
But that means we need to plague almost the entire crate to be async functions.

@tustvold
Copy link
Contributor

tustvold commented Jan 7, 2025

I strongly disagree that intertwining IO in that way is desirable, let alone practical. I think it is far better to expose lower-level push-based APIs for decoding, that allow users to orchestrate IO as they see fit. This is the approach we take for JSON and CSV, parquet poses some additional challenges, but none that are insurmountable I don't think.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

No branches or pull requests

3 participants