Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ set(LIBPARQUET_SRCS

src/parquet/parquet_constants.cpp
src/parquet/parquet_types.cpp
src/parquet/util/comparison.cc
src/parquet/util/memory.cc
)

Expand Down
12 changes: 7 additions & 5 deletions src/parquet/column_writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,16 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
void ReadAndCompare(Compression::type compression, int64_t num_rows) {
this->SetupValuesOut(num_rows);
this->ReadColumnFully(compression);
Compare<T> compare(this->descr_);
std::shared_ptr<CompareDefault<TestType>> compare;
compare = std::static_pointer_cast<CompareDefault<TestType>>(
Comparator::Make(this->descr_));
for (size_t i = 0; i < this->values_.size(); i++) {
if (compare(this->values_[i], this->values_out_[i]) ||
compare(this->values_out_[i], this->values_[i])) {
if ((*compare)(this->values_[i], this->values_out_[i]) ||
(*compare)(this->values_out_[i], this->values_[i])) {
std::cout << "Failed at " << i << std::endl;
}
ASSERT_FALSE(compare(this->values_[i], this->values_out_[i]));
ASSERT_FALSE(compare(this->values_out_[i], this->values_[i]));
ASSERT_FALSE((*compare)(this->values_[i], this->values_out_[i]));
ASSERT_FALSE((*compare)(this->values_out_[i], this->values_[i]));
}
ASSERT_EQ(this->values_, this->values_out_);
}
Expand Down
8 changes: 6 additions & 2 deletions src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,10 @@ int64_t ColumnWriter::Close() {
FlushBufferedDataPages();

EncodedStatistics chunk_statistics = GetChunkStatistics();
if (chunk_statistics.is_set()) metadata_->SetStatistics(chunk_statistics);
if (chunk_statistics.is_set()) {
metadata_->SetStatistics(SortOrder::SIGNED == descr_->sort_order(),
chunk_statistics);
}
pager_->Close(has_dictionary_, fallback_);
}

Expand Down Expand Up @@ -317,7 +320,8 @@ TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
ParquetException::NYI("Selected encoding is not supported");
}

if (properties->statistics_enabled(descr_->path())) {
if (properties->statistics_enabled(descr_->path()) &&
(SortOrder::UNKNOWN != descr_->sort_order())) {
page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
}
Expand Down
19 changes: 10 additions & 9 deletions src/parquet/file/file-metadata-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ TEST(Metadata, TestBuildAccess) {
auto col1_builder = rg1_builder->NextColumnChunk();
auto col2_builder = rg1_builder->NextColumnChunk();
// column metadata
col1_builder->SetStatistics(stats_int);
col2_builder->SetStatistics(stats_float);
col1_builder->SetStatistics(true, stats_int);
col2_builder->SetStatistics(true, stats_float);
col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false);
col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false);
rg1_builder->Finish(1024);
Expand All @@ -73,8 +73,8 @@ TEST(Metadata, TestBuildAccess) {
col1_builder = rg2_builder->NextColumnChunk();
col2_builder = rg2_builder->NextColumnChunk();
// column metadata
col1_builder->SetStatistics(stats_int);
col2_builder->SetStatistics(stats_float);
col1_builder->SetStatistics(true, stats_int);
col2_builder->SetStatistics(true, stats_float);
col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false);
col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false);
rg2_builder->Finish(1024);
Expand Down Expand Up @@ -215,11 +215,12 @@ TEST(ApplicationVersion, Basics) {

ASSERT_EQ(true, version.VersionLt(version1));

ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96));
ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32));
ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY));
ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY));
ASSERT_TRUE(version3.HasCorrectStatistics(Type::FIXED_LEN_BYTE_ARRAY));
ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96, SortOrder::SIGNED));
ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32, SortOrder::SIGNED));
ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED));
ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED));
ASSERT_TRUE(
version3.HasCorrectStatistics(Type::FIXED_LEN_BYTE_ARRAY, SortOrder::SIGNED));
}

} // namespace metadata
Expand Down
111 changes: 41 additions & 70 deletions src/parquet/file/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,62 +35,20 @@ const ApplicationVersion ApplicationVersion::PARQUET_251_FIXED_VERSION =
ApplicationVersion("parquet-mr version 1.8.0");
const ApplicationVersion ApplicationVersion::PARQUET_816_FIXED_VERSION =
ApplicationVersion("parquet-mr version 1.2.9");

