Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usi
pub struct VortexFormat {
session: VortexSession,
file_cache: VortexFileCache,
opts: VortexOptions,
opts: VortexTableOptions,
}

impl Debug for VortexFormat {
Expand All @@ -87,7 +87,7 @@ config_namespace! {
/// Can be set through a DataFusion [`SessionConfig`].
///
/// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
pub struct VortexOptions {
pub struct VortexTableOptions {
/// The size of the in-memory [`vortex::file::Footer`] cache.
pub footer_cache_size_mb: usize, default = 64
/// The size of the in-memory segment cache.
Expand All @@ -97,16 +97,20 @@ config_namespace! {
/// Values smaller than `MAX_POSTSCRIPT_SIZE + EOF_SIZE` will be clamped to that minimum
/// during footer parsing.
pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
/// The intra-partition scan concurrency, controlling the number of row splits to process
/// concurrently per-thread within each file. This does not affect the overall parallelism
/// across partitions, which is controlled by DataFusion's execution configuration.
pub scan_concurrency: Option<usize>, default = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't DF let you set this for an entire sessioncontext? do we want to override this on a per-source basis?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's the copy of VortexOptions that is part of VortexFormat, which propagates it downstream from there through either VortexFormatFactory::create or VortexFormat::file_source

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if it makes sense to use the target_partitions config off the environment, but I realize that's different.

Maybe we can make it clear in the doc comment that this is the intra-partition concurrency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intra-partition was the term I was looking for!

}
}

impl Eq for VortexOptions {}
impl Eq for VortexTableOptions {}

/// Minimal factory to create [`VortexFormat`] instances.
#[derive(Debug)]
pub struct VortexFormatFactory {
session: VortexSession,
options: Option<VortexOptions>,
options: Option<VortexTableOptions>,
}

impl GetExt for VortexFormatFactory {
Expand All @@ -131,7 +135,7 @@ impl VortexFormatFactory {
/// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory.
///
/// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`].
pub fn new_with_options(session: VortexSession, options: VortexOptions) -> Self {
pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
Self {
session,
options: Some(options),
Expand All @@ -142,11 +146,11 @@ impl VortexFormatFactory {
///
/// For example:
/// ```rust
/// use vortex_datafusion::{VortexFormatFactory, VortexOptions};
/// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
///
/// let factory = VortexFormatFactory::new().with_options(VortexOptions::default());
/// let factory = VortexFormatFactory::new().with_options(VortexTableOptions::default());
/// ```
pub fn with_options(mut self, options: VortexOptions) -> Self {
pub fn with_options(mut self, options: VortexTableOptions) -> Self {
self.options = Some(options);
self
}
Expand Down Expand Up @@ -186,11 +190,11 @@ impl FileFormatFactory for VortexFormatFactory {
impl VortexFormat {
/// Create a new instance with default options.
pub fn new(session: VortexSession) -> Self {
Self::new_with_options(session, VortexOptions::default())
Self::new_with_options(session, VortexTableOptions::default())
}

/// Creates a new instance with configured by a [`VortexOptions`].
pub fn new_with_options(session: VortexSession, opts: VortexOptions) -> Self {
/// Creates a new instance with configured by a [`VortexTableOptions`].
pub fn new_with_options(session: VortexSession, opts: VortexTableOptions) -> Self {
Self {
session: session.clone(),
file_cache: VortexFileCache::new(
Expand All @@ -204,7 +208,7 @@ impl VortexFormat {
}

/// Return the format specific configuration
pub fn options(&self) -> &VortexOptions {
pub fn options(&self) -> &VortexTableOptions {
&self.opts
}
}
Expand Down Expand Up @@ -428,6 +432,7 @@ impl FileFormat for VortexFormat {
table_schema,
self.session.clone(),
self.file_cache.clone(),
self.opts.clone(),
))
}
}
Expand Down Expand Up @@ -479,7 +484,7 @@ mod tests {
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS vortex \
LOCATION '{}' \
OPTIONS( segment_cache_size_mb '5', footer_initial_read_size_bytes '12345' );",
OPTIONS( segment_cache_size_mb '5', footer_initial_read_size_bytes '12345', scan_concurrency '3' );",
dir.path().to_str().unwrap()
))
.await
Expand All @@ -491,7 +496,7 @@ mod tests {

#[test]
fn format_plumbs_footer_initial_read_size() {
let mut opts = VortexOptions::default();
let mut opts = VortexTableOptions::default();
opts.set("footer_initial_read_size_bytes", "12345").unwrap();

let format = VortexFormat::new_with_options(VortexSession::default(), opts);
Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod stream;
pub use access_plan::VortexAccessPlan;
pub use format::VortexFormat;
pub use format::VortexFormatFactory;
pub use format::VortexOptions;
pub use format::VortexTableOptions;
pub use source::VortexSource;

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub(crate) struct VortexOpener {
pub has_output_ordering: bool,

pub expression_convertor: Arc<dyn ExpressionConvertor>,
pub scan_concurrency: Option<usize>,
}

impl FileOpener for VortexOpener {
Expand All @@ -105,6 +106,7 @@ impl FileOpener for VortexOpener {
let metrics = self.metrics.clone();
let layout_reader = self.layout_readers.clone();
let has_output_ordering = self.has_output_ordering;
let scan_concurrency = self.scan_concurrency;

let expr_convertor = self.expression_convertor.clone();

Expand Down Expand Up @@ -291,6 +293,10 @@ impl FileOpener for VortexOpener {
scan_builder = scan_builder.with_limit(limit);
}

if let Some(concurrency) = scan_concurrency {
scan_builder = scan_builder.with_concurrency(concurrency);
}

let stream = scan_builder
.with_metrics(metrics)
.with_projection(scan_projection)
Expand Down Expand Up @@ -500,6 +506,7 @@ mod tests {
layout_readers: Default::default(),
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
scan_concurrency: None,
}
}

Expand Down Expand Up @@ -591,6 +598,7 @@ mod tests {
layout_readers: Default::default(),
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
scan_concurrency: None,
};

let filter = col("a").lt(lit(100_i32));
Expand Down Expand Up @@ -674,6 +682,7 @@ mod tests {
layout_readers: Default::default(),
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
scan_concurrency: None,
};

// The opener should successfully open the file and reorder columns
Expand Down Expand Up @@ -826,6 +835,7 @@ mod tests {
layout_readers: Default::default(),
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
scan_concurrency: None,
};

// This should succeed and return the correctly projected and cast data
Expand Down Expand Up @@ -882,6 +892,7 @@ mod tests {
layout_readers: Default::default(),
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
scan_concurrency: None,
}
}

Expand Down Expand Up @@ -1080,6 +1091,7 @@ mod tests {
layout_readers: Default::default(),
has_output_ordering: false,
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
scan_concurrency: None,
};

let file = PartitionedFile::new(file_path.to_string(), data_size);
Expand Down
16 changes: 16 additions & 0 deletions vortex-datafusion/src/persistent/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use vortex::session::VortexSession;
use vortex_utils::aliases::dash_map::DashMap;

use super::cache::VortexFileCache;
use super::format::VortexTableOptions;
use super::metrics::PARTITION_LABEL;
use super::opener::VortexOpener;
use crate::convert::exprs::DefaultExpressionConvertor;
Expand Down Expand Up @@ -60,13 +61,15 @@ pub struct VortexSource {
/// Sharing the readers allows us to only read every layout once from the file, even across partitions.
layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
expression_convertor: Arc<dyn ExpressionConvertor>,
options: VortexTableOptions,
}

impl VortexSource {
pub(crate) fn new(
table_schema: TableSchema,
session: VortexSession,
file_cache: VortexFileCache,
options: VortexTableOptions,
) -> Self {
let full_schema = table_schema.table_schema();
let indices = (0..full_schema.fields().len()).collect::<Vec<_>>();
Expand All @@ -83,6 +86,7 @@ impl VortexSource {
_unused_df_metrics: Default::default(),
layout_readers: Arc::new(DashMap::default()),
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
options,
}
}

Expand All @@ -94,6 +98,17 @@ impl VortexSource {
self.expression_convertor = expr_convertor;
self
}

/// Returns the table options for this source.
pub fn options(&self) -> &VortexTableOptions {
&self.options
}

/// Set the table options for this source.
pub fn with_options(mut self, opts: VortexTableOptions) -> Self {
self.options = opts;
self
}
}

impl FileSource for VortexSource {
Expand Down Expand Up @@ -132,6 +147,7 @@ impl FileSource for VortexSource {
layout_readers: self.layout_readers.clone(),
has_output_ordering: !base_config.output_ordering.is_empty(),
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
scan_concurrency: self.options.scan_concurrency,
};

Ok(Arc::new(opener))
Expand Down
Loading