diff --git a/cpp/src/parquet/stream_writer.cc b/cpp/src/parquet/stream_writer.cc index c578b7e5274..0fd84f49749 100644 --- a/cpp/src/parquet/stream_writer.cc +++ b/cpp/src/parquet/stream_writer.cc @@ -129,20 +129,26 @@ StreamWriter& StreamWriter::operator<<(FixedStringView v) { } StreamWriter& StreamWriter::operator<<(const char* v) { - return WriteVariableLength(v, std::strlen(v)); + return WriteVariableLength(v, std::strlen(v), ConvertedType::UTF8); } StreamWriter& StreamWriter::operator<<(const std::string& v) { - return WriteVariableLength(v.data(), v.size()); + return WriteVariableLength(v.data(), v.size(), ConvertedType::UTF8); } StreamWriter& StreamWriter::operator<<(::std::string_view v) { - return WriteVariableLength(v.data(), v.size()); + return WriteVariableLength(v.data(), v.size(), ConvertedType::UTF8); +} + +StreamWriter& StreamWriter::operator<<(RawDataView v) { + return WriteVariableLength(reinterpret_cast(v.data()), v.size(), + ConvertedType::NONE); } StreamWriter& StreamWriter::WriteVariableLength(const char* data_ptr, - std::size_t data_len) { - CheckColumn(Type::BYTE_ARRAY, ConvertedType::UTF8); + std::size_t data_len, + ConvertedType::type type) { + CheckColumn(Type::BYTE_ARRAY, type); auto writer = static_cast(row_group_writer_->column(column_index_++)); diff --git a/cpp/src/parquet/stream_writer.h b/cpp/src/parquet/stream_writer.h index 7637cf7da24..76265140222 100644 --- a/cpp/src/parquet/stream_writer.h +++ b/cpp/src/parquet/stream_writer.h @@ -26,6 +26,8 @@ #include #include +#include "arrow/util/span.h" + #include "parquet/column_writer.h" #include "parquet/file_writer.h" @@ -151,6 +153,12 @@ class PARQUET_EXPORT StreamWriter { StreamWriter& operator<<(const std::string& v); StreamWriter& operator<<(::std::string_view v); + /// \brief Helper class to write variable length raw data. + using RawDataView = ::arrow::util::span; + + /// \brief Output operators for variable length raw data. + StreamWriter& operator<<(RawDataView v); + /// \brief Output operator for optional fields. template StreamWriter& operator<<(const optional& v) { @@ -190,7 +198,8 @@ class PARQUET_EXPORT StreamWriter { return *this; } - StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len); + StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len, + ConvertedType::type converted_type); StreamWriter& WriteFixedLength(const char* data_ptr, std::size_t data_len); diff --git a/cpp/src/parquet/stream_writer_test.cc b/cpp/src/parquet/stream_writer_test.cc index a36feb429a1..4a972d650c2 100644 --- a/cpp/src/parquet/stream_writer_test.cc +++ b/cpp/src/parquet/stream_writer_test.cc @@ -80,6 +80,9 @@ class TestStreamWriter : public ::testing::Test { fields.push_back(schema::PrimitiveNode::Make("double_field", Repetition::REQUIRED, Type::DOUBLE, ConvertedType::NONE)); + fields.push_back(schema::PrimitiveNode::Make("bytes_field", Repetition::REQUIRED, + Type::BYTE_ARRAY, ConvertedType::NONE)); + return std::static_pointer_cast( schema::GroupNode::Make("schema", Repetition::REQUIRED, fields)); } @@ -99,7 +102,7 @@ TEST_F(TestStreamWriter, DefaultConstructed) { EXPECT_EQ(0, os.current_column()); EXPECT_EQ(0, os.current_row()); EXPECT_EQ(0, os.num_columns()); - EXPECT_EQ(0, os.SkipColumns(10)); + EXPECT_EQ(0, os.SkipColumns(11)); } TEST_F(TestStreamWriter, TypeChecking) { @@ -162,6 +165,17 @@ TEST_F(TestStreamWriter, TypeChecking) { EXPECT_THROW(writer_ << 5.4f, ParquetException); EXPECT_NO_THROW(writer_ << 5.4); + // Required type: Variable length byte array. + // Strings and naked char* are rejected because they should use UTF8 instead of None + // type. + EXPECT_EQ(10, writer_.current_column()); + EXPECT_THROW(writer_ << 5, ParquetException); + EXPECT_THROW(writer_ << char3_array, ParquetException); + EXPECT_THROW(writer_ << char4_array, ParquetException); + EXPECT_THROW(writer_ << char5_array, ParquetException); + EXPECT_THROW(writer_ << std::string("not ok"), ParquetException); + EXPECT_NO_THROW(writer_ << StreamWriter::RawDataView((uint8_t*)"\xff\0ok", 4)); + EXPECT_EQ(0, writer_.current_row()); EXPECT_NO_THROW(writer_ << EndRow); EXPECT_EQ(1, writer_.current_row()); @@ -210,6 +224,11 @@ TEST_F(TestStreamWriter, RequiredFieldChecking) { EXPECT_THROW(writer_ << optional(), ParquetException); EXPECT_NO_THROW(writer_ << optional(5.4)); + // Required field of type: Variable length byte array. + EXPECT_THROW(writer_ << optional(), ParquetException); + EXPECT_NO_THROW( + writer_ << std::make_optional((uint8_t*)"ok", 2)); + EXPECT_NO_THROW(writer_ << EndRow); } @@ -234,6 +253,7 @@ TEST_F(TestStreamWriter, EndRow) { EXPECT_NO_THROW(writer_ << uint64_t((1ull << 60) + 123)); EXPECT_NO_THROW(writer_ << 25.4f); EXPECT_NO_THROW(writer_ << 3.3424); + EXPECT_NO_THROW(writer_ << StreamWriter::RawDataView((uint8_t*)"ok", 2)); // Correct use of end row after all fields have been output. EXPECT_NO_THROW(writer_ << EndRow); EXPECT_EQ(1, writer_.current_row()); @@ -272,6 +292,10 @@ TEST_F(TestStreamWriter, EndRowGroup) { EXPECT_NO_THROW(writer_ << uint64_t((1ull << 60) - i * i)) << "index: " << i; EXPECT_NO_THROW(writer_ << 42325.4f / float(i + 1)) << "index: " << i; EXPECT_NO_THROW(writer_ << 3.2342e5 / double(i + 1)) << "index: " << i; + std::string tmpString = std::to_string(i); + EXPECT_NO_THROW(writer_ << StreamWriter::RawDataView((uint8_t*)tmpString.c_str(), + tmpString.length())) + << "index: " << i; EXPECT_NO_THROW(writer_ << EndRow) << "index: " << i; if (i % 1000 == 0) { @@ -293,7 +317,8 @@ TEST_F(TestStreamWriter, SkipColumns) { writer_ << true << std::string("Cannot skip mandatory columns"); EXPECT_THROW(writer_.SkipColumns(1), ParquetException); writer_ << 'x' << std::array{'A', 'B', 'C', 'D'} << int8_t(2) << uint16_t(3) - << int32_t(4) << uint64_t(5) << 6.0f << 7.0; + << int32_t(4) << uint64_t(5) << 6.0f << 7.0 + << StreamWriter::RawDataView((uint8_t*)"ok", 2); writer_ << EndRow; } @@ -304,7 +329,8 @@ TEST_F(TestStreamWriter, AppendNotImplemented) { writer_ = StreamWriter{ParquetFileWriter::Open(outfile, GetSchema())}; writer_ << false << std::string("Just one row") << 'x' << std::array{'A', 'B', 'C', 'D'} << int8_t(2) << uint16_t(3) - << int32_t(4) << uint64_t(5) << 6.0f << 7.0; + << int32_t(4) << uint64_t(5) << 6.0f << 7.0 + << StreamWriter::RawDataView((uint8_t*)"ok", 2); writer_ << EndRow; writer_ = StreamWriter{};