diff --git a/CMakeLists.txt b/CMakeLists.txt index 5aa8b930..c524ceb5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -460,6 +460,11 @@ if ("${COMPILER_FAMILY}" STREQUAL "clang") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CMAKE_CLANG_OPTIONS}") endif() +if ("${COMPILER_FAMILY}" STREQUAL "msvc") + # MSVC version of -Wno-deprecated + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4996") +endif() + ############################################################ # "make lint" target ############################################################ diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index a470fc18..fe1d4999 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -366,7 +366,7 @@ if (NOT ARROW_FOUND) -DARROW_BUILD_TESTS=OFF) if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "") - set(ARROW_VERSION "0e21f84c2fc26dba949a03ee7d7ebfade0a65b81") # Arrow 0.7.1 + set(ARROW_VERSION "f2806fa518583907a129b2ecb0b7ec8758b69e17") else() set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}") endif() diff --git a/data/fixed_length_decimal.parquet b/data/fixed_length_decimal.parquet new file mode 100644 index 00000000..69fce531 Binary files /dev/null and b/data/fixed_length_decimal.parquet differ diff --git a/data/fixed_length_decimal_legacy.parquet b/data/fixed_length_decimal_legacy.parquet new file mode 100644 index 00000000..b0df62a2 Binary files /dev/null and b/data/fixed_length_decimal_legacy.parquet differ diff --git a/data/int32_decimal.parquet b/data/int32_decimal.parquet new file mode 100644 index 00000000..5bf2d4ea Binary files /dev/null and b/data/int32_decimal.parquet differ diff --git a/data/int64_decimal.parquet b/data/int64_decimal.parquet new file mode 100644 index 00000000..5043bcac Binary files /dev/null and b/data/int64_decimal.parquet differ diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index a18c5650..0e0831ec 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -24,6 +24,7 @@ #include "gtest/gtest.h" #include +#include #include "parquet/api/reader.h" #include "parquet/api/writer.h" @@ -37,13 +38,13 @@ #include "arrow/api.h" #include "arrow/test-util.h" +#include "arrow/util/decimal.h" using arrow::Array; using arrow::ArrayVisitor; using arrow::Buffer; using arrow::ChunkedArray; using arrow::Column; -using arrow::EncodeArrayToDictionary; using arrow::ListArray; using arrow::PoolBuffer; using arrow::PrimitiveArray; @@ -51,6 +52,9 @@ using arrow::Status; using arrow::Table; using arrow::TimeUnit; using arrow::default_memory_pool; +using arrow::compute::DictionaryEncode; +using arrow::compute::FunctionContext; +using arrow::compute::Datum; using arrow::io::BufferReader; using arrow::test::randint; @@ -68,10 +72,10 @@ using ColumnVector = std::vector>; namespace parquet { namespace arrow { -const int SMALL_SIZE = 100; -const int LARGE_SIZE = 10000; +static constexpr int SMALL_SIZE = 100; +static constexpr int LARGE_SIZE = 10000; -constexpr uint32_t kDefaultSeed = 0; +static constexpr uint32_t kDefaultSeed = 0; LogicalType::type get_logical_type(const ::arrow::DataType& type) { switch (type.id()) { @@ -118,6 +122,8 @@ LogicalType::type get_logical_type(const ::arrow::DataType& type) { static_cast(type); return get_logical_type(*dict_type.dictionary()->type()); } + case ArrowId::DECIMAL: + return LogicalType::DECIMAL; default: break; } @@ -147,6 +153,7 @@ ParquetType::type get_physical_type(const ::arrow::DataType& type) { case ArrowId::STRING: return ParquetType::BYTE_ARRAY; case ArrowId::FIXED_SIZE_BINARY: + case ArrowId::DECIMAL: return ParquetType::FIXED_LEN_BYTE_ARRAY; case ArrowId::DATE32: return ParquetType::INT32; @@ -299,6 +306,7 @@ struct test_traits<::arrow::FixedSizeBinaryType> { const std::string test_traits<::arrow::StringType>::value("Test"); // NOLINT const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03"); // NOLINT const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed"); // NOLINT + template using ParquetDataType = DataType::parquet_enum>; @@ -342,28 +350,44 @@ void DoSimpleRoundtrip(const std::shared_ptr& table, int num_threads, static std::shared_ptr MakeSimpleSchema(const ::arrow::DataType& type, Repetition::type repetition) { - int byte_width; - // Decimal is not implemented yet. + int32_t byte_width = -1; + int32_t precision = -1; + int32_t scale = -1; + switch (type.id()) { case ::arrow::Type::DICTIONARY: { - const ::arrow::DictionaryType& dict_type = - static_cast(type); + const auto& dict_type = static_cast(type); const ::arrow::DataType& values_type = *dict_type.dictionary()->type(); - if (values_type.id() == ::arrow::Type::FIXED_SIZE_BINARY) { - byte_width = - static_cast(values_type).byte_width(); - } else { - byte_width = -1; + switch (values_type.id()) { + case ::arrow::Type::FIXED_SIZE_BINARY: + byte_width = + static_cast(values_type).byte_width(); + break; + case ::arrow::Type::DECIMAL: { + const auto& decimal_type = + static_cast(values_type); + precision = decimal_type.precision(); + scale = decimal_type.scale(); + byte_width = DecimalSize(precision); + } break; + default: + break; } } break; case ::arrow::Type::FIXED_SIZE_BINARY: byte_width = static_cast(type).byte_width(); break; + case ::arrow::Type::DECIMAL: { + const auto& decimal_type = static_cast(type); + precision = decimal_type.precision(); + scale = decimal_type.scale(); + byte_width = DecimalSize(precision); + } break; default: - byte_width = -1; + break; } auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type), - get_logical_type(type), byte_width); + get_logical_type(type), byte_width, precision, scale); NodePtr node_ = GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); return std::static_pointer_cast(node_); @@ -371,7 +395,7 @@ static std::shared_ptr MakeSimpleSchema(const ::arrow::DataType& type namespace internal { -void AssertArraysEqual(const Array &expected, const Array &actual) { +void AssertArraysEqual(const Array& expected, const Array& actual) { if (!actual.Equals(expected)) { std::stringstream pp_result; std::stringstream pp_expected; @@ -526,11 +550,19 @@ class TestParquetIO : public ::testing::Test { // There we write an UInt32 Array but receive an Int64 Array as result for // Parquet version 1.0. -typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, - ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, - ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::Date32Type, - ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType, - ::arrow::BinaryType, ::arrow::FixedSizeBinaryType> +typedef ::testing::Types< + ::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, + ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, + ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType, + ::arrow::BinaryType, ::arrow::FixedSizeBinaryType, DecimalWithPrecisionAndScale<1>, + DecimalWithPrecisionAndScale<3>, DecimalWithPrecisionAndScale<5>, + DecimalWithPrecisionAndScale<7>, DecimalWithPrecisionAndScale<10>, + DecimalWithPrecisionAndScale<12>, DecimalWithPrecisionAndScale<15>, + DecimalWithPrecisionAndScale<17>, DecimalWithPrecisionAndScale<19>, + DecimalWithPrecisionAndScale<22>, DecimalWithPrecisionAndScale<23>, + DecimalWithPrecisionAndScale<24>, DecimalWithPrecisionAndScale<27>, + DecimalWithPrecisionAndScale<29>, DecimalWithPrecisionAndScale<32>, + DecimalWithPrecisionAndScale<34>, DecimalWithPrecisionAndScale<38>> TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); @@ -590,8 +622,10 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) { ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); - std::shared_ptr dict_values; - ASSERT_OK(EncodeArrayToDictionary(*values, default_memory_pool(), &dict_values)); + Datum out; + FunctionContext ctx(default_memory_pool()); + ASSERT_OK(DictionaryEncode(&ctx, Datum(values), &out)); + std::shared_ptr dict_values = MakeArray(out.array()); std::shared_ptr schema = MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL); this->WriteColumn(schema, dict_values); @@ -856,25 +890,43 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { ASSERT_OK_NO_THROW( WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, properties)); - std::shared_ptr expected_values; std::shared_ptr int64_data = std::make_shared(::arrow::default_memory_pool()); { ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length())); - int64_t* int64_data_ptr = reinterpret_cast(int64_data->mutable_data()); - const uint32_t* uint32_data_ptr = - reinterpret_cast(values->values()->data()); - // std::copy might be faster but this is explicit on the casts) - for (int64_t i = 0; i < values->length(); i++) { - int64_data_ptr[i] = static_cast(uint32_data_ptr[i]); - } + auto int64_data_ptr = reinterpret_cast(int64_data->mutable_data()); + auto uint32_data_ptr = reinterpret_cast(values->values()->data()); + const auto cast_uint32_to_int64 = [](uint32_t value) { + return static_cast(value); + }; + std::transform(uint32_data_ptr, uint32_data_ptr + values->length(), int64_data_ptr, + cast_uint32_to_int64); } std::vector> buffers{values->null_bitmap(), int64_data}; auto arr_data = std::make_shared<::arrow::ArrayData>(::arrow::int64(), values->length(), buffers, values->null_count()); - ASSERT_OK(MakeArray(arr_data, &expected_values)); - this->ReadAndCheckSingleColumnTable(expected_values); + std::shared_ptr expected_values = MakeArray(arr_data); + ASSERT_NE(expected_values, NULLPTR); + + const auto& expected = static_cast(*expected_values); + ASSERT_GT(values->length(), 0); + ASSERT_EQ(values->length(), expected.length()); + + // TODO(phillipc): Is there a better way to compare these two arrays? + // AssertArraysEqual requires the same type, but we only care about values in this case + for (int i = 0; i < expected.length(); ++i) { + const bool value_is_valid = values->IsValid(i); + const bool expected_value_is_valid = expected.IsValid(i); + + ASSERT_EQ(expected_value_is_valid, value_is_valid); + + if (value_is_valid) { + uint32_t value = values->Value(i); + int64_t expected_value = expected.Value(i); + ASSERT_EQ(expected_value, static_cast(value)); + } + } } using TestStringParquetIO = TestParquetIO<::arrow::StringType>; @@ -1432,7 +1484,7 @@ void MakeListTable(int num_rows, std::shared_ptr
* out) { offset_values.push_back(total_elements); std::vector value_draws; - randint(total_elements, 0, 100, &value_draws); + randint(total_elements, 0, 100, &value_draws); std::vector is_valid; random_is_valid(total_elements, 0.1, &is_valid); @@ -1889,6 +1941,61 @@ TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) { ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table)); } +class TestArrowReaderAdHocSpark + : public ::testing::TestWithParam< + std::tuple>> {}; + +TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) { + std::string path(std::getenv("PARQUET_TEST_DATA")); + + std::string filename; + std::shared_ptr<::arrow::DataType> decimal_type; + std::tie(filename, decimal_type) = GetParam(); + + path += "/" + filename; + ASSERT_GT(path.size(), 0); + + auto pool = ::arrow::default_memory_pool(); + + std::unique_ptr arrow_reader; + ASSERT_NO_THROW( + arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false)))); + std::shared_ptr<::arrow::Table> table; + ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table)); + + ASSERT_EQ(1, table->num_columns()); + + constexpr int32_t expected_length = 24; + + auto value_column = table->column(0); + ASSERT_EQ(expected_length, value_column->length()); + + auto raw_array = value_column->data(); + ASSERT_EQ(1, raw_array->num_chunks()); + + auto chunk = raw_array->chunk(0); + + std::shared_ptr expected_array; + + ::arrow::Decimal128Builder builder(decimal_type, pool); + + for (int32_t i = 0; i < expected_length; ++i) { + ::arrow::Decimal128 value((i + 1) * 100); + ASSERT_OK(builder.Append(value)); + } + ASSERT_OK(builder.Finish(&expected_array)); + + internal::AssertArraysEqual(*expected_array, *chunk); +} + +INSTANTIATE_TEST_CASE_P( + ReadDecimals, TestArrowReaderAdHocSpark, + ::testing::Values( + std::make_tuple("int32_decimal.parquet", ::arrow::decimal(4, 2)), + std::make_tuple("int64_decimal.parquet", ::arrow::decimal(10, 2)), + std::make_tuple("fixed_length_decimal.parquet", ::arrow::decimal(25, 2)), + std::make_tuple("fixed_length_decimal_legacy.parquet", ::arrow::decimal(13, 2)))); + } // namespace arrow } // namespace parquet diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index a7a62c57..7ed9ad83 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -51,7 +51,7 @@ const auto TIMESTAMP_MS = ::arrow::timestamp(TimeUnit::MILLI); const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO); const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO); const auto BINARY = ::arrow::binary(); -const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4); +const auto DECIMAL_8_4 = std::make_shared<::arrow::Decimal128Type>(8, 4); class TestConvertParquetSchema : public ::testing::Test { public: @@ -62,8 +62,8 @@ class TestConvertParquetSchema : public ::testing::Test { for (int i = 0; i < expected_schema->num_fields(); ++i) { auto lhs = result_schema_->field(i); auto rhs = expected_schema->field(i); - EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString() - << " != " << rhs->ToString(); + EXPECT_TRUE(lhs->Equals(rhs)) + << i << " " << lhs->ToString() << " != " << rhs->ToString(); } } diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 5edc8379..3ca49cb4 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -29,6 +29,7 @@ #include "arrow/api.h" #include "arrow/util/bit-util.h" +#include "arrow/util/decimal.h" #include "arrow/util/logging.h" #include "arrow/util/parallel.h" @@ -716,7 +717,8 @@ struct supports_fast_path_impl { template using supports_fast_path = - typename std::enable_if::value>::type; + typename std::enable_if::value, + ParquetType>::type; template struct TransferFunctor { @@ -868,12 +870,248 @@ struct TransferFunctor< // Convert from BINARY type to STRING auto new_data = (*out)->data()->ShallowCopy(); new_data->type = type; - RETURN_NOT_OK(::arrow::MakeArray(new_data, out)); + *out = ::arrow::MakeArray(new_data); } return Status::OK(); } }; +static uint64_t BytesToInteger(const uint8_t* bytes, int32_t start, int32_t stop) { + using ::arrow::BitUtil::FromBigEndian; + + const int32_t length = stop - start; + + DCHECK_GE(length, 0); + DCHECK_LE(length, 8); + + switch (length) { + case 0: + return 0; + case 1: + return bytes[start]; + case 2: + return FromBigEndian(*reinterpret_cast(bytes + start)); + case 3: { + const uint64_t first_two_bytes = + FromBigEndian(*reinterpret_cast(bytes + start)); + const uint64_t last_byte = bytes[stop - 1]; + return first_two_bytes << 8 | last_byte; + } + case 4: + return FromBigEndian(*reinterpret_cast(bytes + start)); + case 5: { + const uint64_t first_four_bytes = + FromBigEndian(*reinterpret_cast(bytes + start)); + const uint64_t last_byte = bytes[stop - 1]; + return first_four_bytes << 8 | last_byte; + } + case 6: { + const uint64_t first_four_bytes = + FromBigEndian(*reinterpret_cast(bytes + start)); + const uint64_t last_two_bytes = + FromBigEndian(*reinterpret_cast(bytes + start + 4)); + return first_four_bytes << 16 | last_two_bytes; + } + case 7: { + const uint64_t first_four_bytes = + FromBigEndian(*reinterpret_cast(bytes + start)); + const uint64_t second_two_bytes = + FromBigEndian(*reinterpret_cast(bytes + start + 4)); + const uint64_t last_byte = bytes[stop - 1]; + return first_four_bytes << 24 | second_two_bytes << 8 | last_byte; + } + case 8: + return FromBigEndian(*reinterpret_cast(bytes + start)); + default: { + DCHECK(false); + return UINT64_MAX; + } + } +} + +static constexpr int32_t kMinDecimalBytes = 1; +static constexpr int32_t kMaxDecimalBytes = 16; + +/// \brief Convert a sequence of big-endian bytes to one int64_t (high bits) and one +/// uint64_t (low bits). +static void BytesToIntegerPair(const uint8_t* bytes, + const int32_t total_number_of_bytes_used, int64_t* high, + uint64_t* low) { + DCHECK_GE(total_number_of_bytes_used, kMinDecimalBytes); + DCHECK_LE(total_number_of_bytes_used, kMaxDecimalBytes); + + /// Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the + /// sign bit. + const bool is_negative = static_cast(bytes[0]) < 0; + + /// Sign extend the low bits if necessary + *low = UINT64_MAX * (is_negative && total_number_of_bytes_used < 8); + *high = -1 * (is_negative && total_number_of_bytes_used < kMaxDecimalBytes); + + /// Stop byte of the high bytes + const int32_t high_bits_offset = std::max(0, total_number_of_bytes_used - 8); + + /// Shift left enough bits to make room for the incoming int64_t + *high <<= high_bits_offset * CHAR_BIT; + + /// Preserve the upper bits by inplace OR-ing the int64_t + *high |= BytesToInteger(bytes, 0, high_bits_offset); + + /// Stop byte of the low bytes + const int32_t low_bits_offset = std::min(total_number_of_bytes_used, 8); + + /// Shift left enough bits to make room for the incoming uint64_t + *low <<= low_bits_offset * CHAR_BIT; + + /// Preserve the upper bits by inplace OR-ing the uint64_t + *low |= BytesToInteger(bytes, high_bits_offset, total_number_of_bytes_used); +} + +static inline void RawBytesToDecimalBytes(const uint8_t* value, int32_t byte_width, + uint8_t* out_buf) { + // view the first 8 bytes as an unsigned 64-bit integer + auto low = reinterpret_cast(out_buf); + + // view the second 8 bytes as a signed 64-bit integer + auto high = reinterpret_cast(out_buf + sizeof(uint64_t)); + + // Convert the fixed size binary array bytes into a Decimal128 compatible layout + BytesToIntegerPair(value, byte_width, high, low); +} + +/// \brief Convert an array of FixedLenByteArrays to an arrow::Decimal128Array +/// We do this by: +/// 1. Creating a arrow::FixedSizeBinaryArray from the RecordReader's builder +/// 2. Allocating a buffer for the arrow::Decimal128Array +/// 3. Converting the big-endian bytes in the FixedSizeBinaryArray to two integers +/// representing the high and low bits of each decimal value. +template <> +struct TransferFunctor<::arrow::Decimal128Type, FLBAType> { + Status operator()(RecordReader* reader, MemoryPool* pool, + const std::shared_ptr<::arrow::DataType>& type, + std::shared_ptr* out) { + DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL); + + // Finish the built data into a temporary array + std::shared_ptr array; + RETURN_NOT_OK(reader->builder()->Finish(&array)); + const auto& fixed_size_binary_array = + static_cast(*array); + + // Get the byte width of the values in the FixedSizeBinaryArray. Most of the time + // this will be different from the decimal array width because we write the minimum + // number of bytes necessary to represent a given precision + const int32_t byte_width = + static_cast(*fixed_size_binary_array.type()) + .byte_width(); + + // The byte width of each decimal value + const int32_t type_length = + static_cast(*type).byte_width(); + + // number of elements in the entire array + const int64_t length = fixed_size_binary_array.length(); + + // allocate memory for the decimal array + std::shared_ptr data; + RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data)); + + // raw bytes that we can write to + uint8_t* out_ptr = data->mutable_data(); + + // convert each FixedSizeBinary value to valid decimal bytes + const int64_t null_count = fixed_size_binary_array.null_count(); + if (null_count > 0) { + for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { + if (!fixed_size_binary_array.IsNull(i)) { + RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, + out_ptr); + } + } + } else { + for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { + RawBytesToDecimalBytes(fixed_size_binary_array.GetValue(i), byte_width, out_ptr); + } + } + + *out = std::make_shared<::arrow::Decimal128Array>( + type, length, data, fixed_size_binary_array.null_bitmap(), null_count); + return Status::OK(); + } +}; + +/// \brief Convert an Int32 or Int64 array into a Decimal128Array +/// The parquet spec allows systems to write decimals in int32, int64 if the values are +/// small enough to fit in less 4 bytes or less than 8 bytes, respectively. +/// This function implements the conversion from int32 and int64 arrays to decimal arrays. +template ::value || + std::is_same::value>::type> +static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, + const std::shared_ptr<::arrow::DataType>& type, + std::shared_ptr* out) { + DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL); + + const int64_t length = reader->values_written(); + + using ElementType = typename ParquetIntegerType::c_type; + static_assert(std::is_same::value || + std::is_same::value, + "ElementType must be int32_t or int64_t"); + + const auto values = reinterpret_cast(reader->values()); + + const auto& decimal_type = static_cast(*type); + const int64_t type_length = decimal_type.byte_width(); + + std::shared_ptr data; + RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data)); + uint8_t* out_ptr = data->mutable_data(); + + using ::arrow::BitUtil::FromLittleEndian; + + for (int64_t i = 0; i < length; ++i, out_ptr += type_length) { + // sign/zero extend int32_t values, otherwise a no-op + const auto value = static_cast(values[i]); + + auto out_ptr_view = reinterpret_cast(out_ptr); + + // No-op on little endian machines, byteswap on big endian + out_ptr_view[0] = FromLittleEndian(static_cast(value)); + + // no need to byteswap here because we're sign/zero extending exactly 8 bytes + out_ptr_view[1] = static_cast(value < 0 ? -1 : 0); + } + + if (reader->nullable_values()) { + std::shared_ptr is_valid = reader->ReleaseIsValid(); + *out = std::make_shared<::arrow::Decimal128Array>(type, length, data, is_valid, + reader->null_count()); + } else { + *out = std::make_shared<::arrow::Decimal128Array>(type, length, data); + } + return Status::OK(); +} + +template <> +struct TransferFunctor<::arrow::Decimal128Type, Int32Type> { + Status operator()(RecordReader* reader, MemoryPool* pool, + const std::shared_ptr<::arrow::DataType>& type, + std::shared_ptr* out) { + return DecimalIntegerTransfer(reader, pool, type, out); + } +}; + +template <> +struct TransferFunctor<::arrow::Decimal128Type, Int64Type> { + Status operator()(RecordReader* reader, MemoryPool* pool, + const std::shared_ptr<::arrow::DataType>& type, + std::shared_ptr* out) { + return DecimalIntegerTransfer(reader, pool, type, out); + } +}; + #define TRANSFER_DATA(ArrowType, ParquetType) \ TransferFunctor func; \ RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), out)); \ @@ -932,6 +1170,22 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr* TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type) TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type) TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType) + case ::arrow::Type::DECIMAL: { + switch (descr_->physical_type()) { + case ::parquet::Type::INT32: { + TRANSFER_DATA(::arrow::Decimal128Type, Int32Type); + } break; + case ::parquet::Type::INT64: { + TRANSFER_DATA(::arrow::Decimal128Type, Int64Type); + } break; + case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { + TRANSFER_DATA(::arrow::Decimal128Type, FLBAType); + } break; + default: + return Status::Invalid( + "Physical type for decimal must be int32, int64, or fixed length binary"); + } + } break; case ::arrow::Type::TIMESTAMP: { ::arrow::TimestampType* timestamp_type = static_cast<::arrow::TimestampType*>(field_->type().get()); @@ -946,8 +1200,7 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr* default: return Status::NotImplemented("TimeUnit not supported"); } - break; - } + } break; TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type) TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type) default: diff --git a/src/parquet/arrow/record_reader.cc b/src/parquet/arrow/record_reader.cc index 7275d2f8..6405ee77 100644 --- a/src/parquet/arrow/record_reader.cc +++ b/src/parquet/arrow/record_reader.cc @@ -83,7 +83,7 @@ class RecordReader::RecordReaderImpl { Reset(); } - virtual ~RecordReaderImpl() {} + virtual ~RecordReaderImpl() = default; virtual int64_t ReadRecords(int64_t num_records) = 0; diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index e16a1afb..41da9fb3 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -25,6 +25,7 @@ #include "parquet/util/schema-util.h" #include "arrow/api.h" +#include "arrow/util/logging.h" using arrow::Field; using arrow::Status; @@ -49,10 +50,9 @@ const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI); const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO); const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO); -TypePtr MakeDecimalType(const PrimitiveNode& node) { - int precision = node.decimal_metadata().precision; - int scale = node.decimal_metadata().scale; - return std::make_shared<::arrow::DecimalType>(precision, scale); +TypePtr MakeDecimal128Type(const PrimitiveNode& node) { + const auto& metadata = node.decimal_metadata(); + return ::arrow::decimal(metadata.precision, metadata.scale); } static Status FromByteArray(const PrimitiveNode& node, TypePtr* out) { @@ -61,7 +61,7 @@ static Status FromByteArray(const PrimitiveNode& node, TypePtr* out) { *out = ::arrow::utf8(); break; case LogicalType::DECIMAL: - *out = MakeDecimalType(node); + *out = MakeDecimal128Type(node); break; default: // BINARY @@ -77,7 +77,7 @@ static Status FromFLBA(const PrimitiveNode& node, TypePtr* out) { *out = ::arrow::fixed_size_binary(node.type_length()); break; case LogicalType::DECIMAL: - *out = MakeDecimalType(node); + *out = MakeDecimal128Type(node); break; default: std::stringstream ss; @@ -120,7 +120,7 @@ static Status FromInt32(const PrimitiveNode& node, TypePtr* out) { *out = ::arrow::time32(::arrow::TimeUnit::MILLI); break; case LogicalType::DECIMAL: - *out = MakeDecimalType(node); + *out = MakeDecimal128Type(node); break; default: std::stringstream ss; @@ -144,7 +144,7 @@ static Status FromInt64(const PrimitiveNode& node, TypePtr* out) { *out = ::arrow::uint64(); break; case LogicalType::DECIMAL: - *out = MakeDecimalType(node); + *out = MakeDecimal128Type(node); break; case LogicalType::TIMESTAMP_MILLIS: *out = TIMESTAMP_MS; @@ -473,7 +473,10 @@ Status FieldToNode(const std::shared_ptr& field, ParquetType::type type; Repetition::type repetition = field->nullable() ? Repetition::OPTIONAL : Repetition::REQUIRED; + int length = -1; + int precision = -1; + int scale = -1; switch (field->type()->id()) { case ArrowType::NA: @@ -532,9 +535,18 @@ Status FieldToNode(const std::shared_ptr& field, break; case ArrowType::FIXED_SIZE_BINARY: { type = ParquetType::FIXED_LEN_BYTE_ARRAY; - auto fixed_size_binary_type = - static_cast<::arrow::FixedSizeBinaryType*>(field->type().get()); - length = fixed_size_binary_type->byte_width(); + const auto& fixed_size_binary_type = + static_cast(*field->type()); + length = fixed_size_binary_type.byte_width(); + } break; + case ArrowType::DECIMAL: { + type = ParquetType::FIXED_LEN_BYTE_ARRAY; + logical_type = LogicalType::DECIMAL; + const auto& decimal_type = + static_cast(*field->type()); + precision = decimal_type.precision(); + scale = decimal_type.scale(); + length = DecimalSize(precision); } break; case ArrowType::DATE32: type = ParquetType::INT32; @@ -565,12 +577,12 @@ Status FieldToNode(const std::shared_ptr& field, auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type()); return StructToNode(struct_type, field->name(), field->nullable(), properties, arrow_properties, out); - } break; + } case ArrowType::LIST: { auto list_type = std::static_pointer_cast<::arrow::ListType>(field->type()); return ListToNode(list_type, field->name(), field->nullable(), properties, arrow_properties, out); - } break; + } case ArrowType::DICTIONARY: { // Parquet has no Dictionary type, dictionary-encoded is handled on // the encoding, not the schema level. @@ -582,14 +594,15 @@ Status FieldToNode(const std::shared_ptr& field, return FieldToNode(unpacked_field, properties, arrow_properties, out); } default: { - // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR + // TODO: DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL_TEXT, VARCHAR std::stringstream ss; ss << "Unhandled type for Arrow to Parquet schema conversion: "; ss << field->type()->ToString(); return Status::NotImplemented(ss.str()); } } - *out = PrimitiveNode::Make(field->name(), repetition, type, logical_type, length); + *out = PrimitiveNode::Make(field->name(), repetition, type, logical_type, length, + precision, scale); return Status::OK(); } @@ -617,5 +630,77 @@ Status ToParquetSchema(const ::arrow::Schema* arrow_schema, out); } +/// \brief Compute the number of bytes required to represent a decimal of a +/// given precision. Taken from the Apache Impala codebase. The comments next +/// to the return values are the maximum value that can be represented in 2's +/// complement with the returned number of bytes. +int32_t DecimalSize(int32_t precision) { + DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got " + << precision; + DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got " + << precision; + + switch (precision) { + case 1: + case 2: + return 1; // 127 + case 3: + case 4: + return 2; // 32,767 + case 5: + case 6: + return 3; // 8,388,607 + case 7: + case 8: + case 9: + return 4; // 2,147,483,427 + case 10: + case 11: + return 5; // 549,755,813,887 + case 12: + case 13: + case 14: + return 6; // 140,737,488,355,327 + case 15: + case 16: + return 7; // 36,028,797,018,963,967 + case 17: + case 18: + return 8; // 9,223,372,036,854,775,807 + case 19: + case 20: + case 21: + return 9; // 2,361,183,241,434,822,606,847 + case 22: + case 23: + return 10; // 604,462,909,807,314,587,353,087 + case 24: + case 25: + case 26: + return 11; // 154,742,504,910,672,534,362,390,527 + case 27: + case 28: + return 12; // 39,614,081,257,132,168,796,771,975,167 + case 29: + case 30: + case 31: + return 13; // 10,141,204,801,825,835,211,973,625,643,007 + case 32: + case 33: + return 14; // 2,596,148,429,267,413,814,265,248,164,610,047 + case 34: + case 35: + return 15; // 664,613,997,892,457,936,451,903,530,140,172,287 + case 36: + case 37: + case 38: + return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727 + default: + DCHECK(false); + break; + } + return -1; +} + } // namespace arrow } // namespace parquet diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h index de153eb7..3b212da7 100644 --- a/src/parquet/arrow/schema.h +++ b/src/parquet/arrow/schema.h @@ -85,6 +85,8 @@ ::arrow::Status PARQUET_EXPORT ToParquetSchema(const ::arrow::Schema* arrow_sche const WriterProperties& properties, std::shared_ptr* out); +int32_t DecimalSize(int32_t precision); + } // namespace arrow } // namespace parquet diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index 954a84f3..8611a303 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. +#include #include #include #include "arrow/api.h" #include "arrow/test-util.h" #include "arrow/type_traits.h" +#include "arrow/util/decimal.h" namespace parquet { namespace arrow { @@ -28,6 +30,16 @@ namespace arrow { using ::arrow::Array; using ::arrow::Status; +template +struct DecimalWithPrecisionAndScale { + static_assert(PRECISION >= 1 && PRECISION <= 38, "Invalid precision value"); + + using type = ::arrow::Decimal128Type; + static constexpr ::arrow::Type::type type_id = ::arrow::Decimal128Type::type_id; + static constexpr int32_t precision = PRECISION; + static constexpr int32_t scale = PRECISION - 1; +}; + template using is_arrow_float = std::is_floating_point; @@ -52,8 +64,10 @@ using is_arrow_bool = std::is_same; template typename std::enable_if::value, Status>::type NonNullArray( size_t size, std::shared_ptr* out) { - std::vector values; - ::arrow::test::random_real(size, 0, 0, 1, &values); + using c_type = typename ArrowType::c_type; + std::vector values; + ::arrow::test::random_real(size, 0, static_cast(0), static_cast(1), + &values); ::arrow::NumericBuilder builder; RETURN_NOT_OK(builder.Append(values.data(), values.size())); return builder.Finish(out); @@ -64,7 +78,7 @@ typename std::enable_if< is_arrow_int::value && !is_arrow_date::value, Status>::type NonNullArray(size_t size, std::shared_ptr* out) { std::vector values; - ::arrow::test::randint(size, 0, 64, &values); + ::arrow::test::randint(size, 0, 64, &values); // Passing data type so this will work with TimestampType too ::arrow::NumericBuilder builder(std::make_shared(), @@ -77,7 +91,7 @@ template typename std::enable_if::value, Status>::type NonNullArray( size_t size, std::shared_ptr* out) { std::vector values; - ::arrow::test::randint(size, 0, 64, &values); + ::arrow::test::randint(size, 0, 64, &values); for (size_t i = 0; i < size; i++) { values[i] *= 86400000; } @@ -114,11 +128,54 @@ NonNullArray(size_t size, std::shared_ptr* out) { return builder.Finish(out); } +static inline void random_decimals(int64_t n, uint32_t seed, int32_t precision, + uint8_t* out) { + std::mt19937 gen(seed); + std::uniform_int_distribution d(0, std::numeric_limits::max()); + const int32_t required_bytes = DecimalSize(precision); + constexpr int32_t byte_width = 16; + std::fill(out, out + byte_width * n, '\0'); + + for (int64_t i = 0; i < n; ++i, out += byte_width) { + std::generate(out, out + required_bytes, + [&d, &gen] { return static_cast(d(gen)); }); + + // sign extend if the sign bit is set for the last byte generated + // 0b10000000 == 0x80 == 128 + if ((out[required_bytes - 1] & '\x80') != 0) { + std::fill(out + required_bytes, out + byte_width, '\xFF'); + } + } +} + +template +typename std::enable_if< + std::is_same>::value, Status>::type +NonNullArray(size_t size, std::shared_ptr* out) { + constexpr int32_t kDecimalPrecision = precision; + constexpr int32_t kDecimalScale = DecimalWithPrecisionAndScale::scale; + + const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale); + ::arrow::Decimal128Builder builder(type); + const int32_t byte_width = + static_cast(*type).byte_width(); + + constexpr int32_t seed = 0; + + std::shared_ptr out_buf; + RETURN_NOT_OK(::arrow::AllocateBuffer(::arrow::default_memory_pool(), size * byte_width, + &out_buf)); + random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data()); + + RETURN_NOT_OK(builder.Append(out_buf->data(), size)); + return builder.Finish(out); +} + template typename std::enable_if::value, Status>::type NonNullArray( size_t size, std::shared_ptr* out) { std::vector values; - ::arrow::test::randint(size, 0, 1, &values); + ::arrow::test::randint(size, 0, 1, &values); ::arrow::BooleanBuilder builder; RETURN_NOT_OK(builder.Append(values.data(), values.size())); return builder.Finish(out); @@ -128,9 +185,10 @@ typename std::enable_if::value, Status>::type NonNullAr template typename std::enable_if::value, Status>::type NullableArray( size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr* out) { - std::vector values; - ::arrow::test::random_real(size, seed, -1e10, 1e10, - &values); + using c_type = typename ArrowType::c_type; + std::vector values; + ::arrow::test::random_real(size, seed, static_cast(-1e10), + static_cast(1e10), &values); std::vector valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { @@ -151,7 +209,7 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr(size, 0, 64, &values); + ::arrow::test::randint(size, 0, 64, &values); std::vector valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { @@ -172,7 +230,7 @@ typename std::enable_if::value, Status>::type NullableA // Seed is random in Arrow right now (void)seed; - ::arrow::test::randint(size, 0, 64, &values); + ::arrow::test::randint(size, 0, 64, &values); for (size_t i = 0; i < size; i++) { values[i] *= 86400000; } @@ -246,6 +304,34 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, return builder.Finish(out); } +template +typename std::enable_if< + std::is_same>::value, Status>::type +NullableArray(size_t size, size_t num_nulls, uint32_t seed, + std::shared_ptr<::arrow::Array>* out) { + std::vector valid_bytes(size, '\1'); + + for (size_t i = 0; i < num_nulls; ++i) { + valid_bytes[i * 2] = '\0'; + } + + constexpr int32_t kDecimalPrecision = precision; + constexpr int32_t kDecimalScale = DecimalWithPrecisionAndScale::scale; + const auto type = ::arrow::decimal(kDecimalPrecision, kDecimalScale); + const int32_t byte_width = + static_cast(*type).byte_width(); + + std::shared_ptr<::arrow::Buffer> out_buf; + RETURN_NOT_OK(::arrow::AllocateBuffer(::arrow::default_memory_pool(), size * byte_width, + &out_buf)); + + random_decimals(size, seed, precision, out_buf->mutable_data()); + + ::arrow::Decimal128Builder builder(type); + RETURN_NOT_OK(builder.Append(out_buf->data(), size, valid_bytes.data())); + return builder.Finish(out); +} + // This helper function only supports (size/2) nulls yet. template typename std::enable_if::value, Status>::type NullableArray( @@ -255,7 +341,7 @@ typename std::enable_if::value, Status>::type NullableA // Seed is random in Arrow right now (void)seed; - ::arrow::test::randint(size, 0, 1, &values); + ::arrow::test::randint(size, 0, 1, &values); std::vector valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index b53c1cac..1f3fc7e6 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -32,6 +32,7 @@ using arrow::Array; using arrow::BinaryArray; using arrow::FixedSizeBinaryArray; +using arrow::Decimal128Array; using arrow::BooleanArray; using arrow::Int16Array; using arrow::Int16Builder; @@ -104,7 +105,6 @@ class LevelBuilder { NOT_IMPLEMENTED_VISIT(Struct) NOT_IMPLEMENTED_VISIT(Union) - NOT_IMPLEMENTED_VISIT(Decimal) NOT_IMPLEMENTED_VISIT(Dictionary) NOT_IMPLEMENTED_VISIT(Interval) @@ -743,8 +743,6 @@ Status FileWriter::Impl::TypedWriteBatch( buffer_ptr[i] = ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]); } - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); } else { int buffer_idx = 0; for (int64_t i = 0; i < data->length(); i++) { @@ -753,9 +751,9 @@ Status FileWriter::Impl::TypedWriteBatch( ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]); } } - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); PARQUET_CATCH_NOT_OK(writer->Close()); return Status::OK(); } @@ -765,29 +763,82 @@ Status FileWriter::Impl::TypedWriteBatch ColumnWriter* column_writer, const std::shared_ptr& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(FLBA), false)); - auto data = static_cast(array.get()); + const auto& data = static_cast(*array); + const int64_t length = data.length(); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); auto writer = reinterpret_cast*>(column_writer); - if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) { + if (writer->descr()->schema_node()->is_required() || data.null_count() == 0) { // no nulls, just dump the data // todo(advancedxy): use a writeBatch to avoid this step - for (int64_t i = 0; i < data->length(); i++) { - buffer_ptr[i] = FixedLenByteArray(data->GetValue(i)); + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = FixedLenByteArray(data.GetValue(i)); } - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); } else { int buffer_idx = 0; - for (int64_t i = 0; i < data->length(); i++) { - if (!data->IsNull(i)) { - buffer_ptr[buffer_idx++] = FixedLenByteArray(data->GetValue(i)); + for (int64_t i = 0; i < length; i++) { + if (!data.IsNull(i)) { + buffer_ptr[buffer_idx++] = FixedLenByteArray(data.GetValue(i)); } } - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + +template <> +Status FileWriter::Impl::TypedWriteBatch( + ColumnWriter* column_writer, const std::shared_ptr& array, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels) { + const auto& data = static_cast(*array); + const int64_t length = data.length(); + + // TODO(phillipc): This is potentially very wasteful if we have a lot of nulls + std::vector big_endian_values(static_cast(length) * 2); + + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(FLBA), false)); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + + auto writer = reinterpret_cast*>(column_writer); + + const auto& decimal_type = static_cast(*data.type()); + const int32_t offset = + decimal_type.byte_width() - DecimalSize(decimal_type.precision()); + + const bool does_not_have_nulls = + writer->descr()->schema_node()->is_required() || data.null_count() == 0; + + // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we + // don't have to keep writing two loops to handle the case where we know there are no + // nulls + if (does_not_have_nulls) { + // no nulls, just dump the data + // todo(advancedxy): use a writeBatch to avoid this step + for (int64_t i = 0, j = 0; i < length; ++i, j += 2) { + auto unsigned_64_bit = reinterpret_cast(data.GetValue(i)); + big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); + big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); + buffer_ptr[i] = FixedLenByteArray( + reinterpret_cast(&big_endian_values[j]) + offset); + } + } else { + for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) { + if (!data.IsNull(i)) { + auto unsigned_64_bit = reinterpret_cast(data.GetValue(i)); + big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]); + big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]); + buffer_ptr[buffer_idx++] = FixedLenByteArray( + reinterpret_cast(&big_endian_values[j]) + offset); + j += 2; + } + } + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); PARQUET_CATCH_NOT_OK(writer->Close()); return Status::OK(); } @@ -896,6 +947,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) { WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType) WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType) WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType) + WRITE_BATCH_CASE(DECIMAL, Decimal128Type, FLBAType) WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type) WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type) WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type) diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h index be387522..3284aca6 100644 --- a/src/parquet/encoding-internal.h +++ b/src/parquet/encoding-internal.h @@ -420,11 +420,9 @@ inline void DictionaryDecoder::SetDict(Decoder* dictionary) PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false)); uint8_t* bytes_data = byte_array_data_->mutable_data(); - int offset = 0; - for (int i = 0; i < num_dictionary_values; ++i) { + for (int32_t i = 0, offset = 0; i < num_dictionary_values; ++i, offset += fixed_len) { memcpy(bytes_data + offset, dictionary_[i].ptr, fixed_len); dictionary_[i].ptr = bytes_data + offset; - offset += fixed_len; } } @@ -597,7 +595,7 @@ inline int DictEncoder::Hash(const typename DType::c_type& value) const { template <> inline int DictEncoder::Hash(const ByteArray& value) const { if (value.len > 0) { - DCHECK(nullptr != value.ptr) << "Value ptr cannot be NULL"; + DCHECK_NE(nullptr, value.ptr) << "Value ptr cannot be NULL"; } return HashUtil::Hash(value.ptr, value.len, 0); } @@ -605,7 +603,7 @@ inline int DictEncoder::Hash(const ByteArray& value) const { template <> inline int DictEncoder::Hash(const FixedLenByteArray& value) const { if (type_length_ > 0) { - DCHECK(nullptr != value.ptr) << "Value ptr cannot be NULL"; + DCHECK_NE(nullptr, value.ptr) << "Value ptr cannot be NULL"; } return HashUtil::Hash(value.ptr, type_length_, 0); } @@ -923,7 +921,8 @@ class DeltaByteArrayDecoder : public Decoder { ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) : Decoder(descr, Encoding::DELTA_BYTE_ARRAY), prefix_len_decoder_(nullptr, pool), - suffix_decoder_(nullptr, pool) {} + suffix_decoder_(nullptr, pool), + last_value_(0, nullptr) {} virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc index 9b9bde9f..4ec48a45 100644 --- a/src/parquet/file/reader.cc +++ b/src/parquet/file/reader.cc @@ -45,9 +45,9 @@ RowGroupReader::RowGroupReader(std::unique_ptr contents) : contents_(std::move(contents)) {} std::shared_ptr RowGroupReader::Column(int i) { - DCHECK(i < metadata()->num_columns()) << "The RowGroup only has " - << metadata()->num_columns() - << "columns, requested column: " << i; + DCHECK(i < metadata()->num_columns()) + << "The RowGroup only has " << metadata()->num_columns() + << "columns, requested column: " << i; const ColumnDescriptor* descr = metadata()->schema()->Column(i); std::unique_ptr page_reader = contents_->GetColumnPageReader(i); @@ -57,9 +57,9 @@ std::shared_ptr RowGroupReader::Column(int i) { } std::unique_ptr RowGroupReader::GetColumnPageReader(int i) { - DCHECK(i < metadata()->num_columns()) << "The RowGroup only has " - << metadata()->num_columns() - << "columns, requested column: " << i; + DCHECK(i < metadata()->num_columns()) + << "The RowGroup only has " << metadata()->num_columns() + << "columns, requested column: " << i; return contents_->GetColumnPageReader(i); } @@ -127,9 +127,9 @@ std::shared_ptr ParquetFileReader::metadata() const { } std::shared_ptr ParquetFileReader::RowGroup(int i) { - DCHECK(i < metadata()->num_row_groups()) << "The file only has " - << metadata()->num_row_groups() - << "row groups, requested reader for: " << i; + DCHECK(i < metadata()->num_row_groups()) + << "The file only has " << metadata()->num_row_groups() + << "row groups, requested reader for: " << i; return contents_->GetRowGroup(i); } diff --git a/src/parquet/types.h b/src/parquet/types.h index af3a58f5..53b33d56 100644 --- a/src/parquet/types.h +++ b/src/parquet/types.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -136,15 +137,15 @@ struct ByteArray { ByteArray(uint32_t len, const uint8_t* ptr) : len(len), ptr(ptr) {} uint32_t len; const uint8_t* ptr; +}; - bool operator==(const ByteArray& other) const { - return this->len == other.len && 0 == memcmp(this->ptr, other.ptr, this->len); - } +inline bool operator==(const ByteArray& left, const ByteArray& right) { + return left.len == right.len && std::equal(left.ptr, left.ptr + left.len, right.ptr); +} - bool operator!=(const ByteArray& other) const { - return this->len != other.len || 0 != memcmp(this->ptr, other.ptr, this->len); - } -}; +inline bool operator!=(const ByteArray& left, const ByteArray& right) { + return !(left == right); +} struct FixedLenByteArray { FixedLenByteArray() : ptr(nullptr) {} @@ -152,63 +153,47 @@ struct FixedLenByteArray { const uint8_t* ptr; }; -typedef FixedLenByteArray FLBA; +using FLBA = FixedLenByteArray; -MANUALLY_ALIGNED_STRUCT(1) Int96 { - uint32_t value[3]; +MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; }; +STRUCT_END(Int96, 12); - bool operator==(const Int96& other) const { - return 0 == memcmp(this->value, other.value, 3 * sizeof(uint32_t)); - } +inline bool operator==(const Int96& left, const Int96& right) { + return std::equal(left.value, left.value + 3, right.value); +} - bool operator!=(const Int96& other) const { return !(*this == other); } -}; -STRUCT_END(Int96, 12); +inline bool operator!=(const Int96& left, const Int96& right) { return !(left == right); } static inline std::string ByteArrayToString(const ByteArray& a) { return std::string(reinterpret_cast(a.ptr), a.len); } static inline std::string Int96ToString(const Int96& a) { - std::stringstream result; - for (int i = 0; i < 3; i++) { - result << a.value[i] << " "; - } + std::ostringstream result; + std::copy(a.value, a.value + 3, std::ostream_iterator(result, " ")); return result.str(); } static inline std::string FixedLenByteArrayToString(const FixedLenByteArray& a, int len) { - const uint8_t* bytes = reinterpret_cast(a.ptr); - std::stringstream result; - for (int i = 0; i < len; i++) { - result << (uint32_t)bytes[i] << " "; - } + std::ostringstream result; + std::copy(a.ptr, a.ptr + len, std::ostream_iterator(result, " ")); return result.str(); } -static inline int ByteCompare(const ByteArray& x1, const ByteArray& x2) { - uint32_t len = std::min(x1.len, x2.len); - int cmp = memcmp(x1.ptr, x2.ptr, len); - if (cmp != 0) return cmp; - if (len < x1.len) return 1; - if (len < x2.len) return -1; - return 0; -} - -template +template struct type_traits {}; template <> struct type_traits { - typedef bool value_type; - static constexpr int value_byte_size = 1; + using value_type = bool; + static constexpr int value_byte_size = 1; static constexpr const char* printf_code = "d"; }; template <> struct type_traits { - typedef int32_t value_type; + using value_type = int32_t; static constexpr int value_byte_size = 4; static constexpr const char* printf_code = "d"; @@ -216,7 +201,7 @@ struct type_traits { template <> struct type_traits { - typedef int64_t value_type; + using value_type = int64_t; static constexpr int value_byte_size = 8; static constexpr const char* printf_code = "ld"; @@ -224,7 +209,7 @@ struct type_traits { template <> struct type_traits { - typedef Int96 value_type; + using value_type = Int96; static constexpr int value_byte_size = 12; static constexpr const char* printf_code = "s"; @@ -232,7 +217,7 @@ struct type_traits { template <> struct type_traits { - typedef float value_type; + using value_type = float; static constexpr int value_byte_size = 4; static constexpr const char* printf_code = "f"; @@ -240,7 +225,7 @@ struct type_traits { template <> struct type_traits { - typedef double value_type; + using value_type = double; static constexpr int value_byte_size = 8; static constexpr const char* printf_code = "lf"; @@ -248,7 +233,7 @@ struct type_traits { template <> struct type_traits { - typedef ByteArray value_type; + using value_type = ByteArray; static constexpr int value_byte_size = sizeof(ByteArray); static constexpr const char* printf_code = "s"; @@ -256,7 +241,7 @@ struct type_traits { template <> struct type_traits { - typedef FixedLenByteArray value_type; + using value_type = FixedLenByteArray; static constexpr int value_byte_size = sizeof(FixedLenByteArray); static constexpr const char* printf_code = "s"; @@ -264,18 +249,18 @@ struct type_traits { template struct DataType { + using c_type = typename type_traits::value_type; static constexpr Type::type type_num = TYPE; - typedef typename type_traits::value_type c_type; }; -typedef DataType BooleanType; -typedef DataType Int32Type; -typedef DataType Int64Type; -typedef DataType Int96Type; -typedef DataType FloatType; -typedef DataType DoubleType; -typedef DataType ByteArrayType; -typedef DataType FLBAType; +using BooleanType = DataType; +using Int32Type = DataType; +using Int64Type = DataType; +using Int96Type = DataType; +using FloatType = DataType; +using DoubleType = DataType; +using ByteArrayType = DataType; +using FLBAType = DataType; template inline std::string format_fwf(int width) {