Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Should be ideally 0, though a higher number isn't representative of a problem.|
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/usedCount`|Number of merge buffers used from the merge buffer pool. This metric is defined per query and not point-in-time like `mergeBuffer/queries`.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,28 @@ public int getNumMergingQueryRunnerMergeBuffers()
return mergingQueryRunnerMergeBuffers.size();
}

/**
* Returns the number of the currently used merge buffers for {@link org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunner}
*/
public int getNumMergingQueryRunnerMergeBuffersHolders()
{
if (mergingQueryRunnerMergeBuffersHolders == null) {
return 0;
}
return mergingQueryRunnerMergeBuffersHolders.size();
}

/**
* Returns the number of the currently used merge buffers for {@link GroupByQueryQueryToolChest#mergeResults}
*/
public int getNumToolchestMergeBuffersHolders()
{
if (toolchestMergeBuffersHolders == null) {
return 0;
}
return toolchestMergeBuffersHolders.size();
}

/**
* Get a merge buffer from the pre-acquired resources.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public void reserve(
try {
// We have reserved a spot in the map. Now begin the blocking call.
resources = GroupingEngine.prepareResource(groupByQuery, mergeBufferPool, willMergeRunner, groupByQueryConfig);
perQueryStats.mergeBufferUsedCount(resources.getNumMergingQueryRunnerMergeBuffersHolders() + resources.getNumToolchestMergeBuffersHolders());
}
catch (Throwable t) {
// Unable to allocate the resources, perform cleanup and rethrow the exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static class AggregateStats
private long spilledQueries = 0;
private long spilledBytes = 0;
private long mergeDictionarySize = 0;
private long mergeBufferUsedCount = 0;

public AggregateStats()
{
Expand All @@ -80,14 +81,16 @@ public AggregateStats(
long mergeBufferAcquisitionTimeNs,
long spilledQueries,
long spilledBytes,
long mergeDictionarySize
long mergeDictionarySize,
long mergeBufferUsedCount
)
{
this.mergeBufferQueries = mergeBufferQueries;
this.mergeBufferAcquisitionTimeNs = mergeBufferAcquisitionTimeNs;
this.spilledQueries = spilledQueries;
this.spilledBytes = spilledBytes;
this.mergeDictionarySize = mergeDictionarySize;
this.mergeBufferUsedCount = mergeBufferUsedCount;
}

public long getMergeBufferQueries()
Expand Down Expand Up @@ -115,10 +118,16 @@ public long getMergeDictionarySize()
return mergeDictionarySize;
}

public long getMergeBufferUsedCount()
{
return mergeBufferUsedCount;
}

public void addQueryStats(PerQueryStats perQueryStats)
{
if (perQueryStats.getMergeBufferAcquisitionTimeNs() > 0) {
mergeBufferQueries++;
mergeBufferUsedCount += perQueryStats.getMergeBufferUsedCount();
mergeBufferAcquisitionTimeNs += perQueryStats.getMergeBufferAcquisitionTimeNs();
}

Expand All @@ -138,14 +147,16 @@ public AggregateStats reset()
mergeBufferAcquisitionTimeNs,
spilledQueries,
spilledBytes,
mergeDictionarySize
mergeDictionarySize,
mergeBufferUsedCount
);

this.mergeBufferQueries = 0;
this.mergeBufferAcquisitionTimeNs = 0;
this.spilledQueries = 0;
this.spilledBytes = 0;
this.mergeDictionarySize = 0;
this.mergeBufferUsedCount = 0;

return aggregateStats;
}
Expand All @@ -156,6 +167,7 @@ public static class PerQueryStats
private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0);
private final AtomicLong spilledBytes = new AtomicLong(0);
private final AtomicLong mergeDictionarySize = new AtomicLong(0);
private final AtomicLong mergeBufferUsedCount = new AtomicLong(0);

public void mergeBufferAcquisitionTime(long delay)
{
Expand All @@ -172,11 +184,21 @@ public void dictionarySize(long size)
mergeDictionarySize.addAndGet(size);
}

public void mergeBufferUsedCount(long count)
{
mergeBufferUsedCount.addAndGet(count);
}

public long getMergeBufferAcquisitionTimeNs()
{
return mergeBufferAcquisitionTimeNs.get();
}

public long getMergeBufferUsedCount()
{
return mergeBufferUsedCount.get();
}

public long getSpilledBytes()
{
return spilledBytes.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public boolean doMonitor(ServiceEmitter emitter)
"mergeBuffer/acquisitionTimeNs",
statsContainer.getMergeBufferAcquisitionTimeNs()
));
emitter.emit(builder.setMetric("mergeBuffer/usedCount", statsContainer.getMergeBufferUsedCount()));
}

if (statsContainer.getSpilledQueries() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public synchronized AggregateStats getStatsSince()
100L,
2L,
200L,
300L
300L,
1L
);
}
};
Expand Down Expand Up @@ -89,9 +90,10 @@ public void testMonitor()
event -> (String) event.toMap().get("metric"),
event -> (Long) event.toMap().get("value")
));
Assert.assertEquals(7, resultMap.size());
Assert.assertEquals(8, resultMap.size());
Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/pendingRequests"));
Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/used"));
Assert.assertEquals(1, (long) resultMap.get("mergeBuffer/usedCount"));
Assert.assertEquals(1, (long) resultMap.get("mergeBuffer/queries"));
Assert.assertEquals(100, (long) resultMap.get("mergeBuffer/acquisitionTimeNs"));
Assert.assertEquals(2, (long) resultMap.get("groupBy/spilledQueries"));
Expand Down
Loading