Skip to content

Commit

Permalink
PARQUET-451: Add RowGroupReader helper class and refactor parquet_rea…
Browse files Browse the repository at this point in the history
…der.cc into DebugPrint

This also addresses PARQUET-433 and PARQUET-453.

Author: Wes McKinney <wes@cloudera.com>

Closes apache#23 from wesm/PARQUET-451 and squashes the following commits:

748ee0c [Wes McKinney] Turn MakeColumnReader into static ColumnReader::Make
6528497 [Wes McKinney] Incorporate code review comments
4b5575d [Wes McKinney] [PARQUET-451/453]: Implement RowGroupReader class and refactor parquet_reader.cc into ParquetFileReader::DebugPrint
2985e2e [Wes McKinney] [PARQUET-433]: Templatize decoders and column readers and remove most switch-on-type statements. Add parquet::SchemaElement* member to Decoder<T>, for FLBA metadata.

Change-Id: I3c7668b53f4167fdaf3e371955a4833e052589e2
  • Loading branch information
wesm authored and nongli committed Jan 28, 2016
1 parent fb06287 commit 7c33cc9
Show file tree
Hide file tree
Showing 20 changed files with 1,133 additions and 423 deletions.
2 changes: 2 additions & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
# Headers: top level
install(FILES
parquet.h
column_reader.h
reader.h
exception.h
types.h
DESTINATION include/parquet)

ADD_PARQUET_TEST(reader-test)
194 changes: 194 additions & 0 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright 2012 Cloudera Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "parquet/column_reader.h"

#include <algorithm>
#include <string>
#include <string.h>

#include "parquet/encodings/encodings.h"
#include "parquet/compression/codec.h"
#include "parquet/thrift/util.h"
#include "parquet/util/input_stream.h"

const int DATA_PAGE_SIZE = 64 * 1024;

namespace parquet_cpp {

using parquet::CompressionCodec;
using parquet::Encoding;
using parquet::FieldRepetitionType;
using parquet::PageType;
using parquet::Type;


ColumnReader::~ColumnReader() {
delete stream_;
}

ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata,
const parquet::SchemaElement* schema, InputStream* stream)
: metadata_(metadata),
schema_(schema),
stream_(stream),
num_buffered_values_(0),
num_decoded_values_(0),
buffered_values_offset_(0) {

switch (metadata->codec) {
case CompressionCodec::UNCOMPRESSED:
break;
case CompressionCodec::SNAPPY:
decompressor_.reset(new SnappyCodec());
break;
default:
ParquetException::NYI("Reading compressed data");
}

config_ = Config::DefaultConfig();
}


// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
// encoding.
static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
}

template <int TYPE>
bool TypedColumnReader<TYPE>::ReadNewPage() {
// Loop until we find the next data page.


while (true) {
int bytes_read = 0;
const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
if (bytes_read == 0) return false;
uint32_t header_size = bytes_read;
DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
stream_->Read(header_size, &bytes_read);

int compressed_len = current_page_header_.compressed_page_size;
int uncompressed_len = current_page_header_.uncompressed_page_size;

// Read the compressed data page.
buffer = stream_->Read(compressed_len, &bytes_read);
if (bytes_read != compressed_len) ParquetException::EofException();

// Uncompress it if we need to
if (decompressor_ != NULL) {
// Grow the uncompressed buffer if we need to.
if (uncompressed_len > decompression_buffer_.size()) {
decompression_buffer_.resize(uncompressed_len);
}
decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
&decompression_buffer_[0]);
buffer = &decompression_buffer_[0];
}

if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
auto it = decoders_.find(Encoding::RLE_DICTIONARY);
if (it != decoders_.end()) {
throw ParquetException("Column cannot have more than one dictionary.");
}

PlainDecoder<TYPE> dictionary(schema_);
dictionary.SetData(current_page_header_.dictionary_page_header.num_values,
buffer, uncompressed_len);
std::shared_ptr<DecoderType> decoder(new DictionaryDecoder<TYPE>(schema_, &dictionary));

decoders_[Encoding::RLE_DICTIONARY] = decoder;
current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
continue;
} else if (current_page_header_.type == PageType::DATA_PAGE) {
// Read a data page.
num_buffered_values_ = current_page_header_.data_page_header.num_values;

// Read definition levels.
if (schema_->repetition_type != FieldRepetitionType::REQUIRED) {
int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);
buffer += sizeof(uint32_t);
definition_level_decoder_.reset(
new RleDecoder(buffer, num_definition_bytes, 1));
buffer += num_definition_bytes;
uncompressed_len -= sizeof(uint32_t);
uncompressed_len -= num_definition_bytes;
}

// TODO: repetition levels

// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
Encoding::type encoding = current_page_header_.data_page_header.encoding;
if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY;

auto it = decoders_.find(encoding);
if (it != decoders_.end()) {
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
std::shared_ptr<DecoderType> decoder(new PlainDecoder<TYPE>(schema_));
decoders_[encoding] = decoder;
current_decoder_ = decoder.get();
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");

case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");

default:
throw ParquetException("Unknown encoding type.");
}
}
current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len);
return true;
} else {
// We don't know what this page type is. We're allowed to skip non-data pages.
continue;
}
}
return true;
}

