diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index ea065b8c8126f..6a1e73dbbe17a 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -576,8 +576,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException throw ex; } catch (Exception ex) { if (!isCancelled()) { - brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteWriteRequestRate().mark(); - brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().mark(); + brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark(); + brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark(); logger.error("Error occurred while copying log segments of partition: {}", topicIdPartition, ex); } } @@ -619,8 +619,8 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), toPathIfExists(segment.lazyOffsetIndex().get().file()), toPathIfExists(segment.lazyTimeIndex().get().file()), Optional.ofNullable(toPathIfExists(segment.txnIndex().file())), producerStateSnapshotFile.toPath(), leaderEpochsIndex); - brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark(); - brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark(); + brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark(); + brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark(); remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), @@ -628,8 +628,8 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment remoteLogMetadataManager.updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get(); brokerTopicStats.topicStats(log.topicPartition().topic()) - .remoteBytesOutRate().mark(copySegmentStartedRlsm.segmentSizeInBytes()); - brokerTopicStats.allTopicsStats().remoteBytesOutRate().mark(copySegmentStartedRlsm.segmentSizeInBytes()); + .remoteCopyBytesRate().mark(copySegmentStartedRlsm.segmentSizeInBytes()); + brokerTopicStats.allTopicsStats().remoteCopyBytesRate().mark(copySegmentStartedRlsm.segmentSizeInBytes()); copiedOffsetOption = OptionalLong.of(endOffset); log.updateHighestOffsetInRemoteStorage(endOffset); logger.info("Copied {} to remote storage with segment-id: {}", logFileName, copySegmentFinishedRlsm.remoteLogSegmentId()); diff --git a/core/src/main/java/kafka/log/remote/RemoteLogReader.java b/core/src/main/java/kafka/log/remote/RemoteLogReader.java index b4cea4fa81ecc..5d24b2bbbdfbe 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogReader.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogReader.java @@ -43,8 +43,8 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo, this.rlm = rlm; this.brokerTopicStats = brokerTopicStats; this.callback = callback; - this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteReadRequestRate().mark(); - this.brokerTopicStats.allTopicsStats().remoteReadRequestRate().mark(); + this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchRequestRate().mark(); + this.brokerTopicStats.allTopicsStats().remoteFetchRequestRate().mark(); logger = new LogContext() { @Override public String logPrefix() { @@ -60,14 +60,14 @@ public Void call() { logger.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicPartition); FetchDataInfo fetchDataInfo = rlm.read(fetchInfo); - brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes()); - brokerTopicStats.allTopicsStats().remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes()); + brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes()); + brokerTopicStats.allTopicsStats().remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes()); result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty()); } catch (OffsetOutOfRangeException e) { result = new RemoteLogReadResult(Optional.empty(), Optional.of(e)); } catch (Exception e) { - brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).failedRemoteReadRequestRate().mark(); - brokerTopicStats.allTopicsStats().failedRemoteReadRequestRate().mark(); + brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).failedRemoteFetchRequestRate().mark(); + brokerTopicStats.allTopicsStats().failedRemoteFetchRequestRate().mark(); logger.error("Error occurred while reading the remote data for {}", fetchInfo.topicPartition, e); result = new RemoteLogReadResult(Optional.empty(), Optional.of(e)); } diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index f0de624562baa..7a9f166b56df0 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -277,12 +277,12 @@ class BrokerTopicMetrics(name: Option[String]) { BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"), BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"), BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"), - BrokerTopicStats.RemoteBytesOutPerSec -> MeterWrapper(BrokerTopicStats.RemoteBytesOutPerSec, "bytes"), - BrokerTopicStats.RemoteBytesInPerSec -> MeterWrapper(BrokerTopicStats.RemoteBytesInPerSec, "bytes"), - BrokerTopicStats.RemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteReadRequestsPerSec, "requests"), - BrokerTopicStats.RemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteWriteRequestsPerSec, "requests"), - BrokerTopicStats.FailedRemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteReadRequestsPerSec, "requests"), - BrokerTopicStats.FailedRemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteWriteRequestsPerSec, "requests"), + BrokerTopicStats.RemoteCopyBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"), + BrokerTopicStats.RemoteFetchBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchBytesPerSec, "bytes"), + BrokerTopicStats.RemoteFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchRequestsPerSec, "requests"), + BrokerTopicStats.RemoteCopyRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyRequestsPerSec, "requests"), + BrokerTopicStats.FailedRemoteFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteFetchRequestsPerSec, "requests"), + BrokerTopicStats.FailedRemoteCopyRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteCopyRequestsPerSec, "requests"), BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec -> MeterWrapper(BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec, "requests"), BrokerTopicStats.InvalidMagicNumberRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMagicNumberRecordsPerSec, "requests"), BrokerTopicStats.InvalidMessageCrcRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMessageCrcRecordsPerSec, "requests"), @@ -342,17 +342,17 @@ class BrokerTopicMetrics(name: Option[String]) { def invalidOffsetOrSequenceRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter() - def remoteBytesOutRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteBytesOutPerSec).meter() + def remoteCopyBytesRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteCopyBytesPerSec).meter() - def remoteBytesInRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteBytesInPerSec).meter() + def remoteFetchBytesRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteFetchBytesPerSec).meter() - def remoteReadRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteReadRequestsPerSec).meter() + def remoteFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteFetchRequestsPerSec).meter() - def remoteWriteRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteWriteRequestsPerSec).meter() + def remoteCopyRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteCopyRequestsPerSec).meter() - def failedRemoteReadRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteReadRequestsPerSec).meter() + def failedRemoteFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteFetchRequestsPerSec).meter() - def failedRemoteWriteRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteWriteRequestsPerSec).meter() + def failedRemoteCopyRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteCopyRequestsPerSec).meter() def closeMetric(metricType: String): Unit = { val meter = metricTypeMap.get(metricType) @@ -378,12 +378,12 @@ object BrokerTopicStats { val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec" val ReassignmentBytesInPerSec = "ReassignmentBytesInPerSec" val ReassignmentBytesOutPerSec = "ReassignmentBytesOutPerSec" - val RemoteBytesOutPerSec = "RemoteBytesOutPerSec" - val RemoteBytesInPerSec = "RemoteBytesInPerSec" - val RemoteReadRequestsPerSec = "RemoteReadRequestsPerSec" - val RemoteWriteRequestsPerSec = "RemoteWriteRequestsPerSec" - val FailedRemoteReadRequestsPerSec = "RemoteReadErrorsPerSec" - val FailedRemoteWriteRequestsPerSec = "RemoteWriteErrorsPerSec" + val RemoteCopyBytesPerSec = "RemoteCopyBytesPerSec" + val RemoteFetchBytesPerSec = "RemoteFetchBytesPerSec" + val RemoteFetchRequestsPerSec = "RemoteFetchRequestsPerSec" + val RemoteCopyRequestsPerSec = "RemoteCopyRequestsPerSec" + val FailedRemoteFetchRequestsPerSec = "RemoteFetchErrorsPerSec" + val FailedRemoteCopyRequestsPerSec = "RemoteCopyErrorsPerSec" // These following topics are for LogValidator for better debugging on failed records val NoKeyCompactedTopicRecordsPerSec = "NoKeyCompactedTopicRecordsPerSec" @@ -439,12 +439,12 @@ class BrokerTopicStats extends Logging { topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec) topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec) topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesOutPerSec) - topicMetrics.closeMetric(BrokerTopicStats.RemoteBytesOutPerSec) - topicMetrics.closeMetric(BrokerTopicStats.RemoteBytesInPerSec) - topicMetrics.closeMetric(BrokerTopicStats.RemoteReadRequestsPerSec) - topicMetrics.closeMetric(BrokerTopicStats.RemoteWriteRequestsPerSec) - topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteReadRequestsPerSec) - topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteWriteRequestsPerSec) + topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyBytesPerSec) + topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchBytesPerSec) + topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchRequestsPerSec) + topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyRequestsPerSec) + topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteFetchRequestsPerSec) + topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteCopyRequestsPerSec) } } diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index c51a34fe3648d..bd4ba87b3e971 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -320,13 +320,13 @@ void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception doNothing().when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); // Verify the metrics for remote writes and for failures is zero before attempt to copy log segment - assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count()); - assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteBytesOutRate().count()); - assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count()); + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count()); + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count()); + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count()); // Verify aggregate metrics - assertEquals(0, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count()); - assertEquals(0, brokerTopicStats.allTopicsStats().remoteBytesOutRate().count()); - assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count()); + assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count()); + assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count()); + assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count()); RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition); task.convertToLeader(2); @@ -362,14 +362,14 @@ void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception assertEquals(oldSegmentEndOffset, argument.getValue()); // Verify the metric for remote writes is updated correctly - assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count()); - assertEquals(10, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteBytesOutRate().count()); + assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count()); + assertEquals(10, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count()); // Verify we did not report any failure for remote writes - assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count()); + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count()); // Verify aggregate metrics - assertEquals(1, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count()); - assertEquals(10, brokerTopicStats.allTopicsStats().remoteBytesOutRate().count()); - assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count()); + assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count()); + assertEquals(10, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count()); + assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count()); } @Test @@ -502,11 +502,11 @@ void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception { doThrow(new RuntimeException()).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)); // Verify the metrics for remote write requests/failures is zero before attempt to copy log segment - assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count()); - assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count()); + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count()); + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count()); // Verify aggregate metrics - assertEquals(0, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count()); - assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count()); + assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count()); + assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count()); RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition); task.convertToLeader(2); task.copyLogSegmentsToRemote(mockLog); @@ -517,11 +517,11 @@ void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception { // Verify we should not have updated the highest offset because of write failure verify(mockLog, times(0)).updateHighestOffsetInRemoteStorage(anyLong()); // Verify the metric for remote write requests/failures was updated. - assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count()); - assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count()); + assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count()); + assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count()); // Verify aggregate metrics - assertEquals(1, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count()); - assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count()); + assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count()); + assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count()); } @Test diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java index 36533fed22df8..3e8596f93cd68 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java @@ -73,13 +73,13 @@ public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOE assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo.get()); // Verify metrics for remote reads are updated correctly - assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteReadRequestRate().count()); - assertEquals(100, brokerTopicStats.topicStats(TOPIC).remoteBytesInRate().count()); - assertEquals(0, brokerTopicStats.topicStats(TOPIC).failedRemoteReadRequestRate().count()); + assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteFetchRequestRate().count()); + assertEquals(100, brokerTopicStats.topicStats(TOPIC).remoteFetchBytesRate().count()); + assertEquals(0, brokerTopicStats.topicStats(TOPIC).failedRemoteFetchRequestRate().count()); // Verify aggregate metrics - assertEquals(1, brokerTopicStats.allTopicsStats().remoteReadRequestRate().count()); - assertEquals(100, brokerTopicStats.allTopicsStats().remoteBytesInRate().count()); - assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteReadRequestRate().count()); + assertEquals(1, brokerTopicStats.allTopicsStats().remoteFetchRequestRate().count()); + assertEquals(100, brokerTopicStats.allTopicsStats().remoteFetchBytesRate().count()); + assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteFetchRequestRate().count()); } @Test @@ -99,12 +99,12 @@ public void testRemoteLogReaderWithError() throws RemoteStorageException, IOExce assertFalse(actualRemoteLogReadResult.fetchDataInfo.isPresent()); // Verify metrics for remote reads are updated correctly - assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteReadRequestRate().count()); - assertEquals(0, brokerTopicStats.topicStats(TOPIC).remoteBytesInRate().count()); - assertEquals(1, brokerTopicStats.topicStats(TOPIC).failedRemoteReadRequestRate().count()); + assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteFetchRequestRate().count()); + assertEquals(0, brokerTopicStats.topicStats(TOPIC).remoteFetchBytesRate().count()); + assertEquals(1, brokerTopicStats.topicStats(TOPIC).failedRemoteFetchRequestRate().count()); // Verify aggregate metrics - assertEquals(1, brokerTopicStats.allTopicsStats().remoteReadRequestRate().count()); - assertEquals(0, brokerTopicStats.allTopicsStats().remoteBytesInRate().count()); - assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteReadRequestRate().count()); + assertEquals(1, brokerTopicStats.allTopicsStats().remoteFetchRequestRate().count()); + assertEquals(0, brokerTopicStats.allTopicsStats().remoteFetchBytesRate().count()); + assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteFetchRequestRate().count()); } }