Skip to content

Commit

Permalink
PARQUET-833: C++: Provide API to write spaced arrays
Browse files Browse the repository at this point in the history
Slight performance improvement. Not so much visible as in the read path as the write performance is dominated by the dictionary appends.

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes apache#220 from xhochy/PARQUET-833 and squashes the following commits:

12514cf [Uwe L. Korn] ninja format
d22477d [Uwe L. Korn] Call correct statistics update
25c9747 [Uwe L. Korn] Add more unittests, remove logging from public header
0c5ce71 [Uwe L. Korn] Save the memory copy for numerical types
1ed0407 [Uwe L. Korn] PARQUET-833: C++: Provide API to write spaced arrays

Change-Id: I4159be624d82a0303a2aa2ffbe847c4d13a401a9
  • Loading branch information
xhochy authored and wesm committed Jan 23, 2017
1 parent 97e69b4 commit f3a3c69
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 20 deletions.
61 changes: 52 additions & 9 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <vector>

#include "parquet/util/bit-util.h"
#include "parquet/util/logging.h"

#include "parquet/arrow/schema.h"
Expand Down Expand Up @@ -51,6 +52,11 @@ class FileWriter::Impl {
Status TypedWriteBatch(
ColumnWriter* writer, const PrimitiveArray* data, int64_t offset, int64_t length);

template <typename ParquetType, typename ArrowType>
Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t length,
const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr);

// TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary
// buffer
template <typename InType, typename OutType>
Expand Down Expand Up @@ -136,19 +142,18 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(length, def_levels_ptr, nullptr, data_writer_ptr));
} else {
RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType)));
auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
int buffer_idx = 0;
const uint8_t* valid_bits = data->null_bitmap_data();
INIT_BITSET(valid_bits, offset);
for (int i = 0; i < length; i++) {
if (data->IsNull(offset + i)) {
def_levels_ptr[i] = 0;
} else {
if (bitset & (1 << bit_offset)) {
def_levels_ptr[i] = 1;
buffer_ptr[buffer_idx++] = static_cast<ParquetCType>(data_ptr[i]);
} else {
def_levels_ptr[i] = 0;
}
READ_NEXT_BITSET(valid_bits);
}
PARQUET_CATCH_NOT_OK(
writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(
writer, length, def_levels_ptr, nullptr, valid_bits, offset, data_ptr)));
}
} else {
return Status::NotImplemented("no support for max definition level > 1 yet");
Expand All @@ -157,6 +162,44 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
return Status::OK();
}

template <typename ParquetType, typename ArrowType>
Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writer,
int64_t length, const int16_t* def_levels, const int16_t* rep_levels,
const uint8_t* valid_bits, int64_t valid_bits_offset,
const typename ArrowType::c_type* data_ptr) {
using ParquetCType = typename ParquetType::c_type;

RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType)));
auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
INIT_BITSET(valid_bits, valid_bits_offset);
for (int i = 0; i < length; i++) {
if (bitset & (1 << bit_offset)) {
buffer_ptr[i] = static_cast<ParquetCType>(data_ptr[i]);
}
READ_NEXT_BITSET(valid_bits);
}
PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
length, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));

return Status::OK();
}

#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
template <> \
Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>( \
TypedColumnWriter<ParquetType> * writer, int64_t length, \
const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, \
int64_t valid_bits_offset, const CType* data_ptr) { \
PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced( \
length, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr)); \
return Status::OK(); \
}

NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)

// This specialization seems quite similar but it significantly differs in two points:
// * offset is added at the most latest time to the pointer as we have sub-byte access
// * Arrow data is stored bitwise thus we cannot use std::copy to transform from
Expand Down
30 changes: 29 additions & 1 deletion cpp/src/parquet/column/column-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,17 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
bool enable_dictionary, bool enable_statistics, int64_t num_rows = SMALL_SIZE) {
this->GenerateData(num_rows);

// Test case 1: required and non-repeated, so no definition or repetition levels
this->WriteRequiredWithSettings(
encoding, compression, enable_dictionary, enable_statistics, num_rows);
this->ReadAndCompare(compression, num_rows);

this->WriteRequiredWithSettingsSpaced(
encoding, compression, enable_dictionary, enable_statistics, num_rows);
this->ReadAndCompare(compression, num_rows);
}

void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression,
bool enable_dictionary, bool enable_statistics, int64_t num_rows) {
ColumnProperties column_properties(
encoding, compression, enable_dictionary, enable_statistics);
std::shared_ptr<TypedColumnWriter<TestType>> writer =
Expand All @@ -117,7 +127,25 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
// The behaviour should be independent from the number of Close() calls
writer->Close();
writer->Close();
}

