diff --git a/src/v/kafka/server/handlers/list_offsets.cc b/src/v/kafka/server/handlers/list_offsets.cc index 54c5a52cd1db..b301e60a928a 100644 --- a/src/v/kafka/server/handlers/list_offsets.cc +++ b/src/v/kafka/server/handlers/list_offsets.cc @@ -20,6 +20,7 @@ #include "kafka/server/replicated_partition.h" #include "kafka/server/request_context.h" #include "kafka/server/response.h" +#include "model/fundamental.h" #include "model/namespace.h" #include "resource_mgmt/io_priority.h" @@ -129,10 +130,22 @@ static ss::future list_offsets_partition( offset, kafka_partition->leader_epoch()); } + auto min_offset = kafka_partition->start_offset(); + auto max_offset = model::prev_offset(offset); + + // Empty partition. + if (max_offset < min_offset) { + co_return list_offsets_response::make_partition( + ktp.get_partition(), + model::timestamp(-1), + model::offset(-1), + kafka_partition->leader_epoch()); + } + auto res = co_await kafka_partition->timequery(storage::timequery_config{ - kafka_partition->start_offset(), + min_offset, timestamp, - model::prev_offset(offset), + max_offset, kafka_read_priority(), {model::record_batch_type::raft_data}, octx.rctx.abort_source().local()}); diff --git a/tests/rptest/tests/timequery_test.py b/tests/rptest/tests/timequery_test.py index c7e8c9297536..3f8cfe71f45c 100644 --- a/tests/rptest/tests/timequery_test.py +++ b/tests/rptest/tests/timequery_test.py @@ -103,6 +103,18 @@ def _test_timequery(self, cluster, cloud_storage: bool, batch_cache: bool): base_ts = 1664453149000 msg_count = (self.log_segment_size * total_segments) // record_size local_retention = self.log_segment_size * 4 + kcat = KafkaCat(cluster) + + # Test the base case with an empty topic. + empty_topic = TopicSpec(name="tq_empty_topic", + partition_count=1, + replication_factor=3) + self.client().create_topic(empty_topic) + offset = kcat.query_offset(empty_topic.name, 0, base_ts) + self.logger.info(f"Time query returned offset {offset}") + assert offset == -1, f"Expected -1, got {offset}" + + # Create a topic and produce a run of messages we will query. topic, timestamps = self._create_and_produce(cluster, cloud_storage, local_retention, base_ts, record_size, msg_count) @@ -163,7 +175,6 @@ def __init__(self, offset, ts=None, expect_read=True): # offset should cause cloud downloads. hit_offsets = set() - kcat = KafkaCat(cluster) cloud_metrics = None local_metrics = None @@ -456,8 +467,8 @@ def query_slices(tid): assert not any([e > 0 for e in errors]) @cluster(num_nodes=4) - # @parametrize(cloud_storage=True, spillover=False) - # @parametrize(cloud_storage=True, spillover=True) + @parametrize(cloud_storage=True, spillover=False) + @parametrize(cloud_storage=True, spillover=True) @parametrize(cloud_storage=False, spillover=False) def test_timequery_with_trim_prefix(self, cloud_storage: bool, spillover: bool): @@ -515,6 +526,12 @@ def test_timequery_with_trim_prefix(self, cloud_storage: bool, offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000) assert offset == msg_count - 1, f"Expected {msg_count - 1}, got {offset}" + # Trim everything, leaving an empty log. + rpk.trim_prefix(topic.name, offset=p.high_watermark, partitions=[0]) + kcat = KafkaCat(self.redpanda) + offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000) + assert offset == -1, f"Expected -1, got {offset}" + @cluster( num_nodes=4, log_allow_list=["Failed to upload spillover manifest {timed_out}"])