Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
throw ex;
} catch (Exception ex) {
if (!isCancelled()) {
brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteWriteRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().mark();
brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
logger.error("Error occurred while copying log segments of partition: {}", topicIdPartition, ex);
}
}
Expand Down Expand Up @@ -619,17 +619,17 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment
LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), toPathIfExists(segment.lazyOffsetIndex().get().file()),
toPathIfExists(segment.lazyTimeIndex().get().file()), Optional.ofNullable(toPathIfExists(segment.txnIndex().file())),
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);

RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);

remoteLogMetadataManager.updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get();
brokerTopicStats.topicStats(log.topicPartition().topic())
.remoteBytesOutRate().mark(copySegmentStartedRlsm.segmentSizeInBytes());
brokerTopicStats.allTopicsStats().remoteBytesOutRate().mark(copySegmentStartedRlsm.segmentSizeInBytes());
.remoteCopyBytesRate().mark(copySegmentStartedRlsm.segmentSizeInBytes());
brokerTopicStats.allTopicsStats().remoteCopyBytesRate().mark(copySegmentStartedRlsm.segmentSizeInBytes());
copiedOffsetOption = OptionalLong.of(endOffset);
log.updateHighestOffsetInRemoteStorage(endOffset);
logger.info("Copied {} to remote storage with segment-id: {}", logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/java/kafka/log/remote/RemoteLogReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
this.rlm = rlm;
this.brokerTopicStats = brokerTopicStats;
this.callback = callback;
this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteReadRequestRate().mark();
this.brokerTopicStats.allTopicsStats().remoteReadRequestRate().mark();
this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchRequestRate().mark();
this.brokerTopicStats.allTopicsStats().remoteFetchRequestRate().mark();
logger = new LogContext() {
@Override
public String logPrefix() {
Expand All @@ -60,14 +60,14 @@ public Void call() {
logger.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicPartition);

FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes());
brokerTopicStats.allTopicsStats().remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes());
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
brokerTopicStats.allTopicsStats().remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty());
} catch (OffsetOutOfRangeException e) {
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
} catch (Exception e) {
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).failedRemoteReadRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteReadRequestRate().mark();
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).failedRemoteFetchRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteFetchRequestRate().mark();
logger.error("Error occurred while reading the remote data for {}", fetchInfo.topicPartition, e);
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
}
Expand Down
48 changes: 24 additions & 24 deletions core/src/main/scala/kafka/server/KafkaRequestHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,12 @@ class BrokerTopicMetrics(name: Option[String]) {
BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
BrokerTopicStats.RemoteBytesOutPerSec -> MeterWrapper(BrokerTopicStats.RemoteBytesOutPerSec, "bytes"),
BrokerTopicStats.RemoteBytesInPerSec -> MeterWrapper(BrokerTopicStats.RemoteBytesInPerSec, "bytes"),
BrokerTopicStats.RemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteReadRequestsPerSec, "requests"),
BrokerTopicStats.RemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteWriteRequestsPerSec, "requests"),
BrokerTopicStats.FailedRemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteReadRequestsPerSec, "requests"),
BrokerTopicStats.FailedRemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteWriteRequestsPerSec, "requests"),
BrokerTopicStats.RemoteCopyBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"),
BrokerTopicStats.RemoteFetchBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchBytesPerSec, "bytes"),
BrokerTopicStats.RemoteFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchRequestsPerSec, "requests"),
BrokerTopicStats.RemoteCopyRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyRequestsPerSec, "requests"),
BrokerTopicStats.FailedRemoteFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteFetchRequestsPerSec, "requests"),
BrokerTopicStats.FailedRemoteCopyRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteCopyRequestsPerSec, "requests"),
BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec -> MeterWrapper(BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMagicNumberRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMagicNumberRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMessageCrcRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMessageCrcRecordsPerSec, "requests"),
Expand Down Expand Up @@ -342,17 +342,17 @@ class BrokerTopicMetrics(name: Option[String]) {

def invalidOffsetOrSequenceRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()

def remoteBytesOutRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteBytesOutPerSec).meter()
def remoteCopyBytesRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteCopyBytesPerSec).meter()

def remoteBytesInRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteBytesInPerSec).meter()
def remoteFetchBytesRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteFetchBytesPerSec).meter()

def remoteReadRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteReadRequestsPerSec).meter()
def remoteFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteFetchRequestsPerSec).meter()

def remoteWriteRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteWriteRequestsPerSec).meter()
def remoteCopyRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteCopyRequestsPerSec).meter()

def failedRemoteReadRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteReadRequestsPerSec).meter()
def failedRemoteFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteFetchRequestsPerSec).meter()

def failedRemoteWriteRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteWriteRequestsPerSec).meter()
def failedRemoteCopyRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteCopyRequestsPerSec).meter()

def closeMetric(metricType: String): Unit = {
val meter = metricTypeMap.get(metricType)
Expand All @@ -378,12 +378,12 @@ object BrokerTopicStats {
val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec"
val ReassignmentBytesInPerSec = "ReassignmentBytesInPerSec"
val ReassignmentBytesOutPerSec = "ReassignmentBytesOutPerSec"
val RemoteBytesOutPerSec = "RemoteBytesOutPerSec"
val RemoteBytesInPerSec = "RemoteBytesInPerSec"
val RemoteReadRequestsPerSec = "RemoteReadRequestsPerSec"
val RemoteWriteRequestsPerSec = "RemoteWriteRequestsPerSec"
val FailedRemoteReadRequestsPerSec = "RemoteReadErrorsPerSec"
val FailedRemoteWriteRequestsPerSec = "RemoteWriteErrorsPerSec"
val RemoteCopyBytesPerSec = "RemoteCopyBytesPerSec"
val RemoteFetchBytesPerSec = "RemoteFetchBytesPerSec"
val RemoteFetchRequestsPerSec = "RemoteFetchRequestsPerSec"
val RemoteCopyRequestsPerSec = "RemoteCopyRequestsPerSec"
val FailedRemoteFetchRequestsPerSec = "RemoteFetchErrorsPerSec"
val FailedRemoteCopyRequestsPerSec = "RemoteCopyErrorsPerSec"

// These following topics are for LogValidator for better debugging on failed records
val NoKeyCompactedTopicRecordsPerSec = "NoKeyCompactedTopicRecordsPerSec"
Expand Down Expand Up @@ -439,12 +439,12 @@ class BrokerTopicStats extends Logging {
topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesOutPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteBytesOutPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteBytesInPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteReadRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteWriteRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteReadRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteWriteRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyBytesPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchBytesPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteFetchRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteCopyRequestsPerSec)
}
}

Expand Down
40 changes: 20 additions & 20 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,13 @@ void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception
doNothing().when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));

// Verify the metrics for remote writes and for failures is zero before attempt to copy log segment
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteBytesOutRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
// Verify aggregate metrics
assertEquals(0, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().remoteBytesOutRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
task.convertToLeader(2);
Expand Down Expand Up @@ -362,14 +362,14 @@ void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception
assertEquals(oldSegmentEndOffset, argument.getValue());

// Verify the metric for remote writes is updated correctly
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count());
assertEquals(10, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteBytesOutRate().count());
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
assertEquals(10, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
// Verify we did not report any failure for remote writes
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
// Verify aggregate metrics
assertEquals(1, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
assertEquals(10, brokerTopicStats.allTopicsStats().remoteBytesOutRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(10, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
}

@Test
Expand Down Expand Up @@ -502,11 +502,11 @@ void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception {
doThrow(new RuntimeException()).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));

// Verify the metrics for remote write requests/failures is zero before attempt to copy log segment
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
// Verify aggregate metrics
assertEquals(0, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
task.convertToLeader(2);
task.copyLogSegmentsToRemote(mockLog);
Expand All @@ -517,11 +517,11 @@ void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception {
// Verify we should not have updated the highest offset because of write failure
verify(mockLog, times(0)).updateHighestOffsetInRemoteStorage(anyLong());
// Verify the metric for remote write requests/failures was updated.
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count());
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count());
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
// Verify aggregate metrics
assertEquals(1, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
}

@Test
Expand Down
24 changes: 12 additions & 12 deletions core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOE
assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo.get());

// Verify metrics for remote reads are updated correctly
assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteReadRequestRate().count());
assertEquals(100, brokerTopicStats.topicStats(TOPIC).remoteBytesInRate().count());
assertEquals(0, brokerTopicStats.topicStats(TOPIC).failedRemoteReadRequestRate().count());
assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteFetchRequestRate().count());
assertEquals(100, brokerTopicStats.topicStats(TOPIC).remoteFetchBytesRate().count());
assertEquals(0, brokerTopicStats.topicStats(TOPIC).failedRemoteFetchRequestRate().count());
// Verify aggregate metrics
assertEquals(1, brokerTopicStats.allTopicsStats().remoteReadRequestRate().count());
assertEquals(100, brokerTopicStats.allTopicsStats().remoteBytesInRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteReadRequestRate().count());
assertEquals(1, brokerTopicStats.allTopicsStats().remoteFetchRequestRate().count());
assertEquals(100, brokerTopicStats.allTopicsStats().remoteFetchBytesRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteFetchRequestRate().count());
}

@Test
Expand All @@ -99,12 +99,12 @@ public void testRemoteLogReaderWithError() throws RemoteStorageException, IOExce
assertFalse(actualRemoteLogReadResult.fetchDataInfo.isPresent());

// Verify metrics for remote reads are updated correctly
assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteReadRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(TOPIC).remoteBytesInRate().count());
assertEquals(1, brokerTopicStats.topicStats(TOPIC).failedRemoteReadRequestRate().count());
assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteFetchRequestRate().count());
assertEquals(0, brokerTopicStats.topicStats(TOPIC).remoteFetchBytesRate().count());
assertEquals(1, brokerTopicStats.topicStats(TOPIC).failedRemoteFetchRequestRate().count());
// Verify aggregate metrics
assertEquals(1, brokerTopicStats.allTopicsStats().remoteReadRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().remoteBytesInRate().count());
assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteReadRequestRate().count());
assertEquals(1, brokerTopicStats.allTopicsStats().remoteFetchRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().remoteFetchBytesRate().count());
assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteFetchRequestRate().count());
}
}