From 915b62d5fa4f011ebb40d16c2a20235e7202f924 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 22 Jul 2024 00:42:49 -0700 Subject: [PATCH 1/7] iceberg: fix avro output stream We were previously using the wrong iobuf append method, which would transparently pack fragments rather than leaving the fragments their respective sizes. This led to exceptions being thrown when serializing larger Avro files. --- src/v/iceberg/avro_utils.h | 9 ++- .../tests/manifest_serialization_test.cc | 60 ++++++++++++++++--- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/src/v/iceberg/avro_utils.h b/src/v/iceberg/avro_utils.h index 83b5dae4e5c3a..a1aa161cd5336 100644 --- a/src/v/iceberg/avro_utils.h +++ b/src/v/iceberg/avro_utils.h @@ -10,6 +10,8 @@ #include "bytes/iobuf.h" +#include + #include #include @@ -31,7 +33,12 @@ class avro_iobuf_ostream : public avro::OutputStream { // space. bool next(uint8_t** data, size_t* len) final { if (available_ == 0) { - buf_->append(ss::temporary_buffer{chunk_size_}); + // NOTE: it is critical to add the buffer to a separate iobuf first + // and then add that iobuf's fragments, as adding a buffer directly + // to `buf_` may end up packing the fragments together. + iobuf new_frag; + new_frag.append(ss::temporary_buffer{chunk_size_}); + buf_->append_fragments(std::move(new_frag)); available_ = chunk_size_; } auto back_frag = buf_->rbegin(); diff --git a/src/v/iceberg/tests/manifest_serialization_test.cc b/src/v/iceberg/tests/manifest_serialization_test.cc index 240c307da04b3..dcc29a40bcaf4 100644 --- a/src/v/iceberg/tests/manifest_serialization_test.cc +++ b/src/v/iceberg/tests/manifest_serialization_test.cc @@ -21,6 +21,24 @@ using namespace iceberg; +namespace { + +// Returns true if the trivial, non-union type fields match between the two +// manifest entries. +// TODO: define a manifest_entry struct that isn't tied to Avro codegen, that +// has a trivial operator==. +bool trivial_fields_eq(const manifest_entry& lhs, const manifest_entry& rhs) { + return lhs.status == rhs.status + && lhs.data_file.content == rhs.data_file.content + && lhs.data_file.file_format == rhs.data_file.file_format + && lhs.data_file.file_path == rhs.data_file.file_path + && lhs.data_file.record_count == rhs.data_file.record_count + && lhs.data_file.file_size_in_bytes + == rhs.data_file.file_size_in_bytes; +} + +} // anonymous namespace + TEST(ManifestSerializationTest, TestManifestEntry) { manifest_entry entry; entry.status = 1; @@ -47,13 +65,41 @@ TEST(ManifestSerializationTest, TestManifestEntry) { manifest_entry dentry; avro::decode(*decoder, dentry); - EXPECT_EQ(entry.status, dentry.status); - EXPECT_EQ(entry.data_file.content, dentry.data_file.content); - EXPECT_EQ(entry.data_file.file_path, dentry.data_file.file_path); - EXPECT_EQ(entry.data_file.file_format, dentry.data_file.file_format); - EXPECT_EQ(entry.data_file.record_count, dentry.data_file.record_count); - EXPECT_EQ( - entry.data_file.file_size_in_bytes, dentry.data_file.file_size_in_bytes); + EXPECT_TRUE(trivial_fields_eq(entry, dentry)); +} + +TEST(ManifestSerializationTest, TestManyManifestEntries) { + manifest_entry entry; + entry.status = 1; + entry.data_file.content = 2; + entry.data_file.file_path = "path/to/file"; + entry.data_file.file_format = "PARQUET"; + entry.data_file.partition = {}; + entry.data_file.record_count = 3; + entry.data_file.file_size_in_bytes = 1024; + + iobuf buf; + auto out = std::make_unique(4096, &buf); + + // Encode many entries. This is a regression test for a bug where + // serializing large Avro files would handle iobuf fragments improperly, + // leading to incorrect serialization. + avro::EncoderPtr encoder = avro::binaryEncoder(); + encoder->init(*out); + for (int i = 0; i < 1024; i++) { + avro::encode(*encoder, entry); + encoder->flush(); + } + + // Decode the iobuf from the input stream. + auto in = std::make_unique(std::move(buf)); + avro::DecoderPtr decoder = avro::binaryDecoder(); + decoder->init(*in); + for (int i = 0; i < 1024; i++) { + manifest_entry dentry; + avro::decode(*decoder, dentry); + EXPECT_TRUE(trivial_fields_eq(entry, dentry)); + } } TEST(ManifestSerializationTest, TestManifestFile) { From 768ddad6d1a57154088127701c75cbe20dc84024 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Fri, 19 Jul 2024 08:50:35 -0700 Subject: [PATCH 2/7] iceberg/tests: script for generating iceberg manifests Adds a script that will be used to generate a test manifest using PyIceberg. The schema in this script matches one that is used in a few iceberg tests already. --- .../tests/gen_test_iceberg_manifest.py | 144 ++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 src/v/iceberg/tests/gen_test_iceberg_manifest.py diff --git a/src/v/iceberg/tests/gen_test_iceberg_manifest.py b/src/v/iceberg/tests/gen_test_iceberg_manifest.py new file mode 100644 index 0000000000000..9a31b4f356fca --- /dev/null +++ b/src/v/iceberg/tests/gen_test_iceberg_manifest.py @@ -0,0 +1,144 @@ +#!/usr/bin/python + +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import argparse +from pyiceberg.io.fsspec import FsspecFileIO +from pyiceberg.manifest import write_manifest, DataFile, DataFileContent, ManifestEntry +from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.types import StringType, ListType, IntegerType, StructType, BooleanType, NestedField, MapType, FloatType + +# TODO: support some other schemas. +# Define the nested schema. This matches the one at +# src/v/iceberg/tests/test_schemas.cc +nested_schema = Schema( + NestedField(field_id=1, + required=False, + name="foo", + field_type=StringType()), + NestedField(field_id=2, + required=True, + name="bar", + field_type=IntegerType()), + NestedField(field_id=3, + required=False, + name="baz", + field_type=BooleanType()), + NestedField(field_id=4, + required=True, + name="qux", + field_type=ListType(element_id=5, + element_required=True, + element_type=StringType())), + NestedField(field_id=6, + required=True, + name="quux", + field_type=MapType(key_id=7, + key_type=StringType(), + value_id=8, + value_type=MapType( + key_id=9, + key_type=StringType(), + value_id=10, + value_required=True, + value_type=IntegerType()))), + NestedField(field_id=11, + required=True, + name="location", + field_type=ListType(element_id=12, + element_required=True, + element_type=StructType( + NestedField(field_id=13, + required=False, + name="latitude", + field_type=FloatType()), + NestedField(field_id=14, + required=False, + name="longitude", + field_type=FloatType())))), + NestedField(field_id=15, + required=False, + name="person", + field_type=StructType( + NestedField(field_id=16, + name="name", + required=False, + field_type=StringType()), + NestedField(field_id=17, + name="age", + required=True, + field_type=IntegerType()), + ))) + + +def make_manifest_entries(num_entries: int) -> list[ManifestEntry]: + manifest_entries: list[ManifestEntry] = [] + for i in range(num_entries): + data_file = DataFile( + content=DataFileContent.DATA, + file_path=f"data/path/file-{i}.parquet", + file_format='PARQUET', + partition={}, + record_count=i, + file_size_in_bytes=i, + column_sizes={}, + value_counts={}, + null_value_counts={}, + nan_value_counts={}, + lower_bounds={}, + upper_bounds={}, + key_metadata=None, + split_offsets=[], + equality_ids=[], + sort_order_id=i, + ) + manifest_entry = ManifestEntry(status=0, + snapshot_id=i, + data_sequence_number=i, + file_sequence_number=i, + data_file=data_file) + manifest_entries.append(manifest_entry) + return manifest_entries + + +def main(args): + # TODO: add once we have support serialization of partition specs. + spec = PartitionSpec(fields=[]) + + file_io = FsspecFileIO(properties={}) + output_file = file_io.new_output(args.out_file) + with write_manifest(schema=nested_schema, + snapshot_id=1, + spec=spec, + output_file=output_file, + format_version=2) as writer: + for entry in make_manifest_entries(args.num_entries): + writer.add_entry(entry) + print(f"Successfully generated manifest with {args.num_entries} entries " + f"at {args.out_file}") + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Write an Apache Iceberg manifest file") + parser.add_argument("-o", + "--out-file", + type=str, + required=True, + help="Destination file") + parser.add_argument( + "-n", + "--num-entries", + type=int, + default=10, + help="The number of data files to represent in the manifest") + args = parser.parse_args() + main(args) From eea6b0d9765fe53354359dd097c2ee8df21e16be Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 22 Jul 2024 16:55:21 -0700 Subject: [PATCH 3/7] iceberg: add partition_spec struct Adds a partition_spec, that will be included in a later commit as a member of the Apache Iceberg manifest[1]. The partition spec includes information relating to the Iceberg data files recorded in a given manifest. This commit adds the definition, some comparators, and some tests for the comparators. [1] https://iceberg.apache.org/spec/#manifests --- src/v/iceberg/BUILD | 17 ++++ src/v/iceberg/CMakeLists.txt | 1 + src/v/iceberg/partition.cc | 37 ++++++++ src/v/iceberg/partition.h | 62 +++++++++++++ src/v/iceberg/tests/BUILD | 14 +++ src/v/iceberg/tests/CMakeLists.txt | 1 + src/v/iceberg/tests/partition_test.cc | 125 ++++++++++++++++++++++++++ 7 files changed, 257 insertions(+) create mode 100644 src/v/iceberg/partition.cc create mode 100644 src/v/iceberg/partition.h create mode 100644 src/v/iceberg/tests/partition_test.cc diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index b9b01537a5b52..3e9e1a70213b7 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -78,6 +78,23 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "partition", + srcs = [ + "partition.cc", + ], + hdrs = [ + "partition.h", + ], + include_prefix = "iceberg", + deps = [ + ":datatypes", + "//src/v/container:fragmented_vector", + "//src/v/utils:named_type", + "@seastar", + ], +) + redpanda_cc_library( name = "schema", hdrs = [ diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index 02b427c303002..dd44805deb278 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -31,6 +31,7 @@ v_cc_library( datatypes.cc datatypes_json.cc json_utils.cc + partition.cc schema_json.cc DEPS v::container diff --git a/src/v/iceberg/partition.cc b/src/v/iceberg/partition.cc new file mode 100644 index 0000000000000..95839b32738ae --- /dev/null +++ b/src/v/iceberg/partition.cc @@ -0,0 +1,37 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#include "iceberg/partition.h" + +namespace iceberg { + +struct transform_comparison_visitor { + template + bool operator()(const T&, const U&) const { + static_assert(!std::is_same::value); + return false; + } + bool + operator()(const bucket_transform& lhs, const bucket_transform& rhs) const { + return lhs.n == rhs.n; + } + bool operator()( + const truncate_transform& lhs, const truncate_transform& rhs) const { + return lhs.length == rhs.length; + } + template + bool operator()(const T&, const T&) const { + return true; + } +}; + +bool operator==(const transform& lhs, const transform& rhs) { + return std::visit(transform_comparison_visitor{}, lhs, rhs); +} + +} // namespace iceberg diff --git a/src/v/iceberg/partition.h b/src/v/iceberg/partition.h new file mode 100644 index 0000000000000..84ddb94dff68e --- /dev/null +++ b/src/v/iceberg/partition.h @@ -0,0 +1,62 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "container/fragmented_vector.h" +#include "iceberg/datatypes.h" +#include "utils/named_type.h" + +#include + +namespace iceberg { + +struct identity_transform {}; +struct bucket_transform { + uint32_t n; +}; +struct truncate_transform { + uint32_t length; +}; +struct year_transform {}; +struct month_transform {}; +struct day_transform {}; +struct hour_transform {}; +struct void_transform {}; + +using transform = std::variant< + identity_transform, + bucket_transform, + truncate_transform, + year_transform, + month_transform, + hour_transform, + void_transform>; +bool operator==(const transform& lhs, const transform& rhs); + +struct partition_field { + using id_t = named_type; + nested_field::id_t source_id; + id_t field_id; + ss::sstring name; + transform transform; + + friend bool operator==(const partition_field&, const partition_field&) + = default; +}; + +struct partition_spec { + using id_t = named_type; + id_t spec_id; + chunked_vector fields; + + friend bool operator==(const partition_spec&, const partition_spec&) + = default; +}; + +} // namespace iceberg diff --git a/src/v/iceberg/tests/BUILD b/src/v/iceberg/tests/BUILD index b482ac50bc872..4e9db37b3c397 100644 --- a/src/v/iceberg/tests/BUILD +++ b/src/v/iceberg/tests/BUILD @@ -62,6 +62,20 @@ redpanda_cc_gtest( ], ) +redpanda_cc_gtest( + name = "partition_test", + timeout = "short", + srcs = [ + "partition_test.cc", + ], + deps = [ + "//src/v/container:fragmented_vector", + "//src/v/iceberg:partition", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + ], +) + redpanda_cc_gtest( name = "schema_json_test", timeout = "short", diff --git a/src/v/iceberg/tests/CMakeLists.txt b/src/v/iceberg/tests/CMakeLists.txt index e5366c5b900d9..4df8324925adb 100644 --- a/src/v/iceberg/tests/CMakeLists.txt +++ b/src/v/iceberg/tests/CMakeLists.txt @@ -9,6 +9,7 @@ rp_test( datatypes_json_test.cc datatypes_test.cc manifest_serialization_test.cc + partition_test.cc schema_json_test.cc test_schemas.cc LIBRARIES diff --git a/src/v/iceberg/tests/partition_test.cc b/src/v/iceberg/tests/partition_test.cc new file mode 100644 index 0000000000000..c1528fdc773ef --- /dev/null +++ b/src/v/iceberg/tests/partition_test.cc @@ -0,0 +1,125 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "container/fragmented_vector.h" +#include "iceberg/partition.h" + +#include + +using namespace iceberg; + +chunked_vector all_transforms() { + chunked_vector all_transforms; + + all_transforms.emplace_back(identity_transform{}); + all_transforms.emplace_back(bucket_transform{.n = 1024}); + all_transforms.emplace_back(truncate_transform{.length = 2048}); + all_transforms.emplace_back(year_transform{}); + all_transforms.emplace_back(month_transform{}); + all_transforms.emplace_back(hour_transform{}); + all_transforms.emplace_back(void_transform{}); + return all_transforms; +} + +void check_single_transform_exists( + const transform& expected_type, + const chunked_vector& all_transforms) { + size_t num_eq = 0; + size_t num_ne = 0; + for (const auto& t : all_transforms) { + if (t == expected_type) { + ++num_eq; + } + if (t != expected_type) { + ++num_ne; + } + } + ASSERT_EQ(num_eq, 1); + ASSERT_EQ(num_ne, all_transforms.size() - 1); +} + +TEST(PartitionTest, TestTransformsEquality) { + auto transforms = all_transforms(); + for (const auto& t : transforms) { + ASSERT_NO_FATAL_FAILURE( + check_single_transform_exists(t, all_transforms())); + } +} + +TEST(PartitionTest, TestBucketTransform) { + bucket_transform t1{.n = 1024}; + auto t1_copy = t1; + auto t2 = t1_copy; + t2.n = 1025; + ASSERT_EQ(t1, t1_copy); + ASSERT_NE(t1, t2); +} + +TEST(PartitionTest, TestTruncateTransform) { + truncate_transform t1{.length = 1024}; + auto t1_copy = t1; + auto t2 = t1_copy; + t2.length = 1025; + ASSERT_EQ(t1, t1_copy); + ASSERT_NE(t1, t2); +} + +TEST(PartitionTest, TestPartitionField) { + auto make_field = [](int32_t i, const ss::sstring& name, transform t) { + return partition_field{ + .field_id = partition_field::id_t{i}, + .name = name, + .transform = t, + }; + }; + auto t1 = make_field(0, "foo", identity_transform{}); + auto t1_copy = t1; + auto t2 = make_field(1, "foo", identity_transform{}); + auto t3 = make_field(0, "fo", identity_transform{}); + auto t4 = make_field(0, "foo", void_transform{}); + ASSERT_EQ(t1, t1_copy); + ASSERT_NE(t1, t2); + ASSERT_NE(t1, t3); + ASSERT_NE(t1, t4); +} + +TEST(PartitionTest, TestPartitionSpec) { + auto make_spec_single = []( + int32_t spec_id, + int32_t field_id, + const ss::sstring& name, + transform t) { + return partition_spec{ + .spec_id = partition_spec::id_t{spec_id}, + .fields = chunked_vector{partition_field{ + .field_id = partition_field::id_t{field_id}, + .name = name, + .transform = t, + }}}; + }; + auto t1 = make_spec_single(0, 1, "foo", identity_transform{}); + auto t1_dup = make_spec_single(0, 1, "foo", identity_transform{}); + ASSERT_EQ(t1, t1_dup); + auto t2 = make_spec_single(1, 1, "foo", identity_transform{}); + auto t3 = make_spec_single(0, 0, "foo", identity_transform{}); + auto t4 = make_spec_single(0, 1, "fo", identity_transform{}); + auto t5 = make_spec_single(0, 1, "foo", void_transform{}); + auto t6 = make_spec_single(0, 1, "foo", void_transform{}); + t6.fields.pop_back(); + auto t7 = make_spec_single(0, 1, "foo", void_transform{}); + t7.fields.emplace_back(t1.fields[0]); + + ASSERT_EQ(t1, t1_dup); + ASSERT_NE(t1, t2); + ASSERT_NE(t1, t3); + ASSERT_NE(t1, t4); + ASSERT_NE(t1, t5); + ASSERT_NE(t1, t6); + ASSERT_NE(t1, t7); +} From 910e219bf583deaf0594c7086ae42b2c3eb8bf68 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 15 Jul 2024 23:28:06 -0700 Subject: [PATCH 4/7] iceberg: add manifest struct Adds an implementation of the Apache Iceberg manifest, which represents a set of files to be added or removed from a table. Note, in this implementation we represent the manifest entries (i.e. the data files) using structs generated using avrogencpp. These structs are restrictive in a few ways: - there is no built-in comparator - comparators are difficult to implement since the generated code makes heavy use of std::any in Avro union types - the manifest_entry r102 ("partition"[1]) field is incomplete, as it depends on runtime partitioning and can't be statically generated I've left a TODO to address this, but it's still worth checking this code in to allow for testing of other pieces of manifest serialization. [1] https://iceberg.apache.org/spec/#manifests --- src/v/iceberg/BUILD | 16 ++++++++++++ src/v/iceberg/manifest.h | 53 +++++++++++++++++++++++++++++++++++++++ src/v/iceberg/tests/BUILD | 1 + 3 files changed, 70 insertions(+) create mode 100644 src/v/iceberg/manifest.h diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index 3e9e1a70213b7..7ed848484d550 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -78,6 +78,22 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "manifest", + hdrs = [ + "manifest.h", + ], + include_prefix = "iceberg", + deps = [ + ":datatypes", + ":manifest_entry", + ":partition", + ":schema", + "//src/v/container:fragmented_vector", + "//src/v/utils:named_type", + ], +) + redpanda_cc_library( name = "partition", srcs = [ diff --git a/src/v/iceberg/manifest.h b/src/v/iceberg/manifest.h new file mode 100644 index 0000000000000..23f0d87db01f5 --- /dev/null +++ b/src/v/iceberg/manifest.h @@ -0,0 +1,53 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "container/fragmented_vector.h" +#include "iceberg/manifest_entry.h" +#include "iceberg/partition.h" +#include "iceberg/schema.h" + +namespace iceberg { + +enum class format_version : uint8_t { + v1, + v2, +}; + +enum class manifest_content_type { + data, + deletes, +}; + +struct manifest_metadata { + schema schema; + partition_spec partition_spec; + format_version format_version; + manifest_content_type manifest_content_type; + + friend bool operator==(const manifest_metadata&, const manifest_metadata&) + = default; +}; + +struct manifest { + manifest_metadata metadata; + + // TODO: the manifest_entry is code-generated with avrogencpp, which is + // restrictive for a few reasons: + // - there is no built-in comparator + // - comparators are difficult to implement since the generated code makes + // heavy use of std::any in Avro union types + // - the manifest_entry r102 (partition) field is incomplete, as it depends + // on runtime partitioning and can't be statically generated + // Rather than using these generated structs, write explicit structs to + // represent the entries, and only use the Avro structs for serialization. + chunked_vector entries; +}; + +} // namespace iceberg diff --git a/src/v/iceberg/tests/BUILD b/src/v/iceberg/tests/BUILD index 4e9db37b3c397..963577b61c782 100644 --- a/src/v/iceberg/tests/BUILD +++ b/src/v/iceberg/tests/BUILD @@ -53,6 +53,7 @@ redpanda_cc_gtest( "//src/v/base", "//src/v/bytes:iobuf", "//src/v/iceberg:avro_utils", + "//src/v/iceberg:manifest", "//src/v/iceberg:manifest_entry", "//src/v/iceberg:manifest_file", "//src/v/test_utils:gtest", From d56755e667f7d1e89b88f13598b531ed5087033b Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Mon, 22 Jul 2024 17:41:27 -0700 Subject: [PATCH 5/7] iceberg: tweak Avro ostream utility to return size Because Avro's serializer utilities works in chunks, we can end up with slightly oversized Avro files. This tweaks the interface to our iobuf converter to pass in mutable size that can be used by callers to trim the iobuf when done serializing. --- src/v/iceberg/avro_utils.h | 19 +++++++++++-------- .../tests/manifest_serialization_test.cc | 16 ++++++++++++---- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/v/iceberg/avro_utils.h b/src/v/iceberg/avro_utils.h index a1aa161cd5336..fb4ff1c020f21 100644 --- a/src/v/iceberg/avro_utils.h +++ b/src/v/iceberg/avro_utils.h @@ -15,15 +15,18 @@ #include #include +namespace iceberg { + // Near-identical implementation of avro::MemoryOutputStream, but backed by an // iobuf that can be released. class avro_iobuf_ostream : public avro::OutputStream { public: - explicit avro_iobuf_ostream(size_t chunk_size, iobuf* buf) + explicit avro_iobuf_ostream( + size_t chunk_size, iobuf* buf, size_t* byte_count) : chunk_size_(chunk_size) , buf_(buf) , available_(0) - , byte_count_(0) {} + , byte_count_(byte_count) {} ~avro_iobuf_ostream() override = default; // If there's no available space in the buffer, allocates `chunk_size_` @@ -45,17 +48,17 @@ class avro_iobuf_ostream : public avro::OutputStream { *data = reinterpret_cast( back_frag->share(chunk_size_ - available_, available_).get_write()); *len = available_; - byte_count_ += available_; + *byte_count_ += available_; available_ = 0; return true; } void backup(size_t len) final { available_ += len; - byte_count_ -= len; + *byte_count_ -= len; } - uint64_t byteCount() const final { return byte_count_; } + uint64_t byteCount() const final { return *byte_count_; } void flush() final {} @@ -64,12 +67,10 @@ class avro_iobuf_ostream : public avro::OutputStream { const size_t chunk_size_; iobuf* buf_; - - // Bytes remaining in the last fragment in the buffer. size_t available_; // Total number of bytes. - size_t byte_count_; + size_t* byte_count_; }; // InputStream implementation that takes an iobuf as input. @@ -145,3 +146,5 @@ class avro_iobuf_istream : public avro::InputStream { size_t cur_frag_pos_; size_t cur_pos_; }; + +} // namespace iceberg diff --git a/src/v/iceberg/tests/manifest_serialization_test.cc b/src/v/iceberg/tests/manifest_serialization_test.cc index dcc29a40bcaf4..223a8bfb3e3d5 100644 --- a/src/v/iceberg/tests/manifest_serialization_test.cc +++ b/src/v/iceberg/tests/manifest_serialization_test.cc @@ -50,7 +50,9 @@ TEST(ManifestSerializationTest, TestManifestEntry) { entry.data_file.file_size_in_bytes = 1024; iobuf buf; - auto out = std::make_unique(4096, &buf); + size_t bytes_streamed{0}; + auto out = std::make_unique( + 4096, &buf, &bytes_streamed); // Encode to the output stream. avro::EncoderPtr encoder = avro::binaryEncoder(); @@ -79,7 +81,9 @@ TEST(ManifestSerializationTest, TestManyManifestEntries) { entry.data_file.file_size_in_bytes = 1024; iobuf buf; - auto out = std::make_unique(4096, &buf); + size_t bytes_streamed{0}; + auto out = std::make_unique( + 4096, &buf, &bytes_streamed); // Encode many entries. This is a regression test for a bug where // serializing large Avro files would handle iobuf fragments improperly, @@ -118,7 +122,9 @@ TEST(ManifestSerializationTest, TestManifestFile) { manifest.deleted_rows_count = 11; iobuf buf; - auto out = std::make_unique(4096, &buf); + size_t bytes_streamed{0}; + auto out = std::make_unique( + 4096, &buf, &bytes_streamed); // Encode to the output stream. avro::EncoderPtr encoder = avro::binaryEncoder(); @@ -172,7 +178,9 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { metadata["f2"] = f2; iobuf buf; - auto out = std::make_unique(4_KiB, &buf); + size_t bytes_streamed{0}; + auto out = std::make_unique( + 4_KiB, &buf, &bytes_streamed); avro::DataFileWriter writer( std::move(out), manifest_file_schema, 16_KiB, avro::NULL_CODEC, metadata); writer.write(manifest); From 95a93cd60f231ba30e80a06c2c287862f23b68e1 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Wed, 17 Jul 2024 17:32:50 -0700 Subject: [PATCH 6/7] iceberg: add serialization of manifests into Avro Adds some serialization code of Iceberg manifests into Avro. We are using code-generated Avro code to serialize manifest_entry structs, and are adding additional metadata as specified by Iceberg's spec[1]. This adds a test to exercise a roundtrip of deserialization, using data written with PyIceberg. Note that this implementation is incomplete, as serializing the partition_spec will require some more foundational code to represent literal values. For now, serialization in either direction regarding partitioning results in an empty partition spec. I've manually verified that PyIceberg can at least read back the files after a serialization roundtrip. [1] https://iceberg.apache.org/spec/#manifests --- src/v/iceberg/BUILD | 27 +++ src/v/iceberg/CMakeLists.txt | 3 + src/v/iceberg/manifest_avro.cc | 184 ++++++++++++++++++ src/v/iceberg/manifest_avro.h | 19 ++ src/v/iceberg/tests/BUILD | 4 + src/v/iceberg/tests/CMakeLists.txt | 5 + .../tests/manifest_serialization_test.cc | 28 +++ src/v/iceberg/tests/testdata/README.md | 7 + .../tests/testdata/nested_manifest.avro | Bin 0 -> 13307 bytes 9 files changed, 277 insertions(+) create mode 100644 src/v/iceberg/manifest_avro.cc create mode 100644 src/v/iceberg/manifest_avro.h create mode 100644 src/v/iceberg/tests/testdata/README.md create mode 100644 src/v/iceberg/tests/testdata/nested_manifest.avro diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index 7ed848484d550..af4fb95d739b5 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -94,6 +94,33 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "manifest_avro", + srcs = [ + "manifest_avro.cc", + ], + hdrs = [ + "manifest_avro.h", + ], + include_prefix = "iceberg", + deps = [ + ":avro_utils", + ":datatypes_json", + ":json_utils", + ":manifest", + ":manifest_entry", + ":partition", + ":schema", + ":schema_json", + "//src/v/base", + "//src/v/bytes:iobuf", + "//src/v/container:fragmented_vector", + "//src/v/strings:string_switch", + "//src/v/utils:named_type", + "@avro", + ], +) + redpanda_cc_library( name = "partition", srcs = [ diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index dd44805deb278..f95777a2c4305 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -31,9 +31,12 @@ v_cc_library( datatypes.cc datatypes_json.cc json_utils.cc + manifest_avro.cc partition.cc schema_json.cc DEPS + Avro::avro + v::bytes v::container v::json v::strings diff --git a/src/v/iceberg/manifest_avro.cc b/src/v/iceberg/manifest_avro.cc new file mode 100644 index 0000000000000..5bef3e17f9524 --- /dev/null +++ b/src/v/iceberg/manifest_avro.cc @@ -0,0 +1,184 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#include "iceberg/manifest_avro.h" + +#include "base/units.h" +#include "bytes/iobuf.h" +#include "iceberg/avro_utils.h" +#include "iceberg/datatypes_json.h" +#include "iceberg/json_utils.h" +#include "iceberg/manifest.h" +#include "iceberg/schema.h" +#include "iceberg/schema_json.h" +#include "strings/string_switch.h" + +#include + +namespace iceberg { + +namespace { +ss::sstring schema_to_json_str(const schema& s) { + json::StringBuffer buf; + json::Writer w(buf); + rjson_serialize(w, s); + return buf.GetString(); +} +schema schema_from_str(const std::string& s) { + json::Document parsed_schema; + parsed_schema.Parse(s); + return parse_schema(parsed_schema); +} + +constexpr std::string_view content_type_to_str(manifest_content_type t) { + switch (t) { + case manifest_content_type::data: + return "data"; + case manifest_content_type::deletes: + return "deletes"; + } +} +manifest_content_type content_type_from_str(std::string_view s) { + return string_switch(s) + .match( + content_type_to_str(manifest_content_type::data), + manifest_content_type::data) + .match( + content_type_to_str(manifest_content_type::deletes), + manifest_content_type::deletes); +} + +constexpr std::string_view format_to_str(format_version v) { + switch (v) { + case format_version::v1: + return "1"; + case format_version::v2: + return "2"; + } +} +format_version format_from_str(std::string_view s) { + return string_switch(s) + .match(format_to_str(format_version::v1), format_version::v1) + .match(format_to_str(format_version::v2), format_version::v2); +} + +struct partition_spec_strs { + ss::sstring spec_id_str; + ss::sstring fields_json_str; +}; +partition_spec partition_spec_from_str(const partition_spec_strs& strs) { + auto spec_id = std::stoi(strs.spec_id_str); + json::Document parsed_spec; + parsed_spec.Parse(strs.fields_json_str); + if (!parsed_spec.IsObject()) { + throw std::invalid_argument(fmt::format( + "'partition-spec' metadata has type '{}' instead of object", + parsed_spec.GetType())); + } + auto spec_spec_id = parse_required_i32(parsed_spec, "spec-id"); + if (spec_spec_id != spec_id) { + throw std::invalid_argument(fmt::format( + "Mismatched partition spec id {} vs {}", spec_id, spec_spec_id)); + } + return partition_spec{ + .spec_id = partition_spec::id_t{spec_id}, + // TODO: implement me! + .fields = {}, + }; +} +partition_spec_strs partition_spec_to_str(const partition_spec& spec) { + partition_spec_strs strs; + strs.spec_id_str = fmt::format("{}", spec.spec_id()); + // TODO: implement me! + strs.fields_json_str = fmt::format( + "{{\"spec-id\":{},\"fields\":[]}}", spec.spec_id()); + return strs; +} + +std::map +metadata_to_map(const manifest_metadata& meta) { + auto partition_spec_strs = partition_spec_to_str(meta.partition_spec); + return { + {"schema", schema_to_json_str(meta.schema)}, + {"content", std::string{content_type_to_str(meta.manifest_content_type)}}, + {"partition-spec", partition_spec_strs.fields_json_str}, + {"partition-spec-id", partition_spec_strs.spec_id_str}, + {"format-version", std::string{format_to_str(meta.format_version)}}}; +} +// TODO: make DataFileReader::getMetadata const! +manifest_metadata +metadata_from_reader(avro::DataFileReader& rdr) { + const auto find_required_str = [&rdr](const std::string& key) { + auto val = rdr.getMetadata(key); + if (!val) { + throw std::invalid_argument( + fmt::format("Manifest metadata missing field '{}'", key)); + } + return *val; + }; + manifest_metadata m; + m.manifest_content_type = content_type_from_str( + find_required_str("content")); + m.format_version = format_from_str(find_required_str("format-version")); + m.partition_spec = partition_spec_from_str(partition_spec_strs{ + .spec_id_str = find_required_str("partition-spec-id"), + .fields_json_str = find_required_str("partition-spec"), + }); + m.schema = schema_from_str(find_required_str("schema")); + return m; +} + +} // anonymous namespace + +iobuf serialize_avro(const manifest& m) { + iobuf buf; + size_t bytes_streamed = 0; + auto out = std::make_unique( + 4_KiB, &buf, &bytes_streamed); + static constexpr size_t avro_default_sync_bytes = 16_KiB; + auto meta = metadata_to_map(m.metadata); + avro::DataFileWriter writer( + std::move(out), + manifest_entry::valid_schema(), + avro_default_sync_bytes, + avro::NULL_CODEC, + meta); + + // TODO: the Avro code-generated manifest_entry doesn't have the r102 + // partition field defined, as it relies on runtime information of the + // partition spec! + for (const auto& e : m.entries) { + writer.write(e); + } + writer.flush(); + writer.close(); + buf.trim_back(buf.size_bytes() - bytes_streamed); + return buf; +} + +manifest parse_manifest(iobuf buf) { + auto in = std::make_unique(buf.copy()); + avro::DataFileReader reader( + std::move(in), manifest_entry::valid_schema()); + auto meta = metadata_from_reader(reader); + chunked_vector entries; + while (true) { + manifest_entry e; + auto did_read = reader.read(e); + if (!did_read) { + break; + } + entries.emplace_back(std::move(e)); + } + manifest m; + m.metadata = std::move(meta); + m.entries = std::move(entries); + return m; +} + +} // namespace iceberg diff --git a/src/v/iceberg/manifest_avro.h b/src/v/iceberg/manifest_avro.h new file mode 100644 index 0000000000000..e2e138af8af2c --- /dev/null +++ b/src/v/iceberg/manifest_avro.h @@ -0,0 +1,19 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "bytes/iobuf.h" +#include "iceberg/manifest.h" + +namespace iceberg { + +iobuf serialize_avro(const manifest&); +manifest parse_manifest(iobuf); + +} // namespace iceberg diff --git a/src/v/iceberg/tests/BUILD b/src/v/iceberg/tests/BUILD index 963577b61c782..c1b42d9246e02 100644 --- a/src/v/iceberg/tests/BUILD +++ b/src/v/iceberg/tests/BUILD @@ -50,13 +50,17 @@ redpanda_cc_gtest( "manifest_serialization_test.cc", ], deps = [ + ":test_schemas", "//src/v/base", "//src/v/bytes:iobuf", "//src/v/iceberg:avro_utils", "//src/v/iceberg:manifest", + "//src/v/iceberg:manifest_avro", "//src/v/iceberg:manifest_entry", "//src/v/iceberg:manifest_file", + "//src/v/iceberg:schema_json", "//src/v/test_utils:gtest", + "//src/v/utils:file_io", "@avro", "@googletest//:gtest", "@seastar", diff --git a/src/v/iceberg/tests/CMakeLists.txt b/src/v/iceberg/tests/CMakeLists.txt index 4df8324925adb..3b4a36a8c0df9 100644 --- a/src/v/iceberg/tests/CMakeLists.txt +++ b/src/v/iceberg/tests/CMakeLists.txt @@ -1,5 +1,7 @@ find_package(Avro) +set(testdata_dir "${CMAKE_CURRENT_SOURCE_DIR}/testdata") + rp_test( UNIT_TEST GTEST @@ -18,5 +20,8 @@ rp_test( v::bytes v::gtest_main v::iceberg + v::utils + INPUT_FILES + "${testdata_dir}/nested_manifest.avro" ARGS "-- -c1" ) diff --git a/src/v/iceberg/tests/manifest_serialization_test.cc b/src/v/iceberg/tests/manifest_serialization_test.cc index 223a8bfb3e3d5..d6feb7acd1b46 100644 --- a/src/v/iceberg/tests/manifest_serialization_test.cc +++ b/src/v/iceberg/tests/manifest_serialization_test.cc @@ -10,10 +10,16 @@ #include "base/units.h" #include "bytes/iobuf.h" #include "iceberg/avro_utils.h" +#include "iceberg/manifest.h" +#include "iceberg/manifest_avro.h" #include "iceberg/manifest_entry.h" #include "iceberg/manifest_file.h" +#include "iceberg/schema_json.h" +#include "iceberg/tests/test_schemas.h" +#include "utils/file_io.h" #include +#include #include #include @@ -208,3 +214,25 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { EXPECT_EQ(manifest.existing_rows_count, dmanifest.existing_rows_count); EXPECT_EQ(manifest.deleted_rows_count, dmanifest.deleted_rows_count); } + +TEST(ManifestSerializationTest, TestSerializeManifestData) { + auto orig_buf = iobuf{ + ss::util::read_entire_file("nested_manifest.avro").get0()}; + auto m = parse_manifest(orig_buf.copy()); + ASSERT_EQ(100, m.entries.size()); + ASSERT_EQ(m.metadata.manifest_content_type, manifest_content_type::data); + ASSERT_EQ(m.metadata.format_version, format_version::v2); + ASSERT_EQ(m.metadata.partition_spec.spec_id, 0); + ASSERT_EQ(m.metadata.partition_spec.fields.size(), 0); + ASSERT_EQ( + m.metadata.schema.schema_struct, + std::get(test_nested_schema_type())); + + auto serialized_buf = serialize_avro(m); + auto m_roundtrip = parse_manifest(serialized_buf.copy()); + ASSERT_EQ(m.metadata, m_roundtrip.metadata); + ASSERT_EQ(100, m_roundtrip.entries.size()); + + auto roundtrip_buf = serialize_avro(m_roundtrip); + ASSERT_EQ(serialized_buf, roundtrip_buf); +} diff --git a/src/v/iceberg/tests/testdata/README.md b/src/v/iceberg/tests/testdata/README.md new file mode 100644 index 0000000000000..6049435ba8490 --- /dev/null +++ b/src/v/iceberg/tests/testdata/README.md @@ -0,0 +1,7 @@ +# Iceberg test data + +Data generated for use in tests. + +``` +python3 $RP_HOME/srv/v/iceberg/tests/gen_test_iceberg_manifest.py -o $RP_HOME/src/v/iceberg/tests/testdata/nested_manifest.avro -n 100 +``` diff --git a/src/v/iceberg/tests/testdata/nested_manifest.avro b/src/v/iceberg/tests/testdata/nested_manifest.avro new file mode 100644 index 0000000000000000000000000000000000000000..d5da55731c3a2f992cc55bb133512df1dbaa2fa3 GIT binary patch literal 13307 zcmb`L4Qmux7KU>V;)+NR5fKSe6e1#>On=0Dt%xg`;E0>eIvEki*j9H}n~s&Pwz@i+ zXhcLrL_|bHToDlw5s^VeM8pvh|HJ->{SSNYt?ufN_avh+a}HzDeeQGaJ@>q~ZslLO z8P&a^nxA%OohutJ*x`J|wMXn~7}WBiov@1~*DF-*k>4-arGh<@O4wy*R!03CSq37W%7Wu2lqhLgraGA%2Dl7wHVK@w{zu7 z>~o5{rLKUA8&su2EV;gBj~GIuf?90Lswc(mQqSp}ZmEeVAHAmLTB0INCG1Gyrop&H zN(FBI5^Fo?H3H|)wZ^9wdn!)ArRNRqtybK8`huUA~yGT1J$eN_E-~v#jj~G#Zv_4*T6M*E~;uuiM3*;vWib(W!Z%J!0vv z+K_ts*(x7xZaME}%eC2@%codOQg*TAxqrd#Un$$nTPNt4Y`O{j)_th&WCtjf zX_;Ge;BX=RJQvX({-4DS#03rQwo037RGe_SGf}d2&l`e? za~-aqO4e^jezDG#!f9*lSIe}TEAXR9L57z0h$R_kSkY=?jXZeZX#N?W96KxTd@4H6;8Xl){&|$E$8-5Pt+0xdL zQkvPvjSMWmXhnpoRf>|ChQnR0mKNM>shrKthi<2!Q_0poGLmd2n)E}*vozJx44o!o zwB^YAUTwCV)e%S^Vp{#ch?U>$d?yH;dA;;f=+5#v$cIVebZH(=jZHX{N)2Y`QmLW1 zR(eFsiN()&{Z)ft4(ax)rUu()9zE?_hBVgSIGUNy`4;(P*T>cAQr__<%{9%k_03>$ zia&2Zz4Ql>3jgU;^cF?cu}TFi^x1VmO)a@U3Z?3q_Kq+UiW&k-IozD+DwJ}VOPNmo zKq-g6zm)0ZYM0VzC&7)G_i6=xl$$4r<&+E7xHH~fTm5XwE>~c_!Ntw<;{{Hqbb-?= zFK{}wf&!;kufV!qmZ=x_us)&Mrc-vx%N8>;xSGYx3~@0t!z(XlW~goU-z5CZ@M;!Q z-(2*0zbLq#%f&4|G3{usQ{1FWJsN*5=Y#o*e%G-WZ#D2%&hvBaTUf1lr7-Iki&ZxS zA!J(aa&h|R4I;i=H2$EV|Kwn$GWurKxNk&Pt7V_#zv?N3PL3}@jd`>+JinYc95K&t zJvhT}lxoKe2flBHWBM;ee8Z?(HGT8sdq%+x-C(vH zs(l?r_XDnZP+;-kn_R#2lr%3O%kB6tqkLDb1a4Ko{qkeF$|?9&gj47j6}~B#s%9$D zm8x;$3;5D)_NMW{tWz!(-D;R+OfYXRZZ>bWdB5Q1*Xid~*MBbj`MUDA52^pY{@>`# zpP93Cv5Sm#C3OS;q+f7;(rfNb_SIj5x+nf|7~C-K%38}RmY z^470M-g=3*f!KgI(8=4d0eKrFUN^A;Z?Kcs-Hp6%iMNs1fH&01+qe;V8ztT*VgufA zCvVdxRtdO` z*boRB*|rUV+azEQu^|vd($j;$9tpUe*boRR*}fft+a;hyYzPFISQY{;31|}=0zoIX zjX+xh?jSY3tsK;RAuxRclr2uj(x6M;J=;4WeVU>cONYZn4{NxmjxO4+*? zfqNz3K4L>4C}rP11n!f7`-u&Kpp^al5x8Fh9w0UZf>I6~K;QuhI7(~?1f`6QB5+g! zju9IIK`CQn2pp4u2Z;@Vpp=6L5qMAn9wIgbf>I70Lf|0@c$nA_2ueA87=ec+;5e}% z5R@`Lj=*sVI6-Ux><6VxOdxPV0v;hY1cFkI96{g_2{=h?2n3}}P9ktp0v;tc1cFkI z9!20$33!ay5C}>+b_{{XB;avkLm(*S_;Catmw+dT4S}GP6DJUOLIR#7HUxrFPM$>I zNeOt0*boRxIduwwrzBvG*boRx$>k83lYj+c1K=J|N}+(jf&?rQ8v;Qo#UcWW5^$Q> z5C}?{o<`ub1e_r@1cFj#W)L_d0Xz3GO-~Llu|Auuq**9#D+jnN~MCp ziUbUZ4S}GPAV6Rs0YhR#ASfja5g1CqIbuT~C}nOAfpZe@Jh1_A0F-k6JOa;4zy)GM zASh*F0f7q=@FKAx5R`K9A_6Z;z)Qr2Kv2r1O9;FKz#FQIF00593TnBmIJm26(T|sv z$i1S-IC4QWR}@DA_lnHDs>nEUK|5C!M*{b%%)O?_IC4Qk*Azzr_nOSTuE;oYK}pvY zM*{b{%)Oz=IC4QyHxx$#_lC^9smM5R2SHai6-NU1rp&#i$T)IAUbhrS0{528y{*VN zazSOc6-NU1w#>bw$T)IAYW%rpP#Q zL2u6#M*{bm%zduNIC4R6&lN`k_qoh{p~yILL2oY6@=apZ#D{!knV+&^UQYemM93wnF4I1;$8W$qhA#(_HwdV8Zd z61Z<MaGc} zdi$U_61X2^?ngz&kqdhJs5lb1A7$<*MaGc}di$g}61bma?q@~DkqdhJtT+<5pJnbB fMaGc}di$a{61ZPv?pH;|kqdhJsyGt3UxE975%mJ{ literal 0 HcmV?d00001 From 36ae03c3f45cfb8c6ef7cbfb11d097156e0f5a26 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Tue, 23 Jul 2024 12:43:00 -0700 Subject: [PATCH 7/7] iceberg: refactor Avro ostream to avoid iobuf While ultimately we want an iobuf from the ostream, it currently requires pointer stability of the underlying fragments, which isn't an explicit contract of the iobuf. Instead, this tweaks the class to use a container of temporary_buffers directly, and expects callers to pass the buffers to an iobuf. There was some discussion on making iobuf smarter and providing an interface that guaranteed pointer stability of buffers. This seems like it'd be a heavy-handed requirement to impose on iobuf to enfroce. Putting the onus on callers seems like a quick, good-enough approach. --- src/v/iceberg/BUILD | 1 + src/v/iceberg/avro_utils.h | 23 ++++----- src/v/iceberg/manifest_avro.cc | 43 ++++++++++------ src/v/iceberg/tests/BUILD | 1 + .../tests/manifest_serialization_test.cc | 50 ++++++++++++++----- 5 files changed, 77 insertions(+), 41 deletions(-) diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index af4fb95d739b5..f33df5f40da80 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -118,6 +118,7 @@ redpanda_cc_library( "//src/v/strings:string_switch", "//src/v/utils:named_type", "@avro", + "@seastar", ], ) diff --git a/src/v/iceberg/avro_utils.h b/src/v/iceberg/avro_utils.h index fb4ff1c020f21..34d9aac7bbf13 100644 --- a/src/v/iceberg/avro_utils.h +++ b/src/v/iceberg/avro_utils.h @@ -9,6 +9,7 @@ #pragma once #include "bytes/iobuf.h" +#include "container/fragmented_vector.h" #include @@ -17,14 +18,15 @@ namespace iceberg { -// Near-identical implementation of avro::MemoryOutputStream, but backed by an -// iobuf that can be released. +// Near-identical implementation of avro::MemoryOutputStream, but backed by +// temporary buffers that callers can use to construct an iobuf. class avro_iobuf_ostream : public avro::OutputStream { public: + using buf_container_t = chunked_vector>; explicit avro_iobuf_ostream( - size_t chunk_size, iobuf* buf, size_t* byte_count) + size_t chunk_size, buf_container_t* bufs, size_t* byte_count) : chunk_size_(chunk_size) - , buf_(buf) + , bufs_(bufs) , available_(0) , byte_count_(byte_count) {} ~avro_iobuf_ostream() override = default; @@ -36,17 +38,12 @@ class avro_iobuf_ostream : public avro::OutputStream { // space. bool next(uint8_t** data, size_t* len) final { if (available_ == 0) { - // NOTE: it is critical to add the buffer to a separate iobuf first - // and then add that iobuf's fragments, as adding a buffer directly - // to `buf_` may end up packing the fragments together. - iobuf new_frag; - new_frag.append(ss::temporary_buffer{chunk_size_}); - buf_->append_fragments(std::move(new_frag)); + bufs_->emplace_back(chunk_size_); available_ = chunk_size_; } - auto back_frag = buf_->rbegin(); + auto& back_frag = bufs_->back(); *data = reinterpret_cast( - back_frag->share(chunk_size_ - available_, available_).get_write()); + back_frag.share(chunk_size_ - available_, available_).get_write()); *len = available_; *byte_count_ += available_; available_ = 0; @@ -66,7 +63,7 @@ class avro_iobuf_ostream : public avro::OutputStream { // Size in bytes with which to allocate new fragments. const size_t chunk_size_; - iobuf* buf_; + buf_container_t* bufs_; size_t available_; // Total number of bytes. diff --git a/src/v/iceberg/manifest_avro.cc b/src/v/iceberg/manifest_avro.cc index 5bef3e17f9524..5d299e15a2c12 100644 --- a/src/v/iceberg/manifest_avro.cc +++ b/src/v/iceberg/manifest_avro.cc @@ -18,6 +18,8 @@ #include "iceberg/schema_json.h" #include "strings/string_switch.h" +#include + #include namespace iceberg { @@ -136,27 +138,36 @@ metadata_from_reader(avro::DataFileReader& rdr) { } // anonymous namespace iobuf serialize_avro(const manifest& m) { - iobuf buf; size_t bytes_streamed = 0; - auto out = std::make_unique( - 4_KiB, &buf, &bytes_streamed); + avro_iobuf_ostream::buf_container_t bufs; static constexpr size_t avro_default_sync_bytes = 16_KiB; auto meta = metadata_to_map(m.metadata); - avro::DataFileWriter writer( - std::move(out), - manifest_entry::valid_schema(), - avro_default_sync_bytes, - avro::NULL_CODEC, - meta); + { + auto out = std::make_unique( + 4_KiB, &bufs, &bytes_streamed); + avro::DataFileWriter writer( + std::move(out), + manifest_entry::valid_schema(), + avro_default_sync_bytes, + avro::NULL_CODEC, + meta); + + // TODO: the Avro code-generated manifest_entry doesn't have the r102 + // partition field defined, as it relies on runtime information of the + // partition spec! + for (const auto& e : m.entries) { + writer.write(e); + } + writer.flush(); + writer.close(); - // TODO: the Avro code-generated manifest_entry doesn't have the r102 - // partition field defined, as it relies on runtime information of the - // partition spec! - for (const auto& e : m.entries) { - writer.write(e); + // NOTE: ~DataFileWriter does a final sync which may write to the + // chunks. Destruct the writer before moving ownership of the chunks. + } + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); } - writer.flush(); - writer.close(); buf.trim_back(buf.size_bytes() - bytes_streamed); return buf; } diff --git a/src/v/iceberg/tests/BUILD b/src/v/iceberg/tests/BUILD index c1b42d9246e02..cf6846011d14a 100644 --- a/src/v/iceberg/tests/BUILD +++ b/src/v/iceberg/tests/BUILD @@ -53,6 +53,7 @@ redpanda_cc_gtest( ":test_schemas", "//src/v/base", "//src/v/bytes:iobuf", + "//src/v/container:fragmented_vector", "//src/v/iceberg:avro_utils", "//src/v/iceberg:manifest", "//src/v/iceberg:manifest_avro", diff --git a/src/v/iceberg/tests/manifest_serialization_test.cc b/src/v/iceberg/tests/manifest_serialization_test.cc index d6feb7acd1b46..4b32d93cd264c 100644 --- a/src/v/iceberg/tests/manifest_serialization_test.cc +++ b/src/v/iceberg/tests/manifest_serialization_test.cc @@ -9,6 +9,7 @@ #include "base/units.h" #include "bytes/iobuf.h" +#include "container/fragmented_vector.h" #include "iceberg/avro_utils.h" #include "iceberg/manifest.h" #include "iceberg/manifest_avro.h" @@ -55,10 +56,10 @@ TEST(ManifestSerializationTest, TestManifestEntry) { entry.data_file.record_count = 3; entry.data_file.file_size_in_bytes = 1024; - iobuf buf; size_t bytes_streamed{0}; + avro_iobuf_ostream::buf_container_t bufs; auto out = std::make_unique( - 4096, &buf, &bytes_streamed); + 4096, &bufs, &bytes_streamed); // Encode to the output stream. avro::EncoderPtr encoder = avro::binaryEncoder(); @@ -66,6 +67,10 @@ TEST(ManifestSerializationTest, TestManifestEntry) { avro::encode(*encoder, entry); encoder->flush(); + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } // Decode the iobuf from the input stream. auto in = std::make_unique(std::move(buf)); avro::DecoderPtr decoder = avro::binaryDecoder(); @@ -86,10 +91,10 @@ TEST(ManifestSerializationTest, TestManyManifestEntries) { entry.data_file.record_count = 3; entry.data_file.file_size_in_bytes = 1024; - iobuf buf; size_t bytes_streamed{0}; + avro_iobuf_ostream::buf_container_t bufs; auto out = std::make_unique( - 4096, &buf, &bytes_streamed); + 4096, &bufs, &bytes_streamed); // Encode many entries. This is a regression test for a bug where // serializing large Avro files would handle iobuf fragments improperly, @@ -101,6 +106,10 @@ TEST(ManifestSerializationTest, TestManyManifestEntries) { encoder->flush(); } + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } // Decode the iobuf from the input stream. auto in = std::make_unique(std::move(buf)); avro::DecoderPtr decoder = avro::binaryDecoder(); @@ -127,10 +136,10 @@ TEST(ManifestSerializationTest, TestManifestFile) { manifest.existing_rows_count = 10; manifest.deleted_rows_count = 11; - iobuf buf; size_t bytes_streamed{0}; + avro_iobuf_ostream::buf_container_t bufs; auto out = std::make_unique( - 4096, &buf, &bytes_streamed); + 4096, &bufs, &bytes_streamed); // Encode to the output stream. avro::EncoderPtr encoder = avro::binaryEncoder(); @@ -138,6 +147,10 @@ TEST(ManifestSerializationTest, TestManifestFile) { avro::encode(*encoder, manifest); encoder->flush(); + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } // Decode the iobuf from the input stream. auto in = std::make_unique(std::move(buf)); avro::DecoderPtr decoder = avro::binaryDecoder(); @@ -183,14 +196,27 @@ TEST(ManifestSerializationTest, TestManifestAvroReaderWriter) { metadata["f1"] = f1; metadata["f2"] = f2; - iobuf buf; size_t bytes_streamed{0}; + avro_iobuf_ostream::buf_container_t bufs; auto out = std::make_unique( - 4_KiB, &buf, &bytes_streamed); - avro::DataFileWriter writer( - std::move(out), manifest_file_schema, 16_KiB, avro::NULL_CODEC, metadata); - writer.write(manifest); - writer.flush(); + 4_KiB, &bufs, &bytes_streamed); + { + avro::DataFileWriter writer( + std::move(out), + manifest_file_schema, + 16_KiB, + avro::NULL_CODEC, + metadata); + writer.write(manifest); + writer.flush(); + + // NOTE: ~DataFileWriter does a final sync which may write to the + // chunks. Destruct the writer before moving ownership of the chunks. + } + iobuf buf; + for (auto& b : bufs) { + buf.append(std::move(b)); + } auto in = std::make_unique(buf.copy()); avro::DataFileReader reader( std::move(in), manifest_file_schema);