Skip to content

Commit

Permalink
iceberg: refactor Avro ostream to avoid iobuf
Browse files Browse the repository at this point in the history
While ultimately we want an iobuf from the ostream, it currently
requires pointer stability of the underlying fragments, which isn't an
explicit contract of the iobuf. Instead, this tweaks the class to use a
container of temporary_buffers directly, and expects callers to pass the
buffers to an iobuf.

There was some discussion on making iobuf smarter and providing an
interface that guaranteed pointer stability of buffers. This seems like
it'd be a heavy-handed requirement to impose on iobuf to enfroce.
Putting the onus on callers seems like a quick, good-enough approach.
  • Loading branch information
andrwng committed Jul 23, 2024
1 parent 95a93cd commit 36ae03c
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 41 deletions.
1 change: 1 addition & 0 deletions src/v/iceberg/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ redpanda_cc_library(
"//src/v/strings:string_switch",
"//src/v/utils:named_type",
"@avro",
"@seastar",
],
)

Expand Down
23 changes: 10 additions & 13 deletions src/v/iceberg/avro_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma once

#include "bytes/iobuf.h"
#include "container/fragmented_vector.h"

#include <seastar/core/temporary_buffer.hh>

Expand All @@ -17,14 +18,15 @@

namespace iceberg {

// Near-identical implementation of avro::MemoryOutputStream, but backed by an
// iobuf that can be released.
// Near-identical implementation of avro::MemoryOutputStream, but backed by
// temporary buffers that callers can use to construct an iobuf.
class avro_iobuf_ostream : public avro::OutputStream {
public:
using buf_container_t = chunked_vector<ss::temporary_buffer<char>>;
explicit avro_iobuf_ostream(
size_t chunk_size, iobuf* buf, size_t* byte_count)
size_t chunk_size, buf_container_t* bufs, size_t* byte_count)
: chunk_size_(chunk_size)
, buf_(buf)
, bufs_(bufs)
, available_(0)
, byte_count_(byte_count) {}
~avro_iobuf_ostream() override = default;
Expand All @@ -36,17 +38,12 @@ class avro_iobuf_ostream : public avro::OutputStream {
// space.
bool next(uint8_t** data, size_t* len) final {
if (available_ == 0) {
// NOTE: it is critical to add the buffer to a separate iobuf first
// and then add that iobuf's fragments, as adding a buffer directly
// to `buf_` may end up packing the fragments together.
iobuf new_frag;
new_frag.append(ss::temporary_buffer<char>{chunk_size_});
buf_->append_fragments(std::move(new_frag));
bufs_->emplace_back(chunk_size_);
available_ = chunk_size_;
}
auto back_frag = buf_->rbegin();
auto& back_frag = bufs_->back();
*data = reinterpret_cast<uint8_t*>(
back_frag->share(chunk_size_ - available_, available_).get_write());
back_frag.share(chunk_size_ - available_, available_).get_write());
*len = available_;
*byte_count_ += available_;
available_ = 0;
Expand All @@ -66,7 +63,7 @@ class avro_iobuf_ostream : public avro::OutputStream {
// Size in bytes with which to allocate new fragments.
const size_t chunk_size_;

iobuf* buf_;
buf_container_t* bufs_;
size_t available_;

// Total number of bytes.
Expand Down
43 changes: 27 additions & 16 deletions src/v/iceberg/manifest_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "iceberg/schema_json.h"
#include "strings/string_switch.h"

#include <seastar/core/temporary_buffer.hh>

#include <avro/DataFile.hh>

namespace iceberg {
Expand Down Expand Up @@ -136,27 +138,36 @@ metadata_from_reader(avro::DataFileReader<manifest_entry>& rdr) {
} // anonymous namespace

iobuf serialize_avro(const manifest& m) {
iobuf buf;
size_t bytes_streamed = 0;
auto out = std::make_unique<avro_iobuf_ostream>(
4_KiB, &buf, &bytes_streamed);
avro_iobuf_ostream::buf_container_t bufs;
static constexpr size_t avro_default_sync_bytes = 16_KiB;
auto meta = metadata_to_map(m.metadata);
avro::DataFileWriter<manifest_entry> writer(
std::move(out),
manifest_entry::valid_schema(),
avro_default_sync_bytes,
avro::NULL_CODEC,
meta);
{
auto out = std::make_unique<avro_iobuf_ostream>(
4_KiB, &bufs, &bytes_streamed);
avro::DataFileWriter<manifest_entry> writer(
std::move(out),
manifest_entry::valid_schema(),
avro_default_sync_bytes,
avro::NULL_CODEC,
meta);

// TODO: the Avro code-generated manifest_entry doesn't have the r102
// partition field defined, as it relies on runtime information of the
// partition spec!
for (const auto& e : m.entries) {
writer.write(e);
}
writer.flush();
writer.close();

// TODO: the Avro code-generated manifest_entry doesn't have the r102
// partition field defined, as it relies on runtime information of the
// partition spec!
for (const auto& e : m.entries) {
writer.write(e);
// NOTE: ~DataFileWriter does a final sync which may write to the
// chunks. Destruct the writer before moving ownership of the chunks.
}
iobuf buf;
for (auto& b : bufs) {
buf.append(std::move(b));
}
writer.flush();
writer.close();
buf.trim_back(buf.size_bytes() - bytes_streamed);
return buf;
}
Expand Down
1 change: 1 addition & 0 deletions src/v/iceberg/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ redpanda_cc_gtest(
":test_schemas",
"//src/v/base",
"//src/v/bytes:iobuf",
"//src/v/container:fragmented_vector",
"//src/v/iceberg:avro_utils",
"//src/v/iceberg:manifest",
"//src/v/iceberg:manifest_avro",
Expand Down
50 changes: 38 additions & 12 deletions src/v/iceberg/tests/manifest_serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "base/units.h"
#include "bytes/iobuf.h"
#include "container/fragmented_vector.h"
#include "iceberg/avro_utils.h"
#include "iceberg/manifest.h"
#include "iceberg/manifest_avro.h"
Expand Down Expand Up @@ -55,17 +56,21 @@ TEST(ManifestSerializationTest, TestManifestEntry) {
entry.data_file.record_count = 3;
entry.data_file.file_size_in_bytes = 1024;

iobuf buf;
size_t bytes_streamed{0};
avro_iobuf_ostream::buf_container_t bufs;
auto out = std::make_unique<avro_iobuf_ostream>(
4096, &buf, &bytes_streamed);
4096, &bufs, &bytes_streamed);

// Encode to the output stream.
avro::EncoderPtr encoder = avro::binaryEncoder();
encoder->init(*out);
avro::encode(*encoder, entry);
encoder->flush();

iobuf buf;
for (auto& b : bufs) {
buf.append(std::move(b));
}
// Decode the iobuf from the input stream.
auto in = std::make_unique<avro_iobuf_istream>(std::move(buf));
avro::DecoderPtr decoder = avro::binaryDecoder();
Expand All @@ -86,10 +91,10 @@ TEST(ManifestSerializationTest, TestManyManifestEntries) {
entry.data_file.record_count = 3;
entry.data_file.file_size_in_bytes = 1024;

iobuf buf;
size_t bytes_streamed{0};
avro_iobuf_ostream::buf_container_t bufs;
auto out = std::make_unique<avro_iobuf_ostream>(
4096, &buf, &bytes_streamed);
4096, &bufs, &bytes_streamed);

// Encode many entries. This is a regression test for a bug where
// serializing large Avro files would handle iobuf fragments improperly,
Expand All @@ -101,6 +106,10 @@ TEST(ManifestSerializationTest, TestManyManifestEntries) {
encoder->flush();
}

iobuf buf;
for (auto& b : bufs) {
buf.append(std::move(b));
}
// Decode the iobuf from the input stream.
auto in = std::make_unique<avro_iobuf_istream>(std::move(buf));
avro::DecoderPtr decoder = avro::binaryDecoder();
Expand All @@ -127,17 +136,21 @@ TEST(ManifestSerializationTest, TestManifestFile) {
manifest.existing_rows_count = 10;
manifest.deleted_rows_count = 11;

iobuf buf;
size_t bytes_streamed{0};
avro_iobuf_ostream::buf_container_t bufs;
auto out = std::make_unique<avro_iobuf_ostream>(
4096, &buf, &bytes_streamed);
4096, &bufs, &bytes_streamed);

// Encode to the output stream.
avro::EncoderPtr encoder = avro::binaryEncoder();
encoder->init(*out);
avro::encode(*encoder, manifest);
encoder->flush();

iobuf buf;
for (auto& b : bufs) {
buf.append(std::move(b));
}
// Decode the iobuf from the input stream.
auto in = std::make_unique<avro_iobuf_istream>(std::move(buf));
avro::DecoderPtr decoder = avro::binaryDecoder();
Expand Down Expand Up @@ -183,14 +196,27 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) {
metadata["f1"] = f1;
metadata["f2"] = f2;

iobuf buf;
size_t bytes_streamed{0};
avro_iobuf_ostream::buf_container_t bufs;
auto out = std::make_unique<avro_iobuf_ostream>(
4_KiB, &buf, &bytes_streamed);
avro::DataFileWriter<manifest_file> writer(
std::move(out), manifest_file_schema, 16_KiB, avro::NULL_CODEC, metadata);
writer.write(manifest);
writer.flush();
4_KiB, &bufs, &bytes_streamed);
{
avro::DataFileWriter<manifest_file> writer(
std::move(out),
manifest_file_schema,
16_KiB,
avro::NULL_CODEC,
metadata);
writer.write(manifest);
writer.flush();

// NOTE: ~DataFileWriter does a final sync which may write to the
// chunks. Destruct the writer before moving ownership of the chunks.
}
iobuf buf;
for (auto& b : bufs) {
buf.append(std::move(b));
}
auto in = std::make_unique<avro_iobuf_istream>(buf.copy());
avro::DataFileReader<manifest_file> reader(
std::move(in), manifest_file_schema);
Expand Down

0 comments on commit 36ae03c

Please sign in to comment.