From 36ae03c3f45cfb8c6ef7cbfb11d097156e0f5a26 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Tue, 23 Jul 2024 12:43:00 -0700 Subject: [PATCH] iceberg: refactor Avro ostream to avoid iobuf While ultimately we want an iobuf from the ostream, it currently requires pointer stability of the underlying fragments, which isn't an explicit contract of the iobuf. Instead, this tweaks the class to use a container of temporary_buffers directly, and expects callers to pass the buffers to an iobuf. There was some discussion on making iobuf smarter and providing an interface that guaranteed pointer stability of buffers. This seems like it'd be a heavy-handed requirement to impose on iobuf to enfroce. Putting the onus on callers seems like a quick, good-enough approach. --- src/v/iceberg/BUILD | 1 + src/v/iceberg/avro_utils.h | 23 ++++----- src/v/iceberg/manifest_avro.cc | 43 ++++++++++------ src/v/iceberg/tests/BUILD | 1 + .../tests/manifest_serialization_test.cc | 50 ++++++++++++++----- 5 files changed, 77 insertions(+), 41 deletions(-) diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index af4fb95d739b5..f33df5f40da80 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -118,6 +118,7 @@ redpanda_cc_library( "//src/v/strings:string_switch", "//src/v/utils:named_type", "@avro", + "@seastar", ], ) diff --git a/src/v/iceberg/avro_utils.h b/src/v/iceberg/avro_utils.h index fb4ff1c020f21..34d9aac7bbf13 100644 --- a/src/v/iceberg/avro_utils.h +++ b/src/v/iceberg/avro_utils.h @@ -9,6 +9,7 @@ #pragma once #include "bytes/iobuf.h" +#include "container/fragmented_vector.h" #include @@ -17,14 +18,15 @@ namespace iceberg { -// Near-identical implementation of avro::MemoryOutputStream, but backed by an -// iobuf that can be released. +// Near-identical implementation of avro::MemoryOutputStream, but backed by +// temporary buffers that callers can use to construct an iobuf. class avro_iobuf_ostream : public avro::OutputStream { public: + using buf_container_t = chunked_vector>; explicit avro_iobuf_ostream( - size_t chunk_size, iobuf* buf, size_t* byte_count) + size_t chunk_size, buf_container_t* bufs, size_t* byte_count) : chunk_size_(chunk_size) - , buf_(buf) + , bufs_(bufs) , available_(0) , byte_count_(byte_count) {} ~avro_iobuf_ostream() override = default; @@ -36,17 +38,12 @@ class avro_iobuf_ostream : public avro::OutputStream { // space. bool next(uint8_t** data, size_t* len) final { if (available_ == 0) { - // NOTE: it is critical to add the buffer to a separate iobuf first - // and then add that iobuf's fragments, as adding a buffer directly - // to `buf_` may end up packing the fragments together. - iobuf new_frag; - new_frag.append(ss::temporary_buffer{chunk_size_}); - buf_->append_fragments(std::move(new_frag)); + bufs_->emplace_back(chunk_size_); available_ = chunk_size_; } - auto back_frag = buf_->rbegin(); + auto& back_frag = bufs_->back(); *data = reinterpret_cast( - back_frag->share(chunk_size_ - available_, available_).get_write()); + back_frag.share(chunk_size_ - available_, available_).get_write()); *len = available_; *byte_count_ += available_; available_ = 0; @@ -66,7 +63,7 @@ class avro_iobuf_ostream : public avro::OutputStream { // Size in bytes with which to allocate new fragments. const size_t chunk_size_; - iobuf* buf_; + buf_container_t* bufs_; size_t available_; // Total number of bytes. diff --git a/src/v/iceberg/manifest_avro.cc b/src/v/iceberg/manifest_avro.cc index 5bef3e17f9524..5d299e15a2c12 100644 --- a/src/v/iceberg/manifest_avro.cc +++ b/src/v/iceberg/manifest_avro.cc @@ -18,6 +18,8 @@ #include "iceberg/schema_json.h" #include "strings/string_switch.h" +#include + #include namespace iceberg { @@ -136,27 +138,36 @@ metadata_from_reader(avro::DataFileReader& rdr) { } // anonymous namespace iobuf serialize_avro(const manifest& m) { - iobuf buf; size_t bytes_streamed = 0; - auto out = std::make_unique( - 4_KiB, &buf, &bytes_streamed); + avro_iobuf_ostream::buf_container_t bufs; static constexpr size_t avro_default_sync_bytes = 16_KiB; auto meta = metadata_to_map(m.metadata); - avro::DataFileWriter writer( - std::move(out), - manifest_entry::valid_schema(), - avro_default_sync_bytes, - avro::NULL_CODEC, - meta); + { + auto out = std::make_unique( + 4_KiB, &bufs, &bytes_streamed); + avro::DataFileWriter writer( + std::move(out), + manifest_entry::valid_schema(), + avro_default_sync_bytes, + avro::NULL_CODEC, + meta); + + // TODO: the Avro code-generated manifest_entry doesn't have the r102 + // partition field defined, as it relies on runtime information of the + // partition spec! + for (const auto& e : m.entries) { + writer.write(e); + } + writer.flush(); + writer.close(); - // TODO: the Avro code-generated manifest_entry doesn't have the r102 - // partition field defined, as it relies on runtime information of the - // partition spec! - for (const auto& e : m.entries) { - writer.write(e); + // NOTE: ~DataFileWriter does a final sync which may write to the + // chunks. Destruct the writer before moving ownership of the chunks. + } + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); } - writer.flush(); - writer.close(); buf.trim_back(buf.size_bytes() - bytes_streamed); return buf; } diff --git a/src/v/iceberg/tests/BUILD b/src/v/iceberg/tests/BUILD index c1b42d9246e02..cf6846011d14a 100644 --- a/src/v/iceberg/tests/BUILD +++ b/src/v/iceberg/tests/BUILD @@ -53,6 +53,7 @@ redpanda_cc_gtest( ":test_schemas", "//src/v/base", "//src/v/bytes:iobuf", + "//src/v/container:fragmented_vector", "//src/v/iceberg:avro_utils", "//src/v/iceberg:manifest", "//src/v/iceberg:manifest_avro", diff --git a/src/v/iceberg/tests/manifest_serialization_test.cc b/src/v/iceberg/tests/manifest_serialization_test.cc index d6feb7acd1b46..4b32d93cd264c 100644 --- a/src/v/iceberg/tests/manifest_serialization_test.cc +++ b/src/v/iceberg/tests/manifest_serialization_test.cc @@ -9,6 +9,7 @@ #include "base/units.h" #include "bytes/iobuf.h" +#include "container/fragmented_vector.h" #include "iceberg/avro_utils.h" #include "iceberg/manifest.h" #include "iceberg/manifest_avro.h" @@ -55,10 +56,10 @@ TEST(ManifestSerializationTest, TestManifestEntry) { entry.data_file.record_count = 3; entry.data_file.file_size_in_bytes = 1024; - iobuf buf; size_t bytes_streamed{0}; + avro_iobuf_ostream::buf_container_t bufs; auto out = std::make_unique( - 4096, &buf, &bytes_streamed); + 4096, &bufs, &bytes_streamed); // Encode to the output stream. avro::EncoderPtr encoder = avro::binaryEncoder(); @@ -66,6 +67,10 @@ TEST(ManifestSerializationTest, TestManifestEntry) { avro::encode(*encoder, entry); encoder->flush(); + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } // Decode the iobuf from the input stream. auto in = std::make_unique(std::move(buf)); avro::DecoderPtr decoder = avro::binaryDecoder(); @@ -86,10 +91,10 @@ TEST(ManifestSerializationTest, TestManyManifestEntries) { entry.data_file.record_count = 3; entry.data_file.file_size_in_bytes = 1024; - iobuf buf; size_t bytes_streamed{0}; + avro_iobuf_ostream::buf_container_t bufs; auto out = std::make_unique( - 4096, &buf, &bytes_streamed); + 4096, &bufs, &bytes_streamed); // Encode many entries. This is a regression test for a bug where // serializing large Avro files would handle iobuf fragments improperly, @@ -101,6 +106,10 @@ TEST(ManifestSerializationTest, TestManyManifestEntries) { encoder->flush(); } + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } // Decode the iobuf from the input stream. auto in = std::make_unique(std::move(buf)); avro::DecoderPtr decoder = avro::binaryDecoder(); @@ -127,10 +136,10 @@ TEST(ManifestSerializationTest, TestManifestFile) { manifest.existing_rows_count = 10; manifest.deleted_rows_count = 11; - iobuf buf; size_t bytes_streamed{0}; + avro_iobuf_ostream::buf_container_t bufs; auto out = std::make_unique( - 4096, &buf, &bytes_streamed); + 4096, &bufs, &bytes_streamed); // Encode to the output stream. avro::EncoderPtr encoder = avro::binaryEncoder(); @@ -138,6 +147,10 @@ TEST(ManifestSerializationTest, TestManifestFile) { avro::encode(*encoder, manifest); encoder->flush(); + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } // Decode the iobuf from the input stream. auto in = std::make_unique(std::move(buf)); avro::DecoderPtr decoder = avro::binaryDecoder(); @@ -183,14 +196,27 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { metadata["f1"] = f1; metadata["f2"] = f2; - iobuf buf; size_t bytes_streamed{0}; + avro_iobuf_ostream::buf_container_t bufs; auto out = std::make_unique( - 4_KiB, &buf, &bytes_streamed); - avro::DataFileWriter writer( - std::move(out), manifest_file_schema, 16_KiB, avro::NULL_CODEC, metadata); - writer.write(manifest); - writer.flush(); + 4_KiB, &bufs, &bytes_streamed); + { + avro::DataFileWriter writer( + std::move(out), + manifest_file_schema, + 16_KiB, + avro::NULL_CODEC, + metadata); + writer.write(manifest); + writer.flush(); + + // NOTE: ~DataFileWriter does a final sync which may write to the + // chunks. Destruct the writer before moving ownership of the chunks. + } + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } auto in = std::make_unique(buf.copy()); avro::DataFileReader reader( std::move(in), manifest_file_schema);