void WriteRequiredWithSettingsSpaced(Encoding::type encoding,
Compression::type compression, bool enable_dictionary, bool enable_statistics,
int64_t num_rows) {
std::vector<uint8_t> valid_bits(
BitUtil::RoundUpNumBytes(this->values_.size()) + 1, 255);
ColumnProperties column_properties(
encoding, compression, enable_dictionary, enable_statistics);
std::shared_ptr<TypedColumnWriter<TestType>> writer =
this->BuildWriter(num_rows, column_properties);
writer->WriteBatchSpaced(
this->values_.size(), nullptr, nullptr, valid_bits.data(), 0, this->values_ptr_);
// The behaviour should be independent from the number of Close() calls
writer->Close();
writer->Close();
}

void ReadAndCompare(Compression::type compression, int64_t num_rows) {
this->SetupValuesOut(num_rows);
this->ReadColumnFully(compression);
Compare<T> compare(this->descr_);
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/parquet/column/statistics-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,22 @@ class TestRowGroupStatistics : public PrimitiveTypedTest<TestType> {
TypedStats statistics2(this->schema_.Column(0), encoded_min, encoded_max,
this->values_.size(), 0, 0, true);

TypedStats statistics3(this->schema_.Column(0));
std::vector<uint8_t> valid_bits(
BitUtil::RoundUpNumBytes(this->values_.size()) + 1, 255);
statistics3.UpdateSpaced(
this->values_ptr_, valid_bits.data(), 0, this->values_.size(), 0);
std::string encoded_min_spaced = statistics3.EncodeMin();
std::string encoded_max_spaced = statistics3.EncodeMax();

ASSERT_EQ(encoded_min, statistics2.EncodeMin());
ASSERT_EQ(encoded_max, statistics2.EncodeMax());
ASSERT_EQ(statistics1.min(), statistics2.min());
ASSERT_EQ(statistics1.max(), statistics2.max());
ASSERT_EQ(encoded_min_spaced, statistics2.EncodeMin());
ASSERT_EQ(encoded_max_spaced, statistics2.EncodeMax());
ASSERT_EQ(statistics3.min(), statistics2.min());
ASSERT_EQ(statistics3.max(), statistics2.max());
}

void TestReset() {
Expand Down
44 changes: 44 additions & 0 deletions cpp/src/parquet/column/statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,50 @@ void TypedRowGroupStatistics<DType>::Update(
}
}

template <typename DType>
void TypedRowGroupStatistics<DType>::UpdateSpaced(const T* values,
const uint8_t* valid_bits, int64_t valid_bits_offset, int64_t num_not_null,
int64_t num_null) {
DCHECK(num_not_null >= 0);
DCHECK(num_null >= 0);

IncrementNullCount(num_null);
IncrementNumValues(num_not_null);
// TODO: support distinct count?
if (num_not_null == 0) return;

Compare<T> compare(descr_);
INIT_BITSET(valid_bits, valid_bits_offset);
// Find first valid entry and use that for min/max
// As (num_not_null != 0) there must be one
int64_t length = num_null + num_not_null;
int64_t i = 0;
for (; i < length; i++) {
if (bitset & (1 << bit_offset)) { break; }
READ_NEXT_BITSET(valid_bits);
}
T min = values[i];
T max = values[i];
for (; i < length; i++) {
if (bitset & (1 << bit_offset)) {
if (compare(values[i], min)) {
min = values[i];
} else if (compare(max, values[i])) {
max = values[i];
}
}
READ_NEXT_BITSET(valid_bits);
}
if (!has_min_max_) {
has_min_max_ = true;
Copy(min, &min_, min_buffer_.get());
Copy(max, &max_, max_buffer_.get());
} else {
Copy(std::min(min_, min, compare), &min_, min_buffer_.get());
Copy(std::max(max_, max, compare), &max_, max_buffer_.get());
}
}

template <typename DType>
const typename DType::c_type& TypedRowGroupStatistics<DType>::min() const {
return min_;
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/parquet/column/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ class TypedRowGroupStatistics : public RowGroupStatistics {
void Merge(const TypedRowGroupStatistics<DType>& other);

void Update(const T* values, int64_t num_not_null, int64_t num_null);
void UpdateSpaced(const T* values, const uint8_t* valid_bits, int64_t valid_bits_spaced,
int64_t num_not_null, int64_t num_null);

const T& min() const;
const T& max() const;
Expand Down
82 changes: 82 additions & 0 deletions cpp/src/parquet/column/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,59 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
return values_to_write;
}

template <typename DType>
inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values,
const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
int64_t valid_bits_offset, const T* values) {
int64_t values_to_write = 0;
// If the field is required and non-repeated, there are no definition levels
if (descr_->max_definition_level() > 0) {
for (int64_t i = 0; i < num_values; ++i) {
if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; }
}

WriteDefinitionLevels(num_values, def_levels);
} else {
// Required field, write all values
values_to_write = num_values;
}

// Not present for non-repeated fields
if (descr_->max_repetition_level() > 0) {
// A row could include more than one value
// Count the occasions where we start a new row
for (int64_t i = 0; i < num_values; ++i) {
if (rep_levels[i] == 0) { num_rows_++; }
}

WriteRepetitionLevels(num_values, rep_levels);
} else {
// Each value is exactly one row
num_rows_ += num_values;
}

if (num_rows_ > expected_rows_) {
throw ParquetException("More rows were written in the column chunk than expected");
}

WriteValuesSpaced(num_values, valid_bits, valid_bits_offset, values);

if (page_statistics_ != nullptr) {
page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, values_to_write,
num_values - values_to_write);
}

