diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index af59687e10dd..bdd1c8390932 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -133,6 +133,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Should be ideally 0, though a higher number isn't representative of a problem.| |`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| +|`mergeBuffer/usedCount`|Number of merge buffers used from the merge buffer pool. This metric is defined per query and not point-in-time like `mergeBuffer/queries`.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryResources.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryResources.java index d756de5c5bab..a1f3a2761ab6 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryResources.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryResources.java @@ -173,6 +173,28 @@ public int getNumMergingQueryRunnerMergeBuffers() return mergingQueryRunnerMergeBuffers.size(); } + /** + * Returns the number of the currently used merge buffers for {@link org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunner} + */ + public int getNumMergingQueryRunnerMergeBuffersHolders() + { + if (mergingQueryRunnerMergeBuffersHolders == null) { + return 0; + } + return mergingQueryRunnerMergeBuffersHolders.size(); + } + + /** + * Returns the number of the currently used merge buffers for {@link GroupByQueryQueryToolChest#mergeResults} + */ + public int getNumToolchestMergeBuffersHolders() + { + if (toolchestMergeBuffersHolders == null) { + return 0; + } + return toolchestMergeBuffersHolders.size(); + } + /** * Get a merge buffer from the pre-acquired resources. * diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java index bd585dc7218c..ffab9517e1f2 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByResourcesReservationPool.java @@ -141,6 +141,7 @@ public void reserve( try { // We have reserved a spot in the map. Now begin the blocking call. resources = GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig); + perQueryStats.mergeBufferUsedCount(resources.getNumMergingQueryRunnerMergeBuffersHolders() + resources.getNumToolchestMergeBuffersHolders()); } catch (Throwable t) { // Unable to allocate the resources, perform cleanup and rethrow the exception diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java index a5ce31cb5f98..26fa2f60c070 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java @@ -70,6 +70,7 @@ public static class AggregateStats private long spilledQueries = 0; private long spilledBytes = 0; private long mergeDictionarySize = 0; + private long mergeBufferUsedCount = 0; public AggregateStats() { @@ -80,7 +81,8 @@ public AggregateStats( long mergeBufferAcquisitionTimeNs, long spilledQueries, long spilledBytes, - long mergeDictionarySize + long mergeDictionarySize, + long mergeBufferUsedCount ) { this.mergeBufferQueries = mergeBufferQueries; @@ -88,6 +90,7 @@ public AggregateStats( this.spilledQueries = spilledQueries; this.spilledBytes = spilledBytes; this.mergeDictionarySize = mergeDictionarySize; + this.mergeBufferUsedCount = mergeBufferUsedCount; } public long getMergeBufferQueries() @@ -115,10 +118,16 @@ public long getMergeDictionarySize() return mergeDictionarySize; } + public long getMergeBufferUsedCount() + { + return mergeBufferUsedCount; + } + public void addQueryStats(PerQueryStats perQueryStats) { if (perQueryStats.getMergeBufferAcquisitionTimeNs() > 0) { mergeBufferQueries++; + mergeBufferUsedCount += perQueryStats.getMergeBufferUsedCount(); mergeBufferAcquisitionTimeNs += perQueryStats.getMergeBufferAcquisitionTimeNs(); } @@ -138,7 +147,8 @@ public AggregateStats reset() mergeBufferAcquisitionTimeNs, spilledQueries, spilledBytes, - mergeDictionarySize + mergeDictionarySize, + mergeBufferUsedCount ); this.mergeBufferQueries = 0; @@ -146,6 +156,7 @@ public AggregateStats reset() this.spilledQueries = 0; this.spilledBytes = 0; this.mergeDictionarySize = 0; + this.mergeBufferUsedCount = 0; return aggregateStats; } @@ -156,6 +167,7 @@ public static class PerQueryStats private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0); private final AtomicLong spilledBytes = new AtomicLong(0); private final AtomicLong mergeDictionarySize = new AtomicLong(0); + private final AtomicLong mergeBufferUsedCount = new AtomicLong(0); public void mergeBufferAcquisitionTime(long delay) { @@ -172,11 +184,21 @@ public void dictionarySize(long size) mergeDictionarySize.addAndGet(size); } + public void mergeBufferUsedCount(long count) + { + mergeBufferUsedCount.addAndGet(count); + } + public long getMergeBufferAcquisitionTimeNs() { return mergeBufferAcquisitionTimeNs.get(); } + public long getMergeBufferUsedCount() + { + return mergeBufferUsedCount.get(); + } + public long getSpilledBytes() { return spilledBytes.get(); diff --git a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java index d49c19c6f882..d34ea35edb44 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java @@ -61,6 +61,7 @@ public boolean doMonitor(ServiceEmitter emitter) "mergeBuffer/acquisitionTimeNs", statsContainer.getMergeBufferAcquisitionTimeNs() )); + emitter.emit(builder.setMetric("mergeBuffer/usedCount", statsContainer.getMergeBufferUsedCount())); } if (statsContainer.getSpilledQueries() > 0) { diff --git a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java index f16acd0b060d..5704508fd49f 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java @@ -58,7 +58,8 @@ public synchronized AggregateStats getStatsSince() 100L, 2L, 200L, - 300L + 300L, + 1L ); } }; @@ -89,9 +90,10 @@ public void testMonitor() event -> (String) event.toMap().get("metric"), event -> (Long) event.toMap().get("value") )); - Assert.assertEquals(7, resultMap.size()); + Assert.assertEquals(8, resultMap.size()); Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/pendingRequests")); Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/used")); + Assert.assertEquals(1, (long) resultMap.get("mergeBuffer/usedCount")); Assert.assertEquals(1, (long) resultMap.get("mergeBuffer/queries")); Assert.assertEquals(100, (long) resultMap.get("mergeBuffer/acquisitionTimeNs")); Assert.assertEquals(2, (long) resultMap.get("groupBy/spilledQueries"));