Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vortex-bench/src/datasets/tpch_l_comment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ impl Dataset for TPCHLCommentChunked {
let file_chunks: Vec<_> = file
.scan()?
.with_projection(pack(vec![("l_comment", col("l_comment"))], NonNullable))
.map(|a| Ok(a.to_canonical().into_array()))
.into_array_stream()?
.map_ok(|a| a.to_canonical().into_array())
.try_collect()
.await?;
chunks.extend(file_chunks);
Expand Down
15 changes: 9 additions & 6 deletions vortex-cxx/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use arrow_schema::DataType;
use arrow_schema::Schema;
use arrow_schema::SchemaRef;
use futures::stream::TryStreamExt;
use vortex::array::ArrayRef;
use vortex::array::arrow::IntoArrowArray;
use vortex::buffer::Buffer;
use vortex::file::OpenOptionsSessionExt;
Expand Down Expand Up @@ -57,7 +56,7 @@ pub(crate) fn open_file_from_buffer(data: &[u8]) -> Result<Box<VortexFile>> {
}

pub(crate) struct VortexScanBuilder {
inner: ScanBuilder<ArrayRef>,
inner: ScanBuilder,
output_schema: Option<SchemaRef>,
}

Expand Down Expand Up @@ -162,11 +161,15 @@ pub(crate) fn scan_builder_into_threadsafe_cloneable_reader(

let stream = builder
.inner
.map(move |b| {
b.into_arrow(&data_type)
.map(|struct_array| RecordBatch::from(struct_array.as_struct()))
})
.into_stream()?
.and_then(move |chunk| {
let data_type = data_type.clone();
async move {
chunk
.into_arrow(&data_type)
.map(|struct_array| RecordBatch::from(struct_array.as_struct()))
}
})
.map_err(|e| ArrowError::ExternalError(Box::new(e)));

let iter = RUNTIME.block_on_stream_thread_safe(|_h| stream);
Expand Down
5 changes: 1 addition & 4 deletions vortex-datafusion/src/persistent/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ impl VortexAccessPlan {

impl VortexAccessPlan {
/// Apply the plan to the scan's builder.
pub fn apply_to_builder<A>(&self, mut scan_builder: ScanBuilder<A>) -> ScanBuilder<A>
where
A: 'static + Send,
{
pub fn apply_to_builder(&self, mut scan_builder: ScanBuilder) -> ScanBuilder {
let Self { selection } = self;

if let Some(selection) = selection {
Expand Down
38 changes: 22 additions & 16 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use object_store::ObjectStore;
use object_store::path::Path;
use tracing::Instrument;
use vortex::array::Array;
use vortex::array::ArrayRef;
use vortex::array::arrow::ArrowArrayExecutor;
use vortex::dtype::FieldName;
use vortex::error::VortexError;
Expand Down Expand Up @@ -288,24 +287,30 @@ impl FileOpener for VortexOpener {
scan_builder = scan_builder.with_limit(limit);
}

let chunk_session = session.clone();
let stream = scan_builder
let array_stream = scan_builder
.with_metrics(metrics)
.with_projection(projection_expr)
.with_some_filter(filter)
.with_ordered(has_output_ordering)
.map(move |chunk| {
if *USE_VORTEX_OPERATORS {
let schema = chunk.dtype().to_arrow_schema()?;
chunk.execute_record_batch(&schema, &chunk_session)
} else {
RecordBatch::try_from(chunk.as_ref())
}
})
.into_stream()
.map_err(|e| {
DataFusionError::Execution(format!("Failed to create Vortex stream: {e}"))
})?
})?;

let rb_stream = array_stream
.and_then(move |chunk| {
let session = session.clone();
async move {
let rb = if *USE_VORTEX_OPERATORS {
let schema = chunk.dtype().to_arrow_schema()?;
chunk.execute_record_batch(&schema, &session)?
} else {
RecordBatch::try_from(chunk.as_ref())?
};

Ok(rb)
}
})
.map_ok(move |rb| {
// We try and slice the stream into respecting datafusion's configured batch size.
stream::iter(
Expand Down Expand Up @@ -338,9 +343,9 @@ impl FileOpener for VortexOpener {
.boxed();

if let Some(file_pruner) = file_pruner {
Ok(PrunableStream::new(file_pruner, stream).boxed())
Ok(PrunableStream::new(file_pruner, rb_stream).boxed())
} else {
Ok(stream)
Ok(rb_stream)
}
}
.in_current_span()
Expand All @@ -353,8 +358,8 @@ fn apply_byte_range(
file_range: FileRange,
total_size: u64,
row_count: u64,
scan_builder: ScanBuilder<ArrayRef>,
) -> ScanBuilder<ArrayRef> {
scan_builder: ScanBuilder,
) -> ScanBuilder {
let row_range = byte_range_to_row_range(
file_range.start as u64..file_range.end as u64,
row_count,
Expand Down Expand Up @@ -401,6 +406,7 @@ mod tests {
use object_store::memory::InMemory;
use rstest::rstest;
use vortex::VortexSessionDefault;
use vortex::array::ArrayRef;
use vortex::array::arrow::FromArrowArray;
use vortex::file::WriteOptionsSessionExt;
use vortex::io::ObjectStoreWriter;
Expand Down
6 changes: 5 additions & 1 deletion vortex-duckdb/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use async_compat::Compat;
use futures::FutureExt;
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
use futures::stream;
use futures::stream::BoxStream;
use futures::stream::SelectAll;
Expand Down Expand Up @@ -439,8 +440,11 @@ impl TableFunction for VortexTableFunction {
.with_some_filter(filter_expr)
.with_projection(projection_expr)
.with_ordered(false)
.map(move |split| Ok((split, conversion_cache.clone())))
.into_stream()?
.and_then(move |split| {
let conversion_cache = conversion_cache.clone();
async move { Ok((split, conversion_cache)) }
})
.boxed();

Ok(Some(scan))
Expand Down
3 changes: 1 addition & 2 deletions vortex-file/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::ops::Range;
use std::sync::Arc;

use itertools::Itertools;
use vortex_array::ArrayRef;
use vortex_array::VectorExecutor;
use vortex_array::expr::Expression;
use vortex_array::expr::pruning::checked_pruning_expr;
Expand Down Expand Up @@ -96,7 +95,7 @@ impl VortexFile {
}

/// Initiate a scan of the file, returning a builder for configuring the scan.
pub fn scan(&self) -> VortexResult<ScanBuilder<ArrayRef>> {
pub fn scan(&self) -> VortexResult<ScanBuilder> {
Ok(
ScanBuilder::new(self.session.clone(), self.layout_reader()?)
.with_metrics(self.metrics.clone()),
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ async fn write_nullable_top_level_struct() {

async fn round_trip(
array: &dyn Array,
f: impl Fn(ScanBuilder<ArrayRef>) -> VortexResult<ScanBuilder<ArrayRef>>,
f: impl Fn(ScanBuilder) -> VortexResult<ScanBuilder>,
) -> VortexResult<ArrayRef> {
let mut writer = vec![];
SESSION
Expand Down
2 changes: 1 addition & 1 deletion vortex-python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl PyVortexFile {
expr: Option<Expression>,
indices: Option<ArrayRef>,
batch_size: Option<usize>,
) -> VortexResult<ScanBuilder<ArrayRef>> {
) -> VortexResult<ScanBuilder> {
let mut builder = self
.vxf
.scan()?
Expand Down
3 changes: 1 addition & 2 deletions vortex-python/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use pyo3::exceptions::PyIndexError;
use pyo3::prelude::*;
use vortex::array::Array;
use vortex::array::ArrayRef;
use vortex::scan::RepeatedScan;

use crate::RUNTIME;
Expand All @@ -24,7 +23,7 @@ pub(crate) fn init(py: Python, parent: &Bound<PyModule>) -> PyResult<()> {

#[pyclass(name = "RepeatedScan", module = "vortex", frozen)]
pub struct PyRepeatedScan {
pub scan: RepeatedScan<ArrayRef>,
pub scan: RepeatedScan,
pub row_count: u64,
}

Expand Down
24 changes: 18 additions & 6 deletions vortex-scan/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use vortex_session::VortexSession;

use crate::ScanBuilder;

impl ScanBuilder<ArrayRef> {
impl ScanBuilder {
/// Creates a new `RecordBatchReader` from the scan builder.
///
/// The `schema` parameter is used to define the schema of the resulting record batches. In
Expand All @@ -33,10 +33,17 @@ impl ScanBuilder<ArrayRef> {
let data_type = DataType::Struct(schema.fields().clone());
let session = self.session().clone();

let iter = self
.map(move |chunk| to_record_batch(chunk, &data_type, &session))
.into_iter(runtime)?
.map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e))));
let iter = self.into_iter(runtime)?.map(move |chunk| match chunk {
Ok(chunk) => {
let data_type = data_type.clone();
let session = session.clone();
to_record_batch(chunk, &data_type, &session)
}
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
Err(e) => Err(ArrowError::ExternalError(Box::new(e))),
});

// let iter = runtime.block_on_stream(rb_stream);

Ok(RecordBatchIteratorAdapter { iter, schema })
}
Expand All @@ -49,8 +56,13 @@ impl ScanBuilder<ArrayRef> {
let session = self.session().clone();

let stream = self
.map(move |chunk| to_record_batch(chunk, &data_type, &session))
.into_stream()?
.and_then(move |chunk| {
let data_type = data_type.clone();
let session = session.clone();

async move { to_record_batch(chunk, &data_type, &session) }
})
.map_err(|e| ArrowError::ExternalError(Box::new(e)));

Ok(stream)
Expand Down
57 changes: 25 additions & 32 deletions vortex-scan/src/repeated_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::ops::Range;
use std::sync::Arc;

use futures::Stream;
use futures::StreamExt;
use futures::future::BoxFuture;
use itertools::Either;
use itertools::Itertools;
Expand All @@ -33,7 +34,7 @@ use crate::tasks::split_exec;
///
/// The method of this struct enable, possibly concurrent, scanning of multiple row ranges of this
/// data source.
pub struct RepeatedScan<A: 'static + Send> {
pub struct RepeatedScan {
session: VortexSession,
layout_reader: LayoutReaderRef,
projection: Expression,
Expand All @@ -47,37 +48,13 @@ pub struct RepeatedScan<A: 'static + Send> {
splits: Splits,
/// The number of splits to make progress on concurrently **per-thread**.
concurrency: usize,
/// Function to apply to each [`ArrayRef`] within the spawned split tasks.
map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
/// Maximal number of rows to read (after filtering)
limit: Option<usize>,
/// The dtype of the projected arrays.
dtype: DType,
}

impl RepeatedScan<ArrayRef> {
pub fn execute_array_iter<B: BlockingRuntime>(
&self,
row_range: Option<Range<u64>>,
runtime: &B,
) -> VortexResult<impl ArrayIterator + 'static> {
let dtype = self.dtype.clone();
let stream = self.execute_stream(row_range)?;
let iter = runtime.block_on_stream(stream);
Ok(ArrayIteratorAdapter::new(dtype, iter))
}

pub fn execute_array_stream(
&self,
row_range: Option<Range<u64>>,
) -> VortexResult<impl ArrayStream + Send + 'static> {
let dtype = self.dtype.clone();
let stream = self.execute_stream(row_range)?;
Ok(ArrayStreamAdapter::new(dtype, stream))
}
}

impl<A: 'static + Send> RepeatedScan<A> {
impl RepeatedScan {
/// Constructor just to allow `scan_builder` to create a `RepeatedScan`.
#[expect(
clippy::too_many_arguments,
Expand All @@ -93,7 +70,6 @@ impl<A: 'static + Send> RepeatedScan<A> {
selection: Selection,
splits: Splits,
concurrency: usize,
map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
limit: Option<usize>,
dtype: DType,
) -> Self {
Expand All @@ -107,22 +83,40 @@ impl<A: 'static + Send> RepeatedScan<A> {
selection,
splits,
concurrency,
map_fn,
limit,
dtype,
}
}

pub fn execute_array_iter<B: BlockingRuntime>(
&self,
row_range: Option<Range<u64>>,
runtime: &B,
) -> VortexResult<impl ArrayIterator + 'static> {
let dtype = self.dtype.clone();
let stream = self.execute_stream(row_range)?;
let iter = runtime.block_on_stream(stream);
Ok(ArrayIteratorAdapter::new(dtype, iter))
}

pub fn execute_array_stream(
&self,
row_range: Option<Range<u64>>,
) -> VortexResult<impl ArrayStream + Send> {
let dtype = self.dtype.clone();
let stream = self.execute_stream(row_range)?;
Ok(ArrayStreamAdapter::new(dtype, stream))
}

pub fn execute(
&self,
row_range: Option<Range<u64>>,
) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<ArrayRef>>>>> {
let ctx = Arc::new(TaskContext {
selection: self.selection.clone(),
filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
reader: self.layout_reader.clone(),
projection: self.projection.clone(),
mapper: self.map_fn.clone(),
});

let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
Expand Down Expand Up @@ -175,8 +169,7 @@ impl<A: 'static + Send> RepeatedScan<A> {
pub fn execute_stream(
&self,
row_range: Option<Range<u64>>,
) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
use futures::StreamExt;
) -> VortexResult<impl Stream<Item = VortexResult<ArrayRef>> + 'static> {
let num_workers = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
Expand Down
Loading
Loading