diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index af4fb95d739b5..f33df5f40da80 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -118,6 +118,7 @@ redpanda_cc_library( "//src/v/strings:string_switch", "//src/v/utils:named_type", "@avro", + "@seastar", ], ) diff --git a/src/v/iceberg/avro_utils.h b/src/v/iceberg/avro_utils.h index fb4ff1c020f21..34d9aac7bbf13 100644 --- a/src/v/iceberg/avro_utils.h +++ b/src/v/iceberg/avro_utils.h @@ -9,6 +9,7 @@ #pragma once #include "bytes/iobuf.h" +#include "container/fragmented_vector.h" #include @@ -17,14 +18,15 @@ namespace iceberg { -// Near-identical implementation of avro::MemoryOutputStream, but backed by an -// iobuf that can be released. +// Near-identical implementation of avro::MemoryOutputStream, but backed by +// temporary buffers that callers can use to construct an iobuf. class avro_iobuf_ostream : public avro::OutputStream { public: + using buf_container_t = chunked_vector>; explicit avro_iobuf_ostream( - size_t chunk_size, iobuf* buf, size_t* byte_count) + size_t chunk_size, buf_container_t* bufs, size_t* byte_count) : chunk_size_(chunk_size) - , buf_(buf) + , bufs_(bufs) , available_(0) , byte_count_(byte_count) {} ~avro_iobuf_ostream() override = default; @@ -36,17 +38,12 @@ class avro_iobuf_ostream : public avro::OutputStream { // space. bool next(uint8_t** data, size_t* len) final { if (available_ == 0) { - // NOTE: it is critical to add the buffer to a separate iobuf first - // and then add that iobuf's fragments, as adding a buffer directly - // to `buf_` may end up packing the fragments together. - iobuf new_frag; - new_frag.append(ss::temporary_buffer{chunk_size_}); - buf_->append_fragments(std::move(new_frag)); + bufs_->emplace_back(chunk_size_); available_ = chunk_size_; } - auto back_frag = buf_->rbegin(); + auto& back_frag = bufs_->back(); *data = reinterpret_cast( - back_frag->share(chunk_size_ - available_, available_).get_write()); + back_frag.share(chunk_size_ - available_, available_).get_write()); *len = available_; *byte_count_ += available_; available_ = 0; @@ -66,7 +63,7 @@ class avro_iobuf_ostream : public avro::OutputStream { // Size in bytes with which to allocate new fragments. const size_t chunk_size_; - iobuf* buf_; + buf_container_t* bufs_; size_t available_; // Total number of bytes. diff --git a/src/v/iceberg/manifest_avro.cc b/src/v/iceberg/manifest_avro.cc index 5bef3e17f9524..5d299e15a2c12 100644 --- a/src/v/iceberg/manifest_avro.cc +++ b/src/v/iceberg/manifest_avro.cc @@ -18,6 +18,8 @@ #include "iceberg/schema_json.h" #include "strings/string_switch.h" +#include + #include namespace iceberg { @@ -136,27 +138,36 @@ metadata_from_reader(avro::DataFileReader& rdr) { } // anonymous namespace iobuf serialize_avro(const manifest& m) { - iobuf buf; size_t bytes_streamed = 0; - auto out = std::make_unique( - 4_KiB, &buf, &bytes_streamed); + avro_iobuf_ostream::buf_container_t bufs; static constexpr size_t avro_default_sync_bytes = 16_KiB; auto meta = metadata_to_map(m.metadata); - avro::DataFileWriter writer( - std::move(out), - manifest_entry::valid_schema(), - avro_default_sync_bytes, - avro::NULL_CODEC, - meta); + { + auto out = std::make_unique( + 4_KiB, &bufs, &bytes_streamed); + avro::DataFileWriter writer( + std::move(out), + manifest_entry::valid_schema(), + avro_default_sync_bytes, + avro::NULL_CODEC, + meta); + + // TODO: the Avro code-generated manifest_entry doesn't have the r102 + // partition field defined, as it relies on runtime information of the + // partition spec! + for (const auto& e : m.entries) { + writer.write(e); + } + writer.flush(); + writer.close(); - // TODO: the Avro code-generated manifest_entry doesn't have the r102 - // partition field defined, as it relies on runtime information of the - // partition spec! - for (const auto& e : m.entries) { - writer.write(e); + // NOTE: ~DataFileWriter does a final sync which may write to the + // chunks. Destruct the writer before moving ownership of the chunks. + } + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); } - writer.flush(); - writer.close(); buf.trim_back(buf.size_bytes() - bytes_streamed); return buf; } diff --git a/src/v/iceberg/tests/BUILD b/src/v/iceberg/tests/BUILD index c1b42d9246e02..cf6846011d14a 100644 --- a/src/v/iceberg/tests/BUILD +++ b/src/v/iceberg/tests/BUILD @@ -53,6 +53,7 @@ redpanda_cc_gtest( ":test_schemas", "//src/v/base", "//src/v/bytes:iobuf", + "//src/v/container:fragmented_vector", "//src/v/iceberg:avro_utils", "//src/v/iceberg:manifest", "//src/v/iceberg:manifest_avro", diff --git a/src/v/iceberg/tests/manifest_serialization_test.cc b/src/v/iceberg/tests/manifest_serialization_test.cc index d6feb7acd1b46..4b32d93cd264c 100644 --- a/src/v/iceberg/tests/manifest_serialization_test.cc +++ b/src/v/iceberg/tests/manifest_serialization_test.cc @@ -9,6 +9,7 @@ #include "base/units.h" #include "bytes/iobuf.h" +#include "container/fragmented_vector.h" #include "iceberg/avro_utils.h" #include "iceberg/manifest.h" #include "iceberg/manifest_avro.h" @@ -55,10 +56,10 @@ TEST(ManifestSerializationTest, TestManifestEntry) { entry.data_file.record_count = 3; entry.data_file.file_size_in_bytes = 1024; - iobuf buf; size_t bytes_streamed{0}; + avro_iobuf_ostream::buf_container_t bufs; auto out = std::make_unique( - 4096, &buf, &bytes_streamed); + 4096, &bufs, &bytes_streamed); // Encode to the output stream. avro::EncoderPtr encoder = avro::binaryEncoder(); @@ -66,6 +67,10 @@ TEST(ManifestSerializationTest, TestManifestEntry) { avro::encode(*encoder, entry); encoder->flush(); + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } // Decode the iobuf from the input stream. auto in = std::make_unique(std::move(buf)); avro::DecoderPtr decoder = avro::binaryDecoder(); @@ -86,10 +91,10 @@ TEST(ManifestSerializationTest, TestManyManifestEntries) { entry.data_file.record_count = 3; entry.data_file.file_size_in_bytes = 1024; - iobuf buf; size_t bytes_streamed{0}; + avro_iobuf_ostream::buf_container_t bufs; auto out = std::make_unique( - 4096, &buf, &bytes_streamed); + 4096, &bufs, &bytes_streamed); // Encode many entries. This is a regression test for a bug where // serializing large Avro files would handle iobuf fragments improperly, @@ -101,6 +106,10 @@ TEST(ManifestSerializationTest, TestManyManifestEntries) { encoder->flush(); } + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } // Decode the iobuf from the input stream. auto in = std::make_unique(std::move(buf)); avro::DecoderPtr decoder = avro::binaryDecoder(); @@ -127,10 +136,10 @@ TEST(ManifestSerializationTest, TestManifestFile) { manifest.existing_rows_count = 10; manifest.deleted_rows_count = 11; - iobuf buf; size_t bytes_streamed{0}; + avro_iobuf_ostream::buf_container_t bufs; auto out = std::make_unique( - 4096, &buf, &bytes_streamed); + 4096, &bufs, &bytes_streamed); // Encode to the output stream. avro::EncoderPtr encoder = avro::binaryEncoder(); @@ -138,6 +147,10 @@ TEST(ManifestSerializationTest, TestManifestFile) { avro::encode(*encoder, manifest); encoder->flush(); + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } // Decode the iobuf from the input stream. auto in = std::make_unique(std::move(buf)); avro::DecoderPtr decoder = avro::binaryDecoder(); @@ -183,14 +196,27 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { metadata["f1"] = f1; metadata["f2"] = f2; - iobuf buf; size_t bytes_streamed{0}; + avro_iobuf_ostream::buf_container_t bufs; auto out = std::make_unique( - 4_KiB, &buf, &bytes_streamed); - avro::DataFileWriter writer( - std::move(out), manifest_file_schema, 16_KiB, avro::NULL_CODEC, metadata); - writer.write(manifest); - writer.flush(); + 4_KiB, &bufs, &bytes_streamed); + { + avro::DataFileWriter writer( + std::move(out), + manifest_file_schema, + 16_KiB, + avro::NULL_CODEC, + metadata); + writer.write(manifest); + writer.flush(); + + // NOTE: ~DataFileWriter does a final sync which may write to the + // chunks. Destruct the writer before moving ownership of the chunks. + } + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } auto in = std::make_unique(buf.copy()); avro::DataFileReader reader( std::move(in), manifest_file_schema);