Skip to content
Open
15 changes: 15 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this after mergeBuffer/acquisitionTimeNs similar to the other corresponding max metrics

|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|

### Historical

Expand All @@ -113,9 +118,14 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here and below

|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|

### Real-time

Expand All @@ -140,9 +150,14 @@ to represent the task ID are deprecated and will be removed in a future release.
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxAcquisitionTimeNs`|Maximum time in nanoseconds to acquire merge buffer for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/maxBytesUsed`|Maximum number of bytes used by merge buffers for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/maxSpilledBytes`|Maximum number of bytes spilled to disk by any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/maxMergeDictionarySize`|Maximum size of the on-heap merge dictionary in bytes observed for any single groupBy query within the emission period. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|

### Jetty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import java.util.concurrent.atomic.AtomicLong;

/**
* Metrics collector for groupBy queries like spilled bytes, merge buffer acquistion time, dictionary size.
* Collects groupBy query metrics (spilled bytes, merge buffer usage, dictionary size) per-query, then
* aggregates them when queries complete. Stats are retrieved and reset periodically via {@link #getStatsSince()}.
*/
@LazySingleton
public class GroupByStatsProvider
Expand Down Expand Up @@ -60,34 +61,67 @@ public synchronized void closeQuery(QueryResourceId resourceId)

public synchronized AggregateStats getStatsSince()
{
return aggregateStatsContainer.reset();
AggregateStats aggregateStats = new AggregateStats(aggregateStatsContainer);
aggregateStatsContainer.reset();
return aggregateStats;
}

public static class AggregateStats
{
private long mergeBufferQueries = 0;
private long mergeBufferAcquisitionTimeNs = 0;
private long mergeBufferTotalUsage = 0;
private long maxMergeBufferAcquisitionTimeNs = 0;
private long maxMergeBufferUsage = 0;
private long spilledQueries = 0;
private long spilledBytes = 0;
private long maxSpilledBytes = 0;
private long mergeDictionarySize = 0;
private long maxMergeDictionarySize = 0;

public AggregateStats()
{
}

public AggregateStats(AggregateStats aggregateStats)
{
this(
aggregateStats.mergeBufferQueries,
aggregateStats.mergeBufferAcquisitionTimeNs,
aggregateStats.mergeBufferTotalUsage,
aggregateStats.maxMergeBufferAcquisitionTimeNs,
aggregateStats.maxMergeBufferUsage,
aggregateStats.spilledQueries,
aggregateStats.spilledBytes,
aggregateStats.maxSpilledBytes,
aggregateStats.mergeDictionarySize,
aggregateStats.maxMergeDictionarySize
);
}

public AggregateStats(
long mergeBufferQueries,
long mergeBufferAcquisitionTimeNs,
long mergeBufferTotalUsage,
long maxMergeBufferAcquisitionTimeNs,
long maxMergeBufferUsage,
long spilledQueries,
long spilledBytes,
long mergeDictionarySize
long maxSpilledBytes,
long mergeDictionarySize,
long maxMergeDictionarySize
)
{
this.mergeBufferQueries = mergeBufferQueries;
this.mergeBufferAcquisitionTimeNs = mergeBufferAcquisitionTimeNs;
this.mergeBufferTotalUsage = mergeBufferTotalUsage;
this.maxMergeBufferAcquisitionTimeNs = maxMergeBufferAcquisitionTimeNs;
this.maxMergeBufferUsage = maxMergeBufferUsage;
this.spilledQueries = spilledQueries;
this.spilledBytes = spilledBytes;
this.maxSpilledBytes = maxSpilledBytes;
this.mergeDictionarySize = mergeDictionarySize;
this.maxMergeDictionarySize = maxMergeDictionarySize;
}

public long getMergeBufferQueries()
Expand All @@ -100,6 +134,21 @@ public long getMergeBufferAcquisitionTimeNs()
return mergeBufferAcquisitionTimeNs;
}

public long getMergeBufferTotalUsage()
{
return mergeBufferTotalUsage;
}

public long getMaxMergeBufferAcquisitionTimeNs()
{
return maxMergeBufferAcquisitionTimeNs;
}

public long getMaxMergeBufferUsage()
{
return maxMergeBufferUsage;
}

public long getSpilledQueries()
{
return spilledQueries;
Expand All @@ -110,50 +159,63 @@ public long getSpilledBytes()
return spilledBytes;
}

public long getMaxSpilledBytes()
{
return maxSpilledBytes;
}

public long getMergeDictionarySize()
{
return mergeDictionarySize;
}

public long getMaxMergeDictionarySize()
{
return maxMergeDictionarySize;
}

public void addQueryStats(PerQueryStats perQueryStats)
{
if (perQueryStats.getMergeBufferAcquisitionTimeNs() > 0) {
mergeBufferQueries++;
mergeBufferAcquisitionTimeNs += perQueryStats.getMergeBufferAcquisitionTimeNs();
maxMergeBufferAcquisitionTimeNs = Math.max(
maxMergeBufferAcquisitionTimeNs,
perQueryStats.getMergeBufferAcquisitionTimeNs()
);
mergeBufferTotalUsage += perQueryStats.getMergeBufferTotalUsage();
maxMergeBufferUsage = Math.max(maxMergeBufferUsage, perQueryStats.getMergeBufferTotalUsage());
}

if (perQueryStats.getSpilledBytes() > 0) {
spilledQueries++;
spilledBytes += perQueryStats.getSpilledBytes();
maxSpilledBytes = Math.max(maxSpilledBytes, perQueryStats.getSpilledBytes());
}

mergeDictionarySize += perQueryStats.getMergeDictionarySize();
maxMergeDictionarySize = Math.max(maxMergeDictionarySize, perQueryStats.getMergeDictionarySize());
}

