Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-8485] Reset translation state on snapshot #24522

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/v/datalake/translation/state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,14 @@ translation_stm::take_local_snapshot(ssx::semaphore_units apply_units) {
co_return raft::stm_snapshot::create(0, snapshot_offset, std::move(result));
}

ss::future<> translation_stm::apply_raft_snapshot(const iobuf&) { co_return; }
ss::future<> translation_stm::apply_raft_snapshot(const iobuf&) {
// reset offset to not initalized when handling Raft snapshot, this way
// state machine will not hold any obsolete state that should be overriden
// with the snapshot.
vlog(_log.debug, "Applying raft snapshot, resetting state");
_highest_translated_offset = kafka::offset{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit unclear to me why is it okay to throw out the offset like this - as the raft snapshot is empty, STMs on different replicas will necessarily get out of sync. Is it because, if the translation is to be continued from some later point, we hope to get another update in the log?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another update is one thing, the other one is the reconciliation with the datalake coordinator which happens before every translation. The empty snapshot indicates the snapshot is not required by this STM, hence resetting the state here is the only viable option we have

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The empty snapshot indicates the snapshot is not required by this STM, hence resetting the state here is the only viable option we have

Yes, but why is it okay to have an empty snapshot?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about that, and given that we always commit to the coordinator i think it is safe. Am I right @bharathv ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its ok to reset. Currently this offset is only used to enforce max_collectible_offset on the replica. It is ok to reset because lowering the max collectible offset only delays compaction and has no correctness implications until it catches up again. As Michal said the leader reconciles with the coordinator every time to get the offset_to_translate_from.

@WillemKauf is planning to get rid of translation in this path for read replicas, so we could probably just store the log offset and avoid this kafka offset altogether. This kafka offset was added as an optimization so the coordinator can avoid reconciliation in every round of translation but since then that optimization has been removed to simplify the code, or we could just store a pair of <kafka_offset, log_offset> since it is already in serde and implement the optimization later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this offset is only used to enforce max_collectible_offset on the replica

is there a reason to be concerned that the scope could increase, this assumption no longer holds, and now there is a problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm can't think of a scope increase for the offset in the near future. The only reason it exists is to enforce max_collectible_offset. Also as noted, the plan is to get rid of offset translation in this path altogether and make this translation state self contained, and that automatically fixes this problem too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh right makes sense. thanks

co_return;
}

ss::future<iobuf> translation_stm::take_snapshot(model::offset) {
co_return iobuf{};
Expand Down
1 change: 1 addition & 0 deletions src/v/datalake/translation/tests/state_machine_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ struct translator_stm_fixture : stm_raft_fixture<stm> {
};

TEST_F_CORO(translator_stm_fixture, state_machine_ops) {
enable_offset_translation();
co_await initialize_state_machines();
co_await wait_for_leader(5s);
scoped_config config;
Expand Down
24 changes: 17 additions & 7 deletions src/v/raft/tests/raft_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ raft_node_instance::raft_node_instance(
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection,
config::binding<std::chrono::milliseconds> election_timeout,
config::binding<std::chrono::milliseconds> heartbeat_interval)
config::binding<std::chrono::milliseconds> heartbeat_interval,
bool with_offset_translation)
: raft_node_instance(
id,
revision,
Expand All @@ -357,7 +358,8 @@ raft_node_instance::raft_node_instance(
std::move(leader_update_clb),
enable_longest_log_detection,
std::move(election_timeout),
std::move(heartbeat_interval)) {}
std::move(heartbeat_interval),
with_offset_translation) {}

raft_node_instance::raft_node_instance(
model::node_id id,
Expand All @@ -368,7 +370,8 @@ raft_node_instance::raft_node_instance(
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection,
config::binding<std::chrono::milliseconds> election_timeout,
config::binding<std::chrono::milliseconds> heartbeat_interval)
config::binding<std::chrono::milliseconds> heartbeat_interval,
bool with_offset_translation)
: _id(id)
, _revision(revision)
, _logger(test_log, fmt::format("[node: {}]", _id))
Expand All @@ -391,7 +394,8 @@ raft_node_instance::raft_node_instance(
, _leader_clb(std::move(leader_update_clb))
, _enable_longest_log_detection(enable_longest_log_detection)
, _election_timeout(std::move(election_timeout))
, _heartbeat_interval(std::move(heartbeat_interval)) {
, _heartbeat_interval(std::move(heartbeat_interval))
, _with_offset_translation(with_offset_translation) {
config::shard_local_cfg().disable_metrics.set_value(true);
}

Expand Down Expand Up @@ -424,7 +428,11 @@ raft_node_instance::initialise(std::vector<raft::vnode> initial_nodes) {
co_await _storage.invoke_on_all(&storage::api::start);
storage::ntp_config ntp_cfg(ntp(), _base_directory);

auto log = co_await _storage.local().log_mgr().manage(std::move(ntp_cfg));
auto log = co_await _storage.local().log_mgr().manage(
std::move(ntp_cfg),
test_group,
_with_offset_translation ? model::offset_translator_batch_types()
: std::vector<model::record_batch_type>{});

_raft = ss::make_lw_shared<consensus>(
_id,
Expand Down Expand Up @@ -591,7 +599,8 @@ raft_fixture::add_node(model::node_id id, model::revision_id rev) {
},
_enable_longest_log_detection,
_election_timeout.bind(),
_heartbeat_interval.bind());
_heartbeat_interval.bind(),
_with_offset_translation);

auto [it, success] = _nodes.emplace(id, std::move(instance));
return *it->second;
Expand All @@ -613,7 +622,8 @@ raft_node_instance& raft_fixture::add_node(
},
_enable_longest_log_detection,
_election_timeout.bind(),
_heartbeat_interval.bind());
_heartbeat_interval.bind(),
_with_offset_translation);

auto [it, success] = _nodes.emplace(id, std::move(instance));
return *it->second;
Expand Down
10 changes: 8 additions & 2 deletions src/v/raft/tests/raft_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection,
config::binding<std::chrono::milliseconds> election_timeout,
config::binding<std::chrono::milliseconds> heartbeat_interval);
config::binding<std::chrono::milliseconds> heartbeat_interval,
bool with_offset_translation = false);

raft_node_instance(
model::node_id id,
Expand All @@ -168,7 +169,8 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
leader_update_clb_t leader_update_clb,
bool enable_longest_log_detection,
config::binding<std::chrono::milliseconds> election_timeout,
config::binding<std::chrono::milliseconds> heartbeat_interval);
config::binding<std::chrono::milliseconds> heartbeat_interval,
bool with_offset_translation = false);

raft_node_instance(const raft_node_instance&) = delete;
raft_node_instance(raft_node_instance&&) noexcept = delete;
Expand Down Expand Up @@ -265,6 +267,7 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
bool _enable_longest_log_detection;
config::binding<std::chrono::milliseconds> _election_timeout;
config::binding<std::chrono::milliseconds> _heartbeat_interval;
bool _with_offset_translation;
};

class raft_fixture
Expand Down Expand Up @@ -530,6 +533,8 @@ class raft_fixture
_heartbeat_interval.update(std::move(timeout));
}

void enable_offset_translation() { _with_offset_translation = true; }

protected:
class raft_not_leader_exception : std::exception {};

Expand Down Expand Up @@ -561,6 +566,7 @@ class raft_fixture
std::optional<leader_update_clb_t> _leader_clb;
config::mock_property<std::chrono::milliseconds> _election_timeout{500ms};
config::mock_property<std::chrono::milliseconds> _heartbeat_interval{50ms};
bool _with_offset_translation = false;
};

template<class... STM>
Expand Down
Loading