Skip to content

Commit

Permalink
storage: add log_reader option to translate offsets
Browse files Browse the repository at this point in the history
Previously offset translation on the read path was done in the Kafka
layer, with the storage layer providing primitives to help translate.
As we clarify the requirements of the storage log implementation, offset
translation has made its way into the purview of storage.

To allow the storage layer to own offset translation end-to-end on the
read path, and allow for future log implementations that translate
offsets differently (e.g. by storing translated offsets), this adds an
option to the log reader to translate. The corresponding code is removed
from the Kafka layer.
  • Loading branch information
andrwng committed Mar 28, 2024
1 parent e64151c commit b4f54b1
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 58 deletions.
51 changes: 2 additions & 49 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,56 +183,9 @@ ss::future<storage::translating_reader> replicated_partition::make_reader(
cfg.max_offset = _translator->to_log_offset(cfg.max_offset);
cfg.type_filter = {model::record_batch_type::raft_data};

class reader : public model::record_batch_reader::impl {
public:
reader(
std::unique_ptr<model::record_batch_reader::impl> underlying,
ss::lw_shared_ptr<const storage::offset_translator_state> tr)
: _underlying(std::move(underlying))
, _translator(std::move(tr)) {}

bool is_end_of_stream() const final {
return _underlying->is_end_of_stream();
}

void print(std::ostream& os) final {
fmt::print(os, "kafka::partition reader for ");
_underlying->print(os);
}
using storage_t = model::record_batch_reader::storage_t;
using data_t = model::record_batch_reader::data_t;
using foreign_data_t = model::record_batch_reader::foreign_data_t;

model::record_batch_reader::data_t& get_batches(storage_t& st) {
if (std::holds_alternative<data_t>(st)) {
return std::get<data_t>(st);
} else {
return *std::get<foreign_data_t>(st).buffer;
}
}

ss::future<storage_t>
do_load_slice(model::timeout_clock::time_point t) final {
return _underlying->do_load_slice(t).then([this](storage_t recs) {
for (auto& batch : get_batches(recs)) {
batch.header().base_offset = _translator->from_log_offset(
batch.base_offset());
}
return recs;
});
}

ss::future<> finally() noexcept final { return _underlying->finally(); }

private:
std::unique_ptr<model::record_batch_reader::impl> _underlying;
ss::lw_shared_ptr<const storage::offset_translator_state> _translator;
};
cfg.translate_offsets = storage::translate_offsets::yes;
auto rdr = co_await _partition->make_reader(cfg, debounce_deadline);
co_return storage::translating_reader(
model::make_record_batch_reader<reader>(
std::move(rdr).release(), _translator),
_translator);
co_return storage::translating_reader(std::move(rdr), _translator);
}

ss::future<std::vector<cluster::rm_stm::tx_range>>
Expand Down
9 changes: 5 additions & 4 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1769,7 +1769,7 @@ disk_log_impl::make_unchecked_reader(log_reader_config config) {
return _lock_mngr.range_lock(config).then(
[this, cfg = config](std::unique_ptr<lock_manager::lease> lease) {
return model::make_record_batch_reader<log_reader>(
std::move(lease), cfg, *_probe);
std::move(lease), cfg, *_probe, get_offset_translator_state());
});
}

Expand All @@ -1784,7 +1784,8 @@ disk_log_impl::make_cached_reader(log_reader_config config) {
}
return _lock_mngr.range_lock(config)
.then([this, cfg = config](std::unique_ptr<lock_manager::lease> lease) {
return std::make_unique<log_reader>(std::move(lease), cfg, *_probe);
return std::make_unique<log_reader>(
std::move(lease), cfg, *_probe, get_offset_translator_state());
})
.then([this](auto rdr) { return _readers_cache->put(std::move(rdr)); });
}
Expand Down Expand Up @@ -2311,7 +2312,7 @@ disk_log_impl::make_reader(log_reader_config config) {
if (config.start_offset > config.max_offset) {
auto lease = std::make_unique<lock_manager::lease>(segment_set({}));
auto empty = model::make_record_batch_reader<log_reader>(
std::move(lease), config, *_probe);
std::move(lease), config, *_probe, get_offset_translator_state());
return ss::make_ready_future<model::record_batch_reader>(
std::move(empty));
}
Expand Down Expand Up @@ -2371,7 +2372,7 @@ disk_log_impl::make_reader(timequery_config config) {
cfg.time,
cfg.abort_source);
return model::make_record_batch_reader<log_reader>(
std::move(lease), config, *_probe);
std::move(lease), config, *_probe, get_offset_translator_state());
});
}

