Skip to content

Commit

Permalink
kafka: fix timequery failing for empty topics
Browse files Browse the repository at this point in the history
This bug was introduced accidentally while trying to fix another
off-by-one bug in commit ref 8f2de96. I
forgot to account that for an empty topic the "start offset" is equal to
"last offset" so calling `model::prev_offset` would result in an offset
below the start which is invalid and throws an exception if passed
downstream.

To avoid the problem, we short-circuit with a "offset not found" response
straight away if that's the case.

8f2de96
  • Loading branch information
nvartolomei committed Jun 20, 2024
1 parent f94b94a commit 3e11d09
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
17 changes: 15 additions & 2 deletions src/v/kafka/server/handlers/list_offsets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "kafka/server/replicated_partition.h"
#include "kafka/server/request_context.h"
#include "kafka/server/response.h"
#include "model/fundamental.h"
#include "model/namespace.h"
#include "resource_mgmt/io_priority.h"

Expand Down Expand Up @@ -129,10 +130,22 @@ static ss::future<list_offset_partition_response> list_offsets_partition(
offset,
kafka_partition->leader_epoch());
}
auto min_offset = kafka_partition->start_offset();
auto max_offset = model::prev_offset(offset);

// Empty topic.
if (max_offset < min_offset) {
co_return list_offsets_response::make_partition(
ktp.get_partition(),
model::timestamp(-1),
model::offset(-1),
kafka_partition->leader_epoch());
}

auto res = co_await kafka_partition->timequery(storage::timequery_config{
kafka_partition->start_offset(),
min_offset,
timestamp,
model::prev_offset(offset),
max_offset,
kafka_read_priority(),
{model::record_batch_type::raft_data},
octx.rctx.abort_source().local()});
Expand Down
13 changes: 12 additions & 1 deletion tests/rptest/tests/timequery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool):
base_ts = 1664453149000
msg_count = (self.log_segment_size * total_segments) // record_size
local_retention = self.log_segment_size * 4
kcat = KafkaCat(cluster)

# Test the base case with an empty topic.
empty_topic = TopicSpec(name="tq_empty_topic",
partition_count=1,
replication_factor=3)
self.client().create_topic(empty_topic)
offset = kcat.query_offset(empty_topic.name, 0, base_ts)
self.logger.info(f"Time query returned offset {offset}")
assert offset == -1, f"Expected -1, got {offset}"

# Create a topic and produce a run of messages we will query.
topic, timestamps = self._create_and_produce(cluster, cloud_storage,
local_retention, base_ts,
record_size, msg_count)
Expand Down Expand Up @@ -163,7 +175,6 @@ def __init__(self, offset, ts=None, expect_read=True):
# offset should cause cloud downloads.
hit_offsets = set()

kcat = KafkaCat(cluster)
cloud_metrics = None
local_metrics = None

Expand Down

0 comments on commit 3e11d09

Please sign in to comment.