public AggregateStats reset()
public void reset()
{
AggregateStats aggregateStats =
new AggregateStats(
mergeBufferQueries,
mergeBufferAcquisitionTimeNs,
spilledQueries,
spilledBytes,
mergeDictionarySize
);

Comment on lines -133 to -143
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to change this contract? I see the caller now calls AggregateStats() explicitly before reset() to save the state, I think the old approach was better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually on second thought, I think the new contract void reset() is better. It aligns with the reset() signature defined in the grouper and generally other places too

this.mergeBufferQueries = 0;
this.mergeBufferAcquisitionTimeNs = 0;
this.mergeBufferTotalUsage = 0;
this.maxMergeBufferAcquisitionTimeNs = 0;
this.maxMergeBufferUsage = 0;
this.spilledQueries = 0;
this.spilledBytes = 0;
this.maxSpilledBytes = 0;
this.mergeDictionarySize = 0;

return aggregateStats;
this.maxMergeDictionarySize = 0;
}
}

public static class PerQueryStats
{
private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0);
private final AtomicLong mergeBufferTotalUsage = new AtomicLong(0);
private final AtomicLong spilledBytes = new AtomicLong(0);
private final AtomicLong mergeDictionarySize = new AtomicLong(0);

Expand All @@ -162,6 +224,11 @@ public void mergeBufferAcquisitionTime(long delay)
mergeBufferAcquisitionTimeNs.addAndGet(delay);
}

public void mergeBufferTotalUsage(long bytes)
{
mergeBufferTotalUsage.addAndGet(bytes);
}

public void spilledBytes(long bytes)
{
spilledBytes.addAndGet(bytes);
Expand All @@ -177,6 +244,11 @@ public long getMergeBufferAcquisitionTimeNs()
return mergeBufferAcquisitionTimeNs.get();
}

public long getMergeBufferTotalUsage()
{
return mergeBufferTotalUsage.get();
}

public long getSpilledBytes()
{
return spilledBytes.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ public void close()
aggregators.reset();
}

/**
* This method is implemented to return the highest memory value claimed by the Grouper. This is only
* used for monitoring the size of the merge buffers used.
*/
public long getMergeBufferUsage()
{
return hashTable.getMaxTableBufferUsage();
}

/**
* Populate a {@link ReusableEntry} with values from a particular bucket.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.query.aggregation.AggregatorAdapters;
import org.apache.druid.query.aggregation.AggregatorFactory;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.AbstractList;
import java.util.Collections;
Expand All @@ -50,7 +49,6 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
// to get a comparator that uses the ordering defined by the OrderByColumnSpec of a query.
private final boolean useDefaultSorting;

@Nullable
private ByteBufferIntList offsetList;

public BufferHashGrouper(
Expand Down Expand Up @@ -154,6 +152,18 @@ public void reset()
aggregators.reset();
}

@Override
public long getMergeBufferUsage()
{
if (!initialized) {
return 0L;
}

long hashTableUsage = hashTable.getMaxTableBufferUsage();
long offSetListUsage = offsetList.getMaxMergeBufferUsageBytes();
return hashTableUsage + offSetListUsage;
}

@Override
public CloseableIterator<Entry<KeyType>> iterator(boolean sorted)
{
Expand Down Expand Up @@ -199,18 +209,15 @@ public int size()
}

// Sort offsets in-place.
Collections.sort(
wrappedOffsets,
(lhs, rhs) -> {
final ByteBuffer tableBuffer = hashTable.getTableBuffer();
return comparator.compare(
tableBuffer,
tableBuffer,
lhs + HASH_SIZE,
rhs + HASH_SIZE
);
}
);
wrappedOffsets.sort((lhs, rhs) -> {
final ByteBuffer tableBuffer = hashTable.getTableBuffer();
return comparator.compare(
tableBuffer,
tableBuffer,
lhs + HASH_SIZE,
rhs + HASH_SIZE
);
});

return new CloseableIterator<>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public static int calculateTableArenaSizeWithFixedAdditionalSize(
@Nullable
protected BucketUpdateHandler bucketUpdateHandler;

// Keeps track on how many bytes is being used in the merge buffer.
protected long maxTableBufferUsage;

public ByteBufferHashTable(
float maxLoadFactor,
int initialBuckets,
Expand All @@ -97,6 +100,7 @@ public ByteBufferHashTable(
this.maxSizeForTesting = maxSizeForTesting;
this.tableArenaSize = buffer.capacity();
this.bucketUpdateHandler = bucketUpdateHandler;
this.maxTableBufferUsage = 0;
}

public void reset()
Expand Down Expand Up @@ -139,6 +143,7 @@ public void reset()
bufferDup.position(tableStart);
bufferDup.limit(tableStart + maxBuckets * bucketSizeWithHash);
tableBuffer = bufferDup.slice();
updateMaxTableBufferUsage();

// Clear used bits of new table
for (int i = 0; i < maxBuckets; i++) {
Expand Down Expand Up @@ -225,6 +230,7 @@ public void adjustTableWhenFull()
maxBuckets = newBuckets;
regrowthThreshold = newMaxSize;
tableBuffer = newTableBuffer;
updateMaxTableBufferUsage();
tableStart = newTableStart;

growthCount++;
Expand Down Expand Up @@ -381,6 +387,16 @@ public int getGrowthCount()
return growthCount;
}

protected void updateMaxTableBufferUsage()
{
maxTableBufferUsage = Math.max(maxTableBufferUsage, tableBuffer.capacity());
}

public long getMaxTableBufferUsage()
{
return maxTableBufferUsage;
}

public interface BucketUpdateHandler
{
void handleNewBucket(int bucketOffset);
Expand Down
Loading
Loading