Skip to content

Commit

Permalink
Merge pull request #11110 from rystsov/issue-10588
Browse files Browse the repository at this point in the history
Fix txn consume group issues leading to undefined behavior
  • Loading branch information
piyushredpanda authored Jun 9, 2023
2 parents 35a0949 + 7ca5707 commit 2edd0fe
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 36 deletions.
48 changes: 37 additions & 11 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ group::group(
kafka::group_id id,
group_state s,
config::configuration& conf,
ss::lw_shared_ptr<cluster::partition> partition,
ss::lw_shared_ptr<attached_partition> p,
model::term_id term,
ss::sharded<cluster::tx_gateway_frontend>& tx_frontend,
ss::sharded<features::feature_table>& feature_table,
group_metadata_serializer serializer,
Expand All @@ -61,11 +62,13 @@ group::group(
, _num_members_joining(0)
, _new_member_added(false)
, _conf(conf)
, _partition(std::move(partition))
, _p(p)
, _partition(p != nullptr ? p->partition : nullptr)
, _probe(_members, _static_members, _offsets)
, _ctxlog(klog, *this)
, _ctx_txlog(cluster::txlog, *this)
, _md_serializer(std::move(serializer))
, _term(term)
, _enable_group_metrics(group_metrics)
, _abort_interval_ms(config::shard_local_cfg()
.abort_timed_out_transactions_interval_ms.value())
Expand All @@ -82,7 +85,8 @@ group::group(
kafka::group_id id,
group_metadata_value& md,
config::configuration& conf,
ss::lw_shared_ptr<cluster::partition> partition,
ss::lw_shared_ptr<attached_partition> p,
model::term_id term,
ss::sharded<cluster::tx_gateway_frontend>& tx_frontend,
ss::sharded<features::feature_table>& feature_table,
group_metadata_serializer serializer,
Expand All @@ -100,11 +104,13 @@ group::group(
, _leader(md.leader)
, _new_member_added(false)
, _conf(conf)
, _partition(std::move(partition))
, _p(p)
, _partition(p->partition)
, _probe(_members, _static_members, _offsets)
, _ctxlog(klog, *this)
, _ctx_txlog(cluster::txlog, *this)
, _md_serializer(std::move(serializer))
, _term(term)
, _enable_group_metrics(group_metrics)
, _abort_interval_ms(config::shard_local_cfg()
.abort_timed_out_transactions_interval_ms.value())
Expand Down Expand Up @@ -1661,8 +1667,15 @@ void group::fail_offset_commit(
}

void group::reset_tx_state(model::term_id term) {
// must be invoked under catchup_lock.hold_write_lock()
// all other tx methods should use catchup_lock.hold_read_lock()
// to avoid modifying the state of the executing tx methods
_term = term;
_volatile_txs.clear();
_prepared_txs.clear();
_expiration_info.clear();
_tx_data.clear();
_fence_pid_epoch.clear();
}

void group::insert_prepared(prepared_tx tx) {
Expand Down Expand Up @@ -1696,14 +1709,14 @@ group::commit_tx(cluster::commit_group_tx_request r) {
if (fence_it == _fence_pid_epoch.end()) {
vlog(
_ctx_txlog.warn,
"Can't prepare tx: fence with pid {} isn't set",
"Can't commit tx: fence with pid {} isn't set",
r.pid);
co_return make_commit_tx_reply(cluster::tx_errc::request_rejected);
}
if (r.pid.get_epoch() != fence_it->second) {
vlog(
_ctx_txlog.trace,
"Can't prepare tx with pid {} - the fence doesn't match {}",
"Can't commit tx with pid {} - the fence doesn't match {}",
r.pid,
fence_it->second);
co_return make_commit_tx_reply(cluster::tx_errc::request_rejected);
Expand Down Expand Up @@ -1902,17 +1915,22 @@ group::begin_tx(cluster::begin_group_tx_request r) {

auto reader = model::make_memory_record_batch_reader(
std::move(batch.value()));
auto e = co_await _partition->raft()->replicate(
auto res = co_await _partition->raft()->replicate(
_term,
std::move(reader),
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!e) {
if (!res) {
vlog(
_ctx_txlog.warn,
"Error \"{}\" on replicating pid:{} fencing batch",
e.error(),
res.error(),
r.pid);
if (
_partition->raft()->is_leader()
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down("group begin_tx failed");
}
co_return make_begin_tx_reply(cluster::tx_errc::leader_not_found);
}

Expand All @@ -1923,9 +1941,9 @@ group::begin_tx(cluster::begin_group_tx_request r) {
_volatile_txs[r.pid] = volatile_tx{.tx_seq = r.tx_seq};
}

auto res = _expiration_info.insert_or_assign(
auto [it, _] = _expiration_info.insert_or_assign(
r.pid, expiration_info(r.timeout));
try_arm(res.first->second.deadline());
try_arm(it->second.deadline());

cluster::begin_group_tx_reply reply;
reply.etag = _term;
Expand Down Expand Up @@ -2258,6 +2276,12 @@ group::store_txn_offsets(txn_offset_commit_request r) {
raft::replicate_options(raft::consistency_level::quorum_ack));

if (!e) {
if (
_partition->raft()->is_leader()
&& _partition->raft()->term() == _term) {
co_await _partition->raft()->step_down(
"group store_txn_offsets failed");
}
co_return txn_offset_commit_response(
r, error_code::unknown_server_error);
}
Expand Down Expand Up @@ -3210,6 +3234,8 @@ void group::maybe_rearm_timer() {
}

ss::future<> group::do_abort_old_txes() {
auto units = co_await _p->catchup_lock.hold_read_lock();

std::vector<model::producer_identity> pids;
for (auto& [id, _] : _prepared_txs) {
pids.push_back(id);
Expand Down
20 changes: 18 additions & 2 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,19 @@ struct configuration;
namespace kafka {
struct group_log_group_metadata;

struct attached_partition {
bool loading;
ssx::semaphore sem{1, "k/group-mgr"};
ss::abort_source as;
ss::lw_shared_ptr<cluster::partition> partition;
ss::basic_rwlock<> catchup_lock;
model::term_id term{-1};

explicit attached_partition(ss::lw_shared_ptr<cluster::partition> p)
: loading(true)
, partition(std::move(p)) {}
};

/**
* \defgroup kafka-groups Kafka group membership API
*
Expand Down Expand Up @@ -202,7 +215,8 @@ class group final : public ss::enable_lw_shared_from_this<group> {
kafka::group_id id,
group_state s,
config::configuration& conf,
ss::lw_shared_ptr<cluster::partition> partition,
ss::lw_shared_ptr<attached_partition>,
model::term_id,
ss::sharded<cluster::tx_gateway_frontend>& tx_frontend,
ss::sharded<features::feature_table>&,
group_metadata_serializer,
Expand All @@ -213,7 +227,8 @@ class group final : public ss::enable_lw_shared_from_this<group> {
kafka::group_id id,
group_metadata_value& md,
config::configuration& conf,
ss::lw_shared_ptr<cluster::partition> partition,
ss::lw_shared_ptr<attached_partition>,
model::term_id,
ss::sharded<cluster::tx_gateway_frontend>& tx_frontend,
ss::sharded<features::feature_table>&,
group_metadata_serializer,
Expand Down Expand Up @@ -914,6 +929,7 @@ class group final : public ss::enable_lw_shared_from_this<group> {
ss::timer<clock_type> _join_timer;
bool _new_member_added;
config::configuration& _conf;
ss::lw_shared_ptr<attached_partition> _p;
ss::lw_shared_ptr<cluster::partition> _partition;
absl::node_hash_map<
model::topic_partition,
Expand Down
20 changes: 10 additions & 10 deletions src/v/kafka/server/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -723,12 +723,12 @@ ss::future<> group_manager::do_recover_group(
group_id,
group_stm.get_metadata(),
_conf,
p->partition,
p,
term,
_tx_frontend,
_feature_table,
_serializer_factory(),
_enable_group_metrics);
group->reset_tx_state(term);
_groups.emplace(group_id, group);
group->reschedule_all_member_heartbeats();
}
Expand Down Expand Up @@ -918,17 +918,17 @@ group::join_group_stages group_manager::join_group(join_group_request&& r) {
return group::join_group_stages(
make_join_error(r.data.member_id, error_code::not_coordinator));
}
auto p = it->second->partition;
auto p = it->second;
group = ss::make_lw_shared<kafka::group>(
r.data.group_id,
group_state::empty,
_conf,
p,
it->second->term,
_tx_frontend,
_feature_table,
_serializer_factory(),
_enable_group_metrics);
group->reset_tx_state(it->second->term);
_groups.emplace(r.data.group_id, group);
_groups.rehash(0);
is_new_group = true;
Expand Down Expand Up @@ -1071,12 +1071,12 @@ group_manager::txn_offset_commit(txn_offset_commit_request&& r) {
r.data.group_id,
group_state::empty,
_conf,
p->partition,
p,
p->term,
_tx_frontend,
_feature_table,
_serializer_factory(),
_enable_group_metrics);
group->reset_tx_state(p->term);
_groups.emplace(r.data.group_id, group);
_groups.rehash(0);
}
Expand Down Expand Up @@ -1157,12 +1157,12 @@ group_manager::begin_tx(cluster::begin_group_tx_request&& r) {
r.group_id,
group_state::empty,
_conf,
p->partition,
p,
p->term,
_tx_frontend,
_feature_table,
_serializer_factory(),
_enable_group_metrics);
group->reset_tx_state(p->term);
_groups.emplace(r.group_id, group);
_groups.rehash(0);
}
Expand Down Expand Up @@ -1265,12 +1265,12 @@ group_manager::offset_commit(offset_commit_request&& r) {
r.data.group_id,
group_state::empty,
_conf,
p->partition,
p,
p->term,
_tx_frontend,
_feature_table,
_serializer_factory(),
_enable_group_metrics);
group->reset_tx_state(p->term);
_groups.emplace(r.data.group_id, group);
_groups.rehash(0);
} else {
Expand Down
13 changes: 0 additions & 13 deletions src/v/kafka/server/group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,19 +203,6 @@ class group_manager {
void detach_partition(const model::ntp&);
ss::future<> do_detach_partition(model::ntp);

struct attached_partition {
bool loading;
ssx::semaphore sem{1, "k/group-mgr"};
ss::abort_source as;
ss::lw_shared_ptr<cluster::partition> partition;
ss::basic_rwlock<> catchup_lock;
model::term_id term{-1};

explicit attached_partition(ss::lw_shared_ptr<cluster::partition> p)
: loading(true)
, partition(std::move(p)) {}
};

cluster::notification_id_type _leader_notify_handle;
cluster::notification_id_type _topic_table_notify_handle;

Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/tests/group_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ static group get() {
group_state::empty,
conf,
nullptr,
model::term_id(),
fr,
feature_table,
make_consumer_offsets_serializer(),
Expand Down

0 comments on commit 2edd0fe

Please sign in to comment.