Skip to content

Commit

Permalink
rptest: test trim-prefix with timequery
Browse files Browse the repository at this point in the history
  • Loading branch information
nvartolomei committed Apr 30, 2024
1 parent 00857d1 commit fe96ed2
Showing 1 changed file with 48 additions and 0 deletions.
48 changes: 48 additions & 0 deletions tests/rptest/tests/timequery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,54 @@ def query_slices(tid):

assert not any([e > 0 for e in errors])

@cluster(num_nodes=4)
@parametrize(cloud_storage=True, spillover=False)
@parametrize(cloud_storage=True, spillover=True)
@parametrize(cloud_storage=False, spillover=False)
def test_timequery_with_trim_prefix(self, cloud_storage: bool,
spillover: bool):
self.set_up_cluster(cloud_storage=cloud_storage,
batch_cache=False,
spillover=spillover)
total_segments = 12
record_size = 1024
base_ts = 1664453149000
msg_count = (self.log_segment_size * total_segments) // record_size
local_retention = self.log_segment_size * 4
topic, timestamps = self._create_and_produce(self.redpanda, True,
local_retention, base_ts,
record_size, msg_count)

# Confirm messages written
rpk = RpkTool(self.redpanda)
p = next(rpk.describe_topic(topic.name))
assert p.high_watermark == msg_count

if cloud_storage:
# If using cloud storage, we must wait for some segments
# to fall out of local storage, to ensure we are really
# hitting the cloud storage read path when querying.
wait_for_local_storage_truncate(redpanda=self.redpanda,
topic=topic.name,
target_bytes=local_retention)

num_batches_per_segment = self.log_segment_size // record_size
new_lwm = int(num_batches_per_segment * 2.5)
trim_response = rpk.trim_prefix(topic.name,
offset=new_lwm,
partitions=[0])
assert len(trim_response) == 1
assert new_lwm == trim_response[0].new_start_offset

# Double check that the start offset has advanced.
p = next(rpk.describe_topic(topic.name))
assert new_lwm == p.start_offset, f"Expected {new_lwm}, got {p.start_offset}"

# Query below valid timestamps the offset of the first message.
kcat = KafkaCat(self.redpanda)
offset = kcat.query_offset(topic.name, 0, timestamps[0] - 1000)
assert offset == new_lwm, f"Expected {new_lwm}, got {offset}"


class TimeQueryKafkaTest(Test, BaseTimeQuery):
"""
Expand Down

0 comments on commit fe96ed2

Please sign in to comment.