Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43660: [C++] Add a CastingGenerator to Parquet Reader that applies required casts before slicing #43661

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

sahil1105
Copy link
Contributor

@sahil1105 sahil1105 commented Aug 12, 2024

Rationale for this change

(See #43660)

What changes are included in this PR?

  • In ParquetFileFormat::ScanBatchesAsync, set the batch_size for the reader to INT64_MAX. Since we already have a SlicingGenerator that is responsible for converting the reader's output into batch_size sized batches, we don't need the reader to batch its output.
  • Add a CastingGenerator that applies any required casts to the output of the reader before the output is passed to SlicingGenerator. The logic for the cast closely follows the logic in MakeExecBatch (
    Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& partial,
    ). There's an existing note in that function which states that the readers should be the ones performing the cast (
    // This *should* be handled by readers, and will just be an error in the future.
    ), so I believe this change gets us closer to that goal.

Are these changes tested?

I would like some feedback on the best way to test this. The changes passed our internal test cases, however, it may not account for all possible use cases.

Are there any user-facing changes?

No.

Copy link

⚠️ GitHub issue #43660 has been automatically assigned in GitHub to PR creator.

@sahil1105 sahil1105 changed the title GH-43660 [C++] Add a CastingGenerator to Parquet Reader that applies required casts before slicing GH-43660: [C++] Add a CastingGenerator to Parquet Reader that applies required casts before slicing Aug 12, 2024
@sahil1105 sahil1105 force-pushed the sahil/pq-read-add-casting-gen branch from 226d2d1 to f1c0d6e Compare August 12, 2024 22:59
@@ -555,6 +562,57 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
});
}

struct CastingGenerator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit curious why casting generator is required, since during reading parquet to arrow, Parquet reader already applies a round of casting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're probably right and it likely does perform some casts during the read. However, it doesn't seem to be doing it for certain cases like String to LargeString.
Could you point me to where the Parquet Reader should be performing this cast? I'm happy to add it there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!
Based on what I see, that is only responsible for casting the data to the logical type specified in the parquet metadata and not the Arrow type we want to convert to (the one in the dataset_schema). For strings, that seems to always map to a String type (based on FromByteArray which is called by GetArrowType which is called by GetTypeForNode which is called by NodeToSchemaField which is called in SchemaManifest::Make during the creation of the LeafReader). Am I missing something?

Copy link
Member

@mapleFU mapleFU Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on what I see, that is only responsible for casting the data to the logical type specified in the parquet metadata and not the Arrow type we want to convert to (the one in the dataset_schema)

Parquet logical type doesn't have an arrow schema, isn't it? Binary reader reads from ::arrow::BinaryBuilder, and casting it to user-specified binary type.

For strings, that seems to always map to a String type (based on FromByteArray which is called by GetArrowType which is called by GetTypeForNode which is called by NodeToSchemaField which is called in SchemaManifest::Make during the creation of the LeafReader).

Yeah, you're right, the read "cast" with file-schema rather than an expected schema. I think a native cast is better here but this doesn't solve your problem, perhaps I can trying to add a naive SchemaManifest with hint solving here, but it would spend some time.

::arrow::Result<std::shared_ptr<ArrowType>> GetTypeForNode(
    int column_index, const schema::PrimitiveNode& primitive_node,
    SchemaTreeContext* ctx)

Maybe we should rethink the GetTypeForNode handling for string/large_string/stringView, or using some handle written type hint here. A casting generator is also good for me when the reader cannot provide the right casting

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide hint would looks like: apache/arrow-rs#5939

Maybe I can add separate issue for that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet logical type doesn't have an arrow schema, isn't it?

As far as I understand, the parquet metadata may or may not have the arrow schema. I believe it depends on the writer. It looks like it tries to get that using GetOriginSchema in SchemaManifest::Make. However, the schema at write time might not be the same as the schema the reader expects.

Binary reader reads from ::arrow::BinaryBuilder, and casting it to user-specified binary type.

Sorry, I didn't quite follow. Are you saying that we should use this to do the cast at read time somehow?

I think a native cast is better here but this doesn't solve your problem, perhaps I can trying to add a naive SchemaManifest with hint solving here, but it would spend some time.
Maybe we should rethink the GetTypeForNode handling for string/large_string/stringView, or using some handle written type hint here.

That makes sense to me.

Maybe I can add separate issue for that

