diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index d16246ab6d4f..a747334d4cd2 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -89,6 +89,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| @@ -113,6 +114,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool.|This metric is only available if the `GroupByStatsMonitor` module is included.|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| +|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| |`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes.|This metric is only available if the `GroupByStatsMonitor` module is included.|Varies| @@ -140,6 +142,7 @@ to represent the task ID are deprecated and will be removed in a future release. |`mergeBuffer/used`|Number of merge buffers used from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/queries`|Number of groupBy queries that acquired a batch of buffers from the merge buffer pool. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Depends on the number of groupBy queries needing merge buffers.| |`mergeBuffer/acquisitionTimeNs`|Total time in nanoseconds to acquire merge buffer for groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| +|`mergeBuffer/bytesUsed`|Number of bytes used by merge buffers to process groupBy queries.|This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`groupBy/spilledQueries`|Number of groupBy queries that have spilled onto the disk. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| |`groupBy/mergeDictionarySize`|Size of on-heap merge dictionary in bytes. This metric is only available if the `GroupByStatsMonitor` module is included.|`dataSource`, `taskId`|Varies| diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java index a5ce31cb5f98..e207b7df5168 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByStatsProvider.java @@ -27,7 +27,8 @@ import java.util.concurrent.atomic.AtomicLong; /** - * Metrics collector for groupBy queries like spilled bytes, merge buffer acquistion time, dictionary size. + * Metrics collector for groupBy queries like spilled bytes, merge buffer acquisition time, merge buffer memory usage, + * and dictionary footprint. */ @LazySingleton public class GroupByStatsProvider @@ -60,13 +61,16 @@ public synchronized void closeQuery(QueryResourceId resourceId) public synchronized AggregateStats getStatsSince() { - return aggregateStatsContainer.reset(); + AggregateStats aggregateStats = new AggregateStats(aggregateStatsContainer); + aggregateStatsContainer.reset(); + return aggregateStats; } public static class AggregateStats { private long mergeBufferQueries = 0; private long mergeBufferAcquisitionTimeNs = 0; + private long mergeBufferTotalUsage = 0; private long spilledQueries = 0; private long spilledBytes = 0; private long mergeDictionarySize = 0; @@ -75,9 +79,22 @@ public AggregateStats() { } + public AggregateStats(AggregateStats aggregateStats) + { + this( + aggregateStats.mergeBufferQueries, + aggregateStats.mergeBufferAcquisitionTimeNs, + aggregateStats.mergeBufferTotalUsage, + aggregateStats.spilledQueries, + aggregateStats.spilledBytes, + aggregateStats.mergeDictionarySize + ); + } + public AggregateStats( long mergeBufferQueries, long mergeBufferAcquisitionTimeNs, + long mergeBufferTotalUsage, long spilledQueries, long spilledBytes, long mergeDictionarySize @@ -85,6 +102,7 @@ public AggregateStats( { this.mergeBufferQueries = mergeBufferQueries; this.mergeBufferAcquisitionTimeNs = mergeBufferAcquisitionTimeNs; + this.mergeBufferTotalUsage = mergeBufferTotalUsage; this.spilledQueries = spilledQueries; this.spilledBytes = spilledBytes; this.mergeDictionarySize = mergeDictionarySize; @@ -100,6 +118,11 @@ public long getMergeBufferAcquisitionTimeNs() return mergeBufferAcquisitionTimeNs; } + public long getMergeBufferTotalUsage() + { + return mergeBufferTotalUsage; + } + public long getSpilledQueries() { return spilledQueries; @@ -120,6 +143,7 @@ public void addQueryStats(PerQueryStats perQueryStats) if (perQueryStats.getMergeBufferAcquisitionTimeNs() > 0) { mergeBufferQueries++; mergeBufferAcquisitionTimeNs += perQueryStats.getMergeBufferAcquisitionTimeNs(); + mergeBufferTotalUsage += perQueryStats.getMergeBufferTotalUsage(); } if (perQueryStats.getSpilledBytes() > 0) { @@ -130,30 +154,21 @@ public void addQueryStats(PerQueryStats perQueryStats) mergeDictionarySize += perQueryStats.getMergeDictionarySize(); } - public AggregateStats reset() + public void reset() { - AggregateStats aggregateStats = - new AggregateStats( - mergeBufferQueries, - mergeBufferAcquisitionTimeNs, - spilledQueries, - spilledBytes, - mergeDictionarySize - ); - this.mergeBufferQueries = 0; this.mergeBufferAcquisitionTimeNs = 0; + this.mergeBufferTotalUsage = 0; this.spilledQueries = 0; this.spilledBytes = 0; this.mergeDictionarySize = 0; - - return aggregateStats; } } public static class PerQueryStats { private final AtomicLong mergeBufferAcquisitionTimeNs = new AtomicLong(0); + private final AtomicLong mergeBufferTotalUsage = new AtomicLong(0); private final AtomicLong spilledBytes = new AtomicLong(0); private final AtomicLong mergeDictionarySize = new AtomicLong(0); @@ -162,6 +177,11 @@ public void mergeBufferAcquisitionTime(long delay) mergeBufferAcquisitionTimeNs.addAndGet(delay); } + public void mergeBufferTotalUsage(long bytes) + { + mergeBufferTotalUsage.addAndGet(bytes); + } + public void spilledBytes(long bytes) { spilledBytes.addAndGet(bytes); @@ -177,6 +197,11 @@ public long getMergeBufferAcquisitionTimeNs() return mergeBufferAcquisitionTimeNs.get(); } + public long getMergeBufferTotalUsage() + { + return mergeBufferTotalUsage.get(); + } + public long getSpilledBytes() { return spilledBytes.get(); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java index 70cf5832cf33..e07409430b3b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java @@ -173,6 +173,15 @@ public void close() aggregators.reset(); } + /** + * This method is implemented to return the highest memory value claimed by the Grouper. This is only + * used for monitoring the size of the merge buffers used. + */ + public long getMergeBufferUsage() + { + return hashTable.getMaxTableBufferUsage(); + } + /** * Populate a {@link ReusableEntry} with values from a particular bucket. */ diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java index 62c65f7cecb7..0b93b5f5a5a9 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferHashTable.java @@ -79,6 +79,9 @@ public static int calculateTableArenaSizeWithFixedAdditionalSize( @Nullable protected BucketUpdateHandler bucketUpdateHandler; + // Keeps track on how many bytes is being used in the merge buffer. + protected long maxTableBufferUsage; + public ByteBufferHashTable( float maxLoadFactor, int initialBuckets, @@ -97,6 +100,7 @@ public ByteBufferHashTable( this.maxSizeForTesting = maxSizeForTesting; this.tableArenaSize = buffer.capacity(); this.bucketUpdateHandler = bucketUpdateHandler; + this.maxTableBufferUsage = 0; } public void reset() @@ -139,6 +143,7 @@ public void reset() bufferDup.position(tableStart); bufferDup.limit(tableStart + maxBuckets * bucketSizeWithHash); tableBuffer = bufferDup.slice(); + updateMaxTableBufferUsage(); // Clear used bits of new table for (int i = 0; i < maxBuckets; i++) { @@ -225,6 +230,7 @@ public void adjustTableWhenFull() maxBuckets = newBuckets; regrowthThreshold = newMaxSize; tableBuffer = newTableBuffer; + updateMaxTableBufferUsage(); tableStart = newTableStart; growthCount++; @@ -381,6 +387,16 @@ public int getGrowthCount() return growthCount; } + protected void updateMaxTableBufferUsage() + { + maxTableBufferUsage = Math.max(maxTableBufferUsage, tableBuffer.capacity()); + } + + public long getMaxTableBufferUsage() + { + return maxTableBufferUsage; + } + public interface BucketUpdateHandler { void handleNewBucket(int bucketOffset); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java index 28de255c13a0..d474dcbc7aa4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferIntList.java @@ -30,6 +30,8 @@ public class ByteBufferIntList private final int maxElements; private int numElements; + private int maxMergeBufferUsageBytes; + public ByteBufferIntList( ByteBuffer buffer, int maxElements @@ -38,6 +40,7 @@ public ByteBufferIntList( this.buffer = buffer; this.maxElements = maxElements; this.numElements = 0; + this.maxMergeBufferUsageBytes = 0; if (buffer.capacity() < (maxElements * Integer.BYTES)) { throw new IAE( @@ -55,6 +58,7 @@ public void add(int val) } buffer.putInt(numElements * Integer.BYTES, val); numElements++; + maxMergeBufferUsageBytes = Math.max(maxMergeBufferUsageBytes, numElements * Integer.BYTES); } public void set(int index, int val) @@ -71,4 +75,9 @@ public void reset() { numElements = 0; } + + public int getMaxMergeBufferUsageBytes() + { + return maxMergeBufferUsageBytes; + } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java index cfa7295e6b43..d4585b34b41a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java @@ -44,6 +44,7 @@ public class ByteBufferMinMaxOffsetHeap private int heapSize; private int maxHeapSize; + private int maxMergeBufferUsageBytes; public ByteBufferMinMaxOffsetHeap( ByteBuffer buf, @@ -55,6 +56,7 @@ public ByteBufferMinMaxOffsetHeap( this.buf = buf; this.limit = limit; this.heapSize = 0; + this.maxMergeBufferUsageBytes = 0; this.minComparator = minComparator; this.maxComparator = Ordering.from(minComparator).reverse(); this.heapIndexUpdater = heapIndexUpdater; @@ -71,9 +73,9 @@ public int addOffset(int offset) int pos = heapSize; buf.putInt(pos * Integer.BYTES, offset); heapSize++; - if (heapSize > maxHeapSize) { - maxHeapSize = heapSize; - } + + maxHeapSize = Math.max(maxHeapSize, heapSize); + maxMergeBufferUsageBytes = Math.max(maxMergeBufferUsageBytes, maxHeapSize * Integer.BYTES); if (heapIndexUpdater != null) { heapIndexUpdater.updateHeapIndexForOffset(offset, pos); @@ -226,6 +228,11 @@ public int getHeapSize() return heapSize; } + public int getMaxMergeBufferUsageBytes() + { + return maxMergeBufferUsageBytes; + } + private void bubbleUp(int pos) { if (isEvenLevel(pos)) { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index 8242c9d8cf5c..b4b4cb347019 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -332,7 +332,7 @@ public void reset() throw new ISE("Grouper is closed"); } - groupers.forEach(Grouper::reset); + groupers.forEach(SpillingGrouper::reset); } @Override @@ -496,7 +496,7 @@ public void close() { if (!closed) { closed = true; - groupers.forEach(Grouper::close); + groupers.forEach(SpillingGrouper::close); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java index 0627fba0333d..57e29bfd6bd5 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java @@ -458,6 +458,14 @@ public boolean validateBufferCapacity(int bufferCapacity) } } + @Override + public long getMergeBufferUsage() + { + long hashTableUsage = super.getMergeBufferUsage(); + long offSetHeapUsage = offsetHeap.getMaxMergeBufferUsageBytes(); + return hashTableUsage + offSetHeapUsage; + } + private class AlternatingByteBufferHashTable extends ByteBufferHashTable { // The base buffer is split into two alternating halves, with one sub-buffer in use at a given time. @@ -503,6 +511,7 @@ public AlternatingByteBufferHashTable( subHashTable2Buffer = subHashTable2Buffer.slice(); subHashTableBuffers = new ByteBuffer[]{subHashTable1Buffer, subHashTable2Buffer}; + updateMaxTableBufferUsage(); } @Override @@ -515,6 +524,7 @@ public void reset() subHashTableBuffers[0].put(i * bucketSizeWithHash, (byte) 0); } tableBuffer = subHashTableBuffers[0]; + updateMaxTableBufferUsage(); } @Override @@ -571,7 +581,19 @@ public void adjustTableWhenFull() size = numCopied; tableBuffer = newTableBuffer; + updateMaxTableBufferUsage(); growthCount++; } + + @Override + protected void updateMaxTableBufferUsage() + { + long currentBufferUsage = 0; + for (ByteBuffer buffer : subHashTableBuffers) { + currentBufferUsage += buffer.capacity(); + } + + maxTableBufferUsage = Math.max(maxTableBufferUsage, currentBufferUsage); + } } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java index fadcfa02c95d..57e678de4480 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -68,7 +68,7 @@ public class SpillingGrouper implements Grouper "Not enough disk space to execute this query. Try raising druid.query.groupBy.maxOnDiskStorage." ); - private final Grouper grouper; + private final AbstractBufferHashGrouper grouper; private final KeySerde keySerde; private final LimitedTemporaryStorage temporaryStorage; private final ObjectMapper spillMapper; @@ -218,12 +218,23 @@ public void reset() @Override public void close() { - perQueryStats.dictionarySize(keySerde.getDictionarySize()); + perQueryStats.dictionarySize(getDictionarySizeEstimate()); + perQueryStats.mergeBufferTotalUsage(getMergeBufferUsage()); grouper.close(); keySerde.reset(); deleteFiles(); } + private long getMergeBufferUsage() + { + return grouper.isInitialized() ? grouper.getMergeBufferUsage() : 0L; + } + + private long getDictionarySizeEstimate() + { + return keySerde.getDictionarySize(); + } + /** * Returns a dictionary of string keys added to this grouper. Note that the dictionary of keySerde is spilled on * local storage whenever the inner grouper is spilled. If there are spilled dictionaries, this method loads them diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java index 565a5ab97bc3..592506eee020 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByStatsProviderTest.java @@ -35,6 +35,7 @@ public void testMetricCollection() stats1.mergeBufferAcquisitionTime(300); stats1.mergeBufferAcquisitionTime(400); + stats1.mergeBufferTotalUsage(50); stats1.spilledBytes(200); stats1.spilledBytes(400); stats1.dictionarySize(100); @@ -45,6 +46,7 @@ public void testMetricCollection() stats2.mergeBufferAcquisitionTime(500); stats2.mergeBufferAcquisitionTime(600); + stats1.mergeBufferTotalUsage(100); stats2.spilledBytes(400); stats2.spilledBytes(600); stats2.dictionarySize(300); @@ -53,6 +55,7 @@ public void testMetricCollection() GroupByStatsProvider.AggregateStats aggregateStats = statsProvider.getStatsSince(); Assert.assertEquals(0L, aggregateStats.getMergeBufferQueries()); Assert.assertEquals(0L, aggregateStats.getMergeBufferAcquisitionTimeNs()); + Assert.assertEquals(0L, aggregateStats.getMergeBufferTotalUsage()); Assert.assertEquals(0L, aggregateStats.getSpilledQueries()); Assert.assertEquals(0L, aggregateStats.getSpilledBytes()); Assert.assertEquals(0L, aggregateStats.getMergeDictionarySize()); @@ -63,6 +66,7 @@ public void testMetricCollection() aggregateStats = statsProvider.getStatsSince(); Assert.assertEquals(2, aggregateStats.getMergeBufferQueries()); Assert.assertEquals(1800L, aggregateStats.getMergeBufferAcquisitionTimeNs()); + Assert.assertEquals(150L, aggregateStats.getMergeBufferTotalUsage()); Assert.assertEquals(2L, aggregateStats.getSpilledQueries()); Assert.assertEquals(1600L, aggregateStats.getSpilledBytes()); Assert.assertEquals(1000L, aggregateStats.getMergeDictionarySize()); diff --git a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java index 0f07bd2894be..ca7dc4b16c5b 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/GroupByStatsMonitor.java @@ -74,10 +74,10 @@ public boolean doMonitor(ServiceEmitter emitter) if (statsContainer.getMergeBufferQueries() > 0) { emitter.emit(builder.setMetric("mergeBuffer/queries", statsContainer.getMergeBufferQueries())); - emitter.emit(builder.setMetric( - "mergeBuffer/acquisitionTimeNs", - statsContainer.getMergeBufferAcquisitionTimeNs() - )); + emitter.emit( + builder.setMetric("mergeBuffer/acquisitionTimeNs", statsContainer.getMergeBufferAcquisitionTimeNs()) + ); + emitter.emit(builder.setMetric("mergeBuffer/bytesUsed", statsContainer.getMergeBufferTotalUsage())); } if (statsContainer.getSpilledQueries() > 0) { diff --git a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java index 5931fba677c1..606f6e981549 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java @@ -63,6 +63,7 @@ public synchronized AggregateStats getStatsSince() return new AggregateStats( 1L, 100L, + 200L, 2L, 200L, 300L @@ -70,7 +71,7 @@ public synchronized AggregateStats getStatsSince() } }; - mergeBufferPool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024), 5); + mergeBufferPool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 5); executorService = Executors.newSingleThreadExecutor(); } @@ -91,11 +92,12 @@ public void testMonitor() // Trigger metric emission monitor.doMonitor(emitter); - Assert.assertEquals(7, emitter.getNumEmittedEvents()); + Assert.assertEquals(8, emitter.getNumEmittedEvents()); emitter.verifyValue("mergeBuffer/pendingRequests", 0L); emitter.verifyValue("mergeBuffer/used", 0L); emitter.verifyValue("mergeBuffer/queries", 1L); emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 100L); + emitter.verifyValue("mergeBuffer/bytesUsed", 200L); emitter.verifyValue("groupBy/spilledQueries", 2L); emitter.verifyValue("groupBy/spilledBytes", 200L); emitter.verifyValue("groupBy/mergeDictionarySize", 300L); @@ -133,11 +135,12 @@ public void testMonitorWithServiceDimensions() final Map dimFilters = Map.of( "taskId", List.of(taskId), "dataSource", List.of(dataSource), "id", List.of(taskId) ); - Assert.assertEquals(7, emitter.getNumEmittedEvents()); + Assert.assertEquals(8, emitter.getNumEmittedEvents()); emitter.verifyValue("mergeBuffer/pendingRequests", dimFilters, 0L); emitter.verifyValue("mergeBuffer/used", dimFilters, 0L); emitter.verifyValue("mergeBuffer/queries", dimFilters, 1L); emitter.verifyValue("mergeBuffer/acquisitionTimeNs", dimFilters, 100L); + emitter.verifyValue("mergeBuffer/bytesUsed", 200L); emitter.verifyValue("groupBy/spilledQueries", dimFilters, 2L); emitter.verifyValue("groupBy/spilledBytes", dimFilters, 200L); emitter.verifyValue("groupBy/mergeDictionarySize", dimFilters, 300L);