Skip to content

Commit

Permalink
PARQUET-701: Ensure that Close can be called multiple times
Browse files Browse the repository at this point in the history
Author: Uwe L. Korn <uwelk@xhochy.com>

Closes apache#149 from xhochy/parquet-701 and squashes the following commits:

6f29d8b [Uwe L. Korn] PARQUET-701: Ensure that Close can be called multiple times

Change-Id: I55fc9bc431365b7b7665a1e065d139b5a8b18651
  • Loading branch information
xhochy authored and wesm committed Sep 1, 2016
1 parent 44b833e commit aa77a95
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
2 changes: 2 additions & 0 deletions cpp/src/parquet/column/column-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class TestPrimitiveWriter : public ::testing::Test {
std::unique_ptr<TypedColumnWriter<TestType>> writer =
this->BuildWriter(SMALL_SIZE, encoding);
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
// The behaviour should be independent from the number of Close() calls
writer->Close();
writer->Close();

this->ReadColumn();
Expand Down
20 changes: 13 additions & 7 deletions cpp/src/parquet/column/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ ColumnWriter::ColumnWriter(const ColumnDescriptor* descr,
num_buffered_values_(0),
num_buffered_encoded_values_(0),
num_rows_(0),
total_bytes_written_(0) {
total_bytes_written_(0),
closed_(false) {
InitSinks();
}

Expand All @@ -56,11 +57,13 @@ void ColumnWriter::InitSinks() {
}

void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
DCHECK(!closed_);
definition_levels_sink_->Write(
reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels);
}

void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
DCHECK(!closed_);
repetition_levels_sink_->Write(
reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels);
}
Expand Down Expand Up @@ -129,12 +132,15 @@ void ColumnWriter::WriteDataPage(const DataPage& page) {
}

int64_t ColumnWriter::Close() {
if (has_dictionary_) { WriteDictionaryPage(); }
// Write all outstanding data to a new page
if (num_buffered_values_ > 0) { AddDataPage(); }

for (size_t i = 0; i < data_pages_.size(); i++) {
WriteDataPage(data_pages_[i]);
if (!closed_) {
closed_ = true;
if (has_dictionary_) { WriteDictionaryPage(); }
// Write all outstanding data to a new page
if (num_buffered_values_ > 0) { AddDataPage(); }

for (size_t i = 0; i < data_pages_.size(); i++) {
WriteDataPage(data_pages_[i]);
}
}

if (num_rows_ != expected_rows_) {
Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/column/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class PARQUET_EXPORT ColumnWriter {
int num_rows_;

int total_bytes_written_;
bool closed_;

std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
Expand Down

0 comments on commit aa77a95

Please sign in to comment.