diff --git a/CMakeLists.txt b/CMakeLists.txt index b7a41d87..1ecf8773 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -646,6 +646,7 @@ set(LIBPARQUET_SRCS src/parquet/parquet_constants.cpp src/parquet/parquet_types.cpp + src/parquet/util/comparison.cc src/parquet/util/memory.cc ) diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc index 3ec36635..48f243ea 100644 --- a/src/parquet/column_writer-test.cc +++ b/src/parquet/column_writer-test.cc @@ -151,14 +151,16 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { void ReadAndCompare(Compression::type compression, int64_t num_rows) { this->SetupValuesOut(num_rows); this->ReadColumnFully(compression); - Compare compare(this->descr_); + std::shared_ptr> compare; + compare = std::static_pointer_cast>( + Comparator::Make(this->descr_)); for (size_t i = 0; i < this->values_.size(); i++) { - if (compare(this->values_[i], this->values_out_[i]) || - compare(this->values_out_[i], this->values_[i])) { + if ((*compare)(this->values_[i], this->values_out_[i]) || + (*compare)(this->values_out_[i], this->values_[i])) { std::cout << "Failed at " << i << std::endl; } - ASSERT_FALSE(compare(this->values_[i], this->values_out_[i])); - ASSERT_FALSE(compare(this->values_out_[i], this->values_[i])); + ASSERT_FALSE((*compare)(this->values_[i], this->values_out_[i])); + ASSERT_FALSE((*compare)(this->values_out_[i], this->values_[i])); } ASSERT_EQ(this->values_, this->values_out_); } diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc index b36f3958..ac7e2ba1 100644 --- a/src/parquet/column_writer.cc +++ b/src/parquet/column_writer.cc @@ -267,7 +267,10 @@ int64_t ColumnWriter::Close() { FlushBufferedDataPages(); EncodedStatistics chunk_statistics = GetChunkStatistics(); - if (chunk_statistics.is_set()) metadata_->SetStatistics(chunk_statistics); + if (chunk_statistics.is_set()) { + metadata_->SetStatistics(SortOrder::SIGNED == descr_->sort_order(), + chunk_statistics); + } pager_->Close(has_dictionary_, fallback_); } @@ -317,7 +320,8 @@ TypedColumnWriter::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata, ParquetException::NYI("Selected encoding is not supported"); } - if (properties->statistics_enabled(descr_->path())) { + if (properties->statistics_enabled(descr_->path()) && + (SortOrder::UNKNOWN != descr_->sort_order())) { page_statistics_ = std::unique_ptr(new TypedStats(descr_, allocator_)); chunk_statistics_ = std::unique_ptr(new TypedStats(descr_, allocator_)); } diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc index a7c438c8..5bd84197 100644 --- a/src/parquet/file/file-metadata-test.cc +++ b/src/parquet/file/file-metadata-test.cc @@ -63,8 +63,8 @@ TEST(Metadata, TestBuildAccess) { auto col1_builder = rg1_builder->NextColumnChunk(); auto col2_builder = rg1_builder->NextColumnChunk(); // column metadata - col1_builder->SetStatistics(stats_int); - col2_builder->SetStatistics(stats_float); + col1_builder->SetStatistics(true, stats_int); + col2_builder->SetStatistics(true, stats_float); col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false); col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false); rg1_builder->Finish(1024); @@ -73,8 +73,8 @@ TEST(Metadata, TestBuildAccess) { col1_builder = rg2_builder->NextColumnChunk(); col2_builder = rg2_builder->NextColumnChunk(); // column metadata - col1_builder->SetStatistics(stats_int); - col2_builder->SetStatistics(stats_float); + col1_builder->SetStatistics(true, stats_int); + col2_builder->SetStatistics(true, stats_float); col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false); col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false); rg2_builder->Finish(1024); @@ -215,11 +215,12 @@ TEST(ApplicationVersion, Basics) { ASSERT_EQ(true, version.VersionLt(version1)); - ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96)); - ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32)); - ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY)); - ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY)); - ASSERT_TRUE(version3.HasCorrectStatistics(Type::FIXED_LEN_BYTE_ARRAY)); + ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96, SortOrder::SIGNED)); + ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32, SortOrder::SIGNED)); + ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED)); + ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED)); + ASSERT_TRUE( + version3.HasCorrectStatistics(Type::FIXED_LEN_BYTE_ARRAY, SortOrder::SIGNED)); } } // namespace metadata diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index d5a96f31..6e7fc3bc 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -35,62 +35,20 @@ const ApplicationVersion ApplicationVersion::PARQUET_251_FIXED_VERSION = ApplicationVersion("parquet-mr version 1.8.0"); const ApplicationVersion ApplicationVersion::PARQUET_816_FIXED_VERSION = ApplicationVersion("parquet-mr version 1.2.9"); - -// Return the Sort Order of the Parquet Physical Types -SortOrder default_sort_order(Type::type primitive) { - switch (primitive) { - case Type::BOOLEAN: - case Type::INT32: - case Type::INT64: - case Type::FLOAT: - case Type::DOUBLE: - return SortOrder::SIGNED; - case Type::BYTE_ARRAY: - case Type::FIXED_LEN_BYTE_ARRAY: - case Type::INT96: // only used for timestamp, which uses unsigned values - return SortOrder::UNSIGNED; - } - return SortOrder::UNKNOWN; -} - -// Return the SortOrder of the Parquet Types using Logical or Physical Types -SortOrder get_sort_order(LogicalType::type converted, Type::type primitive) { - if (converted == LogicalType::NONE) return default_sort_order(primitive); - switch (converted) { - case LogicalType::INT_8: - case LogicalType::INT_16: - case LogicalType::INT_32: - case LogicalType::INT_64: - case LogicalType::DATE: - case LogicalType::TIME_MICROS: - case LogicalType::TIME_MILLIS: - case LogicalType::TIMESTAMP_MICROS: - case LogicalType::TIMESTAMP_MILLIS: - return SortOrder::SIGNED; - case LogicalType::UINT_8: - case LogicalType::UINT_16: - case LogicalType::UINT_32: - case LogicalType::UINT_64: - case LogicalType::ENUM: - case LogicalType::UTF8: - case LogicalType::BSON: - case LogicalType::JSON: - return SortOrder::UNSIGNED; - case LogicalType::NA: - case LogicalType::DECIMAL: - case LogicalType::LIST: - case LogicalType::MAP: - case LogicalType::MAP_KEY_VALUE: - case LogicalType::INTERVAL: - case LogicalType::NONE: // required instead of default - return SortOrder::UNKNOWN; - } - return SortOrder::UNKNOWN; -} +const ApplicationVersion ApplicationVersion::PARQUET_CPP_FIXED_STATS_VERSION = + ApplicationVersion("parquet-cpp version 1.3.0"); template static std::shared_ptr MakeTypedColumnStats( const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) { + // If new fields max_value/min_value are set, then return them. + if (metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value) { + return std::make_shared>( + descr, metadata.statistics.min_value, metadata.statistics.max_value, + metadata.num_values - metadata.statistics.null_count, + metadata.statistics.null_count, metadata.statistics.distinct_count, true); + } + // Default behavior return std::make_shared>( descr, metadata.statistics.min, metadata.statistics.max, metadata.num_values - metadata.statistics.null_count, @@ -159,9 +117,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { inline bool is_stats_set() const { DCHECK(writer_version_ != nullptr); return column_->meta_data.__isset.statistics && - writer_version_->HasCorrectStatistics(type()) && - SortOrder::SIGNED == - get_sort_order(descr_->logical_type(), descr_->physical_type()); + writer_version_->HasCorrectStatistics(type(), descr_->sort_order()); } inline std::shared_ptr statistics() const { @@ -534,15 +490,21 @@ bool ApplicationVersion::VersionEq(const ApplicationVersion& other_version) cons // Reference: // parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java // PARQUET-686 has more disussion on statistics -bool ApplicationVersion::HasCorrectStatistics(Type::type col_type) const { - // None of the current tools write INT96 Statistics correctly - if (col_type == Type::INT96) return false; - - // Statistics of other types are OK - if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) { - return true; +bool ApplicationVersion::HasCorrectStatistics(Type::type col_type, + SortOrder::type sort_order) const { + // Parquet cpp version 1.3.0 onwards stats are computed correctly for all types + if ((application_ != "parquet-cpp") || (VersionLt(PARQUET_CPP_FIXED_STATS_VERSION))) { + // Only SIGNED are valid + if (SortOrder::SIGNED != sort_order) return false; + + // None of the current tools write INT96 Statistics correctly + if (col_type == Type::INT96) return false; + + // Statistics of other types are OK + if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) { + return true; + } } - // created_by is not populated, which could have been caused by // parquet-mr during the same time as PARQUET-251, see PARQUET-297 if (application_ == "unknown") { @@ -577,16 +539,24 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); } // column metadata - void SetStatistics(const EncodedStatistics& val) { + void SetStatistics(bool is_signed, const EncodedStatistics& val) { format::Statistics stats; stats.null_count = val.null_count; stats.distinct_count = val.distinct_count; - stats.max = val.max(); - stats.min = val.min(); - stats.__isset.min = val.has_min; - stats.__isset.max = val.has_max; + stats.max_value = val.max(); + stats.min_value = val.min(); + stats.__isset.min_value = val.has_min; + stats.__isset.max_value = val.has_max; stats.__isset.null_count = val.has_null_count; stats.__isset.distinct_count = val.has_distinct_count; + // If the order is SIGNED, then the old min/max values must be set too. + // This for backward compatibility + if (is_signed) { + stats.max = val.max(); + stats.min = val.min(); + stats.__isset.min = val.has_min; + stats.__isset.max = val.has_max; + } column_chunk_->meta_data.__set_statistics(stats); } @@ -674,8 +644,9 @@ const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const { return impl_->descr(); } -void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) { - impl_->SetStatistics(result); +void ColumnChunkMetaDataBuilder::SetStatistics(bool is_signed, + const EncodedStatistics& result) { + impl_->SetStatistics(is_signed, result); } class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h index 4250f6b5..0d8e10ef 100644 --- a/src/parquet/file/metadata.h +++ b/src/parquet/file/metadata.h @@ -35,22 +35,12 @@ namespace parquet { using KeyValueMetadata = ::arrow::KeyValueMetadata; -// Reference: -// parquet-mr/parquet-hadoop/src/main/java/org/apache/parquet/ -// format/converter/ParquetMetadataConverter.java -// Sort order for page and column statistics. Types are associated with sort -// orders (e.g., UTF8 columns should use UNSIGNED) and column stats are -// aggregated using a sort order. As of parquet-format version 2.3.1, the -// order used to aggregate stats is always SIGNED and is not stored in the -// Parquet file. These stats are discarded for types that need unsigned. -// See PARQUET-686. -enum SortOrder { SIGNED, UNSIGNED, UNKNOWN }; - class ApplicationVersion { public: // Known Versions with Issues static const ApplicationVersion PARQUET_251_FIXED_VERSION; static const ApplicationVersion PARQUET_816_FIXED_VERSION; + static const ApplicationVersion PARQUET_CPP_FIXED_STATS_VERSION; // Regular expression for the version format // major . minor . patch unknown - prerelease.x + build info // Eg: 1.5.0ab-cdh5.5.0+cd @@ -92,7 +82,8 @@ class ApplicationVersion { bool VersionEq(const ApplicationVersion& other_version) const; // Checks if the Version has the correct statistics for a given column - bool HasCorrectStatistics(Type::type primitive) const; + bool HasCorrectStatistics(Type::type primitive, + SortOrder::type sort_order = SortOrder::SIGNED) const; }; class PARQUET_EXPORT ColumnChunkMetaData { @@ -209,7 +200,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder { // Used when a dataset is spread across multiple files void set_file_path(const std::string& path); // column metadata - void SetStatistics(const EncodedStatistics& stats); + void SetStatistics(bool is_signed, const EncodedStatistics& stats); // get the column descriptor const ColumnDescriptor* descr() const; // commit the metadata diff --git a/src/parquet/parquet_version.h.in b/src/parquet/parquet_version.h.in index 7036d2fa..db8f396a 100644 --- a/src/parquet/parquet_version.h.in +++ b/src/parquet/parquet_version.h.in @@ -21,4 +21,4 @@ // define the parquet created by version #define CREATED_BY_VERSION "parquet-cpp version @PARQUET_VERSION@" -#endif // PARQUET_VERSION_H +#endif // PARQUET_VERSION_H diff --git a/src/parquet/schema.h b/src/parquet/schema.h index e240b827..c6b7fbec 100644 --- a/src/parquet/schema.h +++ b/src/parquet/schema.h @@ -332,6 +332,10 @@ class PARQUET_EXPORT ColumnDescriptor { LogicalType::type logical_type() const { return primitive_node_->logical_type(); } + SortOrder::type sort_order() const { + return GetSortOrder(logical_type(), physical_type()); + } + const std::string& name() const { return primitive_node_->name(); } const std::shared_ptr path() const; diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc index 26352c1d..d6c52057 100644 --- a/src/parquet/statistics-test.cc +++ b/src/parquet/statistics-test.cc @@ -194,9 +194,8 @@ bool* TestRowGroupStatistics::GetValuesPointer(std::vector& v } template -typename std::vector -TestRowGroupStatistics::GetDeepCopy( - const std::vector& values) { +typename std::vector TestRowGroupStatistics< + TestType>::GetDeepCopy(const std::vector& values) { return values; } @@ -311,6 +310,7 @@ TYPED_TEST(TestNumericRowGroupStatistics, Merge) { this->TestMerge(); } +// Statistics are restricted for few types in older parquet version TEST(CorruptStatistics, Basics) { ApplicationVersion version("parquet-mr version 1.8.0"); SchemaDescriptor schema; @@ -356,5 +356,332 @@ TEST(CorruptStatistics, Basics) { ASSERT_FALSE(column_chunk6->is_stats_set()); } +// Statistics for all types have no restrictions in newer parquet version +TEST(CorrectStatistics, Basics) { + ApplicationVersion version("parquet-cpp version 1.3.0"); + SchemaDescriptor schema; + schema::NodePtr node; + std::vector fields; + // Test Physical Types + fields.push_back(schema::PrimitiveNode::Make("col1", Repetition::OPTIONAL, Type::INT32, + LogicalType::NONE)); + fields.push_back(schema::PrimitiveNode::Make("col2", Repetition::OPTIONAL, + Type::BYTE_ARRAY, LogicalType::NONE)); + // Test Logical Types + fields.push_back(schema::PrimitiveNode::Make("col3", Repetition::OPTIONAL, Type::INT32, + LogicalType::DATE)); + fields.push_back(schema::PrimitiveNode::Make("col4", Repetition::OPTIONAL, Type::INT32, + LogicalType::UINT_32)); + fields.push_back(schema::PrimitiveNode::Make("col5", Repetition::OPTIONAL, + Type::FIXED_LEN_BYTE_ARRAY, + LogicalType::INTERVAL, 12)); + fields.push_back(schema::PrimitiveNode::Make("col6", Repetition::OPTIONAL, + Type::BYTE_ARRAY, LogicalType::UTF8)); + node = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields); + schema.Init(node); + + format::ColumnChunk col_chunk; + col_chunk.meta_data.__isset.statistics = true; + auto column_chunk1 = ColumnChunkMetaData::Make( + reinterpret_cast(&col_chunk), schema.Column(0), &version); + ASSERT_TRUE(column_chunk1->is_stats_set()); + auto column_chunk2 = ColumnChunkMetaData::Make( + reinterpret_cast(&col_chunk), schema.Column(1), &version); + ASSERT_TRUE(column_chunk2->is_stats_set()); + auto column_chunk3 = ColumnChunkMetaData::Make( + reinterpret_cast(&col_chunk), schema.Column(2), &version); + ASSERT_TRUE(column_chunk3->is_stats_set()); + auto column_chunk4 = ColumnChunkMetaData::Make( + reinterpret_cast(&col_chunk), schema.Column(3), &version); + ASSERT_TRUE(column_chunk4->is_stats_set()); + auto column_chunk5 = ColumnChunkMetaData::Make( + reinterpret_cast(&col_chunk), schema.Column(4), &version); + ASSERT_TRUE(column_chunk5->is_stats_set()); + auto column_chunk6 = ColumnChunkMetaData::Make( + reinterpret_cast(&col_chunk), schema.Column(5), &version); + ASSERT_TRUE(column_chunk6->is_stats_set()); +} + +// Test SortOrder class +static const int NUM_VALUES = 10; + +template +class TestStatistics : public ::testing::Test { + public: + typedef typename TestType::c_type T; + + void AddNodes(std::string name) { + fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, + TestType::type_num, LogicalType::NONE)); + } + + void SetUpSchema() { + stats_.resize(fields_.size()); + values_.resize(NUM_VALUES); + schema_ = std::static_pointer_cast( + GroupNode::Make("Schema", Repetition::REQUIRED, fields_)); + + parquet_sink_ = std::make_shared(); + } + + void SetValues(); + + void WriteParquet() { + // Add writer properties + parquet::WriterProperties::Builder builder; + builder.compression(parquet::Compression::SNAPPY); + builder.created_by("parquet-cpp version 1.3.0"); + std::shared_ptr props = builder.build(); + + // Create a ParquetFileWriter instance + auto file_writer = parquet::ParquetFileWriter::Open(parquet_sink_, schema_, props); + + // Append a RowGroup with a specific number of rows. + auto rg_writer = file_writer->AppendRowGroup(NUM_VALUES); + + this->SetValues(); + + // Insert Values + for (int i = 0; i < static_cast(fields_.size()); i++) { + auto column_writer = + static_cast*>(rg_writer->NextColumn()); + column_writer->WriteBatch(NUM_VALUES, nullptr, nullptr, values_.data()); + } + } + + void VerifyParquetStats() { + auto pbuffer = parquet_sink_->GetBuffer(); + + // Create a ParquetReader instance + std::unique_ptr parquet_reader = + parquet::ParquetFileReader::Open( + std::make_shared(pbuffer)); + + // Get the File MetaData + std::shared_ptr file_metadata = parquet_reader->metadata(); + std::shared_ptr rg_metadata = file_metadata->RowGroup(0); + for (int i = 0; i < static_cast(fields_.size()); i++) { + std::shared_ptr cc_metadata = + rg_metadata->ColumnChunk(i); + ASSERT_EQ(stats_[i].min(), cc_metadata->statistics()->EncodeMin()); + ASSERT_EQ(stats_[i].max(), cc_metadata->statistics()->EncodeMax()); + } + } + + protected: + std::vector values_; + std::vector values_buf_; + std::vector fields_; + std::shared_ptr schema_; + std::shared_ptr parquet_sink_; + std::vector stats_; +}; + +using CompareTestTypes = ::testing::Types; + +// TYPE::INT32 +template <> +void TestStatistics::AddNodes(std::string name) { + // UINT_32 logical type to set Unsigned Statistics + fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT32, + LogicalType::UINT_32)); + // INT_32 logical type to set Signed Statistics + fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT32, + LogicalType::INT_32)); +} + +template <> +void TestStatistics::SetValues() { + for (int i = 0; i < NUM_VALUES; i++) { + values_[i] = i - 5; // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4}; + } + + // Write UINT32 min/max values + stats_[0] + .set_min(std::string(reinterpret_cast(&values_[5]), sizeof(T))) + .set_max(std::string(reinterpret_cast(&values_[4]), sizeof(T))); + + // Write INT32 min/max values + stats_[1] + .set_min(std::string(reinterpret_cast(&values_[0]), sizeof(T))) + .set_max(std::string(reinterpret_cast(&values_[9]), sizeof(T))); +} + +// TYPE::INT64 +template <> +void TestStatistics::AddNodes(std::string name) { + // UINT_64 logical type to set Unsigned Statistics + fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT64, + LogicalType::UINT_64)); + // INT_64 logical type to set Signed Statistics + fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT64, + LogicalType::INT_64)); +} + +template <> +void TestStatistics::SetValues() { + for (int i = 0; i < NUM_VALUES; i++) { + values_[i] = i - 5; // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4}; + } + + // Write UINT64 min/max values + stats_[0] + .set_min(std::string(reinterpret_cast(&values_[5]), sizeof(T))) + .set_max(std::string(reinterpret_cast(&values_[4]), sizeof(T))); + + // Write INT64 min/max values + stats_[1] + .set_min(std::string(reinterpret_cast(&values_[0]), sizeof(T))) + .set_max(std::string(reinterpret_cast(&values_[9]), sizeof(T))); +} + +// TYPE::INT96 +template <> +void TestStatistics::AddNodes(std::string name) { + // INT96 physical type has only Unsigned Statistics + fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, Type::INT96, + LogicalType::NONE)); +} + +template <> +void TestStatistics::SetValues() { + for (int i = 0; i < NUM_VALUES; i++) { + values_[i].value[0] = i - 5; // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4}; + values_[i].value[1] = i - 5; // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4}; + values_[i].value[2] = i - 5; // {-5, -4, -3, -2, -1, 0, 1, 2, 3, 4}; + } + + // Write Int96 min/max values + stats_[0] + .set_min(std::string(reinterpret_cast(&values_[5]), sizeof(T))) + .set_max(std::string(reinterpret_cast(&values_[4]), sizeof(T))); +} + +// TYPE::FLOAT +template <> +void TestStatistics::SetValues() { + for (int i = 0; i < NUM_VALUES; i++) { + values_[i] = + (i * 1.0f) - 5; // {-5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0}; + } + + // Write Float min/max values + stats_[0] + .set_min(std::string(reinterpret_cast(&values_[0]), sizeof(T))) + .set_max(std::string(reinterpret_cast(&values_[9]), sizeof(T))); +} + +// TYPE::DOUBLE +template <> +void TestStatistics::SetValues() { + for (int i = 0; i < NUM_VALUES; i++) { + values_[i] = + (i * 1.0f) - 5; // {-5.0, -4.0, -3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0, 4.0}; + } + + // Write Double min/max values + stats_[0] + .set_min(std::string(reinterpret_cast(&values_[0]), sizeof(T))) + .set_max(std::string(reinterpret_cast(&values_[9]), sizeof(T))); +} + +// TYPE::ByteArray +template <> +void TestStatistics::AddNodes(std::string name) { + // UTF8 logical type to set Unsigned Statistics + fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, + Type::BYTE_ARRAY, LogicalType::UTF8)); +} + +template <> +void TestStatistics::SetValues() { + int max_byte_array_len = 10; + size_t nbytes = NUM_VALUES * max_byte_array_len; + values_buf_.resize(nbytes); + std::vector vals = {u8"c123", u8"b123", u8"a123", u8"d123", u8"e123", + u8"f123", u8"g123", u8"h123", u8"i123", u8"ü123"}; + + uint8_t* base = &values_buf_.data()[0]; + for (int i = 0; i < NUM_VALUES; i++) { + memcpy(base, vals[i].c_str(), vals[i].length()); + values_[i].ptr = base; + values_[i].len = static_cast(vals[i].length()); + base += vals[i].length(); + } + + // Write String min/max values + stats_[0] + .set_min( + std::string(reinterpret_cast(vals[2].c_str()), vals[2].length())) + .set_max( + std::string(reinterpret_cast(vals[9].c_str()), vals[9].length())); +} + +// TYPE::FLBAArray +template <> +void TestStatistics::AddNodes(std::string name) { + // FLBA has only Unsigned Statistics + fields_.push_back(schema::PrimitiveNode::Make(name, Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, + LogicalType::NONE, FLBA_LENGTH)); +} + +template <> +void TestStatistics::SetValues() { + size_t nbytes = NUM_VALUES * FLBA_LENGTH; + values_buf_.resize(nbytes); + char vals[NUM_VALUES][FLBA_LENGTH] = {"b12345", "a12345", "c12345", "d12345", "e12345", + "f12345", "g12345", "h12345", "z12345", "a12345"}; + + uint8_t* base = &values_buf_.data()[0]; + for (int i = 0; i < NUM_VALUES; i++) { + memcpy(base, &vals[i][0], FLBA_LENGTH); + values_[i].ptr = base; + base += FLBA_LENGTH; + } + + // Write FLBA min,max values + stats_[0] + .set_min(std::string(reinterpret_cast(&vals[1][0]), FLBA_LENGTH)) + .set_max(std::string(reinterpret_cast(&vals[8][0]), FLBA_LENGTH)); +} + +TYPED_TEST_CASE(TestStatistics, CompareTestTypes); + +TYPED_TEST(TestStatistics, MinMax) { + this->AddNodes("Column "); + this->SetUpSchema(); + this->WriteParquet(); + this->VerifyParquetStats(); +} + +// Ensure UNKNOWN sort order is handled properly +using TestStatisticsFLBA = TestStatistics; + +TEST_F(TestStatisticsFLBA, UnknownSortOrder) { + this->fields_.push_back(schema::PrimitiveNode::Make( + "Column 0", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, LogicalType::INTERVAL, + FLBA_LENGTH)); + this->SetUpSchema(); + this->WriteParquet(); + + auto pbuffer = parquet_sink_->GetBuffer(); + // Create a ParquetReader instance + std::unique_ptr parquet_reader = + parquet::ParquetFileReader::Open( + std::make_shared(pbuffer)); + // Get the File MetaData + std::shared_ptr file_metadata = parquet_reader->metadata(); + std::shared_ptr rg_metadata = file_metadata->RowGroup(0); + std::shared_ptr cc_metadata = rg_metadata->ColumnChunk(0); + + // stats should not be set for UNKNOWN sort order + ASSERT_FALSE(cc_metadata->is_stats_set()); +} + + + + } // namespace test } // namespace parquet diff --git a/src/parquet/statistics.cc b/src/parquet/statistics.cc index 12d1f5bb..dad1a9bf 100644 --- a/src/parquet/statistics.cc +++ b/src/parquet/statistics.cc @@ -21,7 +21,6 @@ #include "parquet/encoding-internal.h" #include "parquet/exception.h" #include "parquet/statistics.h" -#include "parquet/util/comparison.h" #include "parquet/util/memory.h" using arrow::default_memory_pool; @@ -85,6 +84,12 @@ bool TypedRowGroupStatistics::HasMinMax() const { return has_min_max_; } +template +void TypedRowGroupStatistics::SetComparator() { + comparator_ = + std::static_pointer_cast >(Comparator::Make(descr_)); +} + template void TypedRowGroupStatistics::Reset() { ResetCounts(); @@ -102,15 +107,17 @@ void TypedRowGroupStatistics::Update(const T* values, int64_t num_not_nul // TODO: support distinct count? if (num_not_null == 0) return; - Compare compare(descr_); - auto batch_minmax = std::minmax_element(values, values + num_not_null, compare); + auto batch_minmax = + std::minmax_element(values, values + num_not_null, std::ref(*(this->comparator_))); if (!has_min_max_) { has_min_max_ = true; Copy(*batch_minmax.first, &min_, min_buffer_.get()); Copy(*batch_minmax.second, &max_, max_buffer_.get()); } else { - Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_.get()); - Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_.get()); + Copy(std::min(min_, *batch_minmax.first, std::ref(*(this->comparator_))), &min_, + min_buffer_.get()); + Copy(std::max(max_, *batch_minmax.second, std::ref(*(this->comparator_))), &max_, + max_buffer_.get()); } } @@ -128,7 +135,6 @@ void TypedRowGroupStatistics::UpdateSpaced(const T* values, // TODO: support distinct count? if (num_not_null == 0) return; - Compare compare(descr_); INIT_BITSET(valid_bits, static_cast(valid_bits_offset)); // Find first valid entry and use that for min/max // As (num_not_null != 0) there must be one @@ -144,9 +150,9 @@ void TypedRowGroupStatistics::UpdateSpaced(const T* values, T max = values[i]; for (; i < length; i++) { if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { - if (compare(values[i], min)) { + if ((std::ref(*(this->comparator_)))(values[i], min)) { min = values[i]; - } else if (compare(max, values[i])) { + } else if ((std::ref(*(this->comparator_)))(max, values[i])) { max = values[i]; } } @@ -157,8 +163,8 @@ void TypedRowGroupStatistics::UpdateSpaced(const T* values, Copy(min, &min_, min_buffer_.get()); Copy(max, &max_, max_buffer_.get()); } else { - Copy(std::min(min_, min, compare), &min_, min_buffer_.get()); - Copy(std::max(max_, max, compare), &max_, max_buffer_.get()); + Copy(std::min(min_, min, std::ref(*(this->comparator_))), &min_, min_buffer_.get()); + Copy(std::max(max_, max, std::ref(*(this->comparator_))), &max_, max_buffer_.get()); } } @@ -185,9 +191,10 @@ void TypedRowGroupStatistics::Merge(const TypedRowGroupStatistics& return; } - Compare compare(descr_); - Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_.get()); - Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_.get()); + Copy(std::min(this->min_, other.min_, std::ref(*(this->comparator_))), &this->min_, + min_buffer_.get()); + Copy(std::max(this->max_, other.max_, std::ref(*(this->comparator_))), &this->max_, + max_buffer_.get()); } template diff --git a/src/parquet/statistics.h b/src/parquet/statistics.h index 12d05557..b5466c08 100644 --- a/src/parquet/statistics.h +++ b/src/parquet/statistics.h @@ -24,6 +24,7 @@ #include "parquet/schema.h" #include "parquet/types.h" +#include "parquet/util/comparison.h" #include "parquet/util/memory.h" #include "parquet/util/visibility.h" @@ -97,13 +98,19 @@ class PARQUET_EXPORT RowGroupStatistics virtual EncodedStatistics Encode() = 0; + // Set the Corresponding Comparator + virtual void SetComparator() = 0; + virtual ~RowGroupStatistics() {} Type::type physical_type() const { return descr_->physical_type(); } protected: const ColumnDescriptor* descr() const { return descr_; } - void SetDescr(const ColumnDescriptor* schema) { descr_ = schema; } + void SetDescr(const ColumnDescriptor* schema) { + descr_ = schema; + SetComparator(); + } void IncrementNullCount(int64_t n) { statistics_.null_count += n; } @@ -146,6 +153,7 @@ class TypedRowGroupStatistics : public RowGroupStatistics { bool HasMinMax() const override; void Reset() override; + void SetComparator() override; void Merge(const TypedRowGroupStatistics& other); void Update(const T* values, int64_t num_not_null, int64_t num_null); @@ -164,6 +172,7 @@ class TypedRowGroupStatistics : public RowGroupStatistics { T min_; T max_; ::arrow::MemoryPool* pool_; + std::shared_ptr > comparator_; void PlainEncode(const T& src, std::string* dst); void PlainDecode(const std::string& src, T* dst); diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h index 356486b2..3fd72f2a 100644 --- a/src/parquet/test-util.h +++ b/src/parquet/test-util.h @@ -42,7 +42,7 @@ using std::shared_ptr; namespace parquet { -static int FLBA_LENGTH = 12; +static constexpr int FLBA_LENGTH = 12; bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) { return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH); diff --git a/src/parquet/types.cc b/src/parquet/types.cc index 97c769be..0652c6a8 100644 --- a/src/parquet/types.cc +++ b/src/parquet/types.cc @@ -221,4 +221,56 @@ int GetTypeByteSize(Type::type parquet_type) { return 0; } +// Return the Sort Order of the Parquet Physical Types +SortOrder::type DefaultSortOrder(Type::type primitive) { + switch (primitive) { + case Type::BOOLEAN: + case Type::INT32: + case Type::INT64: + case Type::FLOAT: + case Type::DOUBLE: + return SortOrder::SIGNED; + case Type::BYTE_ARRAY: + case Type::FIXED_LEN_BYTE_ARRAY: + case Type::INT96: // only used for timestamp, which uses unsigned values + return SortOrder::UNSIGNED; + } + return SortOrder::UNKNOWN; +} + +// Return the SortOrder of the Parquet Types using Logical or Physical Types +SortOrder::type GetSortOrder(LogicalType::type converted, Type::type primitive) { + if (converted == LogicalType::NONE) return DefaultSortOrder(primitive); + switch (converted) { + case LogicalType::INT_8: + case LogicalType::INT_16: + case LogicalType::INT_32: + case LogicalType::INT_64: + case LogicalType::DATE: + case LogicalType::TIME_MICROS: + case LogicalType::TIME_MILLIS: + case LogicalType::TIMESTAMP_MICROS: + case LogicalType::TIMESTAMP_MILLIS: + return SortOrder::SIGNED; + case LogicalType::UINT_8: + case LogicalType::UINT_16: + case LogicalType::UINT_32: + case LogicalType::UINT_64: + case LogicalType::ENUM: + case LogicalType::UTF8: + case LogicalType::BSON: + case LogicalType::JSON: + return SortOrder::UNSIGNED; + case LogicalType::DECIMAL: + case LogicalType::LIST: + case LogicalType::MAP: + case LogicalType::MAP_KEY_VALUE: + case LogicalType::INTERVAL: + case LogicalType::NONE: // required instead of default + case LogicalType::NA: // required instead of default + return SortOrder::UNKNOWN; + } + return SortOrder::UNKNOWN; +} + } // namespace parquet diff --git a/src/parquet/types.h b/src/parquet/types.h index 38015c4d..af3a58f5 100644 --- a/src/parquet/types.h +++ b/src/parquet/types.h @@ -116,6 +116,19 @@ struct PageType { enum type { DATA_PAGE, INDEX_PAGE, DICTIONARY_PAGE, DATA_PAGE_V2 }; }; +// Reference: +// parquet-mr/parquet-hadoop/src/main/java/org/apache/parquet/ +// format/converter/ParquetMetadataConverter.java +// Sort order for page and column statistics. Types are associated with sort +// orders (e.g., UTF8 columns should use UNSIGNED) and column stats are +// aggregated using a sort order. As of parquet-format version 2.3.1, the +// order used to aggregate stats is always SIGNED and is not stored in the +// Parquet file. These stats are discarded for types that need unsigned. +// See PARQUET-686. +struct SortOrder { + enum type { SIGNED, UNSIGNED, UNKNOWN }; +}; + // ---------------------------------------------------------------------- struct ByteArray { @@ -283,6 +296,11 @@ PARQUET_EXPORT std::string FormatStatValue(Type::type parquet_type, const char* PARQUET_EXPORT int GetTypeByteSize(Type::type t); +PARQUET_EXPORT SortOrder::type DefaultSortOrder(Type::type primitive); + +PARQUET_EXPORT SortOrder::type GetSortOrder(LogicalType::type converted, + Type::type primitive); + } // namespace parquet #endif // PARQUET_TYPES_H diff --git a/src/parquet/util/comparison-test.cc b/src/parquet/util/comparison-test.cc index 84019837..dc915bfe 100644 --- a/src/parquet/util/comparison-test.cc +++ b/src/parquet/util/comparison-test.cc @@ -42,48 +42,178 @@ static FLBA FLBAFromString(const std::string& s) { return FLBA(ptr); } -TEST(Comparison, ByteArray) { - NodePtr node = PrimitiveNode::Make("bytearray", Repetition::REQUIRED, Type::BYTE_ARRAY); +TEST(Comparison, signedByteArray) { + NodePtr node = + PrimitiveNode::Make("SignedByteArray", Repetition::REQUIRED, Type::BYTE_ARRAY); ColumnDescriptor descr(node, 0, 0); - Compare less(&descr); - - std::string a = "arrange"; - std::string b = "arrangement"; - auto arr1 = ByteArrayFromString(a); - auto arr2 = ByteArrayFromString(b); - ASSERT_TRUE(less(arr1, arr2)); - - a = u8"braten"; - b = u8"bügeln"; - auto arr3 = ByteArrayFromString(a); - auto arr4 = ByteArrayFromString(b); - // see PARQUET-686 discussion about binary comparison - ASSERT_TRUE(!less(arr3, arr4)); + + CompareDefaultByteArray less; + + std::string s1 = "12345"; + std::string s2 = "12345678"; + ByteArray s1ba = ByteArrayFromString(s1); + ByteArray s2ba = ByteArrayFromString(s2); + ASSERT_TRUE(less(s1ba, s2ba)); + + // This is case where signed comparision UTF-8 (PARQUET-686) is incorrect + // This example is to only check signed comparison and not UTF-8. + s1 = u8"bügeln"; + s2 = u8"braten"; + s1ba = ByteArrayFromString(s1); + s2ba = ByteArrayFromString(s2); + ASSERT_TRUE(less(s1ba, s2ba)); } -TEST(Comparison, FLBA) { - std::string a = "Antidisestablishmentarianism"; - std::string b = "Bundesgesundheitsministerium"; - auto arr1 = FLBAFromString(a); - auto arr2 = FLBAFromString(b); +TEST(Comparison, UnsignedByteArray) { + NodePtr node = PrimitiveNode::Make("UnsignedByteArray", Repetition::REQUIRED, + Type::BYTE_ARRAY, LogicalType::UTF8); + ColumnDescriptor descr(node, 0, 0); - NodePtr node = - PrimitiveNode::Make("FLBA", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, - LogicalType::NONE, static_cast(a.size())); + // Check if UTF-8 is compared using unsigned correctly + CompareUnsignedByteArray uless; + + std::string s1 = "arrange"; + std::string s2 = "arrangement"; + ByteArray s1ba = ByteArrayFromString(s1); + ByteArray s2ba = ByteArrayFromString(s2); + ASSERT_TRUE(uless(s1ba, s2ba)); + + // Multi-byte UTF-8 characters + s1 = u8"braten"; + s2 = u8"bügeln"; + s1ba = ByteArrayFromString(s1); + s2ba = ByteArrayFromString(s2); + ASSERT_TRUE(uless(s1ba, s2ba)); + + s1 = u8"ünk123456"; // ü = 252 + s2 = u8"ănk123456"; // ă = 259 + s1ba = ByteArrayFromString(s1); + s2ba = ByteArrayFromString(s2); + ASSERT_TRUE(uless(s1ba, s2ba)); +} + +TEST(Comparison, SignedFLBA) { + int size = 10; + NodePtr node = PrimitiveNode::Make("SignedFLBA", Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, size); ColumnDescriptor descr(node, 0, 0); - Compare less(&descr); - ASSERT_TRUE(less(arr1, arr2)); + + CompareDefaultFLBA less(descr.type_length()); + + std::string s1 = "Anti123456"; + std::string s2 = "Bunkd123456"; + FLBA s1flba = FLBAFromString(s1); + FLBA s2flba = FLBAFromString(s2); + ASSERT_TRUE(less(s1flba, s2flba)); + + s1 = "Bünk123456"; + s2 = "Bunk123456"; + s1flba = FLBAFromString(s1); + s2flba = FLBAFromString(s2); + ASSERT_TRUE(less(s1flba, s2flba)); } -TEST(Comparison, Int96) { +TEST(Comparison, UnsignedFLBA) { + int size = 10; + NodePtr node = PrimitiveNode::Make("UnsignedFLBA", Repetition::REQUIRED, + Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, size); + ColumnDescriptor descr(node, 0, 0); + + CompareUnsignedFLBA uless(descr.type_length()); + + std::string s1 = "Anti123456"; + std::string s2 = "Bunkd123456"; + FLBA s1flba = FLBAFromString(s1); + FLBA s2flba = FLBAFromString(s2); + ASSERT_TRUE(uless(s1flba, s2flba)); + + s1 = "Bunk123456"; + s2 = "Bünk123456"; + s1flba = FLBAFromString(s1); + s2flba = FLBAFromString(s2); + ASSERT_TRUE(uless(s1flba, s2flba)); +} + +TEST(Comparison, SignedInt96) { parquet::Int96 a{{1, 41, 14}}, b{{1, 41, 42}}; + parquet::Int96 aa{{1, 41, 14}}, bb{{1, 41, 14}}; + parquet::Int96 aaa{{static_cast(-1), 41, 14}}, bbb{{1, 41, 42}}; - NodePtr node = PrimitiveNode::Make("int96", Repetition::REQUIRED, Type::INT96); + NodePtr node = PrimitiveNode::Make("SignedInt96", Repetition::REQUIRED, Type::INT96); ColumnDescriptor descr(node, 0, 0); - Compare less(&descr); + + CompareDefaultInt96 less; + ASSERT_TRUE(less(a, b)); - b.value[2] = 14; - ASSERT_TRUE(!less(a, b) && !less(b, a)); + ASSERT_TRUE(!less(aa, bb) && !less(bb, aa)); + ASSERT_TRUE(less(aaa, bbb)); +} + +TEST(Comparison, UnsignedInt96) { + parquet::Int96 a{{1, 41, 14}}, b{{1, static_cast(-41), 42}}; + parquet::Int96 aa{{1, 41, 14}}, bb{{static_cast(-1), 41, 14}}; + + NodePtr node = PrimitiveNode::Make("UnsignedInt96", Repetition::REQUIRED, Type::INT96); + ColumnDescriptor descr(node, 0, 0); + + CompareUnsignedInt96 uless; + + ASSERT_TRUE(uless(a, b)); + ASSERT_TRUE(uless(aa, bb)); +} + +TEST(Comparison, SignedInt64) { + int64_t a = 1, b = 4; + int64_t aa = 1, bb = 1; + int64_t aaa = -1, bbb = 1; + + NodePtr node = PrimitiveNode::Make("SignedInt64", Repetition::REQUIRED, Type::INT64); + ColumnDescriptor descr(node, 0, 0); + + CompareDefaultInt64 less; + + ASSERT_TRUE(less(a, b)); + ASSERT_TRUE(!less(aa, bb) && !less(bb, aa)); + ASSERT_TRUE(less(aaa, bbb)); +} + +TEST(Comparison, UnsignedInt64) { + uint64_t a = 1, b = 4; + uint64_t aa = 1, bb = 1; + uint64_t aaa = 1, bbb = -1; + + NodePtr node = PrimitiveNode::Make("UnsignedInt64", Repetition::REQUIRED, Type::INT64); + ColumnDescriptor descr(node, 0, 0); + + CompareUnsignedInt64 less; + + ASSERT_TRUE(less(a, b)); + ASSERT_TRUE(!less(aa, bb) && !less(bb, aa)); + ASSERT_TRUE(less(aaa, bbb)); +} + +TEST(Comparison, UnsignedInt32) { + uint32_t a = 1, b = 4; + uint32_t aa = 1, bb = 1; + uint32_t aaa = 1, bbb = -1; + + NodePtr node = PrimitiveNode::Make("UnsignedInt32", Repetition::REQUIRED, Type::INT32); + ColumnDescriptor descr(node, 0, 0); + + CompareUnsignedInt32 less; + + ASSERT_TRUE(less(a, b)); + ASSERT_TRUE(!less(aa, bb) && !less(bb, aa)); + ASSERT_TRUE(less(aaa, bbb)); +} + +TEST(Comparison, UnknownSortOrder) { + NodePtr node = + PrimitiveNode::Make("Unknown", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, + LogicalType::INTERVAL, 12); + ColumnDescriptor descr(node, 0, 0); + + ASSERT_THROW(Comparator::Make(&descr), ParquetException); } } // namespace test diff --git a/src/parquet/util/comparison.cc b/src/parquet/util/comparison.cc new file mode 100644 index 00000000..1d7bb9dc --- /dev/null +++ b/src/parquet/util/comparison.cc @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "parquet/exception.h" +#include "parquet/schema.h" +#include "parquet/types.h" +#include "parquet/util/comparison.h" + +namespace parquet { + +std::shared_ptr Comparator::Make(const ColumnDescriptor* descr) { + if (SortOrder::SIGNED == descr->sort_order()) { + switch (descr->physical_type()) { + case Type::BOOLEAN: + return std::make_shared(); + case Type::INT32: + return std::make_shared(); + case Type::INT64: + return std::make_shared(); + case Type::INT96: + return std::make_shared(); + case Type::FLOAT: + return std::make_shared(); + case Type::DOUBLE: + return std::make_shared(); + case Type::BYTE_ARRAY: + return std::make_shared(); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_shared(descr->type_length()); + default: + ParquetException::NYI("Signed Compare not implemented"); + } + } else if (SortOrder::UNSIGNED == descr->sort_order()) { + switch (descr->physical_type()) { + case Type::INT32: + return std::make_shared(); + case Type::INT64: + return std::make_shared(); + case Type::INT96: + return std::make_shared(); + case Type::BYTE_ARRAY: + return std::make_shared(); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_shared(descr->type_length()); + default: + ParquetException::NYI("Unsigned Compare not implemented"); + } + } else { + throw ParquetException("UNKNOWN Sort Order"); + } + return nullptr; +} + +template class PARQUET_TEMPLATE_EXPORT CompareDefault; +template class PARQUET_TEMPLATE_EXPORT CompareDefault; +template class PARQUET_TEMPLATE_EXPORT CompareDefault; +template class PARQUET_TEMPLATE_EXPORT CompareDefault; +template class PARQUET_TEMPLATE_EXPORT CompareDefault; +template class PARQUET_TEMPLATE_EXPORT CompareDefault; +template class PARQUET_TEMPLATE_EXPORT CompareDefault; +template class PARQUET_TEMPLATE_EXPORT CompareDefault; + +} // namespace parquet diff --git a/src/parquet/util/comparison.h b/src/parquet/util/comparison.h index edd3df13..803f0da6 100644 --- a/src/parquet/util/comparison.h +++ b/src/parquet/util/comparison.h @@ -20,40 +20,145 @@ #include +#include "parquet/exception.h" #include "parquet/schema.h" #include "parquet/types.h" namespace parquet { -template -struct Compare { - explicit Compare(const ColumnDescriptor* descr) : type_length_(descr->type_length()) {} - - inline bool operator()(const T& a, const T& b) { return a < b; } +class PARQUET_EXPORT Comparator { + public: + virtual ~Comparator() {} + static std::shared_ptr Make(const ColumnDescriptor* descr); +}; - private: - int32_t type_length_; +// The default comparison is SIGNED +template +class PARQUET_EXPORT CompareDefault : public Comparator { + public: + typedef typename DType::c_type T; + CompareDefault() {} + virtual ~CompareDefault() {} + virtual bool operator()(const T& a, const T& b) { return a < b; } }; template <> -inline bool Compare::operator()(const Int96& a, const Int96& b) { - return std::lexicographical_compare(a.value, a.value + 3, b.value, b.value + 3); -} +class PARQUET_EXPORT CompareDefault : public Comparator { + public: + CompareDefault() {} + virtual ~CompareDefault() {} + virtual bool operator()(const Int96& a, const Int96& b) { + const int32_t* aptr = reinterpret_cast(&a.value[0]); + const int32_t* bptr = reinterpret_cast(&b.value[0]); + return std::lexicographical_compare(aptr, aptr + 3, bptr, bptr + 3); + } +}; template <> -inline bool Compare::operator()(const ByteArray& a, const ByteArray& b) { - auto aptr = reinterpret_cast(a.ptr); - auto bptr = reinterpret_cast(b.ptr); - return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len); -} +class PARQUET_EXPORT CompareDefault : public Comparator { + public: + CompareDefault() {} + virtual ~CompareDefault() {} + virtual bool operator()(const ByteArray& a, const ByteArray& b) { + const int8_t* aptr = reinterpret_cast(a.ptr); + const int8_t* bptr = reinterpret_cast(b.ptr); + return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len); + } +}; template <> -inline bool Compare::operator()(const FLBA& a, const FLBA& b) { - auto aptr = reinterpret_cast(a.ptr); - auto bptr = reinterpret_cast(b.ptr); - return std::lexicographical_compare(aptr, aptr + type_length_, bptr, - bptr + type_length_); -} +class PARQUET_EXPORT CompareDefault : public Comparator { + public: + explicit CompareDefault(int length) : type_length_(length) {} + virtual ~CompareDefault() {} + virtual bool operator()(const FLBA& a, const FLBA& b) { + const int8_t* aptr = reinterpret_cast(a.ptr); + const int8_t* bptr = reinterpret_cast(b.ptr); + return std::lexicographical_compare(aptr, aptr + type_length_, bptr, + bptr + type_length_); + } + int32_t type_length_; +}; + +typedef CompareDefault CompareDefaultBoolean; +typedef CompareDefault CompareDefaultInt32; +typedef CompareDefault CompareDefaultInt64; +typedef CompareDefault CompareDefaultInt96; +typedef CompareDefault CompareDefaultFloat; +typedef CompareDefault CompareDefaultDouble; +typedef CompareDefault CompareDefaultByteArray; +typedef CompareDefault CompareDefaultFLBA; + +#if defined(__GNUC__) && !defined(__clang__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wattributes" +#endif + +PARQUET_EXTERN_TEMPLATE CompareDefault; +PARQUET_EXTERN_TEMPLATE CompareDefault; +PARQUET_EXTERN_TEMPLATE CompareDefault; +PARQUET_EXTERN_TEMPLATE CompareDefault; +PARQUET_EXTERN_TEMPLATE CompareDefault; +PARQUET_EXTERN_TEMPLATE CompareDefault; +PARQUET_EXTERN_TEMPLATE CompareDefault; +PARQUET_EXTERN_TEMPLATE CompareDefault; + +#if defined(__GNUC__) && !defined(__clang__) +#pragma GCC diagnostic pop +#endif + +// Define Unsigned Comparators +class PARQUET_EXPORT CompareUnsignedInt32 : public CompareDefaultInt32 { + public: + virtual ~CompareUnsignedInt32() {} + bool operator()(const int32_t& a, const int32_t& b) override { + const uint32_t ua = a; + const uint32_t ub = b; + return (ua < ub); + } +}; + +class PARQUET_EXPORT CompareUnsignedInt64 : public CompareDefaultInt64 { + public: + virtual ~CompareUnsignedInt64() {} + bool operator()(const int64_t& a, const int64_t& b) override { + const uint64_t ua = a; + const uint64_t ub = b; + return (ua < ub); + } +}; + +class PARQUET_EXPORT CompareUnsignedInt96 : public CompareDefaultInt96 { + public: + virtual ~CompareUnsignedInt96() {} + bool operator()(const Int96& a, const Int96& b) override { + const uint32_t* aptr = reinterpret_cast(&a.value[0]); + const uint32_t* bptr = reinterpret_cast(&b.value[0]); + return std::lexicographical_compare(aptr, aptr + 3, bptr, bptr + 3); + } +}; + +class PARQUET_EXPORT CompareUnsignedByteArray : public CompareDefaultByteArray { + public: + virtual ~CompareUnsignedByteArray() {} + bool operator()(const ByteArray& a, const ByteArray& b) override { + const uint8_t* aptr = reinterpret_cast(a.ptr); + const uint8_t* bptr = reinterpret_cast(b.ptr); + return std::lexicographical_compare(aptr, aptr + a.len, bptr, bptr + b.len); + } +}; + +class PARQUET_EXPORT CompareUnsignedFLBA : public CompareDefaultFLBA { + public: + explicit CompareUnsignedFLBA(int length) : CompareDefaultFLBA(length) {} + virtual ~CompareUnsignedFLBA() {} + bool operator()(const FLBA& a, const FLBA& b) override { + const uint8_t* aptr = reinterpret_cast(a.ptr); + const uint8_t* bptr = reinterpret_cast(b.ptr); + return std::lexicographical_compare(aptr, aptr + type_length_, bptr, + bptr + type_length_); + } +}; } // namespace parquet