Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,7 @@ async fn parquet_explain_analyze() {
&formatted,
"row_groups_pruned_statistics=1 total \u{2192} 1 matched"
);
assert_contains!(&formatted, "scan_efficiency_ratio=14% (259/1851)");

// The order of metrics is expected to be the same as the actual pruning order
// (file-> row-group -> page)
Expand Down
11 changes: 10 additions & 1 deletion datafusion/datasource-parquet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// under the License.

use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics, Time,
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics,
RatioMetrics, Time,
};

/// Stores metrics about the parquet execution for a particular parquet file.
Expand Down Expand Up @@ -66,6 +67,8 @@ pub struct ParquetFileMetrics {
pub page_index_eval_time: Time,
/// Total time spent reading and parsing metadata from the footer
pub metadata_load_time: Time,
/// Scan Efficiency Ratio, calculated as bytes_scanned / total_file_size
pub scan_efficiency_ratio: RatioMetrics,
/// Predicate Cache: number of records read directly from the inner reader.
/// This is the number of rows decoded while evaluating predicates
pub predicate_cache_inner_records: Count,
Expand Down Expand Up @@ -114,6 +117,11 @@ impl ParquetFileMetrics {
.with_type(MetricType::SUMMARY)
.pruning_metrics("files_ranges_pruned_statistics", partition);

let scan_efficiency_ratio = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::SUMMARY)
.ratio_metrics("scan_efficiency_ratio", partition);

// -----------------------
// 'dev' level metrics
// -----------------------
Expand Down Expand Up @@ -164,6 +172,7 @@ impl ParquetFileMetrics {
bloom_filter_eval_time,
page_index_eval_time,
metadata_load_time,
scan_efficiency_ratio,
predicate_cache_inner_records,
predicate_cache_records,
}
Expand Down
24 changes: 24 additions & 0 deletions datafusion/datasource-parquet/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl DefaultParquetFileReaderFactory {
pub struct ParquetFileReader {
pub file_metrics: ParquetFileMetrics,
pub inner: ParquetObjectReader,
pub partitioned_file: PartitionedFile,
Comment on lines 97 to +100
Copy link
Contributor Author

@petern48 petern48 Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure, would this technically be considered a breaking change? This struct is only constructed manually throughout the crate (no constructor).

ParquetFileMetrics is in this ParquetFileReader struct, but ParquetFileReader didn't have a way to access the total file size, since the file_size field in ParquetObjectReader was private (and inside of arrow-rs. This is the change that made the most sense to me, since CachedParquetFileReader already has this as a field.

pub struct CachedParquetFileReader {
pub file_metrics: ParquetFileMetrics,
store: Arc<dyn ObjectStore>,
pub inner: ParquetObjectReader,
partitioned_file: PartitionedFile,

Alternatively, maybe I could add a file_size() getter to ParquetObjectReader in arrow-rs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, but here I think a breaking change is acceptable

}

impl AsyncFileReader for ParquetFileReader {
Expand Down Expand Up @@ -129,6 +130,17 @@ impl AsyncFileReader for ParquetFileReader {
}
}

impl Drop for ParquetFileReader {
fn drop(&mut self) {
self.file_metrics
.scan_efficiency_ratio
.add_part(self.file_metrics.bytes_scanned.value());
self.file_metrics
.scan_efficiency_ratio
.add_total(self.partitioned_file.object_meta.size as usize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vaguely remember each parquet reader is possible to read only a sub-range of a file, if that's the case here we might double count the total size.

I verified with the following queries -- The l_orderkey is globally ordered, so if we set predicate l_orderkey=42 then only the first row group will be scanned. if we set partition to 2, then there are two parquet reader instance, each reading half of the file range, and the resulting scan_efficiency_ratio should be same even if they're executed under different partitioning, now they're different.

Data is generated with https://github.com/clflushopt/tpchgen-rs/tree/main/tpchgen-cli, in parquet, sf1

-- If target_partitions=1, the ratio is 0.86%
-- If target_partitions=2, the ratio is 0.43%
> set datafusion.execution.target_partitions = 2;
0 row(s) fetched.
Elapsed 0.000 seconds.

> explain analyze select * from '/Users/yongting/data/tpch_sf1/lineitem.parquet' where l_orderkey = 42;
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=0, elapsed_compute=376ns, output_bytes=0.0 B]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|                   |   FilterExec: l_orderkey@0 = 42, metrics=[output_rows=0, elapsed_compute=25.835µs, output_bytes=0.0 B, selectivity=0% (0/20096)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|                   |     DataSourceExec: file_groups={2 groups: [[Users/yongting/data/tpch_sf1/lineitem.parquet:0..115375407], [Users/yongting/data/tpch_sf1/lineitem.parquet:115375407..230750813]]}, projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment], file_type=parquet, predicate=l_orderkey@0 = 42, pruning_predicate=l_orderkey_null_count@2 != row_count@3 AND l_orderkey_min@0 <= 42 AND 42 <= l_orderkey_max@1, required_guarantees=[l_orderkey in (42)], metrics=[output_rows=20096, elapsed_compute=2ns, output_bytes=6.6 MB, files_ranges_pruned_statistics=2 total → 2 matched, row_groups_pruned_statistics=49 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_rows_pruned=123010 total → 20096 matched, bytes_scanned=1994659, metadata_load_time=123.46µs, scan_efficiency_ratio=0.43% (1994659/461501626)] |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.010 seconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a fix for this. I also manually verified that the results are the same.

tpchgen-cli -s 1 --format=parquet --output-dir ...

set datafusion.execution.target_partitions = 2;
explain analyze select * from '<path>/lineitem.parquet' where l_orderkey = 42;
-- scan_efficiency_ratio=0.83% (1919614/231669547)

set datafusion.execution.target_partitions = 1;
explain analyze select * from '<path>/lineitem.parquet' where l_orderkey = 42;
-- scan_efficiency_ratio=0.83% (1919614/231669547)

I was hoping to then write a test that would fail on the previous implementation but pass, now. I found it difficult to write a test that does so with the existing files in datafusion/core/tests/data (which are all fairly small). Then I tried to writing a test that mimics this exactly, but found the file was quite large (200+ MB), compared to the rest of the tests in the directory.

image
The test I wrote (not pushed)
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn parquet_scan_efficiency_ratio() {
    let table_name = "tpch_lineitem_sf1";
    let parquet_path = "tests/data/tpch_lineitem_sf1.parquet";

    for num_partitions in [1, 2, 5] {
        let config = SessionConfig::new().with_target_partitions(num_partitions);
        let ctx = SessionContext::new_with_config(config);
        ctx.register_parquet(table_name, parquet_path, ParquetReadOptions::default())
            .await
            .expect("register parquet table for explain analyze test");

        let sql =
            "EXPLAIN ANALYZE select * from tpch_lineitem_sf1 where l_orderkey = 42;";
        let actual = execute_to_batches(&ctx, sql).await;
        let formatted = arrow::util::pretty::pretty_format_batches(&actual)
            .unwrap()
            .to_string();

        assert_contains!(
            &formatted,
            "scan_efficiency_ratio=0.83% (1919614/231669547)"
        );
    }

How would you like to proceed? Do you still want a new test, or is the PR good as it is? The test that existed before this changed passes before and after.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps in https://github.com/apache/parquet-testing/tree/107b36603e051aee26bd93e04b871034f6c756c0, we can find a existing parquet test file, with multiple row groups, and apply a filter that can prune out some row groups.

However, we can proceed as is. Since it's just a metric, the impact is minimal even if it misbehaves.

}
}

impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
fn create_reader(
&self,
Expand Down Expand Up @@ -156,6 +168,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
Ok(Box::new(ParquetFileReader {
inner,
file_metrics,
partitioned_file,
}))
}
}
Expand Down Expand Up @@ -286,6 +299,17 @@ impl AsyncFileReader for CachedParquetFileReader {
}
}

impl Drop for CachedParquetFileReader {
fn drop(&mut self) {
self.file_metrics
.scan_efficiency_ratio
.add_part(self.file_metrics.bytes_scanned.value());
self.file_metrics
.scan_efficiency_ratio
.add_total(self.partitioned_file.object_meta.size as usize);
}
}

/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`].
pub struct CachedParquetMetaData(Arc<ParquetMetaData>);

Expand Down
13 changes: 12 additions & 1 deletion datafusion/datasource-parquet/src/row_group_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1533,6 +1533,7 @@ mod tests {
data: bytes::Bytes,
pruning_predicate: &PruningPredicate,
) -> Result<RowGroupAccessPlanFilter> {
use datafusion_datasource::PartitionedFile;
use object_store::{ObjectMeta, ObjectStore};

let object_meta = ObjectMeta {
Expand All @@ -1551,12 +1552,22 @@ mod tests {
let metrics = ExecutionPlanMetricsSet::new();
let file_metrics =
ParquetFileMetrics::new(0, object_meta.location.as_ref(), &metrics);
let inner = ParquetObjectReader::new(Arc::new(in_memory), object_meta.location)
let inner = ParquetObjectReader::new(Arc::new(in_memory), object_meta.location.clone())
.with_file_size(object_meta.size);

let partitioned_file = PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
};

let reader = ParquetFileReader {
inner,
file_metrics: file_metrics.clone(),
partitioned_file,
};
let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();

Expand Down
Loading