-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-44345: [C++][Parquet] Fully support arrow decimal32/64 in Parquet #45351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cffcc10 to
83c8a02
Compare
mapleFU
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I think the legacy Decimal128/Decimal256 write data behavior should not be change
| if (maybe_type.ok()) { | ||
| arrow_type = maybe_type.ValueOrDie(); | ||
|
|
||
| if (precision <= Decimal32Type::kMaxPrecision) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment that the literal would can be cast to the correspond type if the real reader type is a wider decimal type?
cpp/src/parquet/column_writer.cc
Outdated
| WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int32Type) | ||
| WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int32Type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not
WRITE_SERIALIZE_CASE(DECIMAL32, Decimal32Type, Int32Type)
WRITE_SERIALIZE_CASE(DECIMAL64, Decimal64Type, Int32Type)
WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int32Type)
WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int32Type)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we don't need WRITE_SERIALIZE_CASE(DECIMAL64, Decimal64Type, Int32Type)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we don't need
I think we need, decimal64 is just decimal type but not limit the precision ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to this but suspicious of the value of it.
cpp/src/parquet/column_writer.cc
Outdated
| WRITE_ZERO_COPY_CASE(DURATION, DurationType, Int64Type) | ||
| WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int64Type) | ||
| WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int64Type) | ||
| WRITE_SERIALIZE_CASE(DECIMAL64, Decimal64Type, Int64Type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| scratch_i32 = reinterpret_cast<int32_t*>(scratch_buffer->mutable_data()); | ||
| scratch_i64 = reinterpret_cast<int64_t*>(scratch_buffer->mutable_data()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why split this into two parts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how to handle this code in a clean way. However, the main reason why I had to do this was because of the int64_t* scratch pointer we're using. IIUC, the current code constructs the decimal endian array using this scratch space. The scratch pointer moves along the memory address space to do this construction.
If you see the current logic, it looks at the byte_width to determine how many address spaces we need to use from the input to construct the decimal. The int64_t* scratch pointer works for decimal64, decimal128, and decimal256. However, it doesn't work for decimal32 because it uses 32-bit address space, so I had to create another pointer with int32_t*.
I may misunderstand how this code works, so feel free to correct me here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can remove scratch_i32 and scratch_i64 and delay the casting until we use the scratch buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to delay the casting in this commit f279349, and got some test failures. After some debugging, I realized that we had to initialize the scratch pointer values here. If you look at the current Serialize function and FixDecimalEndianness function, we first allocate the scratch buffer and capture the scratch pointer. Then, we move along that scratch pointer until we finish the output. Therefore, we need to do the initialization and casting in this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we perform the reinterpret_cast right before calling ::arrow::bit_util::ToBigEndian to set it? At that time, you already know the byte_width.
| def test_store_decimal_as_integer(tempdir): | ||
| arr_decimal_1_9 = pa.array(list(map(Decimal, range(100))), | ||
| type=pa.decimal128(5, 2)) | ||
| type=pa.decimal32(5, 2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should not be changed, instead we should add new tests here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little tricky to keep the test to be the same if we don't cast the type to a wider decimal. On the writer side, we can keep the same behavior from arrow to Parquet with additional support for decimal32/64.
However, on the reader side from Parquet to arrow, the Parquet decimal format only contains precision and scale without any knowledge of different arrow types (which is the correct behavior here). Therefore, in order to do the conversion, we look at the precision to convert it to either decimal32/64/128/256 correspondingly.
For this test which does a round trip for both writing to parquet and reading from parquet, the correct end result should be decimal32 when we read the data. I can modify this test case to cast the return decimal to a wider decimal if that's what you meant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So can we provide a type function, and test all possible types here?
| case Type::DATE32: | ||
| case Type::TIME32: | ||
| case Type::INTERVAL_MONTHS: | ||
| case Type::DECIMAL32: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the changes required to the compute kernels required to support Parquet? I can't see why but I might be missing something. Otherwise, we should move adding support for decimal32 and decimal64 to those compute kernels on a different PR and leave this one only with the required parquet changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I see now, on the description says this is required for some tests:
Allow decimal32/64 in Arrow compute vector hash which is needed for some of the existing Parquet tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm down to split this change to another PR which can cover this support with more tests on the arrow compute side. But yes, there are a few tests in Parquet that hit arrow vector kernel code path
|
Quick update: |
|
@wgtmac @mapleFU I'm facing an implementation blocker on keeping the same writing/reading behaviors for decimal128/256 between Arrow and Parquet because I can't find a clean way to do it. As I mentioned in this previous comment #45351 (comment) on my original implementation, the current Parquet reading logic looks at the decimal precision to determine how to convert Parquet decimal logical type to Arrow decimal type. Since we introduced decimal32/64 in arrow, I had to change this logic to include these types based on the precision. Therefore, whenever we want to cast a decimal32 to decimal128, we need to force the schema to convert to a bigger decimal. I found this arrow Field.MergeWith method that could do the job (which I used it in one of the schema tests here). However, when I moved on to the reader/writer tests, I found that these schema fields can only be accessed from manifest within FileReader. Though, it's read only. Therefore, if I want to force schema conversion, I'll have to either:
I can't really come up with other options here. Both of these options require changes in some important classes, so I want to get some alignments before I proceed. Option 2 probably makes the most sense here. I also need to propagate this new property to pyarrow as well if we want to have this same behavior in python. What are your thoughts here on this problem? Any other alternatives that I should try? |
|
This is really a problem. Currently arrow has an option what about the new written decimal32/decimal64 might be readed as decimal32/decimal64, and legacy code goes the legacy way? |
|
@mapleFU Thanks for the pointer on
I'm quite confused about this comment. Could you elaborate? Did you mean that if |
|
If |
This makes sense to me. I'll proceed with this implementation then. Thanks for the discussion |
|
@mapleFU @wgtmac I see that we already have a kinda similar flag called arrow_extensions_enabled from the JSON support. Should we just use this flag instead? It kinda makes sense to me to reuse this flag instead of introducing a new flag just for decimal type. I don't mind introducing a new flag like |
2ba19e9 to
6179dff
Compare
|
|
||
| Status Init() { | ||
| Status Init(const ArrowReaderProperties& schema_arrow_reader_properties) { | ||
| return SchemaManifest::Make(writer_->schema(), /*schema_metadata=*/nullptr, | ||
| default_arrow_reader_properties(), &schema_manifest_); | ||
| schema_arrow_reader_properties, &schema_manifest_); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bug in the current code where the Parquet writer always use the default arrow reader properties for its schema manifest. Instead, it should allow callers to pass in a custom arrow reader properties if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you have linked the wrong file. The Init() function from parquet/arrow/reader.cc does not have this issue, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wgtmac sorry for the late response! I think you're right. I probably misread the logic somehow. Reverting this change now.
|
We cannot reuse |
| Result<std::shared_ptr<::arrow::DataType>> FromInt64(const LogicalType& logical_type); | ||
| Result<std::shared_ptr<::arrow::DataType>> FromByteArray( | ||
| const LogicalType& logical_type, bool arrow_extensions_enabled = false, | ||
| bool smallest_decimal_enabled = false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid these default parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can remove them for FromByteArray and FromFLBA to use ArrowReaderProperties instead. However, for FromInt32 and FromInt64, they're also used in FromInt32Statistics and FromInt64Statistics from StatisticsAsScalars. It doesn't look like I can stream the ArrowReaderProperties from there, so I have to use default parameters
| scratch_i32 = reinterpret_cast<int32_t*>(scratch_buffer->mutable_data()); | ||
| scratch_i64 = reinterpret_cast<int64_t*>(scratch_buffer->mutable_data()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can remove scratch_i32 and scratch_i64 and delay the casting until we use the scratch buffer?
|
|
||
| Result<std::shared_ptr<ArrowType>> FromByteArray( | ||
| const LogicalType& logical_type, const ArrowReaderProperties& reader_properties) { | ||
| Result<std::shared_ptr<ArrowType>> FromByteArray(const LogicalType& logical_type, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just wondering why not simply passing const ArrowReaderProperties& reader_properties to all these functions? If we will add a third parameter which also comes from the properties, then we should directly use it.
cpp/src/parquet/arrow/reader.h
Outdated
| virtual void set_batch_size(int64_t batch_size) = 0; | ||
|
|
||
| /// Set whether to enable smallest decimal arrow type | ||
| virtual void set_smallest_decimal_enabled(bool smallest_decimal_enabled) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks weird. We cannot change it after the reader has been created. Isn't it accessible via the ArrowReaderProperties?
cpp/src/parquet/arrow/reader.h
Outdated
| PARQUET_EXPORT | ||
| ::arrow::Result<std::unique_ptr<FileReader>> OpenFile( | ||
| std::shared_ptr<::arrow::io::RandomAccessFile>, ::arrow::MemoryPool* allocator); | ||
| std::shared_ptr<::arrow::io::RandomAccessFile>, ::arrow::MemoryPool* pool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also revert this change.
| static constexpr ::arrow::Type::type type_id = ::arrow::Decimal32Type::type_id; | ||
| static constexpr int32_t precision = PRECISION; | ||
| static constexpr int32_t scale = PRECISION - 1; | ||
| static constexpr bool smallest_decimal_enabled = SMALLEST_DECIMAL_ENABLED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to add smallest_decimal_enabled to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mainly used for the tests in arrow_reader_writer_test.cc for TestParquetIO.TestTypes from line 984 to 997 where I used this variable to trigger whether the test type requires the reader to use the smallest decimal or not
This reverts commit f279349.
|
@wgtmac Wanna give another round of review? It looks like CI error isn't related to this change |
| static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, | ||
| const std::shared_ptr<Field>& field, Datum* out) { | ||
| // Decimal128 and Decimal256 are only Arrow constructs. Parquet does not | ||
| // Decimal32 and Decimal64 are only Arrow constructs. Parquet does not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment seems not correct?
|
|
||
| #include "arrow/result.h" | ||
| #include "arrow/type_fwd.h" | ||
| #include "parquet/properties.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use forward declaration instead of adding a new include.
| scratch_i32 = reinterpret_cast<int32_t*>(scratch_buffer->mutable_data()); | ||
| scratch_i64 = reinterpret_cast<int64_t*>(scratch_buffer->mutable_data()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we perform the reinterpret_cast right before calling ::arrow::bit_util::ToBigEndian to set it? At that time, you already know the byte_width.
|
Gentle ping @curioustien, do you have time to rebase to resolve the conflicts and address the comments? |
|
@wgtmac thanks for the reminder. I've been quite busy at work lately. I'll try to pick it up this weekend when I have some free time. Otherwise, please feel free to take over if you have time |
I would like to take over the read/write path in C++ code. |
|
@HuaHuaY Feel free to take it over 👍 |
|
I have opened a new PR #47427 and only modified the C++ part. |
|
Let me close this first. Thanks @curioustien! |
Rationale for this change
As described in #44345, the support for arrow decimal32/64 in Parquet is not there yet. This change fully supports arrow decimal32/64 in Parquet by doing the correct conversion between arrow decimal32/64 and Parquet decimal.
What changes are included in this PR?
A few changes in this PR:
Are these changes tested?
Yes
Are there any user-facing changes?
Yes, after this change, any decimals in Parquet will be converted to the corresponding arrow decimal type based on the precision