diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 71235d943ea..88833ca5ccd 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -51,7 +51,7 @@ use crate::VortexAccessPlan; use crate::convert::exprs::ExpressionConvertor; use crate::convert::exprs::can_be_pushed_down; use crate::convert::exprs::make_vortex_predicate; -use crate::persistent::stream::PrunableStream; +use crate::persistent::stream::EarlyTerminatingStream; #[derive(Clone)] pub(crate) struct VortexOpener { @@ -282,12 +282,6 @@ impl FileOpener for VortexOpener { .transpose() .map_err(|e| DataFusionError::External(e.into()))?; - if let Some(limit) = limit - && filter.is_none() - { - scan_builder = scan_builder.with_limit(limit); - } - let chunk_session = session.clone(); let stream = scan_builder .with_metrics(metrics) @@ -337,11 +331,7 @@ impl FileOpener for VortexOpener { .map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b))) .boxed(); - if let Some(file_pruner) = file_pruner { - Ok(PrunableStream::new(file_pruner, stream).boxed()) - } else { - Ok(stream) - } + Ok(EarlyTerminatingStream::new(file_pruner, limit, stream).boxed()) } .in_current_span() .boxed()) diff --git a/vortex-datafusion/src/persistent/stream.rs b/vortex-datafusion/src/persistent/stream.rs index 52d9b7daecc..be21723a58d 100644 --- a/vortex-datafusion/src/persistent/stream.rs +++ b/vortex-datafusion/src/persistent/stream.rs @@ -10,31 +10,56 @@ use datafusion_common::arrow::array::RecordBatch; use datafusion_pruning::FilePruner; use futures::Stream; use futures::StreamExt; +use futures::ready; use futures::stream::BoxStream; -/// Utility to end a stream early if its backing [`PartitionFile`] can be pruned away by an updated dynamic expression. -pub(crate) struct PrunableStream { - file_pruner: FilePruner, +/// Utility to end a stream early if we can stop processing it by a limit or a dynamic expression in the [`FilePruner`]. +pub(crate) struct EarlyTerminatingStream { + file_pruner: Option, + limit: Option, stream: BoxStream<'static, DFResult>, } -impl PrunableStream { - pub fn new(file_pruner: FilePruner, stream: BoxStream<'static, DFResult>) -> Self { +impl EarlyTerminatingStream { + pub fn new( + file_pruner: Option, + limit: Option, + stream: BoxStream<'static, DFResult>, + ) -> Self { Self { file_pruner, + limit, stream, } } } -impl Stream for PrunableStream { +impl Stream for EarlyTerminatingStream { type Item = DFResult; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.as_mut().file_pruner.should_prune()? { + if self.limit.is_some_and(|l| l == 0) + || self + .file_pruner + .as_mut() + .map(|fp| fp.should_prune()) + .transpose()? + .unwrap_or_default() + { Poll::Ready(None) } else { - self.stream.poll_next_unpin(cx) + match ready!(self.stream.poll_next_unpin(cx)) { + Some(rb) => { + let rb = rb?; + + if let Some(limit) = self.limit.as_mut() { + *limit = limit.saturating_sub(rb.num_rows()); + } + + Poll::Ready(Some(Ok(rb))) + } + None => Poll::Ready(None), + } } } }