// Return the Sort Order of the Parquet Physical Types
SortOrder default_sort_order(Type::type primitive) {
switch (primitive) {
case Type::BOOLEAN:
case Type::INT32:
case Type::INT64:
case Type::FLOAT:
case Type::DOUBLE:
return SortOrder::SIGNED;
case Type::BYTE_ARRAY:
case Type::FIXED_LEN_BYTE_ARRAY:
case Type::INT96: // only used for timestamp, which uses unsigned values
return SortOrder::UNSIGNED;
}
return SortOrder::UNKNOWN;
}

// Return the SortOrder of the Parquet Types using Logical or Physical Types
SortOrder get_sort_order(LogicalType::type converted, Type::type primitive) {
if (converted == LogicalType::NONE) return default_sort_order(primitive);
switch (converted) {
case LogicalType::INT_8:
case LogicalType::INT_16:
case LogicalType::INT_32:
case LogicalType::INT_64:
case LogicalType::DATE:
case LogicalType::TIME_MICROS:
case LogicalType::TIME_MILLIS:
case LogicalType::TIMESTAMP_MICROS:
case LogicalType::TIMESTAMP_MILLIS:
return SortOrder::SIGNED;
case LogicalType::UINT_8:
case LogicalType::UINT_16:
case LogicalType::UINT_32:
case LogicalType::UINT_64:
case LogicalType::ENUM:
case LogicalType::UTF8:
case LogicalType::BSON:
case LogicalType::JSON:
return SortOrder::UNSIGNED;
case LogicalType::NA:
case LogicalType::DECIMAL:
case LogicalType::LIST:
case LogicalType::MAP:
case LogicalType::MAP_KEY_VALUE:
case LogicalType::INTERVAL:
case LogicalType::NONE: // required instead of default
return SortOrder::UNKNOWN;
}
return SortOrder::UNKNOWN;
}
const ApplicationVersion ApplicationVersion::PARQUET_CPP_FIXED_STATS_VERSION =
ApplicationVersion("parquet-cpp version 1.3.0");

