diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index ffe64e700575..9328f5f736c1 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::fmt; +use std::num::NonZeroUsize; use std::sync::{Arc, RwLock}; use async_trait::async_trait; @@ -34,6 +35,7 @@ use datafusion::execution::SessionStateBuilder; use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState}; use datafusion::execution::memory_pool::{ GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, + TrackConsumersPool, }; use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; use datafusion::physical_optimizer::PhysicalOptimizerRule; @@ -437,19 +439,25 @@ impl DfQueryPlanner { } } -/// A wrapper around GreedyMemoryPool that records metrics. +/// A wrapper around TrackConsumersPool that records metrics. /// /// This wrapper intercepts all memory pool operations and updates /// Prometheus metrics for monitoring query memory usage and rejections. #[derive(Debug)] struct MetricsMemoryPool { - inner: Arc, + inner: Arc>, } impl MetricsMemoryPool { + // Number of top memory consumers to report in OOM error messages + const TOP_CONSUMERS_TO_REPORT: usize = 5; + fn new(limit: usize) -> Self { Self { - inner: Arc::new(GreedyMemoryPool::new(limit)), + inner: Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(limit), + NonZeroUsize::new(Self::TOP_CONSUMERS_TO_REPORT).unwrap(), + )), } }