diff --git a/vortex-datafusion/tests/schema_evolution.rs b/vortex-datafusion/tests/schema_evolution.rs index 637893bf73e..f088b023f96 100644 --- a/vortex-datafusion/tests/schema_evolution.rs +++ b/vortex-datafusion/tests/schema_evolution.rs @@ -25,6 +25,7 @@ use datafusion::datasource::listing::ListingTableConfig; use datafusion::execution::SessionStateBuilder; use datafusion::execution::context::SessionContext; use datafusion_common::assert_batches_eq; +use datafusion_common::assert_batches_sorted_eq; use datafusion_common::create_array; use datafusion_common::record_batch; use datafusion_datasource::ListingTableUrl; @@ -149,11 +150,11 @@ async fn test_filter_with_schema_evolution() { "| a | b |", "+-------+---+", "| one | |", - "| two | |", "| three | |", + "| two | |", "+-------+---+", ]; - assert_batches_eq!(expected, &result); + assert_batches_sorted_eq!(expected, &result); } #[tokio::test] @@ -246,14 +247,14 @@ async fn test_filter_schema_evolution_order() { // a field: present in both files // b field: only present in file2, file1 fills with nulls - assert_batches_eq!( + assert_batches_sorted_eq!( &[ "+---+------+", "| a | b |", "+---+------+", "| 3 | |", - "| 5 | |", "| 4 | four |", + "| 5 | |", "| 6 | six |", "+---+------+", ], @@ -351,7 +352,7 @@ async fn test_filter_schema_evolution_struct_fields() { // Scan all the records, NULLs are filled in for nested optional fields. let full_scan = df.collect().await.unwrap(); - assert_batches_eq!( + assert_batches_sorted_eq!( &[ "+--------------+-----------------------------+", "| hostname | payload |", @@ -385,7 +386,7 @@ async fn test_filter_schema_evolution_struct_fields() { .await .unwrap(); - assert_batches_eq!( + assert_batches_sorted_eq!( &[ "+--------------+-----------------------------+", "| hostname | payload |", diff --git a/vortex-scan/src/arrow.rs b/vortex-scan/src/arrow.rs index cc2ecb3caad..ac15f699f65 100644 --- a/vortex-scan/src/arrow.rs +++ b/vortex-scan/src/arrow.rs @@ -128,7 +128,7 @@ mod tests { use vortex_error::VortexResult; use super::*; - use crate::test::SESSION; + use crate::test::SCAN_SESSION; fn create_test_struct_array() -> VortexResult { // Create Arrow arrays @@ -166,7 +166,7 @@ mod tests { let vortex_array = create_test_struct_array()?; let schema = create_arrow_schema(); let data_type = DataType::Struct(schema.fields().clone()); - let mut ctx = SESSION.create_execution_ctx(); + let mut ctx = SCAN_SESSION.create_execution_ctx(); let batch = to_record_batch(vortex_array, &data_type, &mut ctx)?; assert_eq!(batch.num_columns(), 2); diff --git a/vortex-scan/src/scan_builder.rs b/vortex-scan/src/scan_builder.rs index fd91c0608a7..9f82b142ffd 100644 --- a/vortex-scan/src/scan_builder.rs +++ b/vortex-scan/src/scan_builder.rs @@ -6,6 +6,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; +use std::task::ready; use futures::Stream; use futures::StreamExt; @@ -31,6 +32,9 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_io::runtime::BlockingRuntime; +use vortex_io::runtime::Handle; +use vortex_io::runtime::Task; +use vortex_io::session::RuntimeSessionExt; use vortex_layout::LayoutReader; use vortex_layout::LayoutReaderRef; use vortex_layout::layouts::row_idx::RowIdxLayoutReader; @@ -308,10 +312,20 @@ impl ScanBuilder { enum LazyScanState { Builder(Option>>), + Preparing(PreparingScan), Stream(BoxStream<'static, VortexResult>), Error(Option), } +type PreparedScanTasks = Vec>>>; + +struct PreparingScan { + ordered: bool, + concurrency: usize, + handle: Handle, + task: Task>>, +} + struct LazyScanStream { state: LazyScanState, } @@ -334,11 +348,40 @@ impl Stream for LazyScanStream { match &mut self.state { LazyScanState::Builder(builder) => { let builder = builder.take().vortex_expect("polled after completion"); - match builder - .prepare() - .and_then(|scan| scan.execute_stream(None).map(|s| s.boxed())) - { - Ok(stream) => self.state = LazyScanState::Stream(stream), + let ordered = builder.ordered; + let num_workers = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + let concurrency = builder.concurrency * num_workers; + let handle = builder.session.handle(); + let task = handle.spawn_blocking(move || { + builder.prepare().and_then(|scan| scan.execute(None)) + }); + self.state = LazyScanState::Preparing(PreparingScan { + ordered, + concurrency, + handle, + task, + }); + } + LazyScanState::Preparing(preparing) => { + match ready!(Pin::new(&mut preparing.task).poll(cx)) { + Ok(tasks) => { + let ordered = preparing.ordered; + let concurrency = preparing.concurrency; + let handle = preparing.handle.clone(); + let stream = + futures::stream::iter(tasks).map(move |task| handle.spawn(task)); + let stream = if ordered { + stream.buffered(concurrency).boxed() + } else { + stream.buffer_unordered(concurrency).boxed() + }; + let stream = stream + .filter_map(|chunk| async move { chunk.transpose() }) + .boxed(); + self.state = LazyScanState::Stream(stream); + } Err(err) => self.state = LazyScanState::Error(Some(err)), } } @@ -392,20 +435,30 @@ fn to_field_mask(field: FieldName) -> FieldMask { mod test { use std::collections::BTreeSet; use std::ops::Range; + use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; - + use std::task::Context; + use std::task::Poll; + use std::time::Duration; + + use futures::Stream; + use futures::task::noop_waker_ref; + use parking_lot::Mutex; + use vortex_array::IntoArray; use vortex_array::MaskFuture; + use vortex_array::ToCanonical; + use vortex_array::arrays::PrimitiveArray; use vortex_array::expr::Expression; use vortex_dtype::DType; use vortex_dtype::FieldMask; use vortex_dtype::Nullability; use vortex_dtype::PType; use vortex_error::VortexResult; + use vortex_error::vortex_err; use vortex_io::runtime::BlockingRuntime; use vortex_io::runtime::single::SingleThreadRuntime; - use vortex_io::session::RuntimeSessionExt; use vortex_layout::ArrayFuture; use vortex_layout::LayoutReader; use vortex_mask::Mask; @@ -490,11 +543,228 @@ mod test { let calls = Arc::new(AtomicUsize::new(0)); let reader = Arc::new(CountingLayoutReader::new(calls.clone())); - let runtime = SingleThreadRuntime::default(); - let session = crate::test::SESSION.clone().with_handle(runtime.handle()); + let session = crate::test::SCAN_SESSION.clone(); let _stream = ScanBuilder::new(session, reader).into_stream().unwrap(); assert_eq!(calls.load(Ordering::Relaxed), 0); } + + #[derive(Debug)] + struct SplittingLayoutReader { + name: Arc, + dtype: DType, + row_count: u64, + register_splits_calls: Arc, + } + + impl SplittingLayoutReader { + fn new(register_splits_calls: Arc) -> Self { + Self { + name: Arc::from("splitting"), + dtype: DType::Primitive(PType::I32, Nullability::NonNullable), + row_count: 4, + register_splits_calls, + } + } + } + + impl LayoutReader for SplittingLayoutReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn register_splits( + &self, + _field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + self.register_splits_calls.fetch_add(1, Ordering::Relaxed); + for split in (row_range.start + 1)..=row_range.end { + splits.insert(split); + } + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: Mask, + ) -> VortexResult { + Ok(MaskFuture::ready(mask)) + } + + fn filter_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + Ok(mask) + } + + fn projection_evaluation( + &self, + row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + let start = usize::try_from(row_range.start) + .map_err(|_| vortex_err!("row_range.start must fit in usize"))?; + let end = usize::try_from(row_range.end) + .map_err(|_| vortex_err!("row_range.end must fit in usize"))?; + + let values: VortexResult> = (start..end) + .map(|v| i32::try_from(v).map_err(|_| vortex_err!("split value must fit in i32"))) + .collect(); + + let array = PrimitiveArray::from_iter(values?).into_array(); + Ok(Box::pin(async move { Ok(array) })) + } + } + + #[test] + fn into_stream_executes_after_prepare() -> VortexResult<()> { + let calls = Arc::new(AtomicUsize::new(0)); + let reader = Arc::new(SplittingLayoutReader::new(calls.clone())); + + let runtime = SingleThreadRuntime::default(); + let session = crate::test::session_with_handle(runtime.handle()); + + let stream = ScanBuilder::new(session, reader).into_stream().unwrap(); + let mut iter = runtime.block_on_stream(stream); + + let mut values = Vec::new(); + for chunk in &mut iter { + values.push(chunk?.to_primitive().buffer::()[0]); + } + + assert_eq!(calls.load(Ordering::Relaxed), 1); + assert_eq!(values, vec![0, 1, 2, 3]); + + Ok(()) + } + + #[derive(Debug)] + struct BlockingSplitsLayoutReader { + name: Arc, + dtype: DType, + row_count: u64, + register_splits_calls: Arc, + gate: Arc>, + } + + impl BlockingSplitsLayoutReader { + fn new(gate: Arc>, register_splits_calls: Arc) -> Self { + Self { + name: Arc::from("blocking-splits"), + dtype: DType::Primitive(PType::I32, Nullability::NonNullable), + row_count: 1, + register_splits_calls, + gate, + } + } + } + + impl LayoutReader for BlockingSplitsLayoutReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn register_splits( + &self, + _field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + self.register_splits_calls.fetch_add(1, Ordering::Relaxed); + let _guard = self.gate.lock(); + splits.insert(row_range.end); + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: Mask, + ) -> VortexResult { + unimplemented!("not needed for this test"); + } + + fn filter_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + unimplemented!("not needed for this test"); + } + + fn projection_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + Ok(Box::pin(async move { + unreachable!("scan should not be polled in this test") + })) + } + } + + #[test] + fn into_stream_first_poll_does_not_block() { + let gate = Arc::new(Mutex::new(())); + let guard = gate.lock(); + + let calls = Arc::new(AtomicUsize::new(0)); + let reader = Arc::new(BlockingSplitsLayoutReader::new(gate.clone(), calls.clone())); + + let runtime = SingleThreadRuntime::default(); + let session = crate::test::session_with_handle(runtime.handle()); + + let mut stream = ScanBuilder::new(session, reader).into_stream().unwrap(); + + let (send, recv) = std::sync::mpsc::channel::(); + let join = std::thread::spawn(move || { + let waker = noop_waker_ref(); + let mut cx = Context::from_waker(waker); + let poll = Pin::new(&mut stream).poll_next(&mut cx); + let _ = send.send(matches!(poll, Poll::Pending)); + }); + + let polled_pending = recv.recv_timeout(Duration::from_secs(1)).ok(); + + // Always release the gate and join the thread so failures don't hang the test process. + drop(guard); + drop(join.join()); + + let polled_pending = polled_pending.expect("poll_next blocked; expected quick return"); + assert!( + polled_pending, + "expected Poll::Pending while prepare is blocked" + ); + assert_eq!(calls.load(Ordering::Relaxed), 0); + + drop(runtime); + } } diff --git a/vortex-scan/src/split_by.rs b/vortex-scan/src/split_by.rs index 55e11078955..f35f8edfd92 100644 --- a/vortex-scan/src/split_by.rs +++ b/vortex-scan/src/split_by.rs @@ -67,7 +67,7 @@ mod test { use vortex_layout::sequence::SequentialArrayStreamExt; use super::*; - use crate::test::SESSION; + use crate::test::SCAN_SESSION; fn reader() -> LayoutReaderRef { let ctx = ArrayContext::empty(); @@ -89,7 +89,9 @@ mod test { }) .unwrap(); - layout.new_reader("".into(), segments, &SESSION).unwrap() + layout + .new_reader("".into(), segments, &SCAN_SESSION) + .unwrap() } #[test] diff --git a/vortex-scan/src/test.rs b/vortex-scan/src/test.rs index d29bf38d3b5..d150d159716 100644 --- a/vortex-scan/src/test.rs +++ b/vortex-scan/src/test.rs @@ -5,16 +5,24 @@ use std::sync::LazyLock; use vortex_array::expr::session::ExprSession; use vortex_array::session::ArraySession; +use vortex_io::runtime::Handle; use vortex_io::session::RuntimeSession; +use vortex_io::session::RuntimeSessionExt; use vortex_layout::session::LayoutSession; use vortex_metrics::VortexMetrics; use vortex_session::VortexSession; -pub static SESSION: LazyLock = LazyLock::new(|| { +pub fn new_session() -> VortexSession { VortexSession::empty() .with::() .with::() .with::() .with::() .with::() -}); +} + +pub fn session_with_handle(handle: Handle) -> VortexSession { + new_session().with_handle(handle) +} + +pub static SCAN_SESSION: LazyLock = LazyLock::new(new_session);