Skip to content

Commit

Permalink
PARQUET-428: Support INT96 and FIXED_LEN_BYTE_ARRAY types
Browse files Browse the repository at this point in the history
This PR adds support for INT96 and FIXED_LEN_BYTE_ARRAY types.
It modifies the examples and DebugPrint to handle these types.

Author: Deepak Majeti <deepak.majeti@hp.com>

Closes apache#27 from majetideepak/master and squashes the following commits:

5ba0a03 [Deepak Majeti] PARQUET-428: Support INT96 and FIXED_LEN_BYTE_ARRAY types

Change-Id: I8c86350ae1245582dba6167b18f5cc92cd35e7f9
  • Loading branch information
Deepak Majeti authored and julienledem committed Jan 29, 2016
1 parent aeaff37 commit aa03414
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 15 deletions.
2 changes: 2 additions & 0 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData*
return std::make_shared<DoubleReader>(metadata, element, stream);
case Type::BYTE_ARRAY:
return std::make_shared<ByteArrayReader>(metadata, element, stream);
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_shared<FixedLenByteArrayReader>(metadata, element, stream);
default:
ParquetException::NYI("type reader not implemented");
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ typedef TypedColumnReader<parquet::Type::INT96> Int96Reader;
typedef TypedColumnReader<parquet::Type::FLOAT> FloatReader;
typedef TypedColumnReader<parquet::Type::DOUBLE> DoubleReader;
typedef TypedColumnReader<parquet::Type::BYTE_ARRAY> ByteArrayReader;
typedef TypedColumnReader<parquet::Type::FIXED_LEN_BYTE_ARRAY> FixedLenByteArrayReader;


template <int TYPE>
Expand Down
19 changes: 19 additions & 0 deletions cpp/src/parquet/encodings/dictionary-encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,25 @@ inline void DictionaryDecoder<parquet::Type::BYTE_ARRAY>::Init(
}
}

template <>
inline void DictionaryDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Init(
Decoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>* dictionary) {
int num_dictionary_values = dictionary->values_left();
dictionary_.resize(num_dictionary_values);
dictionary->Decode(&dictionary_[0], num_dictionary_values);

int fixed_len = schema_->type_length;
int total_size = num_dictionary_values*fixed_len;

byte_array_data_.resize(total_size);
int offset = 0;
for (int i = 0; i < num_dictionary_values; ++i) {
memcpy(&byte_array_data_[offset], dictionary_[i].ptr, fixed_len);
dictionary_[i].ptr = &byte_array_data_[offset];
offset += fixed_len;
}
}

} // namespace parquet_cpp

#endif
16 changes: 16 additions & 0 deletions cpp/src/parquet/encodings/plain-encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,22 @@ inline int PlainDecoder<parquet::Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
return max_values;
}

// Template specialization for FIXED_LEN_BYTE_ARRAY
template <>
inline int PlainDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Decode(FixedLenByteArray* buffer,
int max_values) {
max_values = std::min(max_values, num_values_);
int len = schema_->type_length;
for (int i = 0; i < max_values; ++i) {
if (len_ < len) ParquetException::EofException();
buffer[i].ptr = data_;
data_ += len;
len_ -= len;
}
num_values_ -= max_values;
return max_values;
}

template <>
class PlainDecoder<parquet::Type::BOOLEAN> : public Decoder<parquet::Type::BOOLEAN> {
public:
Expand Down
38 changes: 25 additions & 13 deletions cpp/src/parquet/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ static string parquet_type_to_string(Type::type t) {
case Type::INT64:
return "INT64";
break;
case Type::INT96:
return "INT96";
break;
case Type::FLOAT:
return "FLOAT";
break;
Expand All @@ -226,9 +229,6 @@ static string parquet_type_to_string(Type::type t) {
case Type::BYTE_ARRAY:
return "BYTE_ARRAY";
break;
case Type::INT96:
return "INT96";
break;
case Type::FIXED_LEN_BYTE_ARRAY:
return "FIXED_LEN_BYTE_ARRAY";
break;
Expand All @@ -239,7 +239,7 @@ static string parquet_type_to_string(Type::type t) {
}

// the fixed initial size is just for an example
#define COL_WIDTH "17"
#define COL_WIDTH "20"

void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
if (!parsed_metadata_) {
Expand All @@ -251,10 +251,6 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
for (int c = 1; c < metadata_.schema.size(); ++c) {
stream << "Column " << c-1 << ": " << metadata_.schema[c].name << " ("
<< parquet_type_to_string(metadata_.schema[c].type);
if (metadata_.schema[c].type == Type::INT96 ||
metadata_.schema[c].type == Type::FIXED_LEN_BYTE_ARRAY) {
stream << " - not supported";
}
stream << ")\n";
}

Expand Down Expand Up @@ -291,10 +287,6 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {

printf("%-" COL_WIDTH"s", metadata_.schema[c+1].name.c_str());

if (col_type == Type::INT96 || col_type == Type::FIXED_LEN_BYTE_ARRAY) {
continue;
}

// This is OK in this method as long as the RowGroupReader does not get deleted
readers[c] = col_reader;
}
Expand Down Expand Up @@ -345,6 +337,16 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
}
break;
}
case Type::INT96: {
Int96 val = reinterpret_cast<Int96Reader*>(readers[c])->NextValue(
&def_level[c], &rep_level[c]);
if (def_level[c] >= rep_level[c]) {
string result = Int96ToString(val);
snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str());
stream << buffer;
}
break;
}
case Type::FLOAT: {
float val = reinterpret_cast<FloatReader*>(readers[c])->NextValue(
&def_level[c], &rep_level[c]);
Expand Down Expand Up @@ -373,7 +375,17 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
}
break;
}
default:
case Type::FIXED_LEN_BYTE_ARRAY: {
FixedLenByteArray val = reinterpret_cast<FixedLenByteArrayReader*>(
readers[c])->NextValue(&def_level[c], &rep_level[c]);
if (def_level[c] >= rep_level[c]) {
string result = FixedLenByteArrayToString(val, metadata_.schema[c+1].type_length);
snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str());
stream << buffer;
}
break;
}
default:
continue;
}
}
Expand Down
38 changes: 36 additions & 2 deletions cpp/src/parquet/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
#include <cstdint>
#include <cstring>
#include <string>
#include <sstream>

#include "parquet/thrift/parquet_types.h"
#include "parquet/util/compiler-util.h"

namespace parquet_cpp {

Expand All @@ -32,11 +34,36 @@ struct ByteArray {
const uint8_t* ptr;
};

struct FixedLenByteArray {
const uint8_t* ptr;
};

MANUALLY_ALIGNED_STRUCT(1) Int96 {
uint32_t value[3];
};
STRUCT_END(Int96, 12);

static inline std::string ByteArrayToString(const ByteArray& a) {
return std::string(reinterpret_cast<const char*>(a.ptr), a.len);
}

static inline std::string Int96ToString(const Int96& a) {
std::stringstream result;
for (int i = 0; i < 3; i++) {
result << a.value[i] << " ";
}
return result.str();
}

static inline std::string FixedLenByteArrayToString(const FixedLenByteArray& a, int len) {
const uint8_t *bytes = reinterpret_cast<const uint8_t*>(a.ptr);
std::stringstream result;
for (int i = 0; i < len; i++) {
result << (uint32_t)bytes[i] << " ";
}
return result.str();
}

static inline int ByteCompare(const ByteArray& x1, const ByteArray& x2) {
int len = std::min(x1.len, x2.len);
int cmp = memcmp(x1.ptr, x2.ptr, len);
Expand Down Expand Up @@ -76,8 +103,7 @@ struct type_traits<parquet::Type::INT64> {

template <>
struct type_traits<parquet::Type::INT96> {
// TODO
typedef void* value_type;
typedef Int96 value_type;
static constexpr parquet::Type::type parquet_type = parquet::Type::INT96;

static constexpr size_t value_byte_size = 12;
Expand Down Expand Up @@ -107,6 +133,14 @@ struct type_traits<parquet::Type::BYTE_ARRAY> {
static constexpr size_t value_byte_size = sizeof(ByteArray);
};

template <>
struct type_traits<parquet::Type::FIXED_LEN_BYTE_ARRAY> {
typedef FixedLenByteArray value_type;
static constexpr parquet::Type::type parquet_type = parquet::Type::FIXED_LEN_BYTE_ARRAY;

static constexpr size_t value_byte_size = sizeof(FixedLenByteArray);
};

} // namespace parquet_cpp

#endif // PARQUET_TYPES_H
21 changes: 21 additions & 0 deletions cpp/src/parquet/util/compiler-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,25 @@

#define PREFETCH(addr) __builtin_prefetch(addr)

//macros to disable padding
//these macros are portable across different compilers and platforms
//[https://github.com/google/flatbuffers/blob/master/include/flatbuffers/flatbuffers.h#L1355]
#if defined(_MSC_VER)
#define MANUALLY_ALIGNED_STRUCT(alignment) \
__pragma(pack(1)); \
struct __declspec(align(alignment))
#define STRUCT_END(name, size) \
__pragma(pack()); \
static_assert(sizeof(name) == size, "compiler breaks packing rules")
#elif defined(__GNUC__) || defined(__clang__)
#define MANUALLY_ALIGNED_STRUCT(alignment) \
_Pragma("pack(1)") \
struct __attribute__((aligned(alignment)))
#define STRUCT_END(name, size) \
_Pragma("pack()") \
static_assert(sizeof(name) == size, "compiler breaks packing rules")
#else
#error Unknown compiler, please define structure alignment macros
#endif

#endif // PARQUET_UTIL_COMPILER_UTIL_H

0 comments on commit aa03414

Please sign in to comment.