num_buffered_values_ += num_values;
num_buffered_encoded_values_ += values_to_write;

if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
AddDataPage();
}
if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }

return values_to_write;
}

template <typename DType>
void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels, const T* values) {
Expand All @@ -383,11 +436,40 @@ void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def
num_remaining, &def_levels[offset], &rep_levels[offset], &values[value_offset]);
}

template <typename DType>
void TypedColumnWriter<DType>::WriteBatchSpaced(int64_t num_values,
const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
int64_t valid_bits_offset, const T* values) {
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
// of values, the chunking will ensure the AddDataPage() is called at a reasonable
// pagesize limit
int64_t write_batch_size = properties_->write_batch_size();
int num_batches = num_values / write_batch_size;
int64_t num_remaining = num_values % write_batch_size;
for (int round = 0; round < num_batches; round++) {
int64_t offset = round * write_batch_size;
WriteMiniBatchSpaced(write_batch_size, &def_levels[offset], &rep_levels[offset],
valid_bits, valid_bits_offset + offset, &values[offset]);
}
// Write the remaining values
int64_t offset = num_batches * write_batch_size;
WriteMiniBatchSpaced(num_remaining, &def_levels[offset], &rep_levels[offset],
valid_bits, valid_bits_offset + offset, &values[offset]);
}

template <typename DType>
void TypedColumnWriter<DType>::WriteValues(int64_t num_values, const T* values) {
current_encoder_->Put(values, num_values);
}

template <typename DType>
void TypedColumnWriter<DType>::WriteValuesSpaced(int64_t num_values,
const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) {
current_encoder_->PutSpaced(values, num_values, valid_bits, valid_bits_offset);
}

template class TypedColumnWriter<BooleanType>;
template class TypedColumnWriter<Int32Type>;
template class TypedColumnWriter<Int64Type>;
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/parquet/column/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
void WriteBatch(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels, const T* values);

// Write a batch of repetition levels, definition levels, and values to the
// column.
void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
const T* values);

protected:
std::shared_ptr<Buffer> GetValuesBuffer() override {
return current_encoder_->FlushValues();
Expand All @@ -173,10 +179,16 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels, const T* values);

int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
const T* values);

typedef Encoder<DType> EncoderType;

// Write values to a temporary buffer before they are encoded into pages
void WriteValues(int64_t num_values, const T* values);
void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset, const T* values);
std::unique_ptr<EncoderType> current_encoder_;

typedef TypedRowGroupStatistics<DType> TypedStats;
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/parquet/encodings/dictionary-encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "parquet/encodings/decoder.h"
#include "parquet/encodings/encoder.h"
#include "parquet/encodings/plain-encoding.h"
#include "parquet/util/bit-util.h"
#include "parquet/util/cpu-info.h"
#include "parquet/util/hash-util.h"
#include "parquet/util/memory.h"
Expand Down Expand Up @@ -238,6 +239,15 @@ class DictEncoder : public Encoder<DType> {
}
}

void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
INIT_BITSET(valid_bits, valid_bits_offset);
for (int32_t i = 0; i < num_values; i++) {
if (bitset & (1 << bit_offset)) { Put(src[i]); }
READ_NEXT_BITSET(valid_bits);
}
}

/// Writes out the encoded dictionary to buffer. buffer must be preallocated to
/// dict_encoded_size() bytes.
void WriteDict(uint8_t* buffer);
Expand Down
Loading

0 comments on commit f3a3c69

Please sign in to comment.