Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.

Commit 041f66d

Browse files
committed
PARQUET-782: Support writing to Arrow sinks
1 parent 3e0e5da commit 041f66d

File tree

8 files changed

+177
-12
lines changed

8 files changed

+177
-12
lines changed

src/parquet/arrow/arrow-reader-writer-test.cc

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "parquet/arrow/test-util.h"
2525
#include "parquet/arrow/writer.h"
2626

27+
#include "arrow/io/memory.h"
2728
#include "arrow/test-util.h"
2829
#include "arrow/types/construct.h"
2930
#include "arrow/types/primitive.h"
@@ -342,6 +343,29 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
342343
this->ReadAndCheckSingleColumnTable(values);
343344
}
344345

346+
TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) {
347+
std::shared_ptr<Array> values;
348+
ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values));
349+
std::shared_ptr<Table> table = MakeSimpleTable(values, false);
350+
this->sink_ = std::make_shared<InMemoryOutputStream>();
351+
auto buffer = std::make_shared<::arrow::PoolBuffer>();
352+
auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer);
353+
ASSERT_OK_NO_THROW(WriteFlatTable(
354+
table.get(), default_memory_pool(), arrow_sink_, 512, default_writer_properties()));
355+
356+
std::shared_ptr<ParquetBuffer> pbuffer =
357+
std::make_shared<ParquetBuffer>(buffer->data(), buffer->size());
358+
std::unique_ptr<RandomAccessSource> source(new BufferReader(pbuffer));
359+
std::shared_ptr<::arrow::Table> out;
360+
this->ReadTableFromFile(ParquetFileReader::Open(std::move(source)), &out);
361+
ASSERT_EQ(1, out->num_columns());
362+
ASSERT_EQ(values->length(), out->num_rows());
363+
364+
std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
365+
ASSERT_EQ(1, chunked_array->num_chunks());
366+
ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
367+
}
368+
345369
TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
346370
int64_t chunk_size = SMALL_SIZE / 4;
347371
std::shared_ptr<Array> values;
@@ -456,10 +480,20 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
456480
template <typename T>
457481
using ParquetCDataType = typename ParquetDataType<T>::c_type;
458482

483+
template <typename T>
484+
struct c_type_trait {
485+
using ArrowCType = typename T::c_type;
486+
};
487+
488+
template <>
489+
struct c_type_trait<::arrow::BooleanType> {
490+
using ArrowCType = uint8_t;
491+
};
492+
459493
template <typename TestType>
460494
class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
461495
public:
462-
typedef typename TestType::c_type T;
496+
typedef typename c_type_trait<TestType>::ArrowCType T;
463497

464498
void MakeTestFile(std::vector<T>& values, int num_chunks,
465499
std::unique_ptr<ParquetFileReader>* file_reader) {
@@ -497,7 +531,7 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
497531

498532
std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
499533
ASSERT_EQ(1, chunked_array->num_chunks());
500-
ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get());
534+
ExpectArrayT<TestType>(values.data(), chunked_array->chunk(0).get());
501535
}
502536

503537
void CheckSingleColumnRequiredRead(int num_chunks) {
@@ -508,7 +542,7 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
508542
std::shared_ptr<Array> out;
509543
this->ReadSingleColumnFile(std::move(file_reader), &out);
510544

511-
ExpectArray<TestType>(values.data(), out.get());
545+
ExpectArrayT<TestType>(values.data(), out.get());
512546
}
513547
};
514548

src/parquet/arrow/io.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,5 +103,25 @@ std::shared_ptr<Buffer> ParquetReadSource::Read(int64_t nbytes) {
103103
return result;
104104
}
105105

106+
ParquetWriteSink::ParquetWriteSink(
107+
const std::shared_ptr<::arrow::io::OutputStream>& stream)
108+
: stream_(stream) {}
109+
110+
ParquetWriteSink::~ParquetWriteSink() {}
111+
112+
void ParquetWriteSink::Close() {
113+
PARQUET_THROW_NOT_OK(stream_->Close());
114+
}
115+
116+
int64_t ParquetWriteSink::Tell() {
117+
int64_t position;
118+
PARQUET_THROW_NOT_OK(stream_->Tell(&position));
119+
return position;
120+
}
121+
122+
void ParquetWriteSink::Write(const uint8_t* data, int64_t length) {
123+
PARQUET_THROW_NOT_OK(stream_->Write(data, length));
124+
}
125+
106126
} // namespace arrow
107127
} // namespace parquet

