Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
Expand All @@ -113,6 +114,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies|
Expand Down Expand Up @@ -140,6 +142,7 @@ to represent the task ID are deprecated and will be removed in a future release.
|`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.|
|`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
|`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import java.util.concurrent.atomic.AtomicLong;

/**
* Metrics collector for groupBy queries like spilled bytes, merge buffer acquistion time, dictionary size.
* Metrics collector for groupBy queries like spilled bytes, merge buffer acquisition time, merge buffer memory usage,
* and dictionary footprint.
*/
@LazySingleton
public class GroupByStatsProvider
Expand Down Expand Up @@ -60,13 +61,16 @@ public synchronized void closeQuery(QueryResourceId resourceId)

public synchronized AggregateStats getStatsSince()
{
return aggregateStatsContainer.reset();
AggregateStats aggregateStats = new AggregateStats(aggregateStatsContainer);
aggregateStatsContainer.reset();
return aggregateStats;
}

public static class AggregateStats
{
private long mergeBufferQueries = 0;
private long mergeBufferAcquisitionTimeNs = 0;
private long mergeBufferTotalUsage = 0;
private long spilledQueries = 0;
private long spilledBytes = 0;
private long mergeDictionarySize = 0;
Expand All @@ -75,16 +79,30 @@ public AggregateStats()
{
}

public AggregateStats(AggregateStats aggregateStats)
{
this(
aggregateStats.mergeBufferQueries,
aggregateStats.mergeBufferAcquisitionTimeNs,
aggregateStats.mergeBufferTotalUsage,
aggregateStats.spilledQueries,
aggregateStats.spilledBytes,
aggregateStats.mergeDictionarySize
);
}

public AggregateStats(
long mergeBufferQueries,
long mergeBufferAcquisitionTimeNs,
long mergeBufferTotalUsage,
long spilledQueries,
long spilledBytes,
long mergeDictionarySize
)
{
this.mergeBufferQueries = mergeBufferQueries;
this.mergeBufferAcquisitionTimeNs = mergeBufferAcquisitionTimeNs;
this.mergeBufferTotalUsage = mergeBufferTotalUsage;
this.spilledQueries = spilledQueries;
this.spilledBytes = spilledBytes;
this.mergeDictionarySize = mergeDictionarySize;
Expand All @@ -100,6 +118,11 @@ public long getMergeBufferAcquisitionTimeNs()
return mergeBufferAcquisitionTimeNs;
}

public long getMergeBufferTotalUsage()
{
return mergeBufferTotalUsage;
}

public long getSpilledQueries()
{
return spilledQueries;
Expand All @@ -120,6 +143,7 @@ public void addQueryStats(PerQueryStats perQueryStats)
if (perQueryStats.getMergeBufferAcquisitionTimeNs() > 0) {
mergeBufferQueries++;
mergeBufferAcquisitionTimeNs += perQueryStats.getMergeBufferAcquisitionTimeNs();
mergeBufferTotalUsage += perQueryStats.getMergeBufferTotalUsage();
}

if (perQueryStats.getSpilledBytes() > 0) {
Expand All @@ -130,30 +154,21 @@ public void addQueryStats(PerQueryStats perQueryStats)
mergeDictionarySize += perQueryStats.getMergeDictionarySize();
}

public AggregateStats reset()
public void reset()
{
AggregateStats aggregateStats =
new AggregateStats(
mergeBufferQueries,
mergeBufferAcquisitionTimeNs,
spilledQueries,
spilledBytes,
mergeDictionarySize
);

this.mergeBufferQueries = 0;
this.mergeBufferAcquisitionTimeNs = 0;
this.mergeBufferTotalUsage = 0;
this.spilledQueries = 0;
this.spilledBytes = 0;
this.mergeDictionarySize = 0;

return aggregateStats;
}
}

public static class PerQueryStats
{
private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0);
private final AtomicLong mergeBufferTotalUsage = new AtomicLong(0);
private final AtomicLong spilledBytes = new AtomicLong(0);
private final AtomicLong mergeDictionarySize = new AtomicLong(0);

Expand All @@ -162,6 +177,11 @@ public void mergeBufferAcquisitionTime(long delay)
mergeBufferAcquisitionTimeNs.addAndGet(delay);
}

public void mergeBufferTotalUsage(long bytes)
{
mergeBufferTotalUsage.addAndGet(bytes);
}

public void spilledBytes(long bytes)
{
spilledBytes.addAndGet(bytes);
Expand All @@ -177,6 +197,11 @@ public long getMergeBufferAcquisitionTimeNs()
return mergeBufferAcquisitionTimeNs.get();
}

public long getMergeBufferTotalUsage()
{
return mergeBufferTotalUsage.get();
}

public long getSpilledBytes()
{
return spilledBytes.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ public void close()
aggregators.reset();
}

/**
* This method is implemented to return the highest memory value claimed by the Grouper. This is only
* used for monitoring the size of the merge buffers used.
*/
public long getMergeBufferUsage()
{
return hashTable.getMaxTableBufferUsage();
}

/**
* Populate a {@link ReusableEntry} with values from a particular bucket.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public static int calculateTableArenaSizeWithFixedAdditionalSize(
@Nullable
protected BucketUpdateHandler bucketUpdateHandler;

// Keeps track on how many bytes is being used in the merge buffer.
protected long maxTableBufferUsage;

public ByteBufferHashTable(
float maxLoadFactor,
int initialBuckets,
Expand All @@ -97,6 +100,7 @@ public ByteBufferHashTable(
this.maxSizeForTesting = maxSizeForTesting;
this.tableArenaSize = buffer.capacity();
this.bucketUpdateHandler = bucketUpdateHandler;
this.maxTableBufferUsage = 0;
}

public void reset()
Expand Down Expand Up @@ -139,6 +143,7 @@ public void reset()
bufferDup.position(tableStart);
bufferDup.limit(tableStart + maxBuckets * bucketSizeWithHash);
tableBuffer = bufferDup.slice();
updateMaxTableBufferUsage();

// Clear used bits of new table
for (int i = 0; i < maxBuckets; i++) {
Expand Down Expand Up @@ -225,6 +230,7 @@ public void adjustTableWhenFull()
maxBuckets = newBuckets;
regrowthThreshold = newMaxSize;
tableBuffer = newTableBuffer;
updateMaxTableBufferUsage();
tableStart = newTableStart;

growthCount++;
Expand Down Expand Up @@ -381,6 +387,16 @@ public int getGrowthCount()
return growthCount;
}

protected void updateMaxTableBufferUsage()
{
maxTableBufferUsage = Math.max(maxTableBufferUsage, tableBuffer.capacity());
}

public long getMaxTableBufferUsage()
{
return maxTableBufferUsage;
}

public interface BucketUpdateHandler
{
void handleNewBucket(int bucketOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class ByteBufferIntList
private final int maxElements;
private int numElements;

private int maxMergeBufferUsageBytes;

public ByteBufferIntList(
ByteBuffer buffer,
int maxElements
Expand All @@ -38,6 +40,7 @@ public ByteBufferIntList(
this.buffer = buffer;
this.maxElements = maxElements;
this.numElements = 0;
this.maxMergeBufferUsageBytes = 0;

if (buffer.capacity() < (maxElements * Integer.BYTES)) {
throw new IAE(
Expand All @@ -55,6 +58,7 @@ public void add(int val)
}
buffer.putInt(numElements * Integer.BYTES, val);
numElements++;
maxMergeBufferUsageBytes = Math.max(maxMergeBufferUsageBytes, numElements * Integer.BYTES);
}

public void set(int index, int val)
Expand All @@ -71,4 +75,9 @@ public void reset()
{
numElements = 0;
}

public int getMaxMergeBufferUsageBytes()
{
return maxMergeBufferUsageBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ByteBufferMinMaxOffsetHeap

private int heapSize;
private int maxHeapSize;
private int maxMergeBufferUsageBytes;

public ByteBufferMinMaxOffsetHeap(
ByteBuffer buf,
Expand All @@ -55,6 +56,7 @@ public ByteBufferMinMaxOffsetHeap(
this.buf = buf;
this.limit = limit;
this.heapSize = 0;
this.maxMergeBufferUsageBytes = 0;
this.minComparator = minComparator;
this.maxComparator = Ordering.from(minComparator).reverse();
this.heapIndexUpdater = heapIndexUpdater;
Expand All @@ -71,9 +73,9 @@ public int addOffset(int offset)
int pos = heapSize;
buf.putInt(pos * Integer.BYTES, offset);
heapSize++;
if (heapSize > maxHeapSize) {
maxHeapSize = heapSize;
}

maxHeapSize = Math.max(maxHeapSize, heapSize);
maxMergeBufferUsageBytes = Math.max(maxMergeBufferUsageBytes, maxHeapSize * Integer.BYTES);

if (heapIndexUpdater != null) {
heapIndexUpdater.updateHeapIndexForOffset(offset, pos);
Expand Down Expand Up @@ -226,6 +228,11 @@ public int getHeapSize()
return heapSize;
}

public int getMaxMergeBufferUsageBytes()
{
return maxMergeBufferUsageBytes;
}

private void bubbleUp(int pos)
{
if (isEvenLevel(pos)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void reset()
throw new ISE("Grouper is closed");
}

groupers.forEach(Grouper::reset);
groupers.forEach(SpillingGrouper::reset);
}

@Override
Expand Down Expand Up @@ -496,7 +496,7 @@ public void close()
{
if (!closed) {
closed = true;
groupers.forEach(Grouper::close);
groupers.forEach(SpillingGrouper::close);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,14 @@ public boolean validateBufferCapacity(int bufferCapacity)
}
}

@Override
public long getMergeBufferUsage()
{
long hashTableUsage = super.getMergeBufferUsage();
long offSetHeapUsage = offsetHeap.getMaxMergeBufferUsageBytes();
return hashTableUsage + offSetHeapUsage;
}

private class AlternatingByteBufferHashTable extends ByteBufferHashTable
{
// The base buffer is split into two alternating halves, with one sub-buffer in use at a given time.
Expand Down Expand Up @@ -503,6 +511,7 @@ public AlternatingByteBufferHashTable(
subHashTable2Buffer = subHashTable2Buffer.slice();

subHashTableBuffers = new ByteBuffer[]{subHashTable1Buffer, subHashTable2Buffer};
updateMaxTableBufferUsage();
}

@Override
Expand All @@ -515,6 +524,7 @@ public void reset()
subHashTableBuffers[0].put(i * bucketSizeWithHash, (byte) 0);
}
tableBuffer = subHashTableBuffers[0];
updateMaxTableBufferUsage();
}

@Override
Expand Down Expand Up @@ -571,7 +581,19 @@ public void adjustTableWhenFull()

size = numCopied;
tableBuffer = newTableBuffer;
updateMaxTableBufferUsage();
growthCount++;
}

@Override
protected void updateMaxTableBufferUsage()
{
long currentBufferUsage = 0;
for (ByteBuffer buffer : subHashTableBuffers) {
currentBufferUsage += buffer.capacity();
}

maxTableBufferUsage = Math.max(maxTableBufferUsage, currentBufferUsage);
}
}
}
Loading