diff --git a/src/v/datalake/translation/state_machine.cc b/src/v/datalake/translation/state_machine.cc index 07b0741393a4..a4064c5325b3 100644 --- a/src/v/datalake/translation/state_machine.cc +++ b/src/v/datalake/translation/state_machine.cc @@ -138,7 +138,14 @@ translation_stm::take_local_snapshot(ssx::semaphore_units apply_units) { co_return raft::stm_snapshot::create(0, snapshot_offset, std::move(result)); } -ss::future<> translation_stm::apply_raft_snapshot(const iobuf&) { co_return; } +ss::future<> translation_stm::apply_raft_snapshot(const iobuf&) { + // reset offset to not initalized when handling Raft snapshot, this way + // state machine will not hold any obsolete state that should be overriden + // with the snapshot. + vlog(_log.debug, "Applying raft snapshot, resetting state"); + _highest_translated_offset = kafka::offset{}; + co_return; +} ss::future translation_stm::take_snapshot(model::offset) { co_return iobuf{}; diff --git a/src/v/datalake/translation/tests/state_machine_test.cc b/src/v/datalake/translation/tests/state_machine_test.cc index dc026b9abe67..b276a55e3ba6 100644 --- a/src/v/datalake/translation/tests/state_machine_test.cc +++ b/src/v/datalake/translation/tests/state_machine_test.cc @@ -112,6 +112,7 @@ struct translator_stm_fixture : stm_raft_fixture { }; TEST_F_CORO(translator_stm_fixture, state_machine_ops) { + enable_offset_translation(); co_await initialize_state_machines(); co_await wait_for_leader(5s); scoped_config config; diff --git a/src/v/raft/tests/raft_fixture.cc b/src/v/raft/tests/raft_fixture.cc index c9649f6f62a4..c4f632ec9cab 100644 --- a/src/v/raft/tests/raft_fixture.cc +++ b/src/v/raft/tests/raft_fixture.cc @@ -346,7 +346,8 @@ raft_node_instance::raft_node_instance( leader_update_clb_t leader_update_clb, bool enable_longest_log_detection, config::binding election_timeout, - config::binding heartbeat_interval) + config::binding heartbeat_interval, + bool with_offset_translation) : raft_node_instance( id, revision, @@ -357,7 +358,8 @@ raft_node_instance::raft_node_instance( std::move(leader_update_clb), enable_longest_log_detection, std::move(election_timeout), - std::move(heartbeat_interval)) {} + std::move(heartbeat_interval), + with_offset_translation) {} raft_node_instance::raft_node_instance( model::node_id id, @@ -368,7 +370,8 @@ raft_node_instance::raft_node_instance( leader_update_clb_t leader_update_clb, bool enable_longest_log_detection, config::binding election_timeout, - config::binding heartbeat_interval) + config::binding heartbeat_interval, + bool with_offset_translation) : _id(id) , _revision(revision) , _logger(test_log, fmt::format("[node: {}]", _id)) @@ -391,7 +394,8 @@ raft_node_instance::raft_node_instance( , _leader_clb(std::move(leader_update_clb)) , _enable_longest_log_detection(enable_longest_log_detection) , _election_timeout(std::move(election_timeout)) - , _heartbeat_interval(std::move(heartbeat_interval)) { + , _heartbeat_interval(std::move(heartbeat_interval)) + , _with_offset_translation(with_offset_translation) { config::shard_local_cfg().disable_metrics.set_value(true); } @@ -424,7 +428,11 @@ raft_node_instance::initialise(std::vector initial_nodes) { co_await _storage.invoke_on_all(&storage::api::start); storage::ntp_config ntp_cfg(ntp(), _base_directory); - auto log = co_await _storage.local().log_mgr().manage(std::move(ntp_cfg)); + auto log = co_await _storage.local().log_mgr().manage( + std::move(ntp_cfg), + test_group, + _with_offset_translation ? model::offset_translator_batch_types() + : std::vector{}); _raft = ss::make_lw_shared( _id, @@ -591,7 +599,8 @@ raft_fixture::add_node(model::node_id id, model::revision_id rev) { }, _enable_longest_log_detection, _election_timeout.bind(), - _heartbeat_interval.bind()); + _heartbeat_interval.bind(), + _with_offset_translation); auto [it, success] = _nodes.emplace(id, std::move(instance)); return *it->second; @@ -613,7 +622,8 @@ raft_node_instance& raft_fixture::add_node( }, _enable_longest_log_detection, _election_timeout.bind(), - _heartbeat_interval.bind()); + _heartbeat_interval.bind(), + _with_offset_translation); auto [it, success] = _nodes.emplace(id, std::move(instance)); return *it->second; diff --git a/src/v/raft/tests/raft_fixture.h b/src/v/raft/tests/raft_fixture.h index 1b85afb943d3..3baf664a2b46 100644 --- a/src/v/raft/tests/raft_fixture.h +++ b/src/v/raft/tests/raft_fixture.h @@ -158,7 +158,8 @@ class raft_node_instance : public ss::weakly_referencable { leader_update_clb_t leader_update_clb, bool enable_longest_log_detection, config::binding election_timeout, - config::binding heartbeat_interval); + config::binding heartbeat_interval, + bool with_offset_translation = false); raft_node_instance( model::node_id id, @@ -168,7 +169,8 @@ class raft_node_instance : public ss::weakly_referencable { leader_update_clb_t leader_update_clb, bool enable_longest_log_detection, config::binding election_timeout, - config::binding heartbeat_interval); + config::binding heartbeat_interval, + bool with_offset_translation = false); raft_node_instance(const raft_node_instance&) = delete; raft_node_instance(raft_node_instance&&) noexcept = delete; @@ -265,6 +267,7 @@ class raft_node_instance : public ss::weakly_referencable { bool _enable_longest_log_detection; config::binding _election_timeout; config::binding _heartbeat_interval; + bool _with_offset_translation; }; class raft_fixture @@ -530,6 +533,8 @@ class raft_fixture _heartbeat_interval.update(std::move(timeout)); } + void enable_offset_translation() { _with_offset_translation = true; } + protected: class raft_not_leader_exception : std::exception {}; @@ -561,6 +566,7 @@ class raft_fixture std::optional _leader_clb; config::mock_property _election_timeout{500ms}; config::mock_property _heartbeat_interval{50ms}; + bool _with_offset_translation = false; }; template