Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
<suppress checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log.remote;

class CustomMetadataSizeLimitExceededException extends Exception {
}
39 changes: 34 additions & 5 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
Expand Down Expand Up @@ -480,12 +481,14 @@ InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long star
class RLMTask extends CancellableRunnable {

private final TopicIdPartition topicIdPartition;
private final int customMetadataSizeLimit;
private final Logger logger;

private volatile int leaderEpoch = -1;

public RLMTask(TopicIdPartition topicIdPartition) {
public RLMTask(TopicIdPartition topicIdPartition, int customMetadataSizeLimit) {
this.topicIdPartition = topicIdPartition;
this.customMetadataSizeLimit = customMetadataSizeLimit;
LogContext logContext = new LogContext("[RemoteLogManager=" + brokerId + " partition=" + topicIdPartition + "] ");
logger = logContext.logger(RLMTask.class);
}
Expand Down Expand Up @@ -586,6 +589,11 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
} else {
logger.debug("Skipping copying segments, current read-offset:{}, and LSO:{}", copiedOffset, lso);
}
} catch (CustomMetadataSizeLimitExceededException e) {
// Only stop this task. Logging is done where the exception is thrown.
brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
this.cancel();
} catch (InterruptedException ex) {
throw ex;
} catch (Exception ex) {
Expand All @@ -597,7 +605,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
}
}

private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException {
private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException,
CustomMetadataSizeLimitExceededException {
File logFile = segment.log().file();
String logFileName = logFile.getName();

Expand All @@ -623,10 +632,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
Optional<CustomMetadata> customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);

RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);

if (customMetadata.isPresent()) {
long customMetadataSize = customMetadata.get().value().length;
if (customMetadataSize > this.customMetadataSizeLimit) {
CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException();
logger.error("Custom metadata size {} exceeds configured limit {}." +
" Copying will be stopped and copied segment will be attempted to clean." +
" Original metadata: {}",
customMetadataSize, this.customMetadataSizeLimit, copySegmentStartedRlsm, e);
try {
// For deletion, we provide back the custom metadata by creating a new metadata object from the update.
// However, the update itself will not be stored in this case.
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not we need to add respective delete_segment_started and delete_segment finished events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's my reasoning why not: #13984 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks reasonable to me.

logger.info("Successfully cleaned segment after custom metadata size exceeded");
} catch (RemoteStorageException e1) {
logger.error("Error while cleaning segment after custom metadata size exceeded, consider cleaning manually", e1);
}
throw e;
}
}

remoteLogMetadataManager.updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get();
brokerTopicStats.topicStats(log.topicPartition().topic())
Expand Down Expand Up @@ -883,7 +912,7 @@ void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
Consumer<RLMTask> convertToLeaderOrFollower) {
RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition,
topicIdPartition -> {
RLMTask task = new RLMTask(topicIdPartition);
RLMTask task = new RLMTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
// set this upfront when it is getting initialized instead of doing it after scheduling.
convertToLeaderOrFollower.accept(task);
LOGGER.info("Created a new task: {} and getting scheduled", task);
Expand Down
111 changes: 103 additions & 8 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
Expand Down Expand Up @@ -106,7 +107,6 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -342,7 +342,8 @@ private void assertCopyExpectedLogSegmentsToRemote(long oldSegmentStartOffset,
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
doNothing().when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
.thenReturn(Optional.empty());

// Verify the metrics for remote writes and for failures is zero before attempt to copy log segment
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
Expand All @@ -353,7 +354,7 @@ private void assertCopyExpectedLogSegmentsToRemote(long oldSegmentStartOffset,
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(2);
task.copyLogSegmentsToRemote(mockLog);

Expand Down Expand Up @@ -397,6 +398,100 @@ private void assertCopyExpectedLogSegmentsToRemote(long oldSegmentStartOffset,
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
}

// We are verifying that if the size of a piece of custom metadata is bigger than the configured limit,
// the copy task should be cancelled and there should be an attempt to delete the just copied segment.
@Test
void testCustomMetadataSizeExceedsLimit() throws Exception {
long oldSegmentStartOffset = 0L;
long nextSegmentStartOffset = 150L;
long lastStableOffset = 150L;
long logEndOffset = 150L;

when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());

// leader epoch preparation
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(-1L));

File tempFile = TestUtils.tempFile();
File mockProducerSnapshotIndex = TestUtils.tempFile();
File tempDir = TestUtils.tempDirectory();
// create 2 log segments, with 0 and 150 as log start offset
LogSegment oldSegment = mock(LogSegment.class);
LogSegment activeSegment = mock(LogSegment.class);

when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
verify(oldSegment, times(0)).readNextOffset();
verify(activeSegment, times(0)).readNextOffset();

FileRecords fileRecords = mock(FileRecords.class);
when(oldSegment.log()).thenReturn(fileRecords);
when(fileRecords.file()).thenReturn(tempFile);
when(fileRecords.sizeInBytes()).thenReturn(10);
when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);

when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));

ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
when(mockLog.producerStateManager()).thenReturn(mockStateManager);
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
when(mockLog.logEndOffset()).thenReturn(logEndOffset);

LazyIndex idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000);
LazyIndex timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500);
File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
txnFile.createNewFile();
TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
when(oldSegment.lazyTimeIndex()).thenReturn(timeIdx);
when(oldSegment.lazyOffsetIndex()).thenReturn(idx);
when(oldSegment.txnIndex()).thenReturn(txnIndex);

int customMetadataSizeLimit = 128;
CustomMetadata customMetadata = new CustomMetadata(new byte[customMetadataSizeLimit * 2]);

CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
.thenReturn(Optional.of(customMetadata));

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, customMetadataSizeLimit);
task.convertToLeader(2);
task.copyLogSegmentsToRemote(mockLog);

ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
verify(remoteLogMetadataManager).addRemoteLogSegmentMetadata(remoteLogSegmentMetadataArg.capture());

// Check we attempt to delete the segment data providing the custom metadata back.
RemoteLogSegmentMetadataUpdate expectedMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
remoteLogSegmentMetadataArg.getValue().remoteLogSegmentId(), time.milliseconds(),
Optional.of(customMetadata), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
RemoteLogSegmentMetadata expectedDeleteMetadata = remoteLogSegmentMetadataArg.getValue().createWithUpdates(expectedMetadataUpdate);
verify(remoteStorageManager, times(1)).deleteLogSegmentData(eq(expectedDeleteMetadata));

// Check the task is cancelled in the end.
assertTrue(task.isCancelled());

// The metadata update should not be posted.
verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));

// Verify the metric for remote writes are not updated.
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
// Verify we did not report any failure for remote writes
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
// Verify aggregate metrics
assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
}

@Test
void testRemoteLogManagerTasksAvgIdlePercentMetrics() throws Exception {
long oldSegmentStartOffset = 0L;
Expand Down Expand Up @@ -532,7 +627,7 @@ void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception {
// Verify aggregate metrics
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(2);
task.copyLogSegmentsToRemote(mockLog);

Expand Down Expand Up @@ -572,7 +667,7 @@ void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Excepti
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
when(mockLog.lastStableOffset()).thenReturn(250L);

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToFollower();
task.copyLogSegmentsToRemote(mockLog);

Expand Down Expand Up @@ -714,7 +809,7 @@ private MemoryRecords records(long timestamp,

@Test
void testRLMTaskShouldSetLeaderEpochCorrectly() {
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
assertFalse(task.isLeader());
task.convertToLeader(1);
assertTrue(task.isLeader());
Expand Down Expand Up @@ -862,7 +957,7 @@ public void testCandidateLogSegmentsSkipsActiveSegment() {
when(log.logSegments(5L, Long.MAX_VALUE))
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, activeSegment)));

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
List<RemoteLogManager.EnrichedLogSegment> expected =
Arrays.asList(
new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
Expand All @@ -888,7 +983,7 @@ public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() {
when(log.logSegments(5L, Long.MAX_VALUE))
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, segment3, activeSegment)));

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
List<RemoteLogManager.EnrichedLogSegment> expected =
Arrays.asList(
new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
Expand Down
Loading