Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.

Commit 046f8d3

Browse files
author
Deepak Majeti
committed
Move SortOrder to types.h
add int32 comparison INT96 fix statistics in metadata Use templates
1 parent adb3168 commit 046f8d3

File tree

13 files changed

+330
-123
lines changed

13 files changed

+330
-123
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,7 @@ set(LIBPARQUET_SRCS
646646

647647
src/parquet/parquet_constants.cpp
648648
src/parquet/parquet_types.cpp
649+
src/parquet/util/comparison.cc
649650
src/parquet/util/memory.cc
650651
)
651652

src/parquet/column_writer-test.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,14 +151,15 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
151151
void ReadAndCompare(Compression::type compression, int64_t num_rows) {
152152
this->SetupValuesOut(num_rows);
153153
this->ReadColumnFully(compression);
154-
Compare<T> compare(this->descr_);
154+
std::shared_ptr<CompareDefault<TestType> >compare;
155+
compare = std::static_pointer_cast<CompareDefault<TestType> >(Compare::getComparator(this->descr_));
155156
for (size_t i = 0; i < this->values_.size(); i++) {
156-
if (compare(this->values_[i], this->values_out_[i]) ||
157-
compare(this->values_out_[i], this->values_[i])) {
157+
if ((*compare)(this->values_[i], this->values_out_[i]) ||
158+
(*compare)(this->values_out_[i], this->values_[i])) {
158159
std::cout << "Failed at " << i << std::endl;
159160
}
160-
ASSERT_FALSE(compare(this->values_[i], this->values_out_[i]));
161-
ASSERT_FALSE(compare(this->values_out_[i], this->values_[i]));
161+
ASSERT_FALSE((*compare)(this->values_[i], this->values_out_[i]));
162+
ASSERT_FALSE((*compare)(this->values_out_[i], this->values_[i]));
162163
}
163164
ASSERT_EQ(this->values_, this->values_out_);
164165
}

src/parquet/column_writer.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,9 @@ int64_t ColumnWriter::Close() {
267267
FlushBufferedDataPages();
268268

269269
EncodedStatistics chunk_statistics = GetChunkStatistics();
270-
if (chunk_statistics.is_set()) metadata_->SetStatistics(chunk_statistics);
270+
if (chunk_statistics.is_set()) metadata_->SetStatistics(
271+
SortOrder::SIGNED == GetSortOrder(descr_->logical_type(),
272+
descr_->physical_type()), chunk_statistics);
271273
pager_->Close(has_dictionary_, fallback_);
272274
}
273275

@@ -317,7 +319,9 @@ TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
317319
ParquetException::NYI("Selected encoding is not supported");
318320
}
319321

320-
if (properties->statistics_enabled(descr_->path())) {
322+
if (properties->statistics_enabled(descr_->path()) &&
323+
(SortOrder::UNKNOWN != GetSortOrder(descr_->logical_type(),
324+
descr_->physical_type())) ) {
321325
page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
322326
chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
323327
}

src/parquet/file/file-metadata-test.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ TEST(Metadata, TestBuildAccess) {
6363
auto col1_builder = rg1_builder->NextColumnChunk();
6464
auto col2_builder = rg1_builder->NextColumnChunk();
6565
// column metadata
66-
col1_builder->SetStatistics(stats_int);
67-
col2_builder->SetStatistics(stats_float);
66+
col1_builder->SetStatistics(true, stats_int);
67+
col2_builder->SetStatistics(true, stats_float);
6868
col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false);
6969
col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false);
7070
rg1_builder->Finish(1024);
@@ -73,8 +73,8 @@ TEST(Metadata, TestBuildAccess) {
7373
col1_builder = rg2_builder->NextColumnChunk();
7474
col2_builder = rg2_builder->NextColumnChunk();
7575
// column metadata
76-
col1_builder->SetStatistics(stats_int);
77-
col2_builder->SetStatistics(stats_float);
76+
col1_builder->SetStatistics(true, stats_int);
77+
col2_builder->SetStatistics(true, stats_float);
7878
col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false);
7979
col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false);
8080
rg2_builder->Finish(1024);

src/parquet/file/metadata.cc

Lines changed: 14 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -36,58 +36,6 @@ const ApplicationVersion ApplicationVersion::PARQUET_251_FIXED_VERSION =
3636
const ApplicationVersion ApplicationVersion::PARQUET_816_FIXED_VERSION =
3737
ApplicationVersion("parquet-mr version 1.2.9");
3838

39-
// Return the Sort Order of the Parquet Physical Types
40-
SortOrder default_sort_order(Type::type primitive) {
41-
switch (primitive) {
42-
case Type::BOOLEAN:
43-
case Type::INT32:
44-
case Type::INT64:
45-
case Type::FLOAT:
46-
case Type::DOUBLE:
47-
return SortOrder::SIGNED;
48-
case Type::BYTE_ARRAY:
49-
case Type::FIXED_LEN_BYTE_ARRAY:
50-
case Type::INT96: // only used for timestamp, which uses unsigned values
51-
return SortOrder::UNSIGNED;
52-
}
53-
return SortOrder::UNKNOWN;
54-
}
55-
56-
// Return the SortOrder of the Parquet Types using Logical or Physical Types
57-
SortOrder get_sort_order(LogicalType::type converted, Type::type primitive) {
58-
if (converted == LogicalType::NONE) return default_sort_order(primitive);
59-
switch (converted) {
60-
case LogicalType::INT_8:
61-
case LogicalType::INT_16:
62-
case LogicalType::INT_32:
63-
case LogicalType::INT_64:
64-
case LogicalType::DATE:
65-
case LogicalType::TIME_MICROS:
66-
case LogicalType::TIME_MILLIS:
67-
case LogicalType::TIMESTAMP_MICROS:
68-
case LogicalType::TIMESTAMP_MILLIS:
69-
return SortOrder::SIGNED;
70-
case LogicalType::UINT_8:
71-
case LogicalType::UINT_16:
72-
case LogicalType::UINT_32:
73-
case LogicalType::UINT_64:
74-
case LogicalType::ENUM:
75-
case LogicalType::UTF8:
76-
case LogicalType::BSON:
77-
case LogicalType::JSON:
78-
return SortOrder::UNSIGNED;
79-
case LogicalType::NA:
80-
case LogicalType::DECIMAL:
81-
case LogicalType::LIST:
82-
case LogicalType::MAP:
83-
case LogicalType::MAP_KEY_VALUE:
84-
case LogicalType::INTERVAL:
85-
case LogicalType::NONE: // required instead of default
86-
return SortOrder::UNKNOWN;
87-
}
88-
return SortOrder::UNKNOWN;
89-
}
90-
9139
template <typename DType>
9240
static std::shared_ptr<RowGroupStatistics> MakeTypedColumnStats(
9341
const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) {
@@ -161,7 +109,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
161109
return column_->meta_data.__isset.statistics &&
162110
writer_version_->HasCorrectStatistics(type()) &&
163111
SortOrder::SIGNED ==
164-
get_sort_order(descr_->logical_type(), descr_->physical_type());
112+
GetSortOrder(descr_->logical_type(), descr_->physical_type());
165113
}
166114

