From 3e299c44b5afe9c7373c498253381406463957db Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Thu, 21 Aug 2025 22:34:36 +0200 Subject: [PATCH 1/4] Remove fetchQuotaMetrics and copyQuotaMetrics --- .../kafka/server/log/remote/quota/RLMQuotaMetrics.java | 7 ++++++- .../kafka/server/log/remote/storage/RemoteLogManager.java | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetrics.java b/storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetrics.java index 4e365e1a13d74..f813f31adea6e 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetrics.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetrics.java @@ -24,7 +24,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; -public class RLMQuotaMetrics { +public class RLMQuotaMetrics implements AutoCloseable { private final SensorAccess sensorAccess; private final Metrics metrics; @@ -51,4 +51,9 @@ public Sensor sensor() { String.format(descriptionFormat, "maximum")), new Max()); }); } + + @Override + public void close() { + this.metrics.removeSensor(name); + } } diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index a9b2c67ba79c9..86c74b03e9bf6 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -2054,6 +2054,8 @@ public void close() { Utils.closeQuietly(indexCache, "RemoteIndexCache"); Utils.closeQuietly(remoteLogMetadataManagerPlugin, "remoteLogMetadataManagerPlugin"); Utils.closeQuietly(remoteStorageManagerPlugin, "remoteStorageManagerPlugin"); + Utils.closeQuietly(fetchQuotaMetrics, "fetchQuotaMetrics"); + Utils.closeQuietly(copyQuotaMetrics, "copyQuotaMetrics"); closed = true; } } From 5d651bf0928a90e6b444404fd09910e07ea78067 Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Thu, 21 Aug 2025 23:05:40 +0200 Subject: [PATCH 2/4] Add testClose in RLMQuotaMetricsTest --- .../log/remote/quota/RLMQuotaMetricsTest.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetricsTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetricsTest.java index bf2cddd0f3105..1edefc870ace0 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetricsTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetricsTest.java @@ -28,6 +28,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class RLMQuotaMetricsTest { private final MockTime time = new MockTime(); @@ -49,4 +51,22 @@ public void testNewSensorWhenExpired() { Sensor newSensor = rlmQuotaMetrics.sensor(); assertNotEquals(sensor, newSensor); } + + @Test + public void testClose() { + RLMQuotaMetrics quotaMetrics = new RLMQuotaMetrics(metrics, "metric", "group", "format", 5); + + // Register the sensor + quotaMetrics.sensor(); + var avg = metrics.metricName("metric" + "-avg", "group", String.format("format", "average")); + + // Verify that metrics are created + assertNotNull(metrics.metric(avg)); + + // Close the quotaMetrics instance + quotaMetrics.close(); + + // After closing, the metrics should be removed + assertNull(metrics.metric(avg)); + } } From 254993a04a9c21640f55f546aafa8a66d3cb3212 Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Fri, 22 Aug 2025 02:24:19 +0200 Subject: [PATCH 3/4] Apply spotless and checkstyle fixes --- .../kafka/server/log/remote/quota/RLMQuotaMetricsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetricsTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetricsTest.java index 1edefc870ace0..3c90c1bbc86ae 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetricsTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetricsTest.java @@ -28,8 +28,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; public class RLMQuotaMetricsTest { private final MockTime time = new MockTime(); From f32c9e014fa93a9139ca35c60dd1af7fcee6dcaf Mon Sep 17 00:00:00 2001 From: Chang-Chi Hsu Date: Fri, 22 Aug 2025 21:49:20 +0200 Subject: [PATCH 4/4] Move the closure of fetchQuotaMetrics and copyQuotaMetrics into removeMetrics --- .../log/remote/storage/RemoteLogManager.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 86c74b03e9bf6..f6480b8668c16 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -314,6 +314,8 @@ private void removeMetrics() { metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC); metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC); remoteStorageReaderThreadPool.removeMetrics(); + Utils.closeQuietly(fetchQuotaMetrics, "fetchQuotaMetrics"); + Utils.closeQuietly(copyQuotaMetrics, "copyQuotaMetrics"); } // Visible for testing @@ -2044,19 +2046,18 @@ public void close() { followerThreadPool.close(); try { shutdownAndAwaitTermination(remoteStorageReaderThreadPool, "RemoteStorageReaderThreadPool", 10, TimeUnit.SECONDS); + + leaderCopyRLMTasks.clear(); + leaderExpirationRLMTasks.clear(); + followerRLMTasks.clear(); + + Utils.closeQuietly(indexCache, "RemoteIndexCache"); + Utils.closeQuietly(remoteLogMetadataManagerPlugin, "remoteLogMetadataManagerPlugin"); + Utils.closeQuietly(remoteStorageManagerPlugin, "remoteStorageManagerPlugin"); + closed = true; } finally { removeMetrics(); } - leaderCopyRLMTasks.clear(); - leaderExpirationRLMTasks.clear(); - followerRLMTasks.clear(); - - Utils.closeQuietly(indexCache, "RemoteIndexCache"); - Utils.closeQuietly(remoteLogMetadataManagerPlugin, "remoteLogMetadataManagerPlugin"); - Utils.closeQuietly(remoteStorageManagerPlugin, "remoteStorageManagerPlugin"); - Utils.closeQuietly(fetchQuotaMetrics, "fetchQuotaMetrics"); - Utils.closeQuietly(copyQuotaMetrics, "copyQuotaMetrics"); - closed = true; } } }