diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 68ced23a5f75c..a07f9a12fbb36 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -1202,11 +1202,11 @@ private boolean archiveRecords(long startOffset, long endOffset, NavigableMap 20, 21 -> 30, 31 -> 40, 41 -> 50. + fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 21), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 31), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 41), 10); + + // After the acknowledgements, the state of share partition will be: + // 1. 11 -> 20: AVAILABLE + // 2. 21 -> 30: ACQUIRED + // 3. 31 -> 40: AVAILABLE + // 4. 41 -> 50: ACQUIRED + sharePartition.acknowledge(MEMBER_ID, List.of( + new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)), + new ShareAcknowledgementBatch(31, 40, List.of((byte) 2)) + )); + + // Move the LSO to 41. When the LSO moves ahead, all batches that are AVAILABLE before the new LSO will be ARCHIVED. + // Thus, the state of the share partition will be: + // 1. 11 -> 20: ARCHIVED + // 2. 21 -> 30: ACQUIRED + // 3. 31 -> 40: ARCHIVED + // 4. 41 -> 50: ACQUIRED + // Note, the records that are in ACQUIRED state will remain in ACQUIRED state and will be transitioned to a Terminal + // state when the corresponding acquisition lock timer task expires. + sharePartition.updateCacheAndOffsets(41); + + assertEquals(51, sharePartition.nextFetchOffset()); + assertEquals(41, sharePartition.startOffset()); + assertEquals(50, sharePartition.endOffset()); + + assertEquals(4, sharePartition.cachedState().size()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(11L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(41L).batchState()); + + // The client acknowledges the batch 21 -> 30. Since this batch is before the LSO, nothing will be done and these + // records will remain in the ACQUIRED state. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2)))); + + // The batch is still in ACQUIRED state. + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState()); + + // Once the acquisition lock timer task for the batch 21 -> 30 is expired, these records will directly be + // ARCHIVED. + sharePartition.cachedState().get(21L).batchAcquisitionLockTimeoutTask().run(); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(21L).batchState()); + } + + @Test + public void testLsoMovementForArchivingAllAvailableOffsets() { + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); + + // A client acquires 4 batches, 11 -> 20, 21 -> 30, 31 -> 40, 41 -> 50. + fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 21), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 31), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 41), 10); + + // After the acknowledgements, the share partition state will be: + // 1. 11 -> 20: AVAILABLE + // 2. 21 -> 30: ACQUIRED + // 3. 31 -> 40: AVAILABLE + // 4. 41 -> 50: ACQUIRED + sharePartition.acknowledge(MEMBER_ID, List.of( + new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)), + new ShareAcknowledgementBatch(31, 40, List.of((byte) 2)) + )); + + // Move the LSO to 36. When the LSO moves ahead, all records that are AVAILABLE before the new LSO will be ARCHIVED. + // Thus, the state of the share partition will be: + // 1. 11 -> 20: ARCHIVED + // 2. 21 -> 30: ACQUIRED + // 3. 31 -> 35: ARCHIVED + // 3. 36 -> 40: AVAILABLE + // 4. 41 -> 50: ACQUIRED + // Note, the records that are in ACQUIRED state will remain in ACQUIRED state and will be transitioned to a Terminal + // state when the corresponding acquisition lock timer task expires. + sharePartition.updateCacheAndOffsets(36); + + assertEquals(36, sharePartition.nextFetchOffset()); + assertEquals(36, sharePartition.startOffset()); + assertEquals(50, sharePartition.endOffset()); + + assertEquals(4, sharePartition.cachedState().size()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(11L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(31L).state()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(32L).state()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(33L).state()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(34L).state()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(31L).offsetState().get(35L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(36L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(37L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(38L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(39L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(31L).offsetState().get(40L).state()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(41L).batchState()); + + // The client acknowledges the batch 21 -> 30. Since this batch is before the LSO, nothing will be done and these + // records will remain in the ACQUIRED state. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2)))); + + // The batch is still in ACQUIRED state. + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState()); + + // Once the acquisition lock timer task for the batch 21 -> 30 is expired, these records will directly be + // ARCHIVED. + sharePartition.cachedState().get(21L).batchAcquisitionLockTimeoutTask().run(); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(21L).batchState()); + } + @Test public void testLsoMovementForArchivingOffsets() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();