1414
1515use std:: collections:: HashMap ;
1616use std:: fmt;
17+ use std:: num:: NonZeroUsize ;
1718use std:: sync:: { Arc , RwLock } ;
1819
1920use async_trait:: async_trait;
@@ -34,6 +35,7 @@ use datafusion::execution::SessionStateBuilder;
3435use datafusion:: execution:: context:: { QueryPlanner , SessionConfig , SessionContext , SessionState } ;
3536use datafusion:: execution:: memory_pool:: {
3637 GreedyMemoryPool , MemoryConsumer , MemoryLimit , MemoryPool , MemoryReservation ,
38+ TrackConsumersPool ,
3739} ;
3840use datafusion:: execution:: runtime_env:: { RuntimeEnv , RuntimeEnvBuilder } ;
3941use datafusion:: physical_optimizer:: PhysicalOptimizerRule ;
@@ -437,19 +439,25 @@ impl DfQueryPlanner {
437439 }
438440}
439441
440- /// A wrapper around GreedyMemoryPool that records metrics.
442+ /// A wrapper around TrackConsumersPool that records metrics.
441443///
442444/// This wrapper intercepts all memory pool operations and updates
443445/// Prometheus metrics for monitoring query memory usage and rejections.
444446#[ derive( Debug ) ]
445447struct MetricsMemoryPool {
446- inner : Arc < GreedyMemoryPool > ,
448+ inner : Arc < TrackConsumersPool < GreedyMemoryPool > > ,
447449}
448450
449451impl MetricsMemoryPool {
452+ // Number of top memory consumers to report in OOM error messages
453+ const TOP_CONSUMERS_TO_REPORT : usize = 5 ;
454+
450455 fn new ( limit : usize ) -> Self {
451456 Self {
452- inner : Arc :: new ( GreedyMemoryPool :: new ( limit) ) ,
457+ inner : Arc :: new ( TrackConsumersPool :: new (
458+ GreedyMemoryPool :: new ( limit) ,
459+ NonZeroUsize :: new ( Self :: TOP_CONSUMERS_TO_REPORT ) . unwrap ( ) ,
460+ ) ) ,
453461 }
454462 }
455463
0 commit comments