Skip to content

Commit

Permalink
Fix unnecessarily strict check in parquet chunked reader for choosing…
Browse files Browse the repository at this point in the history
… split locations. (rapidsai#16099)

This is a fix that somehow didn't make it into the initial wave of bug fixes for the parquet chunked reader earlier this year. 

The code that determines where to do splits needs to be sure it always chooses a location such that the pages that are selected always enclose at least one full row for a list column.  This means that you need to see at least 1 full row (2 row boundaries) in the group of pages.  The weaklogic was only checking if you had 1 full row within the very last page in the selection, which is unnecessarily strict.  We actually ran into some data out in the wild where this was hit.  

This PR changes the logic to include all pages within the chunk when doing the check instead of just the last one.

Authors:
  - https://github.com/nvdbaranec
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: rapidsai#16099
  • Loading branch information
nvdbaranec authored Jun 27, 2024
1 parent fa8284d commit 5d49fe6
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
13 changes: 8 additions & 5 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ int64_t find_next_split(int64_t cur_pos,
size_t cur_row_index,
size_t cur_cumulative_size,
cudf::host_span<cumulative_page_info const> sizes,
size_t size_limit)
size_t size_limit,
size_t min_row_count)
{
auto const start = thrust::make_transform_iterator(
sizes.begin(),
Expand All @@ -357,7 +358,7 @@ int64_t find_next_split(int64_t cur_pos,
// this guarantees that even if we cannot fit the set of rows represented by our where our cur_pos
// is, we will still move forward instead of failing.
while (split_pos < (static_cast<int64_t>(sizes.size()) - 1) &&
(sizes[split_pos].end_row_index == cur_row_index)) {
(sizes[split_pos].end_row_index - cur_row_index < min_row_count)) {
split_pos++;
}

Expand Down Expand Up @@ -657,8 +658,10 @@ std::tuple<rmm::device_uvector<page_span>, size_t, size_t> compute_next_subpass(
auto const start_index = find_start_index(h_aggregated_info, start_row);
auto const cumulative_size =
start_row == 0 || start_index == 0 ? 0 : h_aggregated_info[start_index - 1].size_bytes;
// when choosing subpasses, we need to guarantee at least 2 rows in the included pages so that all
// list columns have a clear start and end.
auto const end_index =
find_next_split(start_index, start_row, cumulative_size, h_aggregated_info, size_limit);
find_next_split(start_index, start_row, cumulative_size, h_aggregated_info, size_limit, 2);
auto const end_row = h_aggregated_info[end_index].end_row_index;

// for each column, collect the set of pages that spans start_row / end_row
Expand Down Expand Up @@ -703,8 +706,8 @@ std::vector<row_range> compute_page_splits_by_row(device_span<cumulative_page_in
size_t cur_cumulative_size = 0;
auto const max_row = min(skip_rows + num_rows, h_aggregated_info.back().end_row_index);
while (cur_row_index < max_row) {
auto const split_pos =
find_next_split(cur_pos, cur_row_index, cur_cumulative_size, h_aggregated_info, size_limit);
auto const split_pos = find_next_split(
cur_pos, cur_row_index, cur_cumulative_size, h_aggregated_info, size_limit, 1);

auto const start_row = cur_row_index;
cur_row_index = min(max_row, h_aggregated_info[split_pos].end_row_index);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,8 @@ void reader::impl::preprocess_subpass_pages(read_mode mode, size_t chunk_read_li
// subpass since we know that will safely completed.
bool const is_list = chunk.max_level[level_type::REPETITION] > 0;
if (is_list && max_col_row < last_pass_row) {
size_t const min_col_row = static_cast<size_t>(chunk.start_row + last_page.chunk_row);
auto const& first_page = subpass.pages[page_index];
size_t const min_col_row = static_cast<size_t>(chunk.start_row + first_page.chunk_row);
CUDF_EXPECTS((max_col_row - min_col_row) > 1, "Unexpected short subpass");
max_col_row--;
}
Expand Down

0 comments on commit 5d49fe6

Please sign in to comment.