std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData* metadata,
const parquet::SchemaElement* element, InputStream* stream) {
switch (metadata->type) {
case Type::BOOLEAN:
return std::make_shared<BoolReader>(metadata, element, stream);
case Type::INT32:
return std::make_shared<Int32Reader>(metadata, element, stream);
case Type::INT64:
return std::make_shared<Int64Reader>(metadata, element, stream);
case Type::INT96:
return std::make_shared<Int96Reader>(metadata, element, stream);
case Type::FLOAT:
return std::make_shared<FloatReader>(metadata, element, stream);
case Type::DOUBLE:
return std::make_shared<DoubleReader>(metadata, element, stream);
case Type::BYTE_ARRAY:
return std::make_shared<ByteArrayReader>(metadata, element, stream);
default:
ParquetException::NYI("type reader not implemented");
}
// Unreachable code, but supress compiler warning
return std::shared_ptr<ColumnReader>(nullptr);
}

} // namespace parquet_cpp
183 changes: 183 additions & 0 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef PARQUET_COLUMN_READER_H
#define PARQUET_COLUMN_READER_H

#include <exception>
#include <cstdint>
#include <cstring>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "parquet/exception.h"
#include "parquet/types.h"
#include "parquet/thrift/parquet_constants.h"
#include "parquet/thrift/parquet_types.h"
#include "parquet/util/input_stream.h"
#include "parquet/encodings/encodings.h"
#include "parquet/util/rle-encoding.h"

namespace std {

template <>
struct hash<parquet::Encoding::type> {
std::size_t operator()(const parquet::Encoding::type& k) const {
return hash<int>()(static_cast<int>(k));
}
};

} // namespace std

namespace parquet_cpp {

class Codec;

class ColumnReader {
public:

struct Config {
int batch_size;

static Config DefaultConfig() {
Config config;
config.batch_size = 128;
return config;
}
};

ColumnReader(const parquet::ColumnMetaData*,
const parquet::SchemaElement*, InputStream* stream);

virtual ~ColumnReader();

static std::shared_ptr<ColumnReader> Make(const parquet::ColumnMetaData*,
const parquet::SchemaElement*, InputStream* stream);

virtual bool ReadNewPage() = 0;

// Returns true if there are still values in this column.
bool HasNext() {
if (num_buffered_values_ == 0) {
ReadNewPage();
if (num_buffered_values_ == 0) return false;
}
return true;
}

parquet::Type::type type() const {
return metadata_->type;
}

const parquet::ColumnMetaData* metadata() const {
return metadata_;
}

protected:
// Reads the next definition and repetition level. Returns true if the value is NULL.
bool ReadDefinitionRepetitionLevels(int* def_level, int* rep_level);

Config config_;

const parquet::ColumnMetaData* metadata_;
const parquet::SchemaElement* schema_;
InputStream* stream_;

// Compression codec to use.
std::unique_ptr<Codec> decompressor_;
std::vector<uint8_t> decompression_buffer_;

parquet::PageHeader current_page_header_;

// Not set if field is required.
std::unique_ptr<RleDecoder> definition_level_decoder_;
// Not set for flat schemas.
std::unique_ptr<RleDecoder> repetition_level_decoder_;
int num_buffered_values_;

int num_decoded_values_;
int buffered_values_offset_;
};


// API to read values from a single column. This is the main client facing API.
template <int TYPE>
class TypedColumnReader : public ColumnReader {
public:
typedef typename type_traits<TYPE>::value_type T;

TypedColumnReader(const parquet::ColumnMetaData* metadata,
const parquet::SchemaElement* schema, InputStream* stream) :
ColumnReader(metadata, schema, stream),
current_decoder_(NULL) {
size_t value_byte_size = type_traits<TYPE>::value_byte_size;
values_buffer_.resize(config_.batch_size * value_byte_size);
}

// Returns the next value of this type.
// TODO: batchify this interface.
T NextValue(int* def_level, int* rep_level) {
if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return T();
if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
return reinterpret_cast<T*>(&values_buffer_[0])[buffered_values_offset_++];
}

private:
void BatchDecode();

virtual bool ReadNewPage();

typedef Decoder<TYPE> DecoderType;

// Map of compression type to decompressor object.
std::unordered_map<parquet::Encoding::type, std::shared_ptr<DecoderType> > decoders_;

DecoderType* current_decoder_;
std::vector<uint8_t> values_buffer_;
};

typedef TypedColumnReader<parquet::Type::BOOLEAN> BoolReader;
typedef TypedColumnReader<parquet::Type::INT32> Int32Reader;
typedef TypedColumnReader<parquet::Type::INT64> Int64Reader;
typedef TypedColumnReader<parquet::Type::INT96> Int96Reader;
typedef TypedColumnReader<parquet::Type::FLOAT> FloatReader;
typedef TypedColumnReader<parquet::Type::DOUBLE> DoubleReader;
typedef TypedColumnReader<parquet::Type::BYTE_ARRAY> ByteArrayReader;


template <int TYPE>
void TypedColumnReader<TYPE>::BatchDecode() {
buffered_values_offset_ = 0;
T* buf = reinterpret_cast<T*>(&values_buffer_[0]);
int batch_size = config_.batch_size;
num_decoded_values_ = current_decoder_->Decode(buf, batch_size);
}

inline bool ColumnReader::ReadDefinitionRepetitionLevels(int* def_level, int* rep_level) {
*rep_level = 1;
if (definition_level_decoder_ && !definition_level_decoder_->Get(def_level)) {
ParquetException::EofException();
}
--num_buffered_values_;
return *def_level == 0;
}

} // namespace parquet_cpp

#endif // PARQUET_COLUMN_READER_H
Loading

0 comments on commit 7c33cc9

Please sign in to comment.