Skip to content

Conversation

@petern48
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Added test

Are there any user-facing changes?

Yes, new metric to view.

@github-actions github-actions bot added core Core DataFusion crate datasource Changes to the datasource crate labels Nov 10, 2025
@petern48 petern48 changed the title feat(parquet): Implement scan_efficiency_ratio metric for parquet reading stats feat(parquet): Implement scan_efficiency_ratio metric for parquet reading Nov 10, 2025
Comment on lines 97 to +100
pub struct ParquetFileReader {
pub file_metrics: ParquetFileMetrics,
pub inner: ParquetObjectReader,
pub partitioned_file: PartitionedFile,
Copy link
Contributor Author

@petern48 petern48 Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure, would this technically be considered a breaking change? This struct is only constructed manually throughout the crate (no constructor).

ParquetFileMetrics is in this ParquetFileReader struct, but ParquetFileReader didn't have a way to access the total file size, since the file_size field in ParquetObjectReader was private (and inside of arrow-rs. This is the change that made the most sense to me, since CachedParquetFileReader already has this as a field.

pub struct CachedParquetFileReader {
pub file_metrics: ParquetFileMetrics,
store: Arc<dyn ObjectStore>,
pub inner: ParquetObjectReader,
partitioned_file: PartitionedFile,

Alternatively, maybe I could add a file_size() getter to ParquetObjectReader in arrow-rs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, but here I think a breaking change is acceptable

@petern48 petern48 marked this pull request as ready for review November 10, 2025 01:08
@petern48
Copy link
Contributor Author

cc @2010YOUY01

.add_part(self.file_metrics.bytes_scanned.value());
self.file_metrics
.scan_efficiency_ratio
.add_total(self.partitioned_file.object_meta.size as usize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vaguely remember each parquet reader is possible to read only a sub-range of a file, if that's the case here we might double count the total size.

I verified with the following queries -- The l_orderkey is globally ordered, so if we set predicate l_orderkey=42 then only the first row group will be scanned. if we set partition to 2, then there are two parquet reader instance, each reading half of the file range, and the resulting scan_efficiency_ratio should be same even if they're executed under different partitioning, now they're different.

Data is generated with https://github.com/clflushopt/tpchgen-rs/tree/main/tpchgen-cli, in parquet, sf1

-- If target_partitions=1, the ratio is 0.86%
-- If target_partitions=2, the ratio is 0.43%
> set datafusion.execution.target_partitions = 2;
0 row(s) fetched.
Elapsed 0.000 seconds.

> explain analyze select * from '/Users/yongting/data/tpch_sf1/lineitem.parquet' where l_orderkey = 42;
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=0, elapsed_compute=376ns, output_bytes=0.0 B]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|                   |   FilterExec: l_orderkey@0 = 42, metrics=[output_rows=0, elapsed_compute=25.835µs, output_bytes=0.0 B, selectivity=0% (0/20096)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|                   |     DataSourceExec: file_groups={2 groups: [[Users/yongting/data/tpch_sf1/lineitem.parquet:0..115375407], [Users/yongting/data/tpch_sf1/lineitem.parquet:115375407..230750813]]}, projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment], file_type=parquet, predicate=l_orderkey@0 = 42, pruning_predicate=l_orderkey_null_count@2 != row_count@3 AND l_orderkey_min@0 <= 42 AND 42 <= l_orderkey_max@1, required_guarantees=[l_orderkey in (42)], metrics=[output_rows=20096, elapsed_compute=2ns, output_bytes=6.6 MB, files_ranges_pruned_statistics=2 total → 2 matched, row_groups_pruned_statistics=49 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_rows_pruned=123010 total → 20096 matched, bytes_scanned=1994659, metadata_load_time=123.46µs, scan_efficiency_ratio=0.43% (1994659/461501626)] |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.010 seconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a fix for this. I also manually verified that the results are the same.

tpchgen-cli -s 1 --format=parquet --output-dir ...

set datafusion.execution.target_partitions = 2;
explain analyze select * from '<path>/lineitem.parquet' where l_orderkey = 42;
-- scan_efficiency_ratio=0.83% (1919614/231669547)

set datafusion.execution.target_partitions = 1;
explain analyze select * from '<path>/lineitem.parquet' where l_orderkey = 42;
-- scan_efficiency_ratio=0.83% (1919614/231669547)

I was hoping to then write a test that would fail on the previous implementation but pass, now. I found it difficult to write a test that does so with the existing files in datafusion/core/tests/data (which are all fairly small). Then I tried to writing a test that mimics this exactly, but found the file was quite large (200+ MB), compared to the rest of the tests in the directory.

image
The test I wrote (not pushed)
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn parquet_scan_efficiency_ratio() {
    let table_name = "tpch_lineitem_sf1";
    let parquet_path = "tests/data/tpch_lineitem_sf1.parquet";

    for num_partitions in [1, 2, 5] {
        let config = SessionConfig::new().with_target_partitions(num_partitions);
        let ctx = SessionContext::new_with_config(config);
        ctx.register_parquet(table_name, parquet_path, ParquetReadOptions::default())
            .await
            .expect("register parquet table for explain analyze test");

        let sql =
            "EXPLAIN ANALYZE select * from tpch_lineitem_sf1 where l_orderkey = 42;";
        let actual = execute_to_batches(&ctx, sql).await;
        let formatted = arrow::util::pretty::pretty_format_batches(&actual)
            .unwrap()
            .to_string();

        assert_contains!(
            &formatted,
            "scan_efficiency_ratio=0.83% (1919614/231669547)"
        );
    }

How would you like to proceed? Do you still want a new test, or is the PR good as it is? The test that existed before this changed passes before and after.

Comment on lines 97 to +100
pub struct ParquetFileReader {
pub file_metrics: ParquetFileMetrics,
pub inner: ParquetObjectReader,
pub partitioned_file: PartitionedFile,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, but here I think a breaking change is acceptable

@petern48 petern48 marked this pull request as draft November 11, 2025 19:34
@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Nov 11, 2025
Comment on lines +282 to 291
pub fn ratio_metrics_with_strategy(
self,
name: impl Into<Cow<'static, str>>,
partition: usize,
merge_strategy: RatioMergeStrategy,
) -> RatioMetrics {
let ratio_metrics = RatioMetrics::new().with_merge_strategy(merge_strategy);
self.with_partition(partition).build(MetricValue::Ratio {
name: name.into(),
ratio_metrics: ratio_metrics.clone(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, I tried builder.ratio_metrics().with_merge_strategy() in metrics.rs,

but found the merge strategy wasn't working since the ratio_metrics.clone() call here was registering the ratio_metrics before I added the merge strategy to it. Hence the need for this new ratio_metrics_with_strategy()

@petern48 petern48 marked this pull request as ready for review November 11, 2025 21:58
@petern48 petern48 requested a review from 2010YOUY01 November 11, 2025 22:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants