diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index dae1458400d7e..1cd287b0ccaa4 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -1215,6 +1215,13 @@ ss::future tx_gateway_frontend::limit_init_tm_tx( } auto term = term_opt.value(); + auto units = co_await stm->lock_tx(tx_id, "init_tm_tx"); + if ((co_await stm->get_tx(tx_id)).has_value()) { + co_return co_await do_init_tm_tx( + stm, term, tx_id, transaction_timeout_ms, timeout, expected_pid); + } + units.return_all(); + if (stm->tx_cache_size() > _max_transactions_per_coordinator()) { // lock is sloppy and doesn't guarantee that tx_cache_size // never exceeds _max_transactions_per_coordinator. init_tm_tx @@ -1227,43 +1234,57 @@ ss::future tx_gateway_frontend::limit_init_tm_tx( // similar to double-checked locking pattern // it protects concurrent access to oldest_tx - if (stm->tx_cache_size() > _max_transactions_per_coordinator()) { - auto tx_opt = stm->oldest_tx(); - if (!tx_opt) { + while (stm->tx_cache_size() > _max_transactions_per_coordinator()) { + auto old_tx_opt = stm->oldest_tx(); + if (!old_tx_opt) { vlog( txlog.warn, - "oldest_tx shouldn't return empty when tx cache is at " - "capacity"); + "oldest_tx should return oldest tx when the tx cache size " + "({}) is beyond capacity ({})", + stm->tx_cache_size(), + _max_transactions_per_coordinator()); co_return init_tm_tx_reply{tx_errc::not_coordinator}; } - auto tx = tx_opt.value(); + auto old_tx = old_tx_opt.value(); vlog( txlog.info, - "tx cache is at capacity; expiring oldest tx with id:{}", - tx.id); - auto tx_units = co_await stm->lock_tx(tx_id, "init_tm_tx"); - auto ec = co_await do_expire_old_tx( - stm, - term, - tx.id, - config::shard_local_cfg().create_topic_timeout_ms(), - true); - if (ec != tx_errc::none) { + "tx cache size ({}) is beyond capacity ({}); expiring oldest tx " + "(tx.id={})", + stm->tx_cache_size(), + _max_transactions_per_coordinator(), + old_tx.id); + auto tx_units = co_await stm->lock_tx(old_tx.id, "init_tm_tx"); + + auto timeout = config::shard_local_cfg().create_topic_timeout_ms(); + auto tx_maybe = co_await get_tx(term, stm, old_tx.id, timeout); + if (tx_maybe.has_value()) { + old_tx = tx_maybe.value(); + auto ec = co_await do_expire_old_tx( + stm, term, old_tx, timeout, true); + if (ec != tx_errc::none) { + vlog( + txlog.warn, + "expiring old tx (tx.id={}) failed with ec={}", + old_tx.id, + ec); + co_return init_tm_tx_reply{tx_errc::not_coordinator}; + } + } else if (tx_maybe.error() != tx_errc::tx_not_found) { vlog( - txlog.trace, - "do_expire_old_tx with tx_id={} returned ec={}", - tx.id, - ec); + txlog.warn, + "can't look up a tx (tx.id={}): ec={}", + old_tx.id, + tx_maybe.error()); co_return init_tm_tx_reply{tx_errc::not_coordinator}; } tx_units.return_all(); } - + vlog(txlog.info, "tx cache size is reduced"); init_units.return_all(); } - auto units = co_await stm->lock_tx(tx_id, "init_tm_tx"); + units = co_await stm->lock_tx(tx_id, "init_tm_tx"); co_return co_await do_init_tm_tx( stm, term, tx_id, transaction_timeout_ms, timeout, expected_pid); @@ -3166,27 +3187,23 @@ ss::future<> tx_gateway_frontend::expire_old_tx( } auto term = term_opt.value(); + auto timeout = config::shard_local_cfg().create_topic_timeout_ms(); - co_await do_expire_old_tx( - stm, - term, - tx_id, - config::shard_local_cfg().create_topic_timeout_ms(), - false); + auto tx_maybe = co_await get_tx(term, stm, tx_id, timeout); + if (!tx_maybe.has_value()) { + co_return; + } + auto tx = tx_maybe.value(); + + co_await do_expire_old_tx(stm, term, tx, timeout, false); } ss::future tx_gateway_frontend::do_expire_old_tx( ss::shared_ptr stm, model::term_id term, - kafka::transactional_id tx_id, + tm_transaction tx, model::timeout_clock::duration timeout, bool ignore_update_ts) { - auto r0 = co_await get_tx(term, stm, tx_id, timeout); - if (!r0.has_value()) { - // either timeout or already expired - co_return tx_errc::tx_not_found; - } - auto tx = r0.value(); if (!ignore_update_ts && !stm->is_expired(tx)) { co_return tx_errc::none; } @@ -3207,14 +3224,13 @@ ss::future tx_gateway_frontend::do_expire_old_tx( r = co_await do_abort_tm_tx(term, stm, tx, timeout); } if (!r.has_value()) { - vlog(txlog.warn, "got error {} on aborting tx.id={}", r.error(), tx_id); - + vlog(txlog.warn, "got error {} on aborting tx.id={}", r.error(), tx.id); co_return r.error(); } // it's ok not to check ec because if the expiration isn't passed // it will be retried and it's an idempotent operation - auto ec = co_await stm->expire_tx(term, tx_id); + auto ec = co_await stm->expire_tx(term, tx.id); if (ec != tm_stm::op_status::success) { vlog(txlog.warn, "got error {} on expiring tx.id={}", ec, tx.id); co_return tx_errc::not_coordinator; diff --git a/src/v/cluster/tx_gateway_frontend.h b/src/v/cluster/tx_gateway_frontend.h index f22981d42f16c..6a1fc6cba7687 100644 --- a/src/v/cluster/tx_gateway_frontend.h +++ b/src/v/cluster/tx_gateway_frontend.h @@ -281,7 +281,7 @@ class tx_gateway_frontend final ss::future do_expire_old_tx( ss::shared_ptr, model::term_id term, - kafka::transactional_id, + tm_transaction, model::timeout_clock::duration, bool ignore_update_ts);