That would be great, thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just check this. We can first add a CastingGenerator, because sometimes we would have schema evolution here, and need cast to a "final type". The Parquet reader code can be regard as an optimization

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thanks!

@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Aug 13, 2024
@sahil1105 sahil1105 force-pushed the sahil/pq-read-add-casting-gen branch from bdac54a to e832a0d Compare August 13, 2024 13:50
@sahil1105
Copy link
Contributor Author

I'm not sure why the "Java JNI / AMD64 manylinux2014 Java JNI (pull_request)" and "continuous-integration/appveyor/pr" are failing. The latter in particular passed on the previous commit which is the same as this commit (my last commit was an empty commit to trigger the CI again). Any insights would be appreciated.

I would also appreciate feedback on if and what unit tests need to be added for this change. It seems like the existing unit tests provide good coverage of this code (they helped me find some of the bugs), but I'm happy to add more if it is deemed useful.

@mapleFU
Copy link
Member

mapleFU commented Aug 15, 2024

The interface general LGTM. I'm a little busy on working days, and will take a careful around in weekend. You also can mark it as ready for review here

@sahil1105
Copy link
Contributor Author

The interface general LGTM. I'm a little busy on working days, and will take a careful around in weekend. You also can mark it as ready for review here

Thanks, I appreciate it!

@sahil1105 sahil1105 marked this pull request as ready for review August 15, 2024 12:20
@mapleFU mapleFU self-requested a review August 15, 2024 12:36
@sahil1105
Copy link
Contributor Author

@mapleFU If you get a chance to review this PR this weekend, that'd be great. Thanks!

@mapleFU
Copy link
Member

mapleFU commented Aug 24, 2024

Sorry for delaying I'll take a pass today.

I'm out today so I didn't finish review, will continue after I wake up

Copy link
Member

@mapleFU mapleFU left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea looks great to me, but I don't know would setting memory to maximum matters

cpp/src/arrow/dataset/file_parquet.cc Outdated Show resolved Hide resolved
if (this->cols_to_skip_.count(field->name())) {
// Maintain the original input type.
out_schema_fields.emplace_back(field->WithType(column->type()));
out_cols.emplace_back(std::move(column));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So cols_to_skip_ is just not "Project" this field, rather than not need this field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to skip the cast and leave them as they are.

@@ -617,6 +684,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
[this, options, parquet_fragment, pre_filtered,
row_groups](const std::shared_ptr<parquet::arrow::FileReader>& reader) mutable
-> Result<RecordBatchGenerator> {
// Since we already do the batching through the SlicingGenerator, we don't need the
// reader to batch its output.
reader->set_batch_size(std::numeric_limits<int64_t>::max());
// Ensure that parquet_fragment has FileMetaData
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming a large file, would memory usage grows high in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have that problem regardless of the batch size of the reader.
We pass this reader to reader->GetRecordBatchGenerator (a few lines down). This eventually creates a RowGroupGenerator (

RowGroupGenerator(::arrow::internal::checked_pointer_cast<FileReaderImpl>(reader),
). If we look at the implementation of FetchNext for RowGroupGenerator, it essentially calls ReadOneRowGroup which reads the entire row group into a table and then creates an output stream using TableBatchReader (
table_reader.set_chunksize(batch_size);
). So, we're already reading an entire row group into a single table. The batch_size just creates a reader on top of it which generates zero-copy slices. This is the same as what the SlicingGenerator does and is redundant. In my opinion, it's better to set the batch size to INT_MAX so that it returns one table per row group, and then we can perform the batching through the SlicingGenerator.

Comment on lines +710 to +712
// We need to skip casting the dictionary columns since the dataset_schema doesn't
// have the dictionary-encoding information. Parquet reader will return them with the
// dictionary type, which is what we eventually want.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've forget something here, would Dict(String) and Cast(xxx -> LargeString) matters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure, so I left that case untouched. There are more casts done further up the chain (e.g. in MakeExecBatch potentially) that seem to handle those cases. I couldn't figure out how the dictionary case gets handled either, so I left it as is. I'm happy to implement it here if you could point me in the right direction.

@sahil1105 sahil1105 force-pushed the sahil/pq-read-add-casting-gen branch from 912db12 to f1a4955 Compare August 29, 2024 18:22
@mapleFU
Copy link
Member

mapleFU commented Sep 2, 2024

Also cc @bkietz as the expert here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants