From 7c33cc9d73710f955c4cf9624eb1c24c56e98083 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 27 Jan 2016 21:13:15 -0800 Subject: [PATCH] PARQUET-451: Add RowGroupReader helper class and refactor parquet_reader.cc into DebugPrint This also addresses PARQUET-433 and PARQUET-453. Author: Wes McKinney Closes #23 from wesm/PARQUET-451 and squashes the following commits: 748ee0c [Wes McKinney] Turn MakeColumnReader into static ColumnReader::Make 6528497 [Wes McKinney] Incorporate code review comments 4b5575d [Wes McKinney] [PARQUET-451/453]: Implement RowGroupReader class and refactor parquet_reader.cc into ParquetFileReader::DebugPrint 2985e2e [Wes McKinney] [PARQUET-433]: Templatize decoders and column readers and remove most switch-on-type statements. Add parquet::SchemaElement* member to Decoder, for FLBA metadata. Change-Id: I3c7668b53f4167fdaf3e371955a4833e052589e2 --- cpp/src/parquet/CMakeLists.txt | 2 + cpp/src/parquet/column_reader.cc | 194 ++++++++++++ cpp/src/parquet/column_reader.h | 183 +++++++++++ cpp/src/parquet/compression/codec.h | 6 +- cpp/src/parquet/encodings/CMakeLists.txt | 1 - cpp/src/parquet/encodings/bool-encoding.h | 48 --- .../encodings/delta-bit-pack-encoding.h | 20 +- .../encodings/delta-byte-array-encoding.h | 20 +- .../delta-length-byte-array-encoding.h | 16 +- .../parquet/encodings/dictionary-encoding.h | 131 +++----- cpp/src/parquet/encodings/encodings.h | 34 +-- cpp/src/parquet/encodings/plain-encoding.h | 82 +++-- cpp/src/parquet/parquet.h | 197 +----------- cpp/src/parquet/reader-test.cc | 14 +- cpp/src/parquet/reader.cc | 283 +++++++++++++++++- cpp/src/parquet/reader.h | 65 +++- cpp/src/parquet/types.h | 112 +++++++ cpp/src/parquet/util/CMakeLists.txt | 5 + cpp/src/parquet/util/input_stream.cc | 63 ++++ cpp/src/parquet/util/input_stream.h | 80 +++++ 20 files changed, 1133 insertions(+), 423 deletions(-) create mode 100644 cpp/src/parquet/column_reader.cc create mode 100644 cpp/src/parquet/column_reader.h delete mode 100644 cpp/src/parquet/encodings/bool-encoding.h create mode 100644 cpp/src/parquet/types.h create mode 100644 cpp/src/parquet/util/input_stream.cc create mode 100644 cpp/src/parquet/util/input_stream.h diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index f08901ec2016a..1809ea194f2fb 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -18,8 +18,10 @@ # Headers: top level install(FILES parquet.h + column_reader.h reader.h exception.h + types.h DESTINATION include/parquet) ADD_PARQUET_TEST(reader-test) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc new file mode 100644 index 0000000000000..b7ececb98d648 --- /dev/null +++ b/cpp/src/parquet/column_reader.cc @@ -0,0 +1,194 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "parquet/column_reader.h" + +#include +#include +#include + +#include "parquet/encodings/encodings.h" +#include "parquet/compression/codec.h" +#include "parquet/thrift/util.h" +#include "parquet/util/input_stream.h" + +const int DATA_PAGE_SIZE = 64 * 1024; + +namespace parquet_cpp { + +using parquet::CompressionCodec; +using parquet::Encoding; +using parquet::FieldRepetitionType; +using parquet::PageType; +using parquet::Type; + + +ColumnReader::~ColumnReader() { + delete stream_; +} + +ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata, + const parquet::SchemaElement* schema, InputStream* stream) + : metadata_(metadata), + schema_(schema), + stream_(stream), + num_buffered_values_(0), + num_decoded_values_(0), + buffered_values_offset_(0) { + + switch (metadata->codec) { + case CompressionCodec::UNCOMPRESSED: + break; + case CompressionCodec::SNAPPY: + decompressor_.reset(new SnappyCodec()); + break; + default: + ParquetException::NYI("Reading compressed data"); + } + + config_ = Config::DefaultConfig(); +} + + +// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index +// encoding. +static bool IsDictionaryIndexEncoding(const Encoding::type& e) { + return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY; +} + +template +bool TypedColumnReader::ReadNewPage() { + // Loop until we find the next data page. + + + while (true) { + int bytes_read = 0; + const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read); + if (bytes_read == 0) return false; + uint32_t header_size = bytes_read; + DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); + stream_->Read(header_size, &bytes_read); + + int compressed_len = current_page_header_.compressed_page_size; + int uncompressed_len = current_page_header_.uncompressed_page_size; + + // Read the compressed data page. + buffer = stream_->Read(compressed_len, &bytes_read); + if (bytes_read != compressed_len) ParquetException::EofException(); + + // Uncompress it if we need to + if (decompressor_ != NULL) { + // Grow the uncompressed buffer if we need to. + if (uncompressed_len > decompression_buffer_.size()) { + decompression_buffer_.resize(uncompressed_len); + } + decompressor_->Decompress(compressed_len, buffer, uncompressed_len, + &decompression_buffer_[0]); + buffer = &decompression_buffer_[0]; + } + + if (current_page_header_.type == PageType::DICTIONARY_PAGE) { + auto it = decoders_.find(Encoding::RLE_DICTIONARY); + if (it != decoders_.end()) { + throw ParquetException("Column cannot have more than one dictionary."); + } + + PlainDecoder dictionary(schema_); + dictionary.SetData(current_page_header_.dictionary_page_header.num_values, + buffer, uncompressed_len); + std::shared_ptr decoder(new DictionaryDecoder(schema_, &dictionary)); + + decoders_[Encoding::RLE_DICTIONARY] = decoder; + current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get(); + continue; + } else if (current_page_header_.type == PageType::DATA_PAGE) { + // Read a data page. + num_buffered_values_ = current_page_header_.data_page_header.num_values; + + // Read definition levels. + if (schema_->repetition_type != FieldRepetitionType::REQUIRED) { + int num_definition_bytes = *reinterpret_cast(buffer); + buffer += sizeof(uint32_t); + definition_level_decoder_.reset( + new RleDecoder(buffer, num_definition_bytes, 1)); + buffer += num_definition_bytes; + uncompressed_len -= sizeof(uint32_t); + uncompressed_len -= num_definition_bytes; + } + + // TODO: repetition levels + + // Get a decoder object for this page or create a new decoder if this is the + // first page with this encoding. + Encoding::type encoding = current_page_header_.data_page_header.encoding; + if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY; + + auto it = decoders_.find(encoding); + if (it != decoders_.end()) { + current_decoder_ = it->second.get(); + } else { + switch (encoding) { + case Encoding::PLAIN: { + std::shared_ptr decoder(new PlainDecoder(schema_)); + decoders_[encoding] = decoder; + current_decoder_ = decoder.get(); + break; + } + case Encoding::RLE_DICTIONARY: + throw ParquetException("Dictionary page must be before data page."); + + case Encoding::DELTA_BINARY_PACKED: + case Encoding::DELTA_LENGTH_BYTE_ARRAY: + case Encoding::DELTA_BYTE_ARRAY: + ParquetException::NYI("Unsupported encoding"); + + default: + throw ParquetException("Unknown encoding type."); + } + } + current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len); + return true; + } else { + // We don't know what this page type is. We're allowed to skip non-data pages. + continue; + } + } + return true; +} + +std::shared_ptr ColumnReader::Make(const parquet::ColumnMetaData* metadata, + const parquet::SchemaElement* element, InputStream* stream) { + switch (metadata->type) { + case Type::BOOLEAN: + return std::make_shared(metadata, element, stream); + case Type::INT32: + return std::make_shared(metadata, element, stream); + case Type::INT64: + return std::make_shared(metadata, element, stream); + case Type::INT96: + return std::make_shared(metadata, element, stream); + case Type::FLOAT: + return std::make_shared(metadata, element, stream); + case Type::DOUBLE: + return std::make_shared(metadata, element, stream); + case Type::BYTE_ARRAY: + return std::make_shared(metadata, element, stream); + default: + ParquetException::NYI("type reader not implemented"); + } + // Unreachable code, but supress compiler warning + return std::shared_ptr(nullptr); +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h new file mode 100644 index 0000000000000..cd6cc02587471 --- /dev/null +++ b/cpp/src/parquet/column_reader.h @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_COLUMN_READER_H +#define PARQUET_COLUMN_READER_H + +#include +#include +#include +#include +#include +#include +#include + +#include "parquet/exception.h" +#include "parquet/types.h" +#include "parquet/thrift/parquet_constants.h" +#include "parquet/thrift/parquet_types.h" +#include "parquet/util/input_stream.h" +#include "parquet/encodings/encodings.h" +#include "parquet/util/rle-encoding.h" + +namespace std { + +template <> +struct hash { + std::size_t operator()(const parquet::Encoding::type& k) const { + return hash()(static_cast(k)); + } +}; + +} // namespace std + +namespace parquet_cpp { + +class Codec; + +class ColumnReader { + public: + + struct Config { + int batch_size; + + static Config DefaultConfig() { + Config config; + config.batch_size = 128; + return config; + } + }; + + ColumnReader(const parquet::ColumnMetaData*, + const parquet::SchemaElement*, InputStream* stream); + + virtual ~ColumnReader(); + + static std::shared_ptr Make(const parquet::ColumnMetaData*, + const parquet::SchemaElement*, InputStream* stream); + + virtual bool ReadNewPage() = 0; + + // Returns true if there are still values in this column. + bool HasNext() { + if (num_buffered_values_ == 0) { + ReadNewPage(); + if (num_buffered_values_ == 0) return false; + } + return true; + } + + parquet::Type::type type() const { + return metadata_->type; + } + + const parquet::ColumnMetaData* metadata() const { + return metadata_; + } + + protected: + // Reads the next definition and repetition level. Returns true if the value is NULL. + bool ReadDefinitionRepetitionLevels(int* def_level, int* rep_level); + + Config config_; + + const parquet::ColumnMetaData* metadata_; + const parquet::SchemaElement* schema_; + InputStream* stream_; + + // Compression codec to use. + std::unique_ptr decompressor_; + std::vector decompression_buffer_; + + parquet::PageHeader current_page_header_; + + // Not set if field is required. + std::unique_ptr definition_level_decoder_; + // Not set for flat schemas. + std::unique_ptr repetition_level_decoder_; + int num_buffered_values_; + + int num_decoded_values_; + int buffered_values_offset_; +}; + + +// API to read values from a single column. This is the main client facing API. +template +class TypedColumnReader : public ColumnReader { + public: + typedef typename type_traits::value_type T; + + TypedColumnReader(const parquet::ColumnMetaData* metadata, + const parquet::SchemaElement* schema, InputStream* stream) : + ColumnReader(metadata, schema, stream), + current_decoder_(NULL) { + size_t value_byte_size = type_traits::value_byte_size; + values_buffer_.resize(config_.batch_size * value_byte_size); + } + + // Returns the next value of this type. + // TODO: batchify this interface. + T NextValue(int* def_level, int* rep_level) { + if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return T(); + if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); + return reinterpret_cast(&values_buffer_[0])[buffered_values_offset_++]; + } + + private: + void BatchDecode(); + + virtual bool ReadNewPage(); + + typedef Decoder DecoderType; + + // Map of compression type to decompressor object. + std::unordered_map > decoders_; + + DecoderType* current_decoder_; + std::vector values_buffer_; +}; + +typedef TypedColumnReader BoolReader; +typedef TypedColumnReader Int32Reader; +typedef TypedColumnReader Int64Reader; +typedef TypedColumnReader Int96Reader; +typedef TypedColumnReader FloatReader; +typedef TypedColumnReader DoubleReader; +typedef TypedColumnReader ByteArrayReader; + + +template +void TypedColumnReader::BatchDecode() { + buffered_values_offset_ = 0; + T* buf = reinterpret_cast(&values_buffer_[0]); + int batch_size = config_.batch_size; + num_decoded_values_ = current_decoder_->Decode(buf, batch_size); +} + +inline bool ColumnReader::ReadDefinitionRepetitionLevels(int* def_level, int* rep_level) { + *rep_level = 1; + if (definition_level_decoder_ && !definition_level_decoder_->Get(def_level)) { + ParquetException::EofException(); + } + --num_buffered_values_; + return *def_level == 0; +} + +} // namespace parquet_cpp + +#endif // PARQUET_COLUMN_READER_H diff --git a/cpp/src/parquet/compression/codec.h b/cpp/src/parquet/compression/codec.h index 8166847bfc47a..07648d77acfe2 100644 --- a/cpp/src/parquet/compression/codec.h +++ b/cpp/src/parquet/compression/codec.h @@ -15,11 +15,9 @@ #ifndef PARQUET_COMPRESSION_CODEC_H #define PARQUET_COMPRESSION_CODEC_H -#include "parquet/parquet.h" - #include -#include "parquet/thrift/parquet_constants.h" -#include "parquet/thrift/parquet_types.h" + +#include "parquet/exception.h" namespace parquet_cpp { diff --git a/cpp/src/parquet/encodings/CMakeLists.txt b/cpp/src/parquet/encodings/CMakeLists.txt index 72baf4818c487..544b1e127a628 100644 --- a/cpp/src/parquet/encodings/CMakeLists.txt +++ b/cpp/src/parquet/encodings/CMakeLists.txt @@ -15,7 +15,6 @@ # Headers: encodings install(FILES encodings.h - bool-encoding.h delta-bit-pack-encoding.h delta-byte-array-encoding.h delta-length-byte-array-encoding.h diff --git a/cpp/src/parquet/encodings/bool-encoding.h b/cpp/src/parquet/encodings/bool-encoding.h deleted file mode 100644 index 8eb55bc88f2e8..0000000000000 --- a/cpp/src/parquet/encodings/bool-encoding.h +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2012 Cloudera Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef PARQUET_BOOL_ENCODING_H -#define PARQUET_BOOL_ENCODING_H - -#include "parquet/encodings/encodings.h" - -#include - -namespace parquet_cpp { - -class BoolDecoder : public Decoder { - public: - BoolDecoder() : Decoder(parquet::Type::BOOLEAN, parquet::Encoding::PLAIN) { } - - virtual void SetData(int num_values, const uint8_t* data, int len) { - num_values_ = num_values; - decoder_ = RleDecoder(data, len, 1); - } - - virtual int GetBool(bool* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - if (!decoder_.Get(&buffer[i])) ParquetException::EofException(); - } - num_values_ -= max_values; - return max_values; - } - - private: - RleDecoder decoder_; -}; - -} // namespace parquet_cpp - -#endif diff --git a/cpp/src/parquet/encodings/delta-bit-pack-encoding.h b/cpp/src/parquet/encodings/delta-bit-pack-encoding.h index 77a3b2625080f..b4377348874b6 100644 --- a/cpp/src/parquet/encodings/delta-bit-pack-encoding.h +++ b/cpp/src/parquet/encodings/delta-bit-pack-encoding.h @@ -22,10 +22,16 @@ namespace parquet_cpp { -class DeltaBitPackDecoder : public Decoder { +template +class DeltaBitPackDecoder : public Decoder { public: - explicit DeltaBitPackDecoder(const parquet::Type::type& type) - : Decoder(type, parquet::Encoding::DELTA_BINARY_PACKED) { + typedef typename type_traits::value_type T; + + explicit DeltaBitPackDecoder(const parquet::SchemaElement* schema) + : Decoder(schema, parquet::Encoding::DELTA_BINARY_PACKED) { + + parquet::Type::type type = type_traits::parquet_type; + if (type != parquet::Type::INT32 && type != parquet::Type::INT64) { throw ParquetException("Delta bit pack encoding should only be for integer data."); } @@ -38,15 +44,13 @@ class DeltaBitPackDecoder : public Decoder { values_current_mini_block_ = 0; } - virtual int GetInt32(int32_t* buffer, int max_values) { - return GetInternal(buffer, max_values); - } - - virtual int GetInt64(int64_t* buffer, int max_values) { + virtual int Decode(T* buffer, int max_values) { return GetInternal(buffer, max_values); } private: + using Decoder::num_values_; + void InitBlock() { uint64_t block_size; if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException(); diff --git a/cpp/src/parquet/encodings/delta-byte-array-encoding.h b/cpp/src/parquet/encodings/delta-byte-array-encoding.h index 3396586e8fe84..a1b5b489b66e0 100644 --- a/cpp/src/parquet/encodings/delta-byte-array-encoding.h +++ b/cpp/src/parquet/encodings/delta-byte-array-encoding.h @@ -21,12 +21,12 @@ namespace parquet_cpp { -class DeltaByteArrayDecoder : public Decoder { +class DeltaByteArrayDecoder : public Decoder { public: - DeltaByteArrayDecoder() - : Decoder(parquet::Type::BYTE_ARRAY, parquet::Encoding::DELTA_BYTE_ARRAY), - prefix_len_decoder_(parquet::Type::INT32), - suffix_decoder_() { + explicit DeltaByteArrayDecoder(const parquet::SchemaElement* schema) + : Decoder(schema, parquet::Encoding::DELTA_BYTE_ARRAY), + prefix_len_decoder_(nullptr), + suffix_decoder_(nullptr) { } virtual void SetData(int num_values, const uint8_t* data, int len) { @@ -43,13 +43,13 @@ class DeltaByteArrayDecoder : public Decoder { // TODO: this doesn't work and requires memory management. We need to allocate // new strings to store the results. - virtual int GetByteArray(ByteArray* buffer, int max_values) { + virtual int Decode(ByteArray* buffer, int max_values) { max_values = std::min(max_values, num_values_); for (int i = 0; i < max_values; ++i) { int prefix_len = 0; - prefix_len_decoder_.GetInt32(&prefix_len, 1); + prefix_len_decoder_.Decode(&prefix_len, 1); ByteArray suffix; - suffix_decoder_.GetByteArray(&suffix, 1); + suffix_decoder_.Decode(&suffix, 1); buffer[i].len = prefix_len + suffix.len; uint8_t* result = reinterpret_cast(malloc(buffer[i].len)); @@ -64,7 +64,9 @@ class DeltaByteArrayDecoder : public Decoder { } private: - DeltaBitPackDecoder prefix_len_decoder_; + using Decoder::num_values_; + + DeltaBitPackDecoder prefix_len_decoder_; DeltaLengthByteArrayDecoder suffix_decoder_; ByteArray last_value_; }; diff --git a/cpp/src/parquet/encodings/delta-length-byte-array-encoding.h b/cpp/src/parquet/encodings/delta-length-byte-array-encoding.h index 06bf39d4bb1c6..a6e4c58b2bc09 100644 --- a/cpp/src/parquet/encodings/delta-length-byte-array-encoding.h +++ b/cpp/src/parquet/encodings/delta-length-byte-array-encoding.h @@ -21,11 +21,12 @@ namespace parquet_cpp { -class DeltaLengthByteArrayDecoder : public Decoder { +class DeltaLengthByteArrayDecoder : public Decoder { public: - DeltaLengthByteArrayDecoder() - : Decoder(parquet::Type::BYTE_ARRAY, parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY), - len_decoder_(parquet::Type::INT32) { + explicit DeltaLengthByteArrayDecoder(const parquet::SchemaElement* schema) + : Decoder( + schema, parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY), + len_decoder_(nullptr) { } virtual void SetData(int num_values, const uint8_t* data, int len) { @@ -38,10 +39,10 @@ class DeltaLengthByteArrayDecoder : public Decoder { len_ = len - 4 - total_lengths_len; } - virtual int GetByteArray(ByteArray* buffer, int max_values) { + virtual int Decode(ByteArray* buffer, int max_values) { max_values = std::min(max_values, num_values_); int lengths[max_values]; - len_decoder_.GetInt32(lengths, max_values); + len_decoder_.Decode(lengths, max_values); for (int i = 0; i < max_values; ++i) { buffer[i].len = lengths[i]; buffer[i].ptr = data_; @@ -53,7 +54,8 @@ class DeltaLengthByteArrayDecoder : public Decoder { } private: - DeltaBitPackDecoder len_decoder_; + using Decoder::num_values_; + DeltaBitPackDecoder len_decoder_; const uint8_t* data_; int len_; }; diff --git a/cpp/src/parquet/encodings/dictionary-encoding.h b/cpp/src/parquet/encodings/dictionary-encoding.h index 2501b2aa2c2b3..cb8fb3072ff5c 100644 --- a/cpp/src/parquet/encodings/dictionary-encoding.h +++ b/cpp/src/parquet/encodings/dictionary-encoding.h @@ -22,56 +22,22 @@ namespace parquet_cpp { -class DictionaryDecoder : public Decoder { +template +class DictionaryDecoder : public Decoder { public: + typedef typename type_traits::value_type T; + // Initializes the dictionary with values from 'dictionary'. The data in dictionary // is not guaranteed to persist in memory after this call so the dictionary decoder // needs to copy the data out if necessary. - DictionaryDecoder(const parquet::Type::type& type, Decoder* dictionary) - : Decoder(type, parquet::Encoding::RLE_DICTIONARY) { - int num_dictionary_values = dictionary->values_left(); - switch (type) { - case parquet::Type::BOOLEAN: - throw ParquetException("Boolean cols should not be dictionary encoded."); - - case parquet::Type::INT32: - int32_dictionary_.resize(num_dictionary_values); - dictionary->GetInt32(&int32_dictionary_[0], num_dictionary_values); - break; - case parquet::Type::INT64: - int64_dictionary_.resize(num_dictionary_values); - dictionary->GetInt64(&int64_dictionary_[0], num_dictionary_values); - break; - case parquet::Type::FLOAT: - float_dictionary_.resize(num_dictionary_values); - dictionary->GetFloat(&float_dictionary_[0], num_dictionary_values); - break; - case parquet::Type::DOUBLE: - double_dictionary_.resize(num_dictionary_values); - dictionary->GetDouble(&double_dictionary_[0], num_dictionary_values); - break; - case parquet::Type::BYTE_ARRAY: { - byte_array_dictionary_.resize(num_dictionary_values); - dictionary->GetByteArray(&byte_array_dictionary_[0], num_dictionary_values); - int total_size = 0; - for (int i = 0; i < num_dictionary_values; ++i) { - total_size += byte_array_dictionary_[i].len; - } - byte_array_data_.resize(total_size); - int offset = 0; - for (int i = 0; i < num_dictionary_values; ++i) { - memcpy(&byte_array_data_[offset], - byte_array_dictionary_[i].ptr, byte_array_dictionary_[i].len); - byte_array_dictionary_[i].ptr = &byte_array_data_[offset]; - offset += byte_array_dictionary_[i].len; - } - break; - } - default: - ParquetException::NYI("Unsupported dictionary type"); - } + DictionaryDecoder(const parquet::SchemaElement* schema, Decoder* dictionary) + : Decoder(schema, parquet::Encoding::RLE_DICTIONARY) { + Init(dictionary); } + // Perform type-specific initiatialization + void Init(Decoder* dictionary); + virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; if (len == 0) return; @@ -81,47 +47,17 @@ class DictionaryDecoder : public Decoder { idx_decoder_ = RleDecoder(data, len, bit_width); } - virtual int GetInt32(int32_t* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - buffer[i] = int32_dictionary_[index()]; - } - return max_values; - } - - virtual int GetInt64(int64_t* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - buffer[i] = int64_dictionary_[index()]; - } - return max_values; - } - - virtual int GetFloat(float* buffer, int max_values) { + virtual int Decode(T* buffer, int max_values) { max_values = std::min(max_values, num_values_); for (int i = 0; i < max_values; ++i) { - buffer[i] = float_dictionary_[index()]; - } - return max_values; - } - - virtual int GetDouble(double* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - buffer[i] = double_dictionary_[index()]; - } - return max_values; - } - - virtual int GetByteArray(ByteArray* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - buffer[i] = byte_array_dictionary_[index()]; + buffer[i] = dictionary_[index()]; } return max_values; } private: + using Decoder::num_values_; + int index() { int idx = 0; if (!idx_decoder_.Get(&idx)) ParquetException::EofException(); @@ -130,11 +66,7 @@ class DictionaryDecoder : public Decoder { } // Only one is set. - std::vector int32_dictionary_; - std::vector int64_dictionary_; - std::vector float_dictionary_; - std::vector double_dictionary_; - std::vector byte_array_dictionary_; + std::vector dictionary_; // Data that contains the byte array data (byte_array_dictionary_ just has the // pointers). @@ -143,6 +75,39 @@ class DictionaryDecoder : public Decoder { RleDecoder idx_decoder_; }; +template +inline void DictionaryDecoder::Init(Decoder* dictionary) { + int num_dictionary_values = dictionary->values_left(); + dictionary_.resize(num_dictionary_values); + dictionary->Decode(&dictionary_[0], num_dictionary_values); +} + +template <> +inline void DictionaryDecoder::Init( + Decoder* dictionary) { + ParquetException::NYI("Dictionary encoding is not implemented for boolean values"); +} + +template <> +inline void DictionaryDecoder::Init( + Decoder* dictionary) { + int num_dictionary_values = dictionary->values_left(); + dictionary_.resize(num_dictionary_values); + dictionary->Decode(&dictionary_[0], num_dictionary_values); + + int total_size = 0; + for (int i = 0; i < num_dictionary_values; ++i) { + total_size += dictionary_[i].len; + } + byte_array_data_.resize(total_size); + int offset = 0; + for (int i = 0; i < num_dictionary_values; ++i) { + memcpy(&byte_array_data_[offset], dictionary_[i].ptr, dictionary_[i].len); + dictionary_[i].ptr = &byte_array_data_[offset]; + offset += dictionary_[i].len; + } +} + } // namespace parquet_cpp #endif diff --git a/cpp/src/parquet/encodings/encodings.h b/cpp/src/parquet/encodings/encodings.h index 9211bf8e9f6de..2017fcac37f0b 100644 --- a/cpp/src/parquet/encodings/encodings.h +++ b/cpp/src/parquet/encodings/encodings.h @@ -17,6 +17,8 @@ #include +#include "parquet/types.h" + #include "parquet/thrift/parquet_constants.h" #include "parquet/thrift/parquet_types.h" #include "parquet/util/rle-encoding.h" @@ -24,8 +26,12 @@ namespace parquet_cpp { +// The Decoder template is parameterized on parquet::Type::type +template class Decoder { public: + typedef typename type_traits::value_type T; + virtual ~Decoder() {} // Sets the data for a new page. This will be called multiple times on the same @@ -36,22 +42,7 @@ class Decoder { // the decoder would decode put to 'max_values', storing the result in 'buffer'. // The function returns the number of values decoded, which should be max_values // except for end of the current data page. - virtual int GetBool(bool* buffer, int max_values) { - throw ParquetException("Decoder does not implement this type."); - } - virtual int GetInt32(int32_t* buffer, int max_values) { - throw ParquetException("Decoder does not implement this type."); - } - virtual int GetInt64(int64_t* buffer, int max_values) { - throw ParquetException("Decoder does not implement this type."); - } - virtual int GetFloat(float* buffer, int max_values) { - throw ParquetException("Decoder does not implement this type."); - } - virtual int GetDouble(double* buffer, int max_values) { - throw ParquetException("Decoder does not implement this type."); - } - virtual int GetByteArray(ByteArray* buffer, int max_values) { + virtual int Decode(T* buffer, int max_values) { throw ParquetException("Decoder does not implement this type."); } @@ -62,19 +53,22 @@ class Decoder { const parquet::Encoding::type encoding() const { return encoding_; } protected: - Decoder(const parquet::Type::type& type, const parquet::Encoding::type& encoding) - : type_(type), encoding_(encoding), num_values_(0) {} + explicit Decoder(const parquet::SchemaElement* schema, + const parquet::Encoding::type& encoding) + : schema_(schema), encoding_(encoding), num_values_(0) {} + + // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY + const parquet::SchemaElement* schema_; - const parquet::Type::type type_; const parquet::Encoding::type encoding_; int num_values_; }; } // namespace parquet_cpp -#include "parquet/encodings/bool-encoding.h" #include "parquet/encodings/plain-encoding.h" #include "parquet/encodings/dictionary-encoding.h" + #include "parquet/encodings/delta-bit-pack-encoding.h" #include "parquet/encodings/delta-length-byte-array-encoding.h" #include "parquet/encodings/delta-byte-array-encoding.h" diff --git a/cpp/src/parquet/encodings/plain-encoding.h b/cpp/src/parquet/encodings/plain-encoding.h index b094cdbd47d47..5fb460e70e66e 100644 --- a/cpp/src/parquet/encodings/plain-encoding.h +++ b/cpp/src/parquet/encodings/plain-encoding.h @@ -21,11 +21,15 @@ namespace parquet_cpp { -class PlainDecoder : public Decoder { +template +class PlainDecoder : public Decoder { public: - explicit PlainDecoder(const parquet::Type::type& type) - : Decoder(type, parquet::Encoding::PLAIN), data_(NULL), len_(0) { - } + typedef typename type_traits::value_type T; + using Decoder::num_values_; + + explicit PlainDecoder(const parquet::SchemaElement* schema) : + Decoder(schema, parquet::Encoding::PLAIN), + data_(NULL), len_(0) {} virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; @@ -33,49 +37,61 @@ class PlainDecoder : public Decoder { len_ = len; } - int GetValues(void* buffer, int max_values, int byte_size) { - max_values = std::min(max_values, num_values_); - int size = max_values * byte_size; - if (len_ < size) ParquetException::EofException(); - memcpy(buffer, data_, size); - data_ += size; - len_ -= size; - num_values_ -= max_values; - return max_values; - } + virtual int Decode(T* buffer, int max_values); + private: + const uint8_t* data_; + int len_; +}; - virtual int GetInt32(int32_t* buffer, int max_values) { - return GetValues(buffer, max_values, sizeof(int32_t)); - } +template +inline int PlainDecoder::Decode(T* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + int size = max_values * sizeof(T); + if (len_ < size) ParquetException::EofException(); + memcpy(buffer, data_, size); + data_ += size; + len_ -= size; + num_values_ -= max_values; + return max_values; +} - virtual int GetInt64(int64_t* buffer, int max_values) { - return GetValues(buffer, max_values, sizeof(int64_t)); +// Template specialization for BYTE_ARRAY +template <> +inline int PlainDecoder::Decode(ByteArray* buffer, + int max_values) { + max_values = std::min(max_values, num_values_); + for (int i = 0; i < max_values; ++i) { + buffer[i].len = *reinterpret_cast(data_); + if (len_ < sizeof(uint32_t) + buffer[i].len) ParquetException::EofException(); + buffer[i].ptr = data_ + sizeof(uint32_t); + data_ += sizeof(uint32_t) + buffer[i].len; + len_ -= sizeof(uint32_t) + buffer[i].len; } + num_values_ -= max_values; + return max_values; +} - virtual int GetFloat(float* buffer, int max_values) { - return GetValues(buffer, max_values, sizeof(float)); - } +template <> +class PlainDecoder : public Decoder { + public: + explicit PlainDecoder(const parquet::SchemaElement* schema) : + Decoder(schema, parquet::Encoding::PLAIN) {} - virtual int GetDouble(double* buffer, int max_values) { - return GetValues(buffer, max_values, sizeof(double)); + virtual void SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + decoder_ = RleDecoder(data, len, 1); } - virtual int GetByteArray(ByteArray* buffer, int max_values) { + virtual int Decode(bool* buffer, int max_values) { max_values = std::min(max_values, num_values_); for (int i = 0; i < max_values; ++i) { - buffer[i].len = *reinterpret_cast(data_); - if (len_ < sizeof(uint32_t) + buffer[i].len) ParquetException::EofException(); - buffer[i].ptr = data_ + sizeof(uint32_t); - data_ += sizeof(uint32_t) + buffer[i].len; - len_ -= sizeof(uint32_t) + buffer[i].len; + if (!decoder_.Get(&buffer[i])) ParquetException::EofException(); } num_values_ -= max_values; return max_values; } - private: - const uint8_t* data_; - int len_; + RleDecoder decoder_; }; } // namespace parquet_cpp diff --git a/cpp/src/parquet/parquet.h b/cpp/src/parquet/parquet.h index 4469a82dd2ea9..0fd3e97c3b976 100644 --- a/cpp/src/parquet/parquet.h +++ b/cpp/src/parquet/parquet.h @@ -27,199 +27,8 @@ #include #include "parquet/exception.h" -#include "parquet/thrift/parquet_constants.h" -#include "parquet/thrift/parquet_types.h" -#include "parquet/util/rle-encoding.h" - -namespace std { - -template <> -struct hash { - std::size_t operator()(const parquet::Encoding::type& k) const { - return hash()(static_cast(k)); - } -}; - -} // namespace std - -namespace parquet_cpp { - -class Codec; -class Decoder; - -struct ByteArray { - uint32_t len; - const uint8_t* ptr; -}; - -// Interface for the column reader to get the bytes. The interface is a stream -// interface, meaning the bytes in order and once a byte is read, it does not -// need to be read again. -class InputStream { - public: - // Returns the next 'num_to_peek' without advancing the current position. - // *num_bytes will contain the number of bytes returned which can only be - // less than num_to_peek at end of stream cases. - // Since the position is not advanced, calls to this function are idempotent. - // The buffer returned to the caller is still owned by the input stream and must - // stay valid until the next call to Peek() or Read(). - virtual const uint8_t* Peek(int num_to_peek, int* num_bytes) = 0; - - // Identical to Peek(), except the current position in the stream is advanced by - // *num_bytes. - virtual const uint8_t* Read(int num_to_read, int* num_bytes) = 0; - - virtual ~InputStream() {} - - protected: - InputStream() {} -}; - -// Implementation of an InputStream when all the bytes are in memory. -class InMemoryInputStream : public InputStream { - public: - InMemoryInputStream(const uint8_t* buffer, int64_t len); - virtual const uint8_t* Peek(int num_to_peek, int* num_bytes); - virtual const uint8_t* Read(int num_to_read, int* num_bytes); - - private: - const uint8_t* buffer_; - int64_t len_; - int64_t offset_; -}; - -// A wrapper for InMemoryInputStream to manage the memory. -class ScopedInMemoryInputStream : public InputStream { - public: - ScopedInMemoryInputStream(int64_t len); - uint8_t* data(); - int64_t size(); - virtual const uint8_t* Peek(int num_to_peek, int* num_bytes); - virtual const uint8_t* Read(int num_to_read, int* num_bytes); - - private: - std::vector buffer_; - std::unique_ptr stream_; -}; - -// API to read values from a single column. This is the main client facing API. -class ColumnReader { - public: - struct Config { - int batch_size; - - static Config DefaultConfig() { - Config config; - config.batch_size = 128; - return config; - } - }; - - ColumnReader(const parquet::ColumnMetaData*, - const parquet::SchemaElement*, InputStream* stream); - - ~ColumnReader(); - - // Returns true if there are still values in this column. - bool HasNext(); - - // Returns the next value of this type. - // TODO: batchify this interface. - bool GetBool(int* definition_level, int* repetition_level); - int32_t GetInt32(int* definition_level, int* repetition_level); - int64_t GetInt64(int* definition_level, int* repetition_level); - float GetFloat(int* definition_level, int* repetition_level); - double GetDouble(int* definition_level, int* repetition_level); - ByteArray GetByteArray(int* definition_level, int* repetition_level); - - private: - bool ReadNewPage(); - // Reads the next definition and repetition level. Returns true if the value is NULL. - bool ReadDefinitionRepetitionLevels(int* def_level, int* rep_level); - - void BatchDecode(); - - Config config_; - - const parquet::ColumnMetaData* metadata_; - const parquet::SchemaElement* schema_; - InputStream* stream_; - - // Compression codec to use. - std::unique_ptr decompressor_; - std::vector decompression_buffer_; - - // Map of compression type to decompressor object. - std::unordered_map > decoders_; - - parquet::PageHeader current_page_header_; - - // Not set if field is required. - std::unique_ptr definition_level_decoder_; - // Not set for flat schemas. - std::unique_ptr repetition_level_decoder_; - Decoder* current_decoder_; - int num_buffered_values_; - - std::vector values_buffer_; - int num_decoded_values_; - int buffered_values_offset_; -}; - - -inline bool ColumnReader::HasNext() { - if (num_buffered_values_ == 0) { - ReadNewPage(); - if (num_buffered_values_ == 0) return false; - } - return true; -} - -inline bool ColumnReader::GetBool(int* def_level, int* rep_level) { - if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return bool(); - if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); - return reinterpret_cast(&values_buffer_[0])[buffered_values_offset_++]; -} - -inline int32_t ColumnReader::GetInt32(int* def_level, int* rep_level) { - if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return int32_t(); - if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); - return reinterpret_cast(&values_buffer_[0])[buffered_values_offset_++]; -} - -inline int64_t ColumnReader::GetInt64(int* def_level, int* rep_level) { - if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return int64_t(); - if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); - return reinterpret_cast(&values_buffer_[0])[buffered_values_offset_++]; -} - -inline float ColumnReader::GetFloat(int* def_level, int* rep_level) { - if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return float(); - if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); - return reinterpret_cast(&values_buffer_[0])[buffered_values_offset_++]; -} - -inline double ColumnReader::GetDouble(int* def_level, int* rep_level) { - if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return double(); - if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); - return reinterpret_cast(&values_buffer_[0])[buffered_values_offset_++]; -} - -inline ByteArray ColumnReader::GetByteArray(int* def_level, int* rep_level) { - if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return ByteArray(); - if (buffered_values_offset_ == num_decoded_values_) BatchDecode(); - return reinterpret_cast(&values_buffer_[0])[buffered_values_offset_++]; -} - -inline bool ColumnReader::ReadDefinitionRepetitionLevels(int* def_level, int* rep_level) { - *rep_level = 1; - if (definition_level_decoder_ && !definition_level_decoder_->Get(def_level)) { - ParquetException::EofException(); - } - --num_buffered_values_; - return *def_level == 0; -} - -} // namespace parquet_cpp +#include "parquet/reader.h" +#include "parquet/column_reader.h" +#include "parquet/util/input_stream.h" #endif diff --git a/cpp/src/parquet/reader-test.cc b/cpp/src/parquet/reader-test.cc index 0f06f3fac7d12..1459afc2975ad 100644 --- a/cpp/src/parquet/reader-test.cc +++ b/cpp/src/parquet/reader-test.cc @@ -42,9 +42,7 @@ class TestAllTypesPlain : public ::testing::Test { reader_.Open(&file_); } - void TearDown() { - reader_.Close(); - } + void TearDown() {} protected: LocalFile file_; @@ -56,4 +54,14 @@ TEST_F(TestAllTypesPlain, ParseMetaData) { reader_.ParseMetaData(); } +TEST_F(TestAllTypesPlain, DebugPrintWorks) { + std::stringstream ss; + + // Automatically parses metadata + reader_.DebugPrint(ss); + + std::string result = ss.str(); + ASSERT_TRUE(result.size() > 0); +} + } // namespace parquet_cpp diff --git a/cpp/src/parquet/reader.cc b/cpp/src/parquet/reader.cc index 7ccd98ca4dafc..7c727ba1446fb 100644 --- a/cpp/src/parquet/reader.cc +++ b/cpp/src/parquet/reader.cc @@ -18,18 +18,30 @@ #include "parquet/reader.h" #include +#include +#include +#include +#include #include +#include "parquet/column_reader.h" #include "parquet/exception.h" + #include "parquet/thrift/util.h" +#include "parquet/util/input_stream.h" + +using std::string; +using std::vector; +using parquet::Type; + namespace parquet_cpp { // ---------------------------------------------------------------------- // LocalFile methods LocalFile::~LocalFile() { - // You must explicitly call Close + CloseFile(); } void LocalFile::Open(const std::string& path) { @@ -39,6 +51,11 @@ void LocalFile::Open(const std::string& path) { } void LocalFile::Close() { + // Pure virtual + CloseFile(); +} + +void LocalFile::CloseFile() { if (is_open_) { fclose(file_); is_open_ = false; @@ -58,9 +75,51 @@ size_t LocalFile::Tell() { return ftell(file_); } -void LocalFile::Read(size_t nbytes, uint8_t* buffer, - size_t* bytes_read) { - *bytes_read = fread(buffer, 1, nbytes, file_); +size_t LocalFile::Read(size_t nbytes, uint8_t* buffer) { + return fread(buffer, 1, nbytes, file_); +} + +// ---------------------------------------------------------------------- +// RowGroupReader + +ColumnReader* RowGroupReader::Column(size_t i) { + // TODO: boundschecking + auto it = column_readers_.find(i); + if (it != column_readers_.end()) { + // Already have constructed the ColumnReader + return it->second.get(); + } + + const parquet::ColumnChunk& col = row_group_->columns[i]; + + size_t col_start = col.meta_data.data_page_offset; + if (col.meta_data.__isset.dictionary_page_offset && + col_start > col.meta_data.dictionary_page_offset) { + col_start = col.meta_data.dictionary_page_offset; + } + + std::unique_ptr input( + new ScopedInMemoryInputStream(col.meta_data.total_compressed_size)); + + FileLike* source = this->parent_->buffer_; + + source->Seek(col_start); + + // TODO(wesm): Law of demeter violation + size_t bytes_read = source->Read(input->size(), input->data()); + + if (bytes_read != input->size()) { + std::cout << "Bytes needed: " << col.meta_data.total_compressed_size << std::endl; + std::cout << "Bytes read: " << bytes_read << std::endl; + throw ParquetException("Unable to read column chunk data"); + } + + // TODO(wesm): This presumes a flat schema + std::shared_ptr reader = ColumnReader::Make(&col.meta_data, + &this->parent_->metadata_.schema[i + 1], input.release()); + column_readers_[i] = reader; + + return reader.get(); } // ---------------------------------------------------------------------- @@ -70,6 +129,12 @@ void LocalFile::Read(size_t nbytes, uint8_t* buffer, static constexpr uint32_t FOOTER_SIZE = 8; static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; +ParquetFileReader::ParquetFileReader() : + parsed_metadata_(false), + buffer_(nullptr) {} + +ParquetFileReader::~ParquetFileReader() {} + void ParquetFileReader::Open(FileLike* buffer) { buffer_ = buffer; } @@ -78,6 +143,29 @@ void ParquetFileReader::Close() { buffer_->Close(); } +RowGroupReader* ParquetFileReader::RowGroup(size_t i) { + if (i >= num_row_groups()) { + std::stringstream ss; + ss << "The file only has " << num_row_groups() + << "row groups, requested reader for: " + << i; + throw ParquetException(ss.str()); + } + + auto it = row_group_readers_.find(i); + if (it != row_group_readers_.end()) { + // Constructed the RowGroupReader already + return it->second.get(); + } + if (!parsed_metadata_) { + ParseMetaData(); + } + + // Construct the RowGroupReader + row_group_readers_[i] = std::make_shared(this, &metadata_.row_groups[i]); + return row_group_readers_[i].get(); +} + void ParquetFileReader::ParseMetaData() { size_t filesize = buffer_->Size(); @@ -85,11 +173,11 @@ void ParquetFileReader::ParseMetaData() { throw ParquetException("Corrupted file, smaller than file footer"); } - size_t bytes_read; uint8_t footer_buffer[FOOTER_SIZE]; buffer_->Seek(filesize - FOOTER_SIZE); - buffer_->Read(FOOTER_SIZE, footer_buffer, &bytes_read); + + size_t bytes_read = buffer_->Read(FOOTER_SIZE, footer_buffer); if (bytes_read != FOOTER_SIZE) { throw ParquetException("Invalid parquet file. Corrupt footer."); @@ -107,11 +195,192 @@ void ParquetFileReader::ParseMetaData() { buffer_->Seek(metadata_start); std::vector metadata_buffer(metadata_len); - buffer_->Read(metadata_len, &metadata_buffer[0], &bytes_read); + bytes_read = buffer_->Read(metadata_len, &metadata_buffer[0]); if (bytes_read != metadata_len) { throw ParquetException("Invalid parquet file. Could not read metadata bytes."); } DeserializeThriftMsg(&metadata_buffer[0], &metadata_len, &metadata_); + parsed_metadata_ = true; +} + +// ---------------------------------------------------------------------- +// ParquetFileReader::DebugPrint + +static string parquet_type_to_string(Type::type t) { + switch (t) { + case Type::BOOLEAN: + return "BOOLEAN"; + break; + case Type::INT32: + return "INT32"; + break; + case Type::INT64: + return "INT64"; + break; + case Type::FLOAT: + return "FLOAT"; + break; + case Type::DOUBLE: + return "DOUBLE"; + break; + case Type::BYTE_ARRAY: + return "BYTE_ARRAY"; + break; + case Type::INT96: + return "INT96"; + break; + case Type::FIXED_LEN_BYTE_ARRAY: + return "FIXED_LEN_BYTE_ARRAY"; + break; + default: + return "UNKNOWN"; + break; + } +} + +// the fixed initial size is just for an example +#define COL_WIDTH "17" + +void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) { + if (!parsed_metadata_) { + ParseMetaData(); + } + + stream << "File statistics:\n"; + stream << "Total rows: " << metadata_.num_rows << "\n"; + for (int c = 1; c < metadata_.schema.size(); ++c) { + stream << "Column " << c-1 << ": " << metadata_.schema[c].name << " (" + << parquet_type_to_string(metadata_.schema[c].type); + if (metadata_.schema[c].type == Type::INT96 || + metadata_.schema[c].type == Type::FIXED_LEN_BYTE_ARRAY) { + stream << " - not supported"; + } + stream << ")\n"; + } + + for (int i = 0; i < metadata_.row_groups.size(); ++i) { + stream << "--- Row Group " << i << " ---\n"; + + RowGroupReader* group_reader = RowGroup(i); + + // Print column metadata + size_t nColumns = group_reader->num_columns(); + + for (int c = 0; c < group_reader->num_columns(); ++c) { + const parquet::ColumnMetaData* meta_data = group_reader->Column(c)->metadata(); + stream << "Column " << c + << ": " << meta_data->num_values << " rows, " + << meta_data->statistics.null_count << " null values, " + << meta_data->statistics.distinct_count << " distinct values, " + << "min value: " << (meta_data->statistics.min.length()>0 ? + meta_data->statistics.min : "N/A") + << ", max value: " << (meta_data->statistics.max.length()>0 ? + meta_data->statistics.max : "N/A") << ".\n"; + } + + if (!print_values) { + continue; + } + + // Create readers for all columns and print contents + vector readers(nColumns, NULL); + for (int c = 0; c < nColumns; ++c) { + ColumnReader* col_reader = group_reader->Column(c); + + Type::type col_type = col_reader->type(); + + printf("%-" COL_WIDTH"s", metadata_.schema[c+1].name.c_str()); + + if (col_type == Type::INT96 || col_type == Type::FIXED_LEN_BYTE_ARRAY) { + continue; + } + + // This is OK in this method as long as the RowGroupReader does not get deleted + readers[c] = col_reader; + } + stream << "\n"; + + vector def_level(nColumns, 0); + vector rep_level(nColumns, 0); + + static constexpr size_t bufsize = 25; + char buffer[bufsize]; + + bool hasRow; + do { + hasRow = false; + for (int c = 0; c < nColumns; ++c) { + if (readers[c] == NULL) { + snprintf(buffer, bufsize, "%-" COL_WIDTH"s", " "); + stream << buffer; + continue; + } + if (readers[c]->HasNext()) { + hasRow = true; + switch (readers[c]->type()) { + case Type::BOOLEAN: { + bool val = reinterpret_cast(readers[c])->NextValue( + &def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + snprintf(buffer, bufsize, "%-" COL_WIDTH"d",val); + stream << buffer; + } + break; + } + case Type::INT32: { + int32_t val = reinterpret_cast(readers[c])->NextValue( + &def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + snprintf(buffer, bufsize, "%-" COL_WIDTH"d",val); + stream << buffer; + } + break; + } + case Type::INT64: { + int64_t val = reinterpret_cast(readers[c])->NextValue( + &def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + snprintf(buffer, bufsize, "%-" COL_WIDTH"ld",val); + stream << buffer; + } + break; + } + case Type::FLOAT: { + float val = reinterpret_cast(readers[c])->NextValue( + &def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + snprintf(buffer, bufsize, "%-" COL_WIDTH"f",val); + stream << buffer; + } + break; + } + case Type::DOUBLE: { + double val = reinterpret_cast(readers[c])->NextValue( + &def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + snprintf(buffer, bufsize, "%-" COL_WIDTH"lf",val); + stream << buffer; + } + break; + } + case Type::BYTE_ARRAY: { + ByteArray val = reinterpret_cast(readers[c])->NextValue( + &def_level[c], &rep_level[c]); + if (def_level[c] >= rep_level[c]) { + string result = ByteArrayToString(val); + snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str()); + stream << buffer; + } + break; + } + default: + continue; + } + } + } + stream << "\n"; + } while (hasRow); + } } } // namespace parquet_cpp diff --git a/cpp/src/parquet/reader.h b/cpp/src/parquet/reader.h index 4a40e0482bad0..e8a6806eb2ab8 100644 --- a/cpp/src/parquet/reader.h +++ b/cpp/src/parquet/reader.h @@ -19,14 +19,18 @@ #define PARQUET_FILE_READER_H #include +#include #include #include +#include #include "parquet/thrift/parquet_types.h" -#include "parquet/parquet.h" +#include "parquet/types.h" namespace parquet_cpp { +class ColumnReader; + class FileLike { public: virtual ~FileLike() {} @@ -35,7 +39,9 @@ class FileLike { virtual size_t Size() = 0; virtual size_t Tell() = 0; virtual void Seek(size_t pos) = 0; - virtual void Read(size_t nbytes, uint8_t* out, size_t* bytes_read) = 0; + + // Returns actual number of bytes read + virtual size_t Read(size_t nbytes, uint8_t* out) = 0; }; @@ -50,36 +56,83 @@ class LocalFile : public FileLike { virtual size_t Size(); virtual size_t Tell(); virtual void Seek(size_t pos); - virtual void Read(size_t nbytes, uint8_t* out, size_t* bytes_read); + + // Returns actual number of bytes read + virtual size_t Read(size_t nbytes, uint8_t* out); bool is_open() const { return is_open_;} const std::string& path() const { return path_;} private: + void CloseFile(); + std::string path_; FILE* file_; bool is_open_; }; +class ParquetFileReader; + +class RowGroupReader { + public: + RowGroupReader(ParquetFileReader* parent, parquet::RowGroup* group) : + parent_(parent), + row_group_(group) {} + + // Construct a ColumnReader for the indicated row group-relative column. The + // returned object is owned by the RowGroupReader + ColumnReader* Column(size_t i); + + size_t num_columns() const { + return row_group_->columns.size(); + } + + private: + friend class ParquetFileReader; + + ParquetFileReader* parent_; + parquet::RowGroup* row_group_; + + // Column index -> ColumnReader + std::unordered_map > column_readers_; +}; + class ParquetFileReader { public: - ParquetFileReader() : buffer_(nullptr) {} - ~ParquetFileReader() {} + ParquetFileReader(); + ~ParquetFileReader(); - // The class takes ownership of the passed file-like object + // This class does _not_ take ownership of the file. You must manage its + // lifetime separately void Open(FileLike* buffer); void Close(); void ParseMetaData(); + // The RowGroupReader is owned by the FileReader + RowGroupReader* RowGroup(size_t i); + + size_t num_row_groups() const { + return metadata_.row_groups.size(); + } + const parquet::FileMetaData& metadata() const { return metadata_; } + void DebugPrint(std::ostream& stream, bool print_values = true); + private: + friend class RowGroupReader; + parquet::FileMetaData metadata_; + bool parsed_metadata_; + + // Row group index -> RowGroupReader + std::unordered_map > row_group_readers_; + FileLike* buffer_; }; diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h new file mode 100644 index 0000000000000..37f538aa704ea --- /dev/null +++ b/cpp/src/parquet/types.h @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_TYPES_H +#define PARQUET_TYPES_H + +#include +#include +#include +#include + +#include "parquet/thrift/parquet_types.h" + +namespace parquet_cpp { + +struct ByteArray { + uint32_t len; + const uint8_t* ptr; +}; + + +static inline std::string ByteArrayToString(const ByteArray& a) { + return std::string(reinterpret_cast(a.ptr), a.len); +} + +static inline int ByteCompare(const ByteArray& x1, const ByteArray& x2) { + int len = std::min(x1.len, x2.len); + int cmp = memcmp(x1.ptr, x2.ptr, len); + if (cmp != 0) return cmp; + if (len < x1.len) return 1; + if (len < x2.len) return -1; + return 0; +} + +template +struct type_traits { +}; + +template <> +struct type_traits { + typedef bool value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::BOOLEAN; + + static constexpr size_t value_byte_size = 1; +}; + +template <> +struct type_traits { + typedef int32_t value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::INT32; + + static constexpr size_t value_byte_size = 4; +}; + +template <> +struct type_traits { + typedef int64_t value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::INT64; + + static constexpr size_t value_byte_size = 8; +}; + +template <> +struct type_traits { + // TODO + typedef void* value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::INT96; + + static constexpr size_t value_byte_size = 12; +}; + +template <> +struct type_traits { + typedef float value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::FLOAT; + + static constexpr size_t value_byte_size = 4; +}; + +template <> +struct type_traits { + typedef double value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::DOUBLE; + + static constexpr size_t value_byte_size = 8; +}; + +template <> +struct type_traits { + typedef ByteArray value_type; + static constexpr parquet::Type::type parquet_type = parquet::Type::BYTE_ARRAY; + + static constexpr size_t value_byte_size = sizeof(ByteArray); +}; + +} // namespace parquet_cpp + +#endif // PARQUET_TYPES_H diff --git a/cpp/src/parquet/util/CMakeLists.txt b/cpp/src/parquet/util/CMakeLists.txt index 766214bcdaae5..b3c817dca4aea 100644 --- a/cpp/src/parquet/util/CMakeLists.txt +++ b/cpp/src/parquet/util/CMakeLists.txt @@ -21,8 +21,13 @@ install(FILES logging.h rle-encoding.h stopwatch.h + input_stream.h DESTINATION include/parquet/util) +add_library(parquet_util STATIC + input_stream.cc +) + add_library(parquet_test_main test_main.cc) diff --git a/cpp/src/parquet/util/input_stream.cc b/cpp/src/parquet/util/input_stream.cc new file mode 100644 index 0000000000000..d0e53ed54b481 --- /dev/null +++ b/cpp/src/parquet/util/input_stream.cc @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/util/input_stream.h" + +#include + +#include "parquet/exception.h" + +namespace parquet_cpp { + +InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) : + buffer_(buffer), len_(len), offset_(0) {} + +const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) { + *num_bytes = std::min(static_cast(num_to_peek), len_ - offset_); + return buffer_ + offset_; +} + +const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) { + const uint8_t* result = Peek(num_to_read, num_bytes); + offset_ += *num_bytes; + return result; +} + +ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) { + buffer_.resize(len); + stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size())); +} + +uint8_t* ScopedInMemoryInputStream::data() { + return buffer_.data(); +} + +int64_t ScopedInMemoryInputStream::size() { + return buffer_.size(); +} + +const uint8_t* ScopedInMemoryInputStream::Peek(int num_to_peek, + int* num_bytes) { + return stream_->Peek(num_to_peek, num_bytes); +} + +const uint8_t* ScopedInMemoryInputStream::Read(int num_to_read, + int* num_bytes) { + return stream_->Read(num_to_read, num_bytes); +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/util/input_stream.h b/cpp/src/parquet/util/input_stream.h new file mode 100644 index 0000000000000..ece2488aba267 --- /dev/null +++ b/cpp/src/parquet/util/input_stream.h @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_INPUT_STREAM_H +#define PARQUET_INPUT_STREAM_H + +#include +#include +#include + +namespace parquet_cpp { + +// Interface for the column reader to get the bytes. The interface is a stream +// interface, meaning the bytes in order and once a byte is read, it does not +// need to be read again. +class InputStream { + public: + // Returns the next 'num_to_peek' without advancing the current position. + // *num_bytes will contain the number of bytes returned which can only be + // less than num_to_peek at end of stream cases. + // Since the position is not advanced, calls to this function are idempotent. + // The buffer returned to the caller is still owned by the input stream and must + // stay valid until the next call to Peek() or Read(). + virtual const uint8_t* Peek(int num_to_peek, int* num_bytes) = 0; + + // Identical to Peek(), except the current position in the stream is advanced by + // *num_bytes. + virtual const uint8_t* Read(int num_to_read, int* num_bytes) = 0; + + virtual ~InputStream() {} + + protected: + InputStream() {} +}; + +// Implementation of an InputStream when all the bytes are in memory. +class InMemoryInputStream : public InputStream { + public: + InMemoryInputStream(const uint8_t* buffer, int64_t len); + virtual const uint8_t* Peek(int num_to_peek, int* num_bytes); + virtual const uint8_t* Read(int num_to_read, int* num_bytes); + + private: + const uint8_t* buffer_; + int64_t len_; + int64_t offset_; +}; + + +// A wrapper for InMemoryInputStream to manage the memory. +class ScopedInMemoryInputStream : public InputStream { + public: + explicit ScopedInMemoryInputStream(int64_t len); + uint8_t* data(); + int64_t size(); + virtual const uint8_t* Peek(int num_to_peek, int* num_bytes); + virtual const uint8_t* Read(int num_to_read, int* num_bytes); + + private: + std::vector buffer_; + std::unique_ptr stream_; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_INPUT_STREAM_H