src/parquet/arrow/io.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,25 @@ class PARQUET_EXPORT ParquetReadSource : public RandomAccessSource {
7676
ParquetAllocator* allocator_;
7777
};
7878

79+
class PARQUET_EXPORT ParquetWriteSink : public OutputStream {
80+
public:
81+
explicit ParquetWriteSink(const std::shared_ptr<::arrow::io::OutputStream>& stream);
82+
83+
virtual ~ParquetWriteSink();
84+
85+
// Close the output stream
86+
void Close() override;
87+
88+
// Return the current position in the output stream relative to the start
89+
int64_t Tell() override;
90+
91+
// Copy bytes into the output stream
92+
void Write(const uint8_t* data, int64_t length) override;
93+
94+
private:
95+
std::shared_ptr<::arrow::io::OutputStream> stream_;
96+
};
97+
7998
} // namespace arrow
8099
} // namespace parquet
81100

src/parquet/arrow/reader.cc

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,79 @@ Status FlatColumnReader::Impl::TypedReadBatch(
375375
}
376376
}
377377

378+
template <>
379+
Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
380+
int batch_size, std::shared_ptr<Array>* out) {
381+
int values_to_read = batch_size;
382+
RETURN_NOT_OK(InitDataBuffer<::arrow::BooleanType>(batch_size));
383+
valid_bits_idx_ = 0;
384+
if (descr_->max_definition_level() > 0) {
385+
valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_);
386+
int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size) / 8;
387+
valid_bits_buffer_->Resize(valid_bits_size);
388+
valid_bits_ptr_ = valid_bits_buffer_->mutable_data();
389+
memset(valid_bits_ptr_, 0, valid_bits_size);
390+
null_count_ = 0;
391+
}
392+
393+
while ((values_to_read > 0) && column_reader_) {
394+
values_buffer_.Resize(values_to_read * sizeof(bool));
395+
if (descr_->max_definition_level() > 0) {
396+
def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
397+
}
398+
auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get());
399+
int64_t values_read;
400+
int64_t levels_read;
401+
int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
402+
auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
403+
PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
404+
values_to_read, def_levels, nullptr, values, &values_read));
405+
values_to_read -= levels_read;
406+
if (descr_->max_definition_level() == 0) {
407+
ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(values, values_read);
408+
} else {
409+
// As per the defintion and checks for flat columns:
410+
// descr_->max_definition_level() == 1
411+
ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
412+
def_levels, values, values_read, levels_read);
413+
}
414+
if (!column_reader_->HasNext()) { NextRowGroup(); }
415+
}
416+
417+
if (descr_->max_definition_level() > 0) {
418+
// TODO: Shrink arrays in the case they are too large
419+
if (valid_bits_idx_ < batch_size * 0.8) {
420+
// Shrink arrays as they are larger than the output.
421+
// TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays
422+
// without the need for a copy. Given a decent underlying allocator this
423+
// should still free some underlying pages to the OS.
424+
425+
auto data_buffer = std::make_shared<PoolBuffer>(pool_);
426+
RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(bool)));
427+
memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size());
428+
data_buffer_ = data_buffer;
429+
430+
auto valid_bits_buffer = std::make_shared<PoolBuffer>(pool_);
431+
RETURN_NOT_OK(
432+
valid_bits_buffer->Resize(::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8));
433+
memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(),
434+
valid_bits_buffer->size());
435+
valid_bits_buffer_ = valid_bits_buffer;
436+
}
437+
*out = std::make_shared<::arrow::BooleanArray>(
438+
field_->type, valid_bits_idx_, data_buffer_, null_count_, valid_bits_buffer_);
439+
// Relase the ownership
440+
data_buffer_.reset();
441+
valid_bits_buffer_.reset();
442+
return Status::OK();
443+
} else {
444+
*out = std::make_shared<::arrow::BooleanArray>(
445+
field_->type, valid_bits_idx_, data_buffer_);
446+
data_buffer_.reset();
447+
return Status::OK();
448+
}
449+
}
450+
378451
template <>
379452
Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
380453
int batch_size, std::shared_ptr<Array>* out) {

src/parquet/arrow/test-util.h

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ using is_arrow_int = std::is_integral<typename ArrowType::c_type>;
3737
template <typename ArrowType>
3838
using is_arrow_string = std::is_same<ArrowType, ::arrow::StringType>;
3939

40+
template <typename ArrowType>
41+
using is_arrow_bool = std::is_same<ArrowType, ::arrow::BooleanType>;
42+
4043
template <class ArrowType>
4144
typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullArray(
4245
size_t size, std::shared_ptr<Array>* out) {
@@ -70,8 +73,9 @@ typename std::enable_if<is_arrow_string<ArrowType>::value, Status>::type NonNull
7073
return builder.Finish(out);
7174
}
7275

73-
template <>
74-
Status NonNullArray<::arrow::BooleanType>(size_t size, std::shared_ptr<Array>* out) {
76+
template <class ArrowType>
77+
typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullArray(
78+
size_t size, std::shared_ptr<Array>* out) {
7579
std::vector<uint8_t> values;
7680
::arrow::test::randint<uint8_t>(size, 0, 1, &values);
7781
::arrow::BooleanBuilder builder(
@@ -135,8 +139,8 @@ typename std::enable_if<is_arrow_string<ArrowType>::value, Status>::type Nullabl
135139
}
136140

137141
// This helper function only supports (size/2) nulls yet.
138-
template <>
139-
Status NullableArray<::arrow::BooleanType>(
142+
template <class ArrowType>
143+
typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableArray(
140144
size_t size, size_t num_nulls, std::shared_ptr<Array>* out) {
141145
std::vector<uint8_t> values;
142146
::arrow::test::randint<uint8_t>(size, 0, 1, &values);
@@ -176,19 +180,19 @@ void ExpectArray(T* expected, Array* result) {
176180
}
177181

178182
template <typename ArrowType>
179-
void ExpectArray(typename ArrowType::c_type* expected, Array* result) {
183+
void ExpectArrayT(void* expected, Array* result) {
180184
::arrow::PrimitiveArray* p_array = static_cast<::arrow::PrimitiveArray*>(result);
181185
for (int64_t i = 0; i < result->length(); i++) {
182-
EXPECT_EQ(expected[i],
186+
EXPECT_EQ(reinterpret_cast<typename ArrowType::c_type*>(expected)[i],
183187
reinterpret_cast<const typename ArrowType::c_type*>(p_array->data()->data())[i]);
184188
}
185189
}
186190

187191
template <>
188-
void ExpectArray<::arrow::BooleanType>(uint8_t* expected, Array* result) {
192+
void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) {
189193
::arrow::BooleanBuilder builder(
190194
::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>());
191-
builder.Append(expected, result->length());
195+
builder.Append(reinterpret_cast<uint8_t*>(expected), result->length());
192196

193197
std::shared_ptr<Array> expected_array;
194198
EXPECT_OK(builder.Finish(&expected_array));

src/parquet/arrow/writer.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <algorithm>
2121
#include <vector>
2222

23+
#include "parquet/arrow/io.h"
2324
#include "parquet/arrow/schema.h"
2425
#include "parquet/arrow/utils.h"
2526

@@ -370,6 +371,13 @@ Status WriteFlatTable(const Table* table, MemoryPool* pool,
370371
return writer.Close();
371372
}
372373

374+
Status WriteFlatTable(const Table* table, MemoryPool* pool,
375+
const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
376+
const std::shared_ptr<WriterProperties>& properties) {
377+
auto parquet_sink = std::make_shared<ParquetWriteSink>(sink);
378+
return WriteFlatTable(table, pool, parquet_sink, chunk_size, properties);
379+
}
380+
373381
} // namespace arrow
374382

375383
} // namespace parquet

src/parquet/arrow/writer.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include "parquet/api/schema.h"
2424
#include "parquet/api/writer.h"
2525

26+
#include "arrow/io/interfaces.h"
27+
2628
namespace arrow {
2729

2830
class Array;
@@ -71,6 +73,11 @@ ::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table,
7173
int64_t chunk_size,
7274
const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
7375

76+
::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table,
77+
::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink,
78+
int64_t chunk_size,
79+
const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
80+
7481
} // namespace arrow
7582

7683
} // namespace parquet

thirdparty/versions.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
ARROW_VERSION="d946e7917d55cb220becd6469ae93430f2e60764"
18+
ARROW_VERSION="f082b17323354dc2af31f39c15c58b995ba08360"
1919
ARROW_URL="https://github.com/apache/arrow/archive/${ARROW_VERSION}.tar.gz"
2020
ARROW_BASEDIR="arrow-${ARROW_VERSION}"
2121

0 commit comments

Comments
 (0)