Skip to content

Commit

Permalink
Merge pull request #18324 from bharathv/fix_expiration_24_1_x
Browse files Browse the repository at this point in the history
[backport] [v24.1.x] rm_stm: consider transactions without data batches for expiration
  • Loading branch information
piyushredpanda authored May 10, 2024
2 parents 6d1636e + 00b7ec4 commit 43b3520
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 18 deletions.
45 changes: 27 additions & 18 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1448,29 +1448,38 @@ void rm_stm::abort_old_txes() {
});
}

ss::future<> rm_stm::do_abort_old_txes() {
if (!co_await sync(_sync_timeout)) {
co_return;
absl::btree_set<model::producer_identity>
rm_stm::get_expired_producers() const {
absl::btree_set<model::producer_identity> expired;
auto maybe_add_to_expired = [&](const auto& pid) {
if (expired.contains(pid)) {
return;
}
auto it = _log_state.expiration.find(pid);
if (
it != _log_state.expiration.end()
&& it->second.is_expired(clock_type::now())) {
expired.insert(pid);
}
};
for (auto& [pid, _] : _mem_state.estimated) {
maybe_add_to_expired(pid);
}

fragmented_vector<model::producer_identity> pids;
for (auto& [k, _] : _mem_state.estimated) {
pids.push_back(k);
for (auto& [id, epoch] : _log_state.fence_pid_epoch) {
maybe_add_to_expired(model::producer_identity{id, epoch});
}
for (auto& [k, _] : _log_state.ongoing_map) {
pids.push_back(k);
for (auto& [pid, _] : _log_state.ongoing_map) {
maybe_add_to_expired(pid);
}
absl::btree_set<model::producer_identity> expired;
for (auto pid : pids) {
auto expiration_it = _log_state.expiration.find(pid);
if (expiration_it != _log_state.expiration.end()) {
if (!expiration_it->second.is_expired(clock_type::now())) {
continue;
}
}
expired.insert(pid);
return expired;
}

ss::future<> rm_stm::do_abort_old_txes() {
if (!co_await sync(_sync_timeout)) {
co_return;
}

auto expired = get_expired_producers();
for (auto pid : expired) {
co_await try_abort_old_tx(pid);
}
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ class rm_stm final : public raft::persisted_stm<> {

ss::future<std::error_code> do_mark_expired(model::producer_identity pid);

absl::btree_set<model::producer_identity> get_expired_producers() const;

bool is_known_session(model::producer_identity pid) const {
auto is_known = false;
is_known |= _mem_state.estimated.contains(pid);
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/tests/rm_stm_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ struct rm_stm_test_fixture : simple_raft_fixture {
return _stm->wait(raft_offset, model::timeout_clock::now() + 10ms);
}

auto get_expired_producers() const { return _stm->get_expired_producers(); }

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
ss::sharded<cluster::producer_state_manager> producer_state_manager;
ss::shared_ptr<cluster::rm_stm> _stm;
Expand Down
25 changes: 25 additions & 0 deletions src/v/cluster/tests/rm_stm_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -928,3 +928,28 @@ FIXTURE_TEST(test_snapshot_v3_v4_v5_equivalence, rm_stm_test_fixture) {
highest_pid_from_snapshot, _stm->highest_producer_id());
}
}

FIXTURE_TEST(test_tx_expiration_without_data_batches, rm_stm_test_fixture) {
create_stm_and_start_raft();
auto& stm = *_stm;
stm.start().get0();
stm.testing_only_disable_auto_abort();

wait_for_confirmed_leader();
wait_for_meta_initialized();
// Add a fence batch
auto pid = model::producer_identity{0, 0};
auto term_op = stm
.begin_tx(
pid,
model::tx_seq{0},
std::chrono::milliseconds(10),
model::partition_id(0))
.get0();
BOOST_REQUIRE(term_op.has_value());
BOOST_REQUIRE_EQUAL(term_op.value(), _raft->confirmed_term());
tests::cooperative_spin_wait_with_timeout(5s, [this, pid]() {
auto expired = get_expired_producers();
return std::find(expired.begin(), expired.end(), pid) != expired.end();
}).get0();
}

0 comments on commit 43b3520

Please sign in to comment.