167115
inline std::shared_ptr<RowGroupStatistics> statistics() const {
@@ -577,16 +525,22 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
577525
void set_file_path(const std::string& val) { column_chunk_->__set_file_path(val); }
578526

579527
// column metadata
580-
void SetStatistics(const EncodedStatistics& val) {
528+
void SetStatistics(bool is_signed, const EncodedStatistics& val) {
581529
format::Statistics stats;
582530
stats.null_count = val.null_count;
583531
stats.distinct_count = val.distinct_count;
584-
stats.max = val.max();
585-
stats.min = val.min();
586-
stats.__isset.min = val.has_min;
587-
stats.__isset.max = val.has_max;
532+
stats.max_value = val.max();
533+
stats.min_value = val.min();
534+
stats.__isset.min_value = val.has_min;
535+
stats.__isset.max_value = val.has_max;
588536
stats.__isset.null_count = val.has_null_count;
589537
stats.__isset.distinct_count = val.has_distinct_count;
538+
if (is_signed) {
539+
stats.max = val.max();
540+
stats.min = val.min();
541+
stats.__isset.min = val.has_min;
542+
stats.__isset.max = val.has_max;
543+
}
590544

591545
column_chunk_->meta_data.__set_statistics(stats);
592546
}
@@ -674,8 +628,8 @@ const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const {
674628
return impl_->descr();
675629
}
676630

677-
void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) {
678-
impl_->SetStatistics(result);
631+
void ColumnChunkMetaDataBuilder::SetStatistics(bool is_signed, const EncodedStatistics& result) {
632+
impl_->SetStatistics(is_signed, result);
679633
}
680634

681635
class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {

src/parquet/file/metadata.h

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,6 @@ namespace parquet {
3535

3636
using KeyValueMetadata = ::arrow::KeyValueMetadata;
3737

38-
// Reference:
39-
// parquet-mr/parquet-hadoop/src/main/java/org/apache/parquet/
40-
// format/converter/ParquetMetadataConverter.java
41-
// Sort order for page and column statistics. Types are associated with sort
42-
// orders (e.g., UTF8 columns should use UNSIGNED) and column stats are
43-
// aggregated using a sort order. As of parquet-format version 2.3.1, the
44-
// order used to aggregate stats is always SIGNED and is not stored in the
45-
// Parquet file. These stats are discarded for types that need unsigned.
46-
// See PARQUET-686.
47-
enum SortOrder { SIGNED, UNSIGNED, UNKNOWN };
48-
4938
class ApplicationVersion {
5039
public:
5140
// Known Versions with Issues
@@ -209,7 +198,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
209198
// Used when a dataset is spread across multiple files
210199
void set_file_path(const std::string& path);
211200
// column metadata
212-
void SetStatistics(const EncodedStatistics& stats);
201+
void SetStatistics(bool is_signed, const EncodedStatistics& stats);
213202
// get the column descriptor
214203
const ColumnDescriptor* descr() const;
215204
// commit the metadata

src/parquet/statistics.cc

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include "parquet/encoding-internal.h"
2222
#include "parquet/exception.h"
2323
#include "parquet/statistics.h"
24-
#include "parquet/util/comparison.h"
2524
#include "parquet/util/memory.h"
2625

2726
using arrow::default_memory_pool;
@@ -35,6 +34,7 @@ TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const ColumnDescriptor*
3534
: pool_(pool),
3635
min_buffer_(AllocateBuffer(pool_, 0)),
3736
max_buffer_(AllocateBuffer(pool_, 0)) {
37+
comparator_ = std::static_pointer_cast<CompareDefault<DType> >(Compare::getComparator(schema));
3838
SetDescr(schema);
3939
Reset();
4040
}
@@ -69,6 +69,7 @@ TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(
6969
IncrementNullCount(null_count);
7070
IncrementDistinctCount(distinct_count);
7171

72+
comparator_ = std::static_pointer_cast<CompareDefault<DType> >(Compare::getComparator(schema));
7273
SetDescr(schema);
7374

7475
if (!encoded_min.empty()) {
@@ -102,15 +103,14 @@ void TypedRowGroupStatistics<DType>::Update(const T* values, int64_t num_not_nul
102103
// TODO: support distinct count?
103104
if (num_not_null == 0) return;
104105

105-
Compare<T> compare(descr_);
106-
auto batch_minmax = std::minmax_element(values, values + num_not_null, compare);
106+
auto batch_minmax = std::minmax_element(values, values + num_not_null, std::ref(*(this->comparator_)));
107107
if (!has_min_max_) {
108108
has_min_max_ = true;
109109
Copy(*batch_minmax.first, &min_, min_buffer_.get());
110110
Copy(*batch_minmax.second, &max_, max_buffer_.get());
111111
} else {
112-
Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_.get());
113-
Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_.get());
112+
Copy(std::min(min_, *batch_minmax.first, std::ref(*(this->comparator_))), &min_, min_buffer_.get());
113+
Copy(std::max(max_, *batch_minmax.second, std::ref(*(this->comparator_))), &max_, max_buffer_.get());
114114
}
115115
}
116116

@@ -128,7 +128,6 @@ void TypedRowGroupStatistics<DType>::UpdateSpaced(const T* values,
128128
// TODO: support distinct count?
129129
if (num_not_null == 0) return;
130130

131-
Compare<T> compare(descr_);
132131
INIT_BITSET(valid_bits, static_cast<int>(valid_bits_offset));
133132
// Find first valid entry and use that for min/max
134133
// As (num_not_null != 0) there must be one
@@ -144,9 +143,9 @@ void TypedRowGroupStatistics<DType>::UpdateSpaced(const T* values,
144143
T max = values[i];
145144
for (; i < length; i++) {
146145
if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
147-
if (compare(values[i], min)) {
146+
if ((std::ref(*(this->comparator_)))(values[i], min)) {
148147
min = values[i];
149-
} else if (compare(max, values[i])) {
148+
} else if ((std::ref(*(this->comparator_)))(max, values[i])) {
150149
max = values[i];
151150
}
152151
}
@@ -157,8 +156,8 @@ void TypedRowGroupStatistics<DType>::UpdateSpaced(const T* values,
157156
Copy(min, &min_, min_buffer_.get());
158157
Copy(max, &max_, max_buffer_.get());
159158
} else {
160-
Copy(std::min(min_, min, compare), &min_, min_buffer_.get());
161-
Copy(std::max(max_, max, compare), &max_, max_buffer_.get());
159+
Copy(std::min(min_, min, std::ref(*(this->comparator_))), &min_, min_buffer_.get());
160+
Copy(std::max(max_, max, std::ref(*(this->comparator_))), &max_, max_buffer_.get());
162161
}
163162
}
164163

@@ -185,9 +184,8 @@ void TypedRowGroupStatistics<DType>::Merge(const TypedRowGroupStatistics<DType>&
185184
return;
186185
}
187186

188-
Compare<T> compare(descr_);
189-
Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_.get());
190-
Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_.get());
187+
Copy(std::min(this->min_, other.min_, std::ref(*(this->comparator_))), &this->min_, min_buffer_.get());
188+
Copy(std::max(this->max_, other.max_, std::ref(*(this->comparator_))), &this->max_, max_buffer_.get());
191189
}
192190

193191
template <typename DType>

src/parquet/statistics.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include "parquet/schema.h"
2626
#include "parquet/types.h"
27+
#include "parquet/util/comparison.h"
2728
#include "parquet/util/memory.h"
2829
#include "parquet/util/visibility.h"
2930

@@ -164,6 +165,7 @@ class TypedRowGroupStatistics : public RowGroupStatistics {
164165
T min_;
165166
T max_;
166167
::arrow::MemoryPool* pool_;
168+
std::shared_ptr<CompareDefault<DType> > comparator_;
167169

168170
void PlainEncode(const T& src, std::string* dst);
169171
void PlainDecode(const std::string& src, T* dst);

src/parquet/types.cc

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,4 +221,56 @@ int GetTypeByteSize(Type::type parquet_type) {
221221
return 0;
222222
}
223223

224+
// Return the Sort Order of the Parquet Physical Types
225+
SortOrder::type DefaultSortOrder(Type::type primitive) {
226+
switch (primitive) {
227+
case Type::BOOLEAN:
228+
case Type::INT32:
229+
case Type::INT64:
230+
case Type::FLOAT:
231+
case Type::DOUBLE:
232+
return SortOrder::SIGNED;
233+
case Type::BYTE_ARRAY:
234+
case Type::FIXED_LEN_BYTE_ARRAY:
235+
case Type::INT96: // only used for timestamp, which uses unsigned values
236+
return SortOrder::UNSIGNED;
237+
}
238+
return SortOrder::UNKNOWN;
239+
}
240+
241+
// Return the SortOrder of the Parquet Types using Logical or Physical Types
242+
SortOrder::type GetSortOrder(LogicalType::type converted, Type::type primitive) {
243+
if (converted == LogicalType::NONE) return DefaultSortOrder(primitive);
244+
switch (converted) {
245+
case LogicalType::INT_8:
246+
case LogicalType::INT_16:
247+
case LogicalType::INT_32:
248+
case LogicalType::INT_64:
249+
case LogicalType::DATE:
250+
case LogicalType::TIME_MICROS:
251+
case LogicalType::TIME_MILLIS:
252+
case LogicalType::TIMESTAMP_MICROS:
253+
case LogicalType::TIMESTAMP_MILLIS:
254+
return SortOrder::SIGNED;
255+
case LogicalType::UINT_8:
256+
case LogicalType::UINT_16:
257+
case LogicalType::UINT_32:
258+
case LogicalType::UINT_64:
259+
case LogicalType::ENUM:
260+
case LogicalType::UTF8:
261+
case LogicalType::BSON:
262+
case LogicalType::JSON:
263+
return SortOrder::UNSIGNED;
264+
case LogicalType::DECIMAL:
265+
case LogicalType::LIST:
266+
case LogicalType::MAP:
267+
case LogicalType::MAP_KEY_VALUE:
268+
case LogicalType::INTERVAL:
269+
case LogicalType::NONE: // required instead of default
270+
case LogicalType::NA: // required instead of default
271+
return SortOrder::UNKNOWN;
272+
}
273+
return SortOrder::UNKNOWN;
274+
}
275+
224276
} // namespace parquet

src/parquet/types.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,19 @@ struct PageType {
116116
enum type { DATA_PAGE, INDEX_PAGE, DICTIONARY_PAGE, DATA_PAGE_V2 };
117117
};
118118

119+
// Reference:
120+
// parquet-mr/parquet-hadoop/src/main/java/org/apache/parquet/
121+
// format/converter/ParquetMetadataConverter.java
122+
// Sort order for page and column statistics. Types are associated with sort
123+
// orders (e.g., UTF8 columns should use UNSIGNED) and column stats are
124+
// aggregated using a sort order. As of parquet-format version 2.3.1, the
125+
// order used to aggregate stats is always SIGNED and is not stored in the
126+
// Parquet file. These stats are discarded for types that need unsigned.
127+
// See PARQUET-686.
128+
struct SortOrder {
129+
enum type { SIGNED, UNSIGNED, UNKNOWN };
130+
};
131+
119132
// ----------------------------------------------------------------------
120133

121134
struct ByteArray {
@@ -283,6 +296,10 @@ PARQUET_EXPORT std::string FormatStatValue(Type::type parquet_type, const char*
283296

284297
PARQUET_EXPORT int GetTypeByteSize(Type::type t);
285298

299+
SortOrder::type PARQUET_EXPORT DefaultSortOrder(Type::type primitive);
300+
301+
SortOrder::type PARQUET_EXPORT GetSortOrder(LogicalType::type converted, Type::type primitive);
302+
286303
} // namespace parquet
287304

288305
#endif // PARQUET_TYPES_H

0 commit comments

Comments
 (0)