Skip to content

Commit

Permalink
Merge pull request redpanda-data#21574 from andrwng/iceberg-manifest-…
Browse files Browse the repository at this point in the history
…avro

iceberg: implement manifest serialization
  • Loading branch information
andrwng authored Jul 24, 2024
2 parents 78e5023 + 36ae03c commit 24dcdc1
Show file tree
Hide file tree
Showing 15 changed files with 879 additions and 31 deletions.
61 changes: 61 additions & 0 deletions src/v/iceberg/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,67 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "manifest",
hdrs = [
"manifest.h",
],
include_prefix = "iceberg",
deps = [
":datatypes",
":manifest_entry",
":partition",
":schema",
"//src/v/container:fragmented_vector",
"//src/v/utils:named_type",
],
)

redpanda_cc_library(
name = "manifest_avro",
srcs = [
"manifest_avro.cc",
],
hdrs = [
"manifest_avro.h",
],
include_prefix = "iceberg",
deps = [
":avro_utils",
":datatypes_json",
":json_utils",
":manifest",
":manifest_entry",
":partition",
":schema",
":schema_json",
"//src/v/base",
"//src/v/bytes:iobuf",
"//src/v/container:fragmented_vector",
"//src/v/strings:string_switch",
"//src/v/utils:named_type",
"@avro",
"@seastar",
],
)

redpanda_cc_library(
name = "partition",
srcs = [
"partition.cc",
],
hdrs = [
"partition.h",
],
include_prefix = "iceberg",
deps = [
":datatypes",
"//src/v/container:fragmented_vector",
"//src/v/utils:named_type",
"@seastar",
],
)

redpanda_cc_library(
name = "schema",
hdrs = [
Expand Down
4 changes: 4 additions & 0 deletions src/v/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ v_cc_library(
datatypes.cc
datatypes_json.cc
json_utils.cc
manifest_avro.cc
partition.cc
schema_json.cc
DEPS
Avro::avro
v::bytes
v::container
v::json
v::strings
Expand Down
37 changes: 22 additions & 15 deletions src/v/iceberg/avro_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,26 @@
#pragma once

#include "bytes/iobuf.h"
#include "container/fragmented_vector.h"

#include <seastar/core/temporary_buffer.hh>

#include <avro/DataFile.hh>
#include <avro/Stream.hh>

// Near-identical implementation of avro::MemoryOutputStream, but backed by an
// iobuf that can be released.
namespace iceberg {

// Near-identical implementation of avro::MemoryOutputStream, but backed by
// temporary buffers that callers can use to construct an iobuf.
class avro_iobuf_ostream : public avro::OutputStream {
public:
explicit avro_iobuf_ostream(size_t chunk_size, iobuf* buf)
using buf_container_t = chunked_vector<ss::temporary_buffer<char>>;
explicit avro_iobuf_ostream(
size_t chunk_size, buf_container_t* bufs, size_t* byte_count)
: chunk_size_(chunk_size)
, buf_(buf)
, bufs_(bufs)
, available_(0)
, byte_count_(0) {}
, byte_count_(byte_count) {}
~avro_iobuf_ostream() override = default;

// If there's no available space in the buffer, allocates `chunk_size_`
Expand All @@ -31,38 +38,36 @@ class avro_iobuf_ostream : public avro::OutputStream {
// space.
bool next(uint8_t** data, size_t* len) final {
if (available_ == 0) {
buf_->append(ss::temporary_buffer<char>{chunk_size_});
bufs_->emplace_back(chunk_size_);
available_ = chunk_size_;
}
auto back_frag = buf_->rbegin();
auto& back_frag = bufs_->back();
*data = reinterpret_cast<uint8_t*>(
back_frag->share(chunk_size_ - available_, available_).get_write());
back_frag.share(chunk_size_ - available_, available_).get_write());
*len = available_;
byte_count_ += available_;
*byte_count_ += available_;
available_ = 0;
return true;
}

void backup(size_t len) final {
available_ += len;
byte_count_ -= len;
*byte_count_ -= len;
}

uint64_t byteCount() const final { return byte_count_; }
uint64_t byteCount() const final { return *byte_count_; }

void flush() final {}

private:
// Size in bytes with which to allocate new fragments.
const size_t chunk_size_;

iobuf* buf_;

// Bytes remaining in the last fragment in the buffer.
buf_container_t* bufs_;
size_t available_;

// Total number of bytes.
size_t byte_count_;
size_t* byte_count_;
};

// InputStream implementation that takes an iobuf as input.
Expand Down Expand Up @@ -138,3 +143,5 @@ class avro_iobuf_istream : public avro::InputStream {
size_t cur_frag_pos_;
size_t cur_pos_;
};

} // namespace iceberg
53 changes: 53 additions & 0 deletions src/v/iceberg/manifest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#pragma once

#include "container/fragmented_vector.h"
#include "iceberg/manifest_entry.h"
#include "iceberg/partition.h"
#include "iceberg/schema.h"

namespace iceberg {

enum class format_version : uint8_t {
v1,
v2,
};

enum class manifest_content_type {
data,
deletes,
};

struct manifest_metadata {
schema schema;
partition_spec partition_spec;
format_version format_version;
manifest_content_type manifest_content_type;

friend bool operator==(const manifest_metadata&, const manifest_metadata&)
= default;
};

struct manifest {
manifest_metadata metadata;

// TODO: the manifest_entry is code-generated with avrogencpp, which is
// restrictive for a few reasons:
// - there is no built-in comparator
// - comparators are difficult to implement since the generated code makes
// heavy use of std::any in Avro union types
// - the manifest_entry r102 (partition) field is incomplete, as it depends
// on runtime partitioning and can't be statically generated
// Rather than using these generated structs, write explicit structs to
// represent the entries, and only use the Avro structs for serialization.
chunked_vector<manifest_entry> entries;
};

} // namespace iceberg
Loading

0 comments on commit 24dcdc1

Please sign in to comment.