Skip to content

Commit

Permalink
kafka/group: don't replicate empty batch on offset commit
Browse files Browse the repository at this point in the history
Redpanda generally does not permit empty batches to make their way to
the log. This is problematic in case of an empty offset commit requests.

Kafka returns immediately for empty requests[1], so this tweaks the
return store_offsets() method to do the same. I considered putting this
higher up in the Kafka layer, but there didn't seem to be a specialized
handler for offsets commits where such an early return would be natural.

[1] https://github.com/apache/kafka/blob/98cdf9717049e87ba34bb5161276577fcb8bd1c4/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L505
  • Loading branch information
andrwng committed Aug 9, 2024
1 parent 2c24041 commit c0b2fc7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2290,6 +2290,11 @@ group::offset_commit_stages group::store_offsets(offset_commit_request&& r) {
_pending_offset_commits[tp] = md;
}
}
if (builder.empty()) {
vlog(_ctxlog.debug, "Empty offsets committed request");
return offset_commit_stages(
offset_commit_response(r, error_code::none));
}

auto batch = std::move(builder).build();
auto reader = model::make_memory_record_batch_reader(std::move(batch));
Expand Down
35 changes: 35 additions & 0 deletions src/v/kafka/server/tests/consumer_groups_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,41 @@ FIXTURE_TEST(join_empty_group_static_member, consumer_offsets_fixture) {
}).get();
}

FIXTURE_TEST(empty_offset_commit_request, consumer_offsets_fixture) {
scoped_config cfg;
cfg.get("group_topic_partitions").set_value(1);
add_topic(
model::topic_namespace_view{model::kafka_namespace, model::topic{"foo"}})
.get();
kafka::group_instance_id gr("instance-1");
wait_for_consumer_offsets_topic(gr);
auto client = make_kafka_client().get0();
auto deferred = ss::defer([&client] {
client.stop().then([&client] { client.shutdown(); }).get();
});
client.connect().get();
// Regression test for a crash that we would previously see with empty
// requests.
// NOTE: errors are stored in partition fields of the response. Since the
// requests are empty, there are no errors.
{
auto req = offset_commit_request{
.data{.group_id = kafka::group_id{"foo-topics"}, .topics = {}}};
req.data.group_instance_id = gr;
auto resp = client.dispatch(std::move(req)).get();
BOOST_REQUIRE(!resp.data.errored());
}
{
auto req = offset_commit_request{.data{
.group_id = kafka::group_id{"foo-partitions"},
.topics = {offset_commit_request_topic{
.name = model::topic{"foo"}, .partitions = {}}}}};
req.data.group_instance_id = gr;
auto resp = client.dispatch(std::move(req)).get();
BOOST_REQUIRE(!resp.data.errored());
}
}

FIXTURE_TEST(conditional_retention_test, consumer_offsets_fixture) {
scoped_config cfg;
cfg.get("group_topic_partitions").set_value(1);
Expand Down

0 comments on commit c0b2fc7

Please sign in to comment.