From 69fbd560d0ab0b08469e57f4eed33c5deb6d0c2c Mon Sep 17 00:00:00 2001 From: likangning <422766572@qq.com> Date: Sat, 14 Sep 2024 09:24:48 +0800 Subject: [PATCH] code optimization --- .../storage/internals/log/LogSegmentTest.java | 715 +++++++++--------- 1 file changed, 368 insertions(+), 347 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java index d542d72f574a5..9d2c197dafb91 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java @@ -48,6 +48,7 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -124,16 +125,17 @@ public void teardown() throws IOException { "100, 10", "2147483648, 0", "-2147483648, 0", - "2147483648,4294967296" + "2147483648, 4294967296" }) public void testAppendForLogSegmentOffsetOverflowException(long baseOffset, long largestOffset) throws IOException { - LogSegment seg = createSegment(baseOffset, 10, Time.SYSTEM); - long currentTime = Time.SYSTEM.milliseconds(); - long shallowOffsetOfMaxTimestamp = largestOffset; - MemoryRecords memoryRecords = records(0, "hello"); - assertThrows(LogSegmentOffsetOverflowException.class, () -> { - seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, memoryRecords); - }); + try (LogSegment seg = createSegment(baseOffset, 10, Time.SYSTEM)) { + long currentTime = Time.SYSTEM.milliseconds(); + long shallowOffsetOfMaxTimestamp = largestOffset; + MemoryRecords memoryRecords = records(0, "hello"); + assertThrows(LogSegmentOffsetOverflowException.class, () -> { + seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, memoryRecords); + }); + } } /** @@ -141,9 +143,10 @@ public void testAppendForLogSegmentOffsetOverflowException(long baseOffset, long */ @Test public void testReadOnEmptySegment() throws IOException { - LogSegment seg = createSegment(40); - FetchDataInfo read = seg.read(40, 300); - assertNull(read, "Read beyond the last offset in the segment should be null"); + try (LogSegment seg = createSegment(40)) { + FetchDataInfo read = seg.read(40, 300); + assertNull(read, "Read beyond the last offset in the segment should be null"); + } } /** @@ -152,11 +155,12 @@ public void testReadOnEmptySegment() throws IOException { */ @Test public void testReadBeforeFirstOffset() throws IOException { - LogSegment seg = createSegment(40); - MemoryRecords ms = records(50, "hello", "there", "little", "bee"); - seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms); - Records read = seg.read(41, 300).records; - checkEquals(ms.records().iterator(), read.records().iterator()); + try (LogSegment seg = createSegment(40)) { + MemoryRecords ms = records(50, "hello", "there", "little", "bee"); + seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms); + Records read = seg.read(41, 300).records; + checkEquals(ms.records().iterator(), read.records().iterator()); + } } /** @@ -164,11 +168,12 @@ public void testReadBeforeFirstOffset() throws IOException { */ @Test public void testReadAfterLast() throws IOException { - LogSegment seg = createSegment(40); - MemoryRecords ms = records(50, "hello", "there"); - seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); - FetchDataInfo read = seg.read(52, 200); - assertNull(read, "Read beyond the last offset in the segment should give null"); + try (LogSegment seg = createSegment(40)) { + MemoryRecords ms = records(50, "hello", "there"); + seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); + FetchDataInfo read = seg.read(52, 200); + assertNull(read, "Read beyond the last offset in the segment should give null"); + } } /** @@ -177,13 +182,14 @@ public void testReadAfterLast() throws IOException { */ @Test public void testReadFromGap() throws IOException { - LogSegment seg = createSegment(40); - MemoryRecords ms = records(50, "hello", "there"); - seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); - MemoryRecords ms2 = records(60, "alpha", "beta"); - seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2); - FetchDataInfo read = seg.read(55, 200); - checkEquals(ms2.records().iterator(), read.records.records().iterator()); + try (LogSegment seg = createSegment(40)) { + MemoryRecords ms = records(50, "hello", "there"); + seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); + MemoryRecords ms2 = records(60, "alpha", "beta"); + seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2); + FetchDataInfo read = seg.read(55, 200); + checkEquals(ms2.records().iterator(), read.records.records().iterator()); + } } @ParameterizedTest(name = "testReadWhenNoMaxPosition minOneMessage = {0}") @@ -191,32 +197,33 @@ public void testReadFromGap() throws IOException { public void testReadWhenNoMaxPosition(boolean minOneMessage) throws IOException { Optional maxPosition = Optional.empty(); int maxSize = 1; - LogSegment seg = createSegment(40); - MemoryRecords ms = records(50, "hello", "there"); - seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); - - // read before first offset - FetchDataInfo read = seg.read(48, maxSize, maxPosition, minOneMessage); - assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata); - assertFalse(read.records.records().iterator().hasNext()); - - // read at first offset - read = seg.read(50, maxSize, maxPosition, minOneMessage); - assertEquals(new LogOffsetMetadata(50, 40, 0), read.fetchOffsetMetadata); - assertFalse(read.records.records().iterator().hasNext()); - - // read at last offset - read = seg.read(51, maxSize, maxPosition, minOneMessage); - assertEquals(new LogOffsetMetadata(51, 40, 39), read.fetchOffsetMetadata); - assertFalse(read.records.records().iterator().hasNext()); - - // read at log-end-offset - read = seg.read(52, maxSize, maxPosition, minOneMessage); - assertNull(read); - - // read beyond log-end-offset - read = seg.read(53, maxSize, maxPosition, minOneMessage); - assertNull(read); + try (LogSegment seg = createSegment(40)) { + MemoryRecords ms = records(50, "hello", "there"); + seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); + + // read before first offset + FetchDataInfo read = seg.read(48, maxSize, maxPosition, minOneMessage); + assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata); + assertFalse(read.records.records().iterator().hasNext()); + + // read at first offset + read = seg.read(50, maxSize, maxPosition, minOneMessage); + assertEquals(new LogOffsetMetadata(50, 40, 0), read.fetchOffsetMetadata); + assertFalse(read.records.records().iterator().hasNext()); + + // read at last offset + read = seg.read(51, maxSize, maxPosition, minOneMessage); + assertEquals(new LogOffsetMetadata(51, 40, 39), read.fetchOffsetMetadata); + assertFalse(read.records.records().iterator().hasNext()); + + // read at log-end-offset + read = seg.read(52, maxSize, maxPosition, minOneMessage); + assertNull(read); + + // read beyond log-end-offset + read = seg.read(53, maxSize, maxPosition, minOneMessage); + assertNull(read); + } } /** @@ -225,24 +232,25 @@ public void testReadWhenNoMaxPosition(boolean minOneMessage) throws IOException */ @Test public void testTruncate() throws IOException { - LogSegment seg = createSegment(40); - long offset = 40; - for (int i = 0; i < 30; i++) { - MemoryRecords ms1 = records(offset, "hello"); - seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1); - MemoryRecords ms2 = records(offset + 1, "hello"); - seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2); - - // check that we can read back both messages - FetchDataInfo read = seg.read(offset, 10000); - assertEquals(Arrays.asList(ms1.records().iterator().next(), ms2.records().iterator().next()), iteratorToList(read.records.records().iterator())); - - // Now truncate off the last message - seg.truncateTo(offset + 1); - FetchDataInfo read2 = seg.read(offset, 10000); - assertEquals(1, iteratorToList(read2.records.records().iterator()).size()); - checkEquals(ms1.records().iterator(), read2.records.records().iterator()); - offset += 1; + try (LogSegment seg = createSegment(40)) { + long offset = 40; + for (int i = 0; i < 30; i++) { + MemoryRecords ms1 = records(offset, "hello"); + seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1); + MemoryRecords ms2 = records(offset + 1, "hello"); + seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2); + + // check that we can read back both messages + FetchDataInfo read = seg.read(offset, 10000); + assertEquals(Arrays.asList(ms1.records().iterator().next(), ms2.records().iterator().next()), iteratorToList(read.records.records().iterator())); + + // Now truncate off the last message + seg.truncateTo(offset + 1); + FetchDataInfo read2 = seg.read(offset, 10000); + assertEquals(1, iteratorToList(read2.records.records().iterator()).size()); + checkEquals(ms1.records().iterator(), read2.records.records().iterator()); + offset += 1; + } } } @@ -262,55 +270,57 @@ public void testTruncateEmptySegment() throws IOException { long maxSegmentMs = 300000; MockTime time = new MockTime(); - LogSegment seg = createSegment(0L, time); - seg.timeIndex(); // Force load indexes before closing the segment - seg.offsetIndex(); - seg.close(); - - LogSegment reopened = createSegment(0L, time); - assertEquals(0, seg.timeIndex().sizeInBytes()); - assertEquals(0, seg.offsetIndex().sizeInBytes()); - - time.sleep(500); - reopened.truncateTo(57); - assertEquals(0, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)); - assertFalse(reopened.timeIndex().isFull()); - assertFalse(reopened.offsetIndex().isFull()); - - RollParams rollParams = new RollParams(maxSegmentMs, Integer.MAX_VALUE, RecordBatch.NO_TIMESTAMP, - 100L, 1024, time.milliseconds()); - assertFalse(reopened.shouldRoll(rollParams)); - - // the segment should not be rolled even if maxSegmentMs has been exceeded - time.sleep(maxSegmentMs + 1); - assertEquals(maxSegmentMs + 1, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)); - rollParams = new RollParams(maxSegmentMs, Integer.MAX_VALUE, RecordBatch.NO_TIMESTAMP, 100L, 1024, time.milliseconds()); - assertFalse(reopened.shouldRoll(rollParams)); - - // but we should still roll the segment if we cannot fit the next offset - rollParams = new RollParams(maxSegmentMs, Integer.MAX_VALUE, RecordBatch.NO_TIMESTAMP, - (long) Integer.MAX_VALUE + 200L, 1024, time.milliseconds()); - assertTrue(reopened.shouldRoll(rollParams)); + try (LogSegment seg = createSegment(0L, time)) { + seg.timeIndex(); // Force load indexes before closing the segment + seg.offsetIndex(); + seg.close(); + + LogSegment reopened = createSegment(0L, time); + assertEquals(0, seg.timeIndex().sizeInBytes()); + assertEquals(0, seg.offsetIndex().sizeInBytes()); + + time.sleep(500); + reopened.truncateTo(57); + assertEquals(0, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)); + assertFalse(reopened.timeIndex().isFull()); + assertFalse(reopened.offsetIndex().isFull()); + + RollParams rollParams = new RollParams(maxSegmentMs, Integer.MAX_VALUE, RecordBatch.NO_TIMESTAMP, + 100L, 1024, time.milliseconds()); + assertFalse(reopened.shouldRoll(rollParams)); + + // the segment should not be rolled even if maxSegmentMs has been exceeded + time.sleep(maxSegmentMs + 1); + assertEquals(maxSegmentMs + 1, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)); + rollParams = new RollParams(maxSegmentMs, Integer.MAX_VALUE, RecordBatch.NO_TIMESTAMP, 100L, 1024, time.milliseconds()); + assertFalse(reopened.shouldRoll(rollParams)); + + // but we should still roll the segment if we cannot fit the next offset + rollParams = new RollParams(maxSegmentMs, Integer.MAX_VALUE, RecordBatch.NO_TIMESTAMP, + (long) Integer.MAX_VALUE + 200L, 1024, time.milliseconds()); + assertTrue(reopened.shouldRoll(rollParams)); + } } @Test public void testReloadLargestTimestampAndNextOffsetAfterTruncation() throws IOException { int numMessages = 30; - LogSegment seg = createSegment(40, 2 * records(0, "hello").sizeInBytes() - 1); - int offset = 40; - for (int i = 0; i < numMessages; i++) { - seg.append(offset, offset, offset, records(offset, "hello")); - offset++; - } - assertEquals(offset, seg.readNextOffset()); + try (LogSegment seg = createSegment(40, 2 * records(0, "hello").sizeInBytes() - 1)) { + int offset = 40; + for (int i = 0; i < numMessages; i++) { + seg.append(offset, offset, offset, records(offset, "hello")); + offset++; + } + assertEquals(offset, seg.readNextOffset()); - int expectedNumEntries = numMessages / 2 - 1; - assertEquals(expectedNumEntries, seg.timeIndex().entries(), String.format("Should have %d time indexes", expectedNumEntries)); + int expectedNumEntries = numMessages / 2 - 1; + assertEquals(expectedNumEntries, seg.timeIndex().entries(), String.format("Should have %d time indexes", expectedNumEntries)); - seg.truncateTo(41); - assertEquals(0, seg.timeIndex().entries(), "Should have 0 time indexes"); - assertEquals(400L, seg.largestTimestamp(), "Largest timestamp should be 400"); - assertEquals(41, seg.readNextOffset()); + seg.truncateTo(41); + assertEquals(0, seg.timeIndex().entries(), "Should have 0 time indexes"); + assertEquals(400L, seg.largestTimestamp(), "Largest timestamp should be 400"); + assertEquals(41, seg.readNextOffset()); + } } /** @@ -319,20 +329,22 @@ public void testReloadLargestTimestampAndNextOffsetAfterTruncation() throws IOEx @Test public void testTruncateFull() throws IOException { MockTime time = new MockTime(); - LogSegment seg = createSegment(40, time); - seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")); + try (LogSegment seg = createSegment(40, time)) { - // If the segment is empty after truncation, the create time should be reset - time.sleep(500); - assertEquals(500, seg.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)); + seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")); - seg.truncateTo(0); - assertEquals(0, seg.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)); - assertFalse(seg.timeIndex().isFull()); - assertFalse(seg.offsetIndex().isFull()); - assertNull(seg.read(0, 1024), "Segment should be empty."); + // If the segment is empty after truncation, the create time should be reset + time.sleep(500); + assertEquals(500, seg.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)); - seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")); + seg.truncateTo(0); + assertEquals(0, seg.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)); + assertFalse(seg.timeIndex().isFull()); + assertFalse(seg.offsetIndex().isFull()); + assertNull(seg.read(0, 1024), "Segment should be empty."); + + seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")); + } } /** @@ -341,25 +353,26 @@ public void testTruncateFull() throws IOException { @Test public void testFindOffsetByTimestamp() throws IOException { int messageSize = records(0, "msg00").sizeInBytes(); - LogSegment seg = createSegment(40, messageSize * 2 - 1); - // Produce some messages - for (int i = 40; i < 50; i++) { - seg.append(i, i * 10, i, records(i, "msg" + i)); - } + try (LogSegment seg = createSegment(40, messageSize * 2 - 1)) { + // Produce some messages + for (int i = 40; i < 50; i++) { + seg.append(i, i * 10, i, records(i, "msg" + i)); + } - assertEquals(490, seg.largestTimestamp()); - // Search for an indexed timestamp - assertEquals(42, seg.findOffsetByTimestamp(420, 0L).get().offset); - assertEquals(43, seg.findOffsetByTimestamp(421, 0L).get().offset); - // Search for an un-indexed timestamp - assertEquals(43, seg.findOffsetByTimestamp(430, 0L).get().offset); - assertEquals(44, seg.findOffsetByTimestamp(431, 0L).get().offset); - // Search beyond the last timestamp - assertEquals(Optional.empty(), seg.findOffsetByTimestamp(491, 0L)); - // Search before the first indexed timestamp - assertEquals(41, seg.findOffsetByTimestamp(401, 0L).get().offset); - // Search before the first timestamp - assertEquals(40, seg.findOffsetByTimestamp(399, 0L).get().offset); + assertEquals(490, seg.largestTimestamp()); + // Search for an indexed timestamp + assertEquals(42, seg.findOffsetByTimestamp(420, 0L).get().offset); + assertEquals(43, seg.findOffsetByTimestamp(421, 0L).get().offset); + // Search for an un-indexed timestamp + assertEquals(43, seg.findOffsetByTimestamp(430, 0L).get().offset); + assertEquals(44, seg.findOffsetByTimestamp(431, 0L).get().offset); + // Search beyond the last timestamp + assertEquals(Optional.empty(), seg.findOffsetByTimestamp(491, 0L)); + // Search before the first indexed timestamp + assertEquals(41, seg.findOffsetByTimestamp(401, 0L).get().offset); + // Search before the first timestamp + assertEquals(40, seg.findOffsetByTimestamp(399, 0L).get().offset); + } } /** @@ -367,10 +380,11 @@ public void testFindOffsetByTimestamp() throws IOException { */ @Test public void testNextOffsetCalculation() throws IOException { - LogSegment seg = createSegment(40); - assertEquals(40, seg.readNextOffset()); - seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you")); - assertEquals(53, seg.readNextOffset()); + try (LogSegment seg = createSegment(40)) { + assertEquals(40, seg.readNextOffset()); + seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you")); + assertEquals(53, seg.readNextOffset()); + } } /** @@ -378,28 +392,29 @@ public void testNextOffsetCalculation() throws IOException { */ @Test public void testChangeFileSuffixes() throws IOException { - LogSegment seg = createSegment(40); - File logFile = seg.log().file(); - File indexFile = seg.offsetIndexFile(); - File timeIndexFile = seg.timeIndexFile(); - // Ensure that files for offset and time indices have not been created eagerly. - assertFalse(seg.offsetIndexFile().exists()); - assertFalse(seg.timeIndexFile().exists()); - seg.changeFileSuffixes("", ".deleted"); - // Ensure that attempt to change suffixes for non-existing offset and time indices does not create new files. - assertFalse(seg.offsetIndexFile().exists()); - assertFalse(seg.timeIndexFile().exists()); - // Ensure that file names are updated accordingly. - assertEquals(logFile.getAbsolutePath() + ".deleted", seg.log().file().getAbsolutePath()); - assertEquals(indexFile.getAbsolutePath() + ".deleted", seg.offsetIndexFile().getAbsolutePath()); - assertEquals(timeIndexFile.getAbsolutePath() + ".deleted", seg.timeIndexFile().getAbsolutePath()); - assertTrue(seg.log().file().exists()); - // Ensure lazy creation of offset index file upon accessing it. - seg.offsetIndex(); - assertTrue(seg.offsetIndexFile().exists()); - // Ensure lazy creation of time index file upon accessing it. - seg.timeIndex(); - assertTrue(seg.timeIndexFile().exists()); + try (LogSegment seg = createSegment(40)) { + File logFile = seg.log().file(); + File indexFile = seg.offsetIndexFile(); + File timeIndexFile = seg.timeIndexFile(); + // Ensure that files for offset and time indices have not been created eagerly. + assertFalse(seg.offsetIndexFile().exists()); + assertFalse(seg.timeIndexFile().exists()); + seg.changeFileSuffixes("", ".deleted"); + // Ensure that attempt to change suffixes for non-existing offset and time indices does not create new files. + assertFalse(seg.offsetIndexFile().exists()); + assertFalse(seg.timeIndexFile().exists()); + // Ensure that file names are updated accordingly. + assertEquals(logFile.getAbsolutePath() + ".deleted", seg.log().file().getAbsolutePath()); + assertEquals(indexFile.getAbsolutePath() + ".deleted", seg.offsetIndexFile().getAbsolutePath()); + assertEquals(timeIndexFile.getAbsolutePath() + ".deleted", seg.timeIndexFile().getAbsolutePath()); + assertTrue(seg.log().file().exists()); + // Ensure lazy creation of offset index file upon accessing it. + seg.offsetIndex(); + assertTrue(seg.offsetIndexFile().exists()); + // Ensure lazy creation of time index file upon accessing it. + seg.timeIndex(); + assertTrue(seg.timeIndexFile().exists()); + } } /** @@ -408,79 +423,81 @@ public void testChangeFileSuffixes() throws IOException { */ @Test public void testRecoveryFixesCorruptIndex() throws Exception { - LogSegment seg = createSegment(0); - for (int i = 0; i < 100; i++) { - seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, Integer.toString(i))); - } - File indexFile = seg.offsetIndexFile(); - writeNonsenseToFile(indexFile, 5, (int) indexFile.length()); - seg.recover(newProducerStateManager(), Optional.empty()); - for (int i = 0; i < 100; i++) { - Iterable records = seg.read(i, 1, Optional.of((long) seg.size()), true).records.records(); - assertEquals(i, records.iterator().next().offset()); + try (LogSegment seg = createSegment(0)) { + for (int i = 0; i < 100; i++) { + seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, Integer.toString(i))); + } + File indexFile = seg.offsetIndexFile(); + writeNonsenseToFile(indexFile, 5, (int) indexFile.length()); + seg.recover(newProducerStateManager(), Optional.empty()); + for (int i = 0; i < 100; i++) { + Iterable records = seg.read(i, 1, Optional.of((long) seg.size()), true).records.records(); + assertEquals(i, records.iterator().next().offset()); + } } } @Test public void testRecoverTransactionIndex() throws Exception { - LogSegment segment = createSegment(100); - short producerEpoch = 0; - int partitionLeaderEpoch = 15; - int sequence = 100; - - long pid1 = 5L; - long pid2 = 10L; - - // append transactional records from pid1 - segment.append(101L, RecordBatch.NO_TIMESTAMP, - 100L, MemoryRecords.withTransactionalRecords(100L, Compression.NONE, - pid1, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); - - // append transactional records from pid2 - segment.append(103L, RecordBatch.NO_TIMESTAMP, - 102L, MemoryRecords.withTransactionalRecords(102L, Compression.NONE, - pid2, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); - - // append non-transactional records - segment.append(105L, RecordBatch.NO_TIMESTAMP, - 104L, MemoryRecords.withRecords(104L, Compression.NONE, - partitionLeaderEpoch, new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); - - // abort the transaction from pid2 - segment.append(106L, RecordBatch.NO_TIMESTAMP, - 106L, endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, 106L)); - - // commit the transaction from pid1 - segment.append(107L, RecordBatch.NO_TIMESTAMP, - 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, 107L)); - - ProducerStateManager stateManager = newProducerStateManager(); - segment.recover(stateManager, Optional.empty()); - assertEquals(108L, stateManager.mapEndOffset()); - - List abortedTxns = segment.txnIndex().allAbortedTxns(); - assertEquals(1, abortedTxns.size()); - AbortedTxn abortedTxn = abortedTxns.get(0); - assertEquals(pid2, abortedTxn.producerId()); - assertEquals(102L, abortedTxn.firstOffset()); - assertEquals(106L, abortedTxn.lastOffset()); - assertEquals(100L, abortedTxn.lastStableOffset()); - - // recover again, assuming the transaction from pid2 began on a previous segment - stateManager = newProducerStateManager(); - stateManager.loadProducerEntry(new ProducerStateEntry(pid2, producerEpoch, 0, - RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L), - Optional.of(new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)))); - segment.recover(stateManager, Optional.empty()); - assertEquals(108L, stateManager.mapEndOffset()); - - abortedTxns = segment.txnIndex().allAbortedTxns(); - assertEquals(1, abortedTxns.size()); - abortedTxn = abortedTxns.get(0); - assertEquals(pid2, abortedTxn.producerId()); - assertEquals(75L, abortedTxn.firstOffset()); - assertEquals(106L, abortedTxn.lastOffset()); - assertEquals(100L, abortedTxn.lastStableOffset()); + try (LogSegment segment = createSegment(100)) { + short producerEpoch = 0; + int partitionLeaderEpoch = 15; + int sequence = 100; + + long pid1 = 5L; + long pid2 = 10L; + + // append transactional records from pid1 + segment.append(101L, RecordBatch.NO_TIMESTAMP, + 100L, MemoryRecords.withTransactionalRecords(100L, Compression.NONE, + pid1, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); + + // append transactional records from pid2 + segment.append(103L, RecordBatch.NO_TIMESTAMP, + 102L, MemoryRecords.withTransactionalRecords(102L, Compression.NONE, + pid2, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); + + // append non-transactional records + segment.append(105L, RecordBatch.NO_TIMESTAMP, + 104L, MemoryRecords.withRecords(104L, Compression.NONE, + partitionLeaderEpoch, new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); + + // abort the transaction from pid2 + segment.append(106L, RecordBatch.NO_TIMESTAMP, + 106L, endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, 106L)); + + // commit the transaction from pid1 + segment.append(107L, RecordBatch.NO_TIMESTAMP, + 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, 107L)); + + ProducerStateManager stateManager = newProducerStateManager(); + segment.recover(stateManager, Optional.empty()); + assertEquals(108L, stateManager.mapEndOffset()); + + List abortedTxns = segment.txnIndex().allAbortedTxns(); + assertEquals(1, abortedTxns.size()); + AbortedTxn abortedTxn = abortedTxns.get(0); + assertEquals(pid2, abortedTxn.producerId()); + assertEquals(102L, abortedTxn.firstOffset()); + assertEquals(106L, abortedTxn.lastOffset()); + assertEquals(100L, abortedTxn.lastStableOffset()); + + // recover again, assuming the transaction from pid2 began on a previous segment + stateManager = newProducerStateManager(); + stateManager.loadProducerEntry(new ProducerStateEntry(pid2, producerEpoch, 0, + RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L), + Optional.of(new BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)))); + segment.recover(stateManager, Optional.empty()); + assertEquals(108L, stateManager.mapEndOffset()); + + abortedTxns = segment.txnIndex().allAbortedTxns(); + assertEquals(1, abortedTxns.size()); + abortedTxn = abortedTxns.get(0); + assertEquals(pid2, abortedTxn.producerId()); + assertEquals(75L, abortedTxn.firstOffset()); + assertEquals(106L, abortedTxn.lastOffset()); + assertEquals(100L, abortedTxn.lastStableOffset()); + } } /** @@ -489,28 +506,28 @@ public void testRecoverTransactionIndex() throws Exception { */ @Test public void testRecoveryRebuildsEpochCache() throws Exception { - LogSegment seg = createSegment(0); + try (LogSegment seg = createSegment(0)) { + LeaderEpochCheckpointFile checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1)); - LeaderEpochCheckpointFile checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1)); + LeaderEpochFileCache cache = new LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new MockTime())); + seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L, MemoryRecords.withRecords(104L, Compression.NONE, 0, + new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); - LeaderEpochFileCache cache = new LeaderEpochFileCache(topicPartition, checkpoint, new MockScheduler(new MockTime())); - seg.append(105L, RecordBatch.NO_TIMESTAMP, 104L, MemoryRecords.withRecords(104L, Compression.NONE, 0, - new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); + seg.append(107L, RecordBatch.NO_TIMESTAMP, 106L, MemoryRecords.withRecords(106L, Compression.NONE, 1, + new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); - seg.append(107L, RecordBatch.NO_TIMESTAMP, 106L, MemoryRecords.withRecords(106L, Compression.NONE, 1, - new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); + seg.append(109L, RecordBatch.NO_TIMESTAMP, 108L, MemoryRecords.withRecords(108L, Compression.NONE, 1, + new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); - seg.append(109L, RecordBatch.NO_TIMESTAMP, 108L, MemoryRecords.withRecords(108L, Compression.NONE, 1, - new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); + seg.append(111L, RecordBatch.NO_TIMESTAMP, 110L, MemoryRecords.withRecords(110L, Compression.NONE, 2, + new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); - seg.append(111L, RecordBatch.NO_TIMESTAMP, 110L, MemoryRecords.withRecords(110L, Compression.NONE, 2, - new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes()))); - - seg.recover(newProducerStateManager(), Optional.of(cache)); - assertEquals(Arrays.asList( + seg.recover(newProducerStateManager(), Optional.of(cache)); + assertEquals(Arrays.asList( new EpochEntry(0, 104L), new EpochEntry(1, 106L), new EpochEntry(2, 110L)), cache.epochEntries()); + } } private MemoryRecords endTxnRecords( @@ -549,17 +566,18 @@ private MemoryRecords endTxnRecords( */ @Test public void testRecoveryFixesCorruptTimeIndex() throws IOException { - LogSegment seg = createSegment(0); - for (int i = 0; i < 100; i++) { - seg.append(i, i * 10, i, records(i, String.valueOf(i))); - } - File timeIndexFile = seg.timeIndexFile(); - writeNonsenseToFile(timeIndexFile, 5, (int) timeIndexFile.length()); - seg.recover(newProducerStateManager(), Optional.empty()); - for (int i = 0; i < 100; i++) { - assertEquals(i, seg.findOffsetByTimestamp(i * 10, 0L).get().offset); - if (i < 99) { - assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1, 0L).get().offset); + try (LogSegment seg = createSegment(0)) { + for (int i = 0; i < 100; i++) { + seg.append(i, i * 10, i, records(i, String.valueOf(i))); + } + File timeIndexFile = seg.timeIndexFile(); + writeNonsenseToFile(timeIndexFile, 5, (int) timeIndexFile.length()); + seg.recover(newProducerStateManager(), Optional.empty()); + for (int i = 0; i < 100; i++) { + assertEquals(i, seg.findOffsetByTimestamp(i * 10, 0L).get().offset); + if (i < 99) { + assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1, 0L).get().offset); + } } } } @@ -571,28 +589,29 @@ public void testRecoveryFixesCorruptTimeIndex() throws IOException { public void testRecoveryWithCorruptMessage() throws IOException { int messagesAppended = 20; for (int ignore = 0; ignore < 10; ignore++) { - LogSegment seg = createSegment(0); - for (int i = 0; i < messagesAppended; i++) { - seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, String.valueOf(i))); + try (LogSegment seg = createSegment(0)) { + for (int i = 0; i < messagesAppended; i++) { + seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, String.valueOf(i))); + } + int offsetToBeginCorruption = TestUtils.RANDOM.nextInt(messagesAppended); + // start corrupting somewhere in the middle of the chosen record all the way to the end + + FileRecords.LogOffsetPosition recordPosition = seg.log().searchForOffsetWithSize(offsetToBeginCorruption, 0); + int position = recordPosition.position + TestUtils.RANDOM.nextInt(15); + writeNonsenseToFile(seg.log().file(), position, (int) (seg.log().file().length() - position)); + seg.recover(newProducerStateManager(), Optional.empty()); + + List expectList = new ArrayList<>(); + for (long j = 0; j < offsetToBeginCorruption; j++) { + expectList.add(j); + } + List actualList = new ArrayList<>(); + for (FileLogInputStream.FileChannelRecordBatch batch : seg.log().batches()) { + actualList.add(batch.lastOffset()); + } + assertEquals(expectList, actualList, "Should have truncated off bad messages."); + seg.deleteIfExists(); } - int offsetToBeginCorruption = TestUtils.RANDOM.nextInt(messagesAppended); - // start corrupting somewhere in the middle of the chosen record all the way to the end - - FileRecords.LogOffsetPosition recordPosition = seg.log().searchForOffsetWithSize(offsetToBeginCorruption, 0); - int position = recordPosition.position + TestUtils.RANDOM.nextInt(15); - writeNonsenseToFile(seg.log().file(), position, (int) (seg.log().file().length() - position)); - seg.recover(newProducerStateManager(), Optional.empty()); - - List expectList = new ArrayList<>(); - for (long j = 0; j < offsetToBeginCorruption; j++) { - expectList.add(j); - } - List actualList = new ArrayList<>(); - for (FileLogInputStream.FileChannelRecordBatch batch : seg.log().batches()) { - actualList.add(batch.lastOffset()); - } - assertEquals(expectList, actualList, "Should have truncated off bad messages."); - seg.deleteIfExists(); } } @@ -614,13 +633,14 @@ private LogSegment createSegment(long baseOffset, boolean fileAlreadyExists, int /* create a segment with pre allocate, put message to it and verify */ @Test public void testCreateWithInitFileSizeAppendMessage() throws IOException { - LogSegment seg = createSegment(40, false, 512 * 1024 * 1024, true); - MemoryRecords ms = records(50, "hello", "there"); - seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); - MemoryRecords ms2 = records(60, "alpha", "beta"); - seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2); - FetchDataInfo read = seg.read(55, 200); - checkEquals(ms2.records().iterator(), read.records.records().iterator()); + try (LogSegment seg = createSegment(40, false, 512 * 1024 * 1024, true)) { + MemoryRecords ms = records(50, "hello", "there"); + seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); + MemoryRecords ms2 = records(60, "alpha", "beta"); + seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2); + FetchDataInfo read = seg.read(55, 200); + checkEquals(ms2.records().iterator(), read.records.records().iterator()); + } } /* create a segment with pre allocate and clearly shut down*/ @@ -636,35 +656,34 @@ public void testCreateWithInitFileSizeClearShutdown() throws IOException { configMap.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, 0); LogConfig logConfig = new LogConfig(configMap); - LogSegment seg = LogSegment.open(tempDir, 40, logConfig, Time.SYSTEM, - 512 * 1024 * 1024, true); - - MemoryRecords ms = records(50, "hello", "there"); - seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); - MemoryRecords ms2 = records(60, "alpha", "beta"); - seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2); - FetchDataInfo read = seg.read(55, 200); - checkEquals(ms2.records().iterator(), read.records.records().iterator()); - long oldSize = seg.log().sizeInBytes(); - long oldPosition = seg.log().channel().position(); - long oldFileSize = seg.log().file().length(); - assertEquals(512 * 1024 * 1024, oldFileSize); - seg.close(); - // After close, file should be trimmed - assertEquals(oldSize, seg.log().file().length()); - - LogSegment segReopen = LogSegment.open(tempDir, 40, logConfig, Time.SYSTEM, - true, 512 * 1024 * 1024, true, ""); - segments.add(segReopen); - - FetchDataInfo readAgain = segReopen.read(55, 200); - checkEquals(ms2.records().iterator(), readAgain.records.records().iterator()); - long size = segReopen.log().sizeInBytes(); - long position = segReopen.log().channel().position(); - long fileSize = segReopen.log().file().length(); - assertEquals(oldPosition, position); - assertEquals(oldSize, size); - assertEquals(size, fileSize); + try (LogSegment seg = LogSegment.open(tempDir, 40, logConfig, Time.SYSTEM, 512 * 1024 * 1024, true)) { + MemoryRecords ms = records(50, "hello", "there"); + seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms); + MemoryRecords ms2 = records(60, "alpha", "beta"); + seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2); + FetchDataInfo read = seg.read(55, 200); + checkEquals(ms2.records().iterator(), read.records.records().iterator()); + long oldSize = seg.log().sizeInBytes(); + long oldPosition = seg.log().channel().position(); + long oldFileSize = seg.log().file().length(); + assertEquals(512 * 1024 * 1024, oldFileSize); + seg.close(); + // After close, file should be trimmed + assertEquals(oldSize, seg.log().file().length()); + + LogSegment segReopen = LogSegment.open(tempDir, 40, logConfig, Time.SYSTEM, + true, 512 * 1024 * 1024, true, ""); + segments.add(segReopen); + + FetchDataInfo readAgain = segReopen.read(55, 200); + checkEquals(ms2.records().iterator(), readAgain.records.records().iterator()); + long size = segReopen.log().sizeInBytes(); + long position = segReopen.log().channel().position(); + long fileSize = segReopen.log().file().length(); + assertEquals(oldPosition, position); + assertEquals(oldSize, size); + assertEquals(size, fileSize); + } } private MemoryRecords recordsForTruncateEven(long offset, String record) { @@ -674,22 +693,23 @@ private MemoryRecords recordsForTruncateEven(long offset, String record) { @Test public void shouldTruncateEvenIfOffsetPointsToAGapInTheLog() throws IOException { - LogSegment seg = createSegment(40); - long offset = 40; + try (LogSegment seg = createSegment(40)) { + long offset = 40; - // Given two messages with a gap between them (e.g. mid offset compacted away) - MemoryRecords ms1 = recordsForTruncateEven(offset, "first message"); - seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1); - MemoryRecords ms2 = recordsForTruncateEven(offset + 3, "message after gap"); - seg.append(offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2); + // Given two messages with a gap between them (e.g. mid offset compacted away) + MemoryRecords ms1 = recordsForTruncateEven(offset, "first message"); + seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1); + MemoryRecords ms2 = recordsForTruncateEven(offset + 3, "message after gap"); + seg.append(offset + 3, RecordBatch.NO_TIMESTAMP, -1L, ms2); - // When we truncate to an offset without a corresponding log entry - seg.truncateTo(offset + 1); + // When we truncate to an offset without a corresponding log entry + seg.truncateTo(offset + 1); - // Then we should still truncate the record that was present (i.e. offset + 3 is gone) - FetchDataInfo log = seg.read(offset, 10000); - assertEquals(offset, log.records.batches().iterator().next().baseOffset()); - assertEquals(1, iteratorToList(log.records.batches().iterator()).size()); + // Then we should still truncate the record that was present (i.e. offset + 3 is gone) + FetchDataInfo log = seg.read(offset, 10000); + assertEquals(offset, log.records.batches().iterator().next().baseOffset()); + assertEquals(1, iteratorToList(log.records.batches().iterator()).size()); + } } private MemoryRecords records(long offset, int size) { @@ -710,35 +730,36 @@ public void testAppendFromFile() throws IOException { fileRecords.append(records(Integer.MAX_VALUE + 5L, 1024)); long sizeAfterOverflow = fileRecords.sizeInBytes(); - LogSegment segment = createSegment(0); - long bytesAppended = segment.appendFromFile(fileRecords, 0); - assertEquals(sizeBeforeOverflow, bytesAppended); - assertEquals(sizeBeforeOverflow, segment.size()); + try (LogSegment segment = createSegment(0)) { + long bytesAppended = segment.appendFromFile(fileRecords, 0); + assertEquals(sizeBeforeOverflow, bytesAppended); + assertEquals(sizeBeforeOverflow, segment.size()); + } - LogSegment overflowSegment = createSegment(Integer.MAX_VALUE); - long overflowBytesAppended = overflowSegment.appendFromFile(fileRecords, (int) sizeBeforeOverflow); - assertEquals(sizeAfterOverflow - sizeBeforeOverflow, overflowBytesAppended); - assertEquals(overflowBytesAppended, overflowSegment.size()); + try (LogSegment overflowSegment = createSegment(Integer.MAX_VALUE)) { + long overflowBytesAppended = overflowSegment.appendFromFile(fileRecords, (int) sizeBeforeOverflow); + assertEquals(sizeAfterOverflow - sizeBeforeOverflow, overflowBytesAppended); + assertEquals(overflowBytesAppended, overflowSegment.size()); + } Utils.delete(tempDir); } @Test public void testGetFirstBatchTimestamp() throws IOException { - LogSegment segment = createSegment(1); - assertEquals(Long.MAX_VALUE, segment.getFirstBatchTimestamp()); - - segment.append(1, 1000L, 1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord("one".getBytes()))); - assertEquals(1000L, segment.getFirstBatchTimestamp()); + try (LogSegment segment = createSegment(1)) { + assertEquals(Long.MAX_VALUE, segment.getFirstBatchTimestamp()); - segment.close(); + segment.append(1, 1000L, 1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord("one".getBytes()))); + assertEquals(1000L, segment.getFirstBatchTimestamp()); + } } private ProducerStateManager newProducerStateManager() throws IOException { return new ProducerStateManager( topicPartition, logDir, - 5 * 60 * 1000, + (int) (Duration.ofMinutes(5).toMillis()), new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), new MockTime() );