Expand Down
20 changes: 18 additions & 2 deletions src/v/storage/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "model/fundamental.h"
#include "model/record.h"
#include "storage/logger.h"
#include "storage/offset_translator_state.h"
#include "storage/parser_errc.h"

#include <seastar/core/abort_source.hh>
Expand Down Expand Up @@ -278,15 +279,17 @@ log_segment_batch_reader::read_some(model::timeout_clock::time_point timeout) {
log_reader::log_reader(
std::unique_ptr<lock_manager::lease> l,
log_reader_config config,
probe& probe) noexcept
probe& probe,
ss::lw_shared_ptr<const storage::offset_translator_state> tr) noexcept
: _lease(std::move(l))
, _iterator(_lease->range.begin())
, _config(config)
, _expected_next(
_config.fill_gaps
? std::make_optional<model::offset>(_config.start_offset)
: std::nullopt)
, _probe(probe) {
, _probe(probe)
, _translator(std::move(tr)) {
if (config.abort_source) {
auto op_sub = config.abort_source.value().get().subscribe(
[this]() noexcept { set_end_of_stream(); });
Expand Down Expand Up @@ -430,7 +433,20 @@ log_reader::do_load_slice(model::timeout_clock::time_point timeout) {
return ss::make_ready_future<storage_t>(
std::move(batches_filled));
}
// To keep things consistent, our internal accounting is all done in
// untranslated offsets, even if we've been requested to return
// translated offsets.
_expected_next = model::next_offset(batches.back().last_offset());

if (_config.translate_offsets) {
vassert(
_translator, "Expected offset translactor to be initialized");
for (auto& b : batches) {
b.header().base_offset = _translator->from_log_offset(
b.base_offset());
}
}

return ss::make_ready_future<storage_t>(std::move(batches));
})
.handle_exception([this](std::exception_ptr e) {
Expand Down
8 changes: 7 additions & 1 deletion src/v/storage/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "model/limits.h"
#include "model/record_batch_reader.h"
#include "storage/lock_manager.h"
#include "storage/offset_translator_state.h"
#include "storage/parser.h"
#include "storage/probe.h"
#include "storage/segment.h"
Expand Down Expand Up @@ -135,7 +136,10 @@ class log_reader final : public model::record_batch_reader::impl {
using storage_t = model::record_batch_reader::storage_t;

log_reader(
std::unique_ptr<lock_manager::lease>, log_reader_config, probe&) noexcept;
std::unique_ptr<lock_manager::lease>,
log_reader_config,
probe&,
ss::lw_shared_ptr<const storage::offset_translator_state>) noexcept;

~log_reader() final {
vassert(!_iterator.reader, "log reader destroyed with live reader");
Expand Down Expand Up @@ -245,6 +249,8 @@ class log_reader final : public model::record_batch_reader::impl {
std::optional<model::offset> _expected_next;
probe& _probe;
ss::abort_source::subscription _as_sub;

ss::lw_shared_ptr<const storage::offset_translator_state> _translator;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion src/v/storage/segment_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ model::record_batch_reader create_segment_full_reader(
segment_set(std::move(set)));
lease->locks.push_back(std::move(h));
return model::make_record_batch_reader<log_reader>(
std::move(lease), reader_cfg, pb);
std::move(lease), reader_cfg, pb, nullptr);
}

ss::future<> do_swap_data_file_handles(
Expand Down
3 changes: 2 additions & 1 deletion src/v/storage/tests/readers_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ struct readers_cache_test_fixture : seastar_test {
model::offset(0),
model::offset::max(),
ss::default_priority_class()),
probe);
probe,
nullptr);
}

segment_ptr make_segment(model::offset base_offset) {
Expand Down
10 changes: 10 additions & 0 deletions src/v/storage/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ struct truncate_prefix_config {
*
* Start and max offset are inclusive.
*/
using translate_offsets = ss::bool_class<struct translate_tag>;
struct log_reader_config {
model::offset start_offset;
model::offset max_offset;
Expand Down Expand Up @@ -361,6 +362,15 @@ struct log_reader_config {
// terms boundaries.
bool fill_gaps{false};

// If set to true, the offsets returned will be translated from Redpanda
// offset to its data offset, as dictated by the underlying log's offset
// translator types.
//
// NOTE: the translation refers only to the returned batches, not to the
// input min/max offset bounds. Callers are expected to account for inputs
// separately.
translate_offsets translate_offsets{false};

log_reader_config(
model::offset start_offset,
model::offset max_offset,
Expand Down

0 comments on commit b4f54b1

Please sign in to comment.