diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 2feaa2dcac7..404794b4ea3 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -70,7 +70,7 @@ const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usi pub struct VortexFormat { session: VortexSession, file_cache: VortexFileCache, - opts: VortexOptions, + opts: VortexTableOptions, } impl Debug for VortexFormat { @@ -87,7 +87,7 @@ config_namespace! { /// Can be set through a DataFusion [`SessionConfig`]. /// /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html - pub struct VortexOptions { + pub struct VortexTableOptions { /// The size of the in-memory [`vortex::file::Footer`] cache. pub footer_cache_size_mb: usize, default = 64 /// The size of the in-memory segment cache. @@ -97,16 +97,20 @@ config_namespace! { /// Values smaller than `MAX_POSTSCRIPT_SIZE + EOF_SIZE` will be clamped to that minimum /// during footer parsing. pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES + /// The intra-partition scan concurrency, controlling the number of row splits to process + /// concurrently per-thread within each file. This does not affect the overall parallelism + /// across partitions, which is controlled by DataFusion's execution configuration. + pub scan_concurrency: Option, default = None } } -impl Eq for VortexOptions {} +impl Eq for VortexTableOptions {} /// Minimal factory to create [`VortexFormat`] instances. #[derive(Debug)] pub struct VortexFormatFactory { session: VortexSession, - options: Option, + options: Option, } impl GetExt for VortexFormatFactory { @@ -131,7 +135,7 @@ impl VortexFormatFactory { /// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory. /// /// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`]. - pub fn new_with_options(session: VortexSession, options: VortexOptions) -> Self { + pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self { Self { session, options: Some(options), @@ -142,11 +146,11 @@ impl VortexFormatFactory { /// /// For example: /// ```rust - /// use vortex_datafusion::{VortexFormatFactory, VortexOptions}; + /// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions}; /// - /// let factory = VortexFormatFactory::new().with_options(VortexOptions::default()); + /// let factory = VortexFormatFactory::new().with_options(VortexTableOptions::default()); /// ``` - pub fn with_options(mut self, options: VortexOptions) -> Self { + pub fn with_options(mut self, options: VortexTableOptions) -> Self { self.options = Some(options); self } @@ -186,11 +190,11 @@ impl FileFormatFactory for VortexFormatFactory { impl VortexFormat { /// Create a new instance with default options. pub fn new(session: VortexSession) -> Self { - Self::new_with_options(session, VortexOptions::default()) + Self::new_with_options(session, VortexTableOptions::default()) } - /// Creates a new instance with configured by a [`VortexOptions`]. - pub fn new_with_options(session: VortexSession, opts: VortexOptions) -> Self { + /// Creates a new instance with configured by a [`VortexTableOptions`]. + pub fn new_with_options(session: VortexSession, opts: VortexTableOptions) -> Self { Self { session: session.clone(), file_cache: VortexFileCache::new( @@ -204,7 +208,7 @@ impl VortexFormat { } /// Return the format specific configuration - pub fn options(&self) -> &VortexOptions { + pub fn options(&self) -> &VortexTableOptions { &self.opts } } @@ -428,6 +432,7 @@ impl FileFormat for VortexFormat { table_schema, self.session.clone(), self.file_cache.clone(), + self.opts.clone(), )) } } @@ -479,7 +484,7 @@ mod tests { (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \ STORED AS vortex \ LOCATION '{}' \ - OPTIONS( segment_cache_size_mb '5', footer_initial_read_size_bytes '12345' );", + OPTIONS( segment_cache_size_mb '5', footer_initial_read_size_bytes '12345', scan_concurrency '3' );", dir.path().to_str().unwrap() )) .await @@ -491,7 +496,7 @@ mod tests { #[test] fn format_plumbs_footer_initial_read_size() { - let mut opts = VortexOptions::default(); + let mut opts = VortexTableOptions::default(); opts.set("footer_initial_read_size_bytes", "12345").unwrap(); let format = VortexFormat::new_with_options(VortexSession::default(), opts); diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index 21033dbbccb..c0b621a6742 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -14,7 +14,7 @@ mod stream; pub use access_plan::VortexAccessPlan; pub use format::VortexFormat; pub use format::VortexFormatFactory; -pub use format::VortexOptions; +pub use format::VortexTableOptions; pub use source::VortexSource; #[cfg(test)] diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index c6c2badc528..740538ead11 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -84,6 +84,7 @@ pub(crate) struct VortexOpener { pub has_output_ordering: bool, pub expression_convertor: Arc, + pub scan_concurrency: Option, } impl FileOpener for VortexOpener { @@ -105,6 +106,7 @@ impl FileOpener for VortexOpener { let metrics = self.metrics.clone(); let layout_reader = self.layout_readers.clone(); let has_output_ordering = self.has_output_ordering; + let scan_concurrency = self.scan_concurrency; let expr_convertor = self.expression_convertor.clone(); @@ -291,6 +293,10 @@ impl FileOpener for VortexOpener { scan_builder = scan_builder.with_limit(limit); } + if let Some(concurrency) = scan_concurrency { + scan_builder = scan_builder.with_concurrency(concurrency); + } + let stream = scan_builder .with_metrics(metrics) .with_projection(scan_projection) @@ -500,6 +506,7 @@ mod tests { layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), + scan_concurrency: None, } } @@ -591,6 +598,7 @@ mod tests { layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), + scan_concurrency: None, }; let filter = col("a").lt(lit(100_i32)); @@ -674,6 +682,7 @@ mod tests { layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), + scan_concurrency: None, }; // The opener should successfully open the file and reorder columns @@ -826,6 +835,7 @@ mod tests { layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), + scan_concurrency: None, }; // This should succeed and return the correctly projected and cast data @@ -882,6 +892,7 @@ mod tests { layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), + scan_concurrency: None, } } @@ -1080,6 +1091,7 @@ mod tests { layout_readers: Default::default(), has_output_ordering: false, expression_convertor: Arc::new(DefaultExpressionConvertor::default()), + scan_concurrency: None, }; let file = PartitionedFile::new(file_path.to_string(), data_size); diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index cab3ea275d9..62015b24cdf 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -33,6 +33,7 @@ use vortex::session::VortexSession; use vortex_utils::aliases::dash_map::DashMap; use super::cache::VortexFileCache; +use super::format::VortexTableOptions; use super::metrics::PARTITION_LABEL; use super::opener::VortexOpener; use crate::convert::exprs::DefaultExpressionConvertor; @@ -60,6 +61,7 @@ pub struct VortexSource { /// Sharing the readers allows us to only read every layout once from the file, even across partitions. layout_readers: Arc>>, expression_convertor: Arc, + options: VortexTableOptions, } impl VortexSource { @@ -67,6 +69,7 @@ impl VortexSource { table_schema: TableSchema, session: VortexSession, file_cache: VortexFileCache, + options: VortexTableOptions, ) -> Self { let full_schema = table_schema.table_schema(); let indices = (0..full_schema.fields().len()).collect::>(); @@ -83,6 +86,7 @@ impl VortexSource { _unused_df_metrics: Default::default(), layout_readers: Arc::new(DashMap::default()), expression_convertor: Arc::new(DefaultExpressionConvertor::default()), + options, } } @@ -94,6 +98,17 @@ impl VortexSource { self.expression_convertor = expr_convertor; self } + + /// Returns the table options for this source. + pub fn options(&self) -> &VortexTableOptions { + &self.options + } + + /// Set the table options for this source. + pub fn with_options(mut self, opts: VortexTableOptions) -> Self { + self.options = opts; + self + } } impl FileSource for VortexSource { @@ -132,6 +147,7 @@ impl FileSource for VortexSource { layout_readers: self.layout_readers.clone(), has_output_ordering: !base_config.output_ordering.is_empty(), expression_convertor: Arc::new(DefaultExpressionConvertor::default()), + scan_concurrency: self.options.scan_concurrency, }; Ok(Arc::new(opener))