Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed parquet path partitioning when only selecting partitioned columns #2000

Merged
merged 14 commits into from
Apr 4, 2022

Conversation

pjmore
Copy link
Contributor

@pjmore pjmore commented Mar 12, 2022

Which issue does this PR close?

Partially #1999.

Rationale for this change

Fix behaviour when querying only partitioning columns for parquet file format.

What changes are included in this PR?

Use row group level metadata to return the correct number of partition columns.

Are there any user-facing changes?

No

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Mar 12, 2022
@xudong963 xudong963 added the bug Something isn't working label Mar 17, 2022
Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@alamb
Copy link
Contributor

alamb commented Mar 20, 2022

cc @rdettai

@alamb
Copy link
Contributor

alamb commented Mar 23, 2022

I think this PR is waiting on responses to @rdettai

datafusion/src/physical_plan/file_format/parquet.rs Outdated Show resolved Hide resolved
Comment on lines 460 to 499
for partitioned_file in partition {
let object_reader =
object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
let mut file_rows: usize = file_reader
.metadata()
.file_metadata()
.num_rows()
.try_into()
.expect("Row count should always be greater than or equal to 0");
let remaining_rows = limit.unwrap_or(usize::MAX);
if file_rows >= remaining_rows {
file_rows = remaining_rows;
limit = Some(0);
} else if let Some(remaining_limit) = &mut limit {
*remaining_limit -= file_rows;
}

while file_rows > batch_size {
send_result(
&response_tx,
partition_column_projector
.project_empty(batch_size, &partitioned_file.partition_values),
)?;
file_rows -= batch_size;
}
if file_rows != 0 {
send_result(
&response_tx,
partition_column_projector
.project_empty(batch_size, &partitioned_file.partition_values),
)?;
}

if limit == Some(0) {
break;
}
}
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still feel this could be simplified and made more readable by using more iterators:

  • iterate over file
  • map them to their size
  • map each size to an iterator that repeats the batch size file_rows/batch_size times + residual
  • flat map the whole thing
  • apply limit with take(limit)
  • for_each(send)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a good way to implement what you suggested. The error handling when opening the file was the main issue that I ran into. I couldn't figure out another way to short circuit when the limit was met and short circuit on any errors that occured. If you're okay scanning all of the partition files even on an error I'm okay with it, I just figured that for remote object stores that that might be a bad idea.

    let mut res: Result<()> = Ok(());
    let mut batch_size_partition_iter = partition.iter() 
        .map(|partitioned_file|{
            let mut num_rows: usize = match object_store.file_reader(partitioned_file.file_meta.sized_file.clone()){
                Ok(object_reader) => {
                    match SerializedFileReader::new(ChunkObjectReader(object_reader)){
                        Ok(file_reader) => {
                            file_reader
                                .metadata()
                                .file_metadata()
                                .num_rows()
                                .try_into()
                                .expect("Row count should always be greater than or equal to 0 and less than usize::MAX")
                        },
                        Err(e) =>{
                            res = Err(e.into());
                            0
                        },
                    }
                },
                Err(e) => {
                    res = Err(e);
                    0
                },
            };            
            num_rows = limit.min(num_rows); 
            limit -= num_rows;
            (num_rows, partitioned_file.partition_values.as_slice())
        })
        .take_while(|(num_rows, _)| *num_rows != 0)
        .flat_map(|(num_rows, partition_values)| BatchSizeIter::new(num_rows, batch_size).zip(std::iter::repeat(partition_values)));
        Iterator::try_for_each(&mut batch_size_partition_iter,|(batch_size, partition_values)| {
            send_result(&response_tx, partition_column_projector.project_empty(batch_size, partition_values))
        })?;
        res?;
        Ok(())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, error management in iterators can quickly become annoying! Then I think the version with loop is fine for now.

datafusion/src/physical_plan/file_format/mod.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdettai / @tustvold would you like to review this PR again?

Copy link
Contributor

@rdettai rdettai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the amount of repetition in read_partition_no_file_columns now reached a very satisfying level. @alamb do you agree?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Thank you @pjmore and @rdettai

datafusion/tests/path_partition.rs Outdated Show resolved Hide resolved
@alamb
Copy link
Contributor

alamb commented Apr 3, 2022

Looks like it just needs some updating to resolve conflicts. @pjmore I am happy to do so, let me know if you would like me to

@pjmore
Copy link
Contributor Author

pjmore commented Apr 3, 2022

@alamb I had some extra test cases to add for the limit logic so I just fixed the conflicts then. Should be good to go now!

@alamb
Copy link
Contributor

alamb commented Apr 4, 2022

Thanks @pjmore -- epic work 👍

@tustvold
Copy link
Contributor

I've created apache/arrow-rs#1537 to track pushing this functionality upstream, as I think it is generally useful. I will try to bash it out if I have some spare cycles.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants