diff --git a/src/parquet/column/properties-test.cc b/src/parquet/column/properties-test.cc index 42be0451..13a155a8 100644 --- a/src/parquet/column/properties-test.cc +++ b/src/parquet/column/properties-test.cc @@ -34,11 +34,12 @@ TEST(TestReaderProperties, Basics) { } TEST(TestWriterProperties, Basics) { - WriterProperties props; + std::shared_ptr props = WriterProperties::Builder().build(); - ASSERT_EQ(DEFAULT_PAGE_SIZE, props.data_pagesize()); - ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE, props.dictionary_pagesize()); - ASSERT_EQ(DEFAULT_IS_DICTIONARY_ENABLED, props.is_dictionary_enabled()); + ASSERT_EQ(DEFAULT_PAGE_SIZE, props->data_pagesize()); + ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE, props->dictionary_pagesize()); + ASSERT_EQ(DEFAULT_IS_DICTIONARY_ENABLED, props->dictionary_enabled()); + ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version()); } } // namespace test diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h index 132b1a6e..d2a231b8 100644 --- a/src/parquet/column/properties.h +++ b/src/parquet/column/properties.h @@ -26,6 +26,10 @@ namespace parquet { +struct ParquetVersion { + enum type { PARQUET_1_0, PARQUET_2_0 }; +}; + static int64_t DEFAULT_BUFFER_SIZE = 0; static bool DEFAULT_USE_BUFFERED_STREAM = false; @@ -72,42 +76,94 @@ ReaderProperties default_reader_properties(); static int64_t DEFAULT_PAGE_SIZE = 1024 * 1024; static int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE; static bool DEFAULT_IS_DICTIONARY_ENABLED = true; +static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION = + ParquetVersion::PARQUET_1_0; class WriterProperties { public: - explicit WriterProperties(MemoryAllocator* allocator = default_allocator()) - : allocator_(allocator) { - pagesize_ = DEFAULT_PAGE_SIZE; - dictionary_pagesize_ = DEFAULT_DICTIONARY_PAGE_SIZE; - dictionary_enabled_ = DEFAULT_IS_DICTIONARY_ENABLED; - } + class Builder { + public: + Builder() + : allocator_(default_allocator()), + dictionary_enabled_(DEFAULT_IS_DICTIONARY_ENABLED), + dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE), + pagesize_(DEFAULT_PAGE_SIZE), + version_(DEFAULT_WRITER_VERSION) {} + virtual ~Builder() {} + + Builder* allocator(MemoryAllocator* allocator) { + allocator_ = allocator; + return this; + } - int64_t dictionary_pagesize() const { return dictionary_pagesize_; } + Builder* dictionary_pagesize(int64_t dictionary_psize) { + dictionary_pagesize_ = dictionary_psize; + return this; + } - void set_dictionary_pagesize(int64_t dictionary_psize) { - dictionary_pagesize_ = dictionary_psize; - } + Builder* data_pagesize(int64_t pg_size) { + pagesize_ = pg_size; + return this; + } - int64_t data_pagesize() const { return pagesize_; } + Builder* enable_dictionary() { + dictionary_enabled_ = true; + return this; + } - void set_data_pagesize(int64_t pg_size) { pagesize_ = pg_size; } + Builder* disable_dictionary() { + dictionary_enabled_ = false; + return this; + } - void enable_dictionary() { dictionary_enabled_ = true; } + Builder* version(ParquetVersion::type version) { + version_ = version; + return this; + } - void disable_dictionary() { dictionary_enabled_ = false; } + std::shared_ptr build() { + return std::shared_ptr(new WriterProperties( + allocator_, dictionary_enabled_, dictionary_pagesize_, pagesize_, version_)); + } - bool is_dictionary_enabled() const { return dictionary_enabled_; } + private: + MemoryAllocator* allocator_; + bool dictionary_enabled_; + int64_t dictionary_pagesize_; + int64_t pagesize_; + ParquetVersion::type version_; + }; MemoryAllocator* allocator() { return allocator_; } + bool dictionary_enabled() const { return dictionary_enabled_; } + + int64_t dictionary_pagesize() const { return dictionary_pagesize_; } + + int64_t data_pagesize() const { return pagesize_; } + + ParquetVersion::type version() { return parquet_version_; } + private: - int64_t pagesize_; - int64_t dictionary_pagesize_; - bool dictionary_enabled_; + explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled, + int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version) + : allocator_(allocator), + dictionary_enabled_(dictionary_enabled), + dictionary_pagesize_(dictionary_pagesize), + pagesize_(pagesize), + parquet_version_(version) { + pagesize_ = DEFAULT_PAGE_SIZE; + dictionary_enabled_ = DEFAULT_IS_DICTIONARY_ENABLED; + } + MemoryAllocator* allocator_; + bool dictionary_enabled_; + int64_t dictionary_pagesize_; + int64_t pagesize_; + ParquetVersion::type parquet_version_; }; -WriterProperties default_writer_properties(); +std::shared_ptr default_writer_properties(); } // namespace parquet diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index c57e0ad9..8856f51d 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -25,8 +25,9 @@ namespace parquet { // ---------------------------------------------------------------------- // ColumnWriter -WriterProperties default_writer_properties() { - static WriterProperties default_writer_properties; +std::shared_ptr default_writer_properties() { + static std::shared_ptr default_writer_properties = + WriterProperties::Builder().build(); return default_writer_properties; } diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index 27e8d626..f9ff4865 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -168,9 +168,9 @@ void RowGroupSerializer::Close() { std::unique_ptr FileSerializer::Open( std::shared_ptr sink, std::shared_ptr& schema, - MemoryAllocator* allocator) { + MemoryAllocator* allocator, const std::shared_ptr& properties) { std::unique_ptr result( - new FileSerializer(sink, schema, allocator)); + new FileSerializer(sink, schema, allocator, properties)); return result; } @@ -200,6 +200,10 @@ int64_t FileSerializer::num_rows() const { return num_rows_; } +const std::shared_ptr& FileSerializer::properties() const { + return properties_; +} + RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) { if (row_group_writer_) { row_group_writer_->Close(); } num_rows_ += num_rows; @@ -243,12 +247,14 @@ void FileSerializer::WriteMetaData() { } FileSerializer::FileSerializer(std::shared_ptr sink, - std::shared_ptr& schema, MemoryAllocator* allocator = default_allocator()) + std::shared_ptr& schema, MemoryAllocator* allocator, + const std::shared_ptr& properties) : sink_(sink), allocator_(allocator), num_row_groups_(0), num_rows_(0), - is_open_(true) { + is_open_(true), + properties_(properties) { schema_.Init(schema); StartFile(); } diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index dd595a91..8ab025c4 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -110,12 +110,15 @@ class FileSerializer : public ParquetFileWriter::Contents { public: static std::unique_ptr Open( std::shared_ptr sink, std::shared_ptr& schema, - MemoryAllocator* allocator = default_allocator()); + MemoryAllocator* allocator = default_allocator(), + const std::shared_ptr& properties = default_writer_properties()); void Close() override; RowGroupWriter* AppendRowGroup(int64_t num_rows) override; + const std::shared_ptr& properties() const override; + int num_columns() const override; int num_row_groups() const override; int64_t num_rows() const override; @@ -124,7 +127,8 @@ class FileSerializer : public ParquetFileWriter::Contents { private: explicit FileSerializer(std::shared_ptr sink, - std::shared_ptr& schema, MemoryAllocator* allocator); + std::shared_ptr& schema, MemoryAllocator* allocator, + const std::shared_ptr& properties); std::shared_ptr sink_; format::FileMetaData metadata_; @@ -134,6 +138,7 @@ class FileSerializer : public ParquetFileWriter::Contents { int num_rows_; bool is_open_; std::unique_ptr row_group_writer_; + std::shared_ptr properties_; void StartFile(); void WriteMetaData(); diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc index c516a5e7..8f643c29 100644 --- a/src/parquet/file/writer.cc +++ b/src/parquet/file/writer.cc @@ -55,7 +55,7 @@ ParquetFileWriter::~ParquetFileWriter() { std::unique_ptr ParquetFileWriter::Open( std::shared_ptr sink, std::shared_ptr& schema, - MemoryAllocator* allocator) { + MemoryAllocator* allocator, const std::shared_ptr& properties) { auto contents = FileSerializer::Open(sink, schema, allocator); std::unique_ptr result(new ParquetFileWriter()); @@ -80,4 +80,8 @@ RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) { return contents_->AppendRowGroup(num_rows); } +const std::shared_ptr& ParquetFileWriter::properties() const { + return contents_->properties(); +} + } // namespace parquet diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h index 22ee8562..15b590dc 100644 --- a/src/parquet/file/writer.h +++ b/src/parquet/file/writer.h @@ -21,6 +21,7 @@ #include #include +#include "parquet/column/properties.h" #include "parquet/schema/descriptor.h" #include "parquet/schema/types.h" #include "parquet/util/mem-allocator.h" @@ -93,6 +94,8 @@ class ParquetFileWriter { virtual int num_columns() const = 0; virtual int num_row_groups() const = 0; + virtual const std::shared_ptr& properties() const = 0; + // Return const-poitner to make it clear that this object is not to be copied const SchemaDescriptor* schema() const { return &schema_; } SchemaDescriptor schema_; @@ -103,7 +106,8 @@ class ParquetFileWriter { static std::unique_ptr Open(std::shared_ptr sink, std::shared_ptr& schema, - MemoryAllocator* allocator = default_allocator()); + MemoryAllocator* allocator = default_allocator(), + const std::shared_ptr& properties = default_writer_properties()); void Open(std::unique_ptr contents); void Close(); @@ -138,6 +142,11 @@ class ParquetFileWriter { */ int num_row_groups() const; + /** + * Configuartion passed to the writer, e.g. the used Parquet format version. + */ + const std::shared_ptr& properties() const; + /** * Returns the file schema descriptor */