Skip to content

Commit

Permalink
Merge pull request #22895 from vbotbuildovich/backport-pr-22824-v24.2…
Browse files Browse the repository at this point in the history
….x-513

[v24.2.x] kafka/group: don't replicate empty batch on offset commit
  • Loading branch information
piyushredpanda authored Aug 15, 2024
2 parents 93a60b9 + a00a61d commit 3907600
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2240,6 +2240,11 @@ group::offset_commit_stages group::store_offsets(offset_commit_request&& r) {
_pending_offset_commits[tp] = md;
}
}
if (builder.empty()) {
vlog(_ctxlog.debug, "Empty offsets committed request");
return offset_commit_stages(
offset_commit_response(r, error_code::none));
}

auto batch = std::move(builder).build();
auto reader = model::make_memory_record_batch_reader(std::move(batch));
Expand Down
35 changes: 35 additions & 0 deletions src/v/kafka/server/tests/consumer_groups_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,41 @@ FIXTURE_TEST(join_empty_group_static_member, consumer_offsets_fixture) {
}).get();
}

FIXTURE_TEST(empty_offset_commit_request, consumer_offsets_fixture) {
scoped_config cfg;
cfg.get("group_topic_partitions").set_value(1);
add_topic(
model::topic_namespace_view{model::kafka_namespace, model::topic{"foo"}})
.get();
kafka::group_instance_id gr("instance-1");
wait_for_consumer_offsets_topic(gr);
auto client = make_kafka_client().get0();
auto deferred = ss::defer([&client] {
client.stop().then([&client] { client.shutdown(); }).get();
});
client.connect().get();
// Regression test for a crash that we would previously see with empty
// requests.
// NOTE: errors are stored in partition fields of the response. Since the
// requests are empty, there are no errors.
{
auto req = offset_commit_request{
.data{.group_id = kafka::group_id{"foo-topics"}, .topics = {}}};
req.data.group_instance_id = gr;
auto resp = client.dispatch(std::move(req)).get();
BOOST_REQUIRE(!resp.data.errored());
}
{
auto req = offset_commit_request{.data{
.group_id = kafka::group_id{"foo-partitions"},
.topics = {offset_commit_request_topic{
.name = model::topic{"foo"}, .partitions = {}}}}};
req.data.group_instance_id = gr;
auto resp = client.dispatch(std::move(req)).get();
BOOST_REQUIRE(!resp.data.errored());
}
}

FIXTURE_TEST(conditional_retention_test, consumer_offsets_fixture) {
scoped_config cfg;
cfg.get("group_topic_partitions").set_value(1);
Expand Down

0 comments on commit 3907600

Please sign in to comment.