Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.2.x] Txn sessions thrashing: expire all old txs instead of one at a time #13746

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 54 additions & 38 deletions src/v/cluster/tx_gateway_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,13 @@ ss::future<cluster::init_tm_tx_reply> tx_gateway_frontend::limit_init_tm_tx(
}
auto term = term_opt.value();

auto units = co_await stm->lock_tx(tx_id, "init_tm_tx");
if ((co_await stm->get_tx(tx_id)).has_value()) {
co_return co_await do_init_tm_tx(
stm, term, tx_id, transaction_timeout_ms, timeout, expected_pid);
}
units.return_all();

if (stm->tx_cache_size() > _max_transactions_per_coordinator()) {
// lock is sloppy and doesn't guarantee that tx_cache_size
// never exceeds _max_transactions_per_coordinator. init_tm_tx
Expand All @@ -1227,43 +1234,57 @@ ss::future<cluster::init_tm_tx_reply> tx_gateway_frontend::limit_init_tm_tx(

// similar to double-checked locking pattern
// it protects concurrent access to oldest_tx
if (stm->tx_cache_size() > _max_transactions_per_coordinator()) {
auto tx_opt = stm->oldest_tx();
if (!tx_opt) {
while (stm->tx_cache_size() > _max_transactions_per_coordinator()) {
auto old_tx_opt = stm->oldest_tx();
if (!old_tx_opt) {
vlog(
txlog.warn,
"oldest_tx shouldn't return empty when tx cache is at "
"capacity");
"oldest_tx should return oldest tx when the tx cache size "
"({}) is beyond capacity ({})",
stm->tx_cache_size(),
_max_transactions_per_coordinator());
co_return init_tm_tx_reply{tx_errc::not_coordinator};
}

auto tx = tx_opt.value();
auto old_tx = old_tx_opt.value();
vlog(
txlog.info,
"tx cache is at capacity; expiring oldest tx with id:{}",
tx.id);
auto tx_units = co_await stm->lock_tx(tx_id, "init_tm_tx");
auto ec = co_await do_expire_old_tx(
stm,
term,
tx.id,
config::shard_local_cfg().create_topic_timeout_ms(),
true);
if (ec != tx_errc::none) {
"tx cache size ({}) is beyond capacity ({}); expiring oldest tx "
"(tx.id={})",
stm->tx_cache_size(),
_max_transactions_per_coordinator(),
old_tx.id);
auto tx_units = co_await stm->lock_tx(old_tx.id, "init_tm_tx");

auto timeout = config::shard_local_cfg().create_topic_timeout_ms();
auto tx_maybe = co_await get_tx(term, stm, old_tx.id, timeout);
if (tx_maybe.has_value()) {
old_tx = tx_maybe.value();
auto ec = co_await do_expire_old_tx(
stm, term, old_tx, timeout, true);
if (ec != tx_errc::none) {
vlog(
txlog.warn,
"expiring old tx (tx.id={}) failed with ec={}",
old_tx.id,
ec);
co_return init_tm_tx_reply{tx_errc::not_coordinator};
}
} else if (tx_maybe.error() != tx_errc::tx_not_found) {
vlog(
txlog.trace,
"do_expire_old_tx with tx_id={} returned ec={}",
tx.id,
ec);
txlog.warn,
"can't look up a tx (tx.id={}): ec={}",
old_tx.id,
tx_maybe.error());
co_return init_tm_tx_reply{tx_errc::not_coordinator};
}
tx_units.return_all();
}

vlog(txlog.info, "tx cache size is reduced");
init_units.return_all();
}

auto units = co_await stm->lock_tx(tx_id, "init_tm_tx");
units = co_await stm->lock_tx(tx_id, "init_tm_tx");

co_return co_await do_init_tm_tx(
stm, term, tx_id, transaction_timeout_ms, timeout, expected_pid);
Expand Down Expand Up @@ -3166,27 +3187,23 @@ ss::future<> tx_gateway_frontend::expire_old_tx(
}

auto term = term_opt.value();
auto timeout = config::shard_local_cfg().create_topic_timeout_ms();

co_await do_expire_old_tx(
stm,
term,
tx_id,
config::shard_local_cfg().create_topic_timeout_ms(),
false);
auto tx_maybe = co_await get_tx(term, stm, tx_id, timeout);
if (!tx_maybe.has_value()) {
co_return;
}
auto tx = tx_maybe.value();

co_await do_expire_old_tx(stm, term, tx, timeout, false);
}

ss::future<tx_errc> tx_gateway_frontend::do_expire_old_tx(
ss::shared_ptr<tm_stm> stm,
model::term_id term,
kafka::transactional_id tx_id,
tm_transaction tx,
model::timeout_clock::duration timeout,
bool ignore_update_ts) {
auto r0 = co_await get_tx(term, stm, tx_id, timeout);
if (!r0.has_value()) {
// either timeout or already expired
co_return tx_errc::tx_not_found;
}
auto tx = r0.value();
if (!ignore_update_ts && !stm->is_expired(tx)) {
co_return tx_errc::none;
}
Expand All @@ -3207,14 +3224,13 @@ ss::future<tx_errc> tx_gateway_frontend::do_expire_old_tx(
r = co_await do_abort_tm_tx(term, stm, tx, timeout);
}
if (!r.has_value()) {
vlog(txlog.warn, "got error {} on aborting tx.id={}", r.error(), tx_id);

vlog(txlog.warn, "got error {} on aborting tx.id={}", r.error(), tx.id);
co_return r.error();
}

// it's ok not to check ec because if the expiration isn't passed
// it will be retried and it's an idempotent operation
auto ec = co_await stm->expire_tx(term, tx_id);
auto ec = co_await stm->expire_tx(term, tx.id);
if (ec != tm_stm::op_status::success) {
vlog(txlog.warn, "got error {} on expiring tx.id={}", ec, tx.id);
co_return tx_errc::not_coordinator;
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/tx_gateway_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class tx_gateway_frontend final
ss::future<tx_errc> do_expire_old_tx(
ss::shared_ptr<tm_stm>,
model::term_id term,
kafka::transactional_id,
tm_transaction,
model::timeout_clock::duration,
bool ignore_update_ts);

Expand Down
Loading