template <typename DType>
static std::shared_ptr<RowGroupStatistics> MakeTypedColumnStats(
const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) {
// If new fields max_value/min_value are set, then return them.
if (metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value) {
return std::make_shared<TypedRowGroupStatistics<DType>>(
descr, metadata.statistics.min_value, metadata.statistics.max_value,
metadata.num_values - metadata.statistics.null_count,
metadata.statistics.null_count, metadata.statistics.distinct_count, true);
}
// Default behavior
return std::make_shared<TypedRowGroupStatistics<DType>>(
descr, metadata.statistics.min, metadata.statistics.max,
metadata.num_values - metadata.statistics.null_count,
Expand Down Expand Up @@ -159,9 +117,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
inline bool is_stats_set() const {
DCHECK(writer_version_ != nullptr);
return column_->meta_data.__isset.statistics &&
writer_version_->HasCorrectStatistics(type()) &&
SortOrder::SIGNED ==
get_sort_order(descr_->logical_type(), descr_->physical_type());
writer_version_->HasCorrectStatistics(type(), descr_->sort_order());
}

inline std::shared_ptr<RowGroupStatistics> statistics() const {
Expand Down Expand Up @@ -534,15 +490,21 @@ bool ApplicationVersion::VersionEq(const ApplicationVersion& other_version) cons
// Reference:
// parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
// PARQUET-686 has more disussion on statistics
bool ApplicationVersion::HasCorrectStatistics(Type::type col_type) const {
// None of the current tools write INT96 Statistics correctly
if (col_type == Type::INT96) return false;

// Statistics of other types are OK
if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) {
return true;
bool ApplicationVersion::HasCorrectStatistics(Type::type col_type,
SortOrder::type sort_order) const {
// Parquet cpp version 1.3.0 onwards stats are computed correctly for all types
if ((application_ != "parquet-cpp") || (VersionLt(PARQUET_CPP_FIXED_STATS_VERSION))) {
// Only SIGNED are valid
if (SortOrder::SIGNED != sort_order) return false;

// None of the current tools write INT96 Statistics correctly
if (col_type == Type::INT96) return false;

// Statistics of other types are OK
if (col_type != Type::FIXED_LEN_BYTE_ARRAY && col_type != Type::BYTE_ARRAY) {
return true;
}
}

// created_by is not populated, which could have been caused by
// parquet-mr during the same time as PARQUET-251, see PARQUET-297
if (application_ == "unknown") {
Expand Down Expand Up @@ -577,16 +539,24 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); }

// column metadata
void SetStatistics(const EncodedStatistics& val) {
void SetStatistics(bool is_signed, const EncodedStatistics& val) {
format::Statistics stats;
stats.null_count = val.null_count;
stats.distinct_count = val.distinct_count;
stats.max = val.max();
stats.min = val.min();
stats.__isset.min = val.has_min;
stats.__isset.max = val.has_max;
stats.max_value = val.max();
stats.min_value = val.min();
stats.__isset.min_value = val.has_min;
stats.__isset.max_value = val.has_max;
stats.__isset.null_count = val.has_null_count;
stats.__isset.distinct_count = val.has_distinct_count;
// If the order is SIGNED, then the old min/max values must be set too.
// This for backward compatibility
if (is_signed) {
stats.max = val.max();
stats.min = val.min();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the new statistics fields cause any problems for older Parquet readers?

Copy link
Author

@majetideepak majetideepak Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and it didn't.

stats.__isset.min = val.has_min;
stats.__isset.max = val.has_max;
}

column_chunk_->meta_data.__set_statistics(stats);
}
Expand Down Expand Up @@ -674,8 +644,9 @@ const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const {
return impl_->descr();
}

void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) {
impl_->SetStatistics(result);
void ColumnChunkMetaDataBuilder::SetStatistics(bool is_signed,
const EncodedStatistics& result) {
impl_->SetStatistics(is_signed, result);
}

class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
Expand Down
17 changes: 4 additions & 13 deletions src/parquet/file/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,12 @@ namespace parquet {

using KeyValueMetadata = ::arrow::KeyValueMetadata;

// Reference:
// parquet-mr/parquet-hadoop/src/main/java/org/apache/parquet/
// format/converter/ParquetMetadataConverter.java
// Sort order for page and column statistics. Types are associated with sort
// orders (e.g., UTF8 columns should use UNSIGNED) and column stats are
// aggregated using a sort order. As of parquet-format version 2.3.1, the
// order used to aggregate stats is always SIGNED and is not stored in the
// Parquet file. These stats are discarded for types that need unsigned.
// See PARQUET-686.
enum SortOrder { SIGNED, UNSIGNED, UNKNOWN };

class ApplicationVersion {
public:
// Known Versions with Issues
static const ApplicationVersion PARQUET_251_FIXED_VERSION;
static const ApplicationVersion PARQUET_816_FIXED_VERSION;
static const ApplicationVersion PARQUET_CPP_FIXED_STATS_VERSION;
// Regular expression for the version format
// major . minor . patch unknown - prerelease.x + build info
// Eg: 1.5.0ab-cdh5.5.0+cd
Expand Down Expand Up @@ -92,7 +82,8 @@ class ApplicationVersion {
bool VersionEq(const ApplicationVersion& other_version) const;

// Checks if the Version has the correct statistics for a given column
bool HasCorrectStatistics(Type::type primitive) const;
bool HasCorrectStatistics(Type::type primitive,
SortOrder::type sort_order = SortOrder::SIGNED) const;
};

class PARQUET_EXPORT ColumnChunkMetaData {
Expand Down Expand Up @@ -209,7 +200,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
// Used when a dataset is spread across multiple files
void set_file_path(const std::string& path);
// column metadata
void SetStatistics(const EncodedStatistics& stats);
void SetStatistics(bool is_signed, const EncodedStatistics& stats);
// get the column descriptor
const ColumnDescriptor* descr() const;
// commit the metadata
Expand Down
2 changes: 1 addition & 1 deletion src/parquet/parquet_version.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
// define the parquet created by version
#define CREATED_BY_VERSION "parquet-cpp version @PARQUET_VERSION@"

#endif // PARQUET_VERSION_H
#endif // PARQUET_VERSION_H
4 changes: 4 additions & 0 deletions src/parquet/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ class PARQUET_EXPORT ColumnDescriptor {

LogicalType::type logical_type() const { return primitive_node_->logical_type(); }

SortOrder::type sort_order() const {
return GetSortOrder(logical_type(), physical_type());
}

const std::string& name() const { return primitive_node_->name(); }

const std::shared_ptr<schema::ColumnPath> path() const;
Expand Down
Loading