-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat(parquet): Implement scan_efficiency_ratio metric for parquet reading
#18577
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
scan_efficiency_ratio metric for parquet reading
| pub struct ParquetFileReader { | ||
| pub file_metrics: ParquetFileMetrics, | ||
| pub inner: ParquetObjectReader, | ||
| pub partitioned_file: PartitionedFile, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure, would this technically be considered a breaking change? This struct is only constructed manually throughout the crate (no constructor).
ParquetFileMetrics is in this ParquetFileReader struct, but ParquetFileReader didn't have a way to access the total file size, since the file_size field in ParquetObjectReader was private (and inside of arrow-rs. This is the change that made the most sense to me, since CachedParquetFileReader already has this as a field.
datafusion/datafusion/datasource-parquet/src/reader.rs
Lines 225 to 229 in f162fd3
| pub struct CachedParquetFileReader { | |
| pub file_metrics: ParquetFileMetrics, | |
| store: Arc<dyn ObjectStore>, | |
| pub inner: ParquetObjectReader, | |
| partitioned_file: PartitionedFile, |
Alternatively, maybe I could add a file_size() getter to ParquetObjectReader in arrow-rs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, but here I think a breaking change is acceptable
|
cc @2010YOUY01 |
| .add_part(self.file_metrics.bytes_scanned.value()); | ||
| self.file_metrics | ||
| .scan_efficiency_ratio | ||
| .add_total(self.partitioned_file.object_meta.size as usize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I vaguely remember each parquet reader is possible to read only a sub-range of a file, if that's the case here we might double count the total size.
I verified with the following queries -- The l_orderkey is globally ordered, so if we set predicate l_orderkey=42 then only the first row group will be scanned. if we set partition to 2, then there are two parquet reader instance, each reading half of the file range, and the resulting scan_efficiency_ratio should be same even if they're executed under different partitioning, now they're different.
Data is generated with https://github.com/clflushopt/tpchgen-rs/tree/main/tpchgen-cli, in parquet, sf1
-- If target_partitions=1, the ratio is 0.86%
-- If target_partitions=2, the ratio is 0.43%
> set datafusion.execution.target_partitions = 2;
0 row(s) fetched.
Elapsed 0.000 seconds.
> explain analyze select * from '/Users/yongting/data/tpch_sf1/lineitem.parquet' where l_orderkey = 42;
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=0, elapsed_compute=376ns, output_bytes=0.0 B] |
| | FilterExec: l_orderkey@0 = 42, metrics=[output_rows=0, elapsed_compute=25.835µs, output_bytes=0.0 B, selectivity=0% (0/20096)] |
| | DataSourceExec: file_groups={2 groups: [[Users/yongting/data/tpch_sf1/lineitem.parquet:0..115375407], [Users/yongting/data/tpch_sf1/lineitem.parquet:115375407..230750813]]}, projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment], file_type=parquet, predicate=l_orderkey@0 = 42, pruning_predicate=l_orderkey_null_count@2 != row_count@3 AND l_orderkey_min@0 <= 42 AND 42 <= l_orderkey_max@1, required_guarantees=[l_orderkey in (42)], metrics=[output_rows=20096, elapsed_compute=2ns, output_bytes=6.6 MB, files_ranges_pruned_statistics=2 total → 2 matched, row_groups_pruned_statistics=49 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_rows_pruned=123010 total → 20096 matched, bytes_scanned=1994659, metadata_load_time=123.46µs, scan_efficiency_ratio=0.43% (1994659/461501626)] |
| | |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.010 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed a fix for this. I also manually verified that the results are the same.
tpchgen-cli -s 1 --format=parquet --output-dir ...
set datafusion.execution.target_partitions = 2;
explain analyze select * from '<path>/lineitem.parquet' where l_orderkey = 42;
-- scan_efficiency_ratio=0.83% (1919614/231669547)
set datafusion.execution.target_partitions = 1;
explain analyze select * from '<path>/lineitem.parquet' where l_orderkey = 42;
-- scan_efficiency_ratio=0.83% (1919614/231669547)
I was hoping to then write a test that would fail on the previous implementation but pass, now. I found it difficult to write a test that does so with the existing files in datafusion/core/tests/data (which are all fairly small). Then I tried to writing a test that mimics this exactly, but found the file was quite large (200+ MB), compared to the rest of the tests in the directory.
The test I wrote (not pushed)
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn parquet_scan_efficiency_ratio() {
let table_name = "tpch_lineitem_sf1";
let parquet_path = "tests/data/tpch_lineitem_sf1.parquet";
for num_partitions in [1, 2, 5] {
let config = SessionConfig::new().with_target_partitions(num_partitions);
let ctx = SessionContext::new_with_config(config);
ctx.register_parquet(table_name, parquet_path, ParquetReadOptions::default())
.await
.expect("register parquet table for explain analyze test");
let sql =
"EXPLAIN ANALYZE select * from tpch_lineitem_sf1 where l_orderkey = 42;";
let actual = execute_to_batches(&ctx, sql).await;
let formatted = arrow::util::pretty::pretty_format_batches(&actual)
.unwrap()
.to_string();
assert_contains!(
&formatted,
"scan_efficiency_ratio=0.83% (1919614/231669547)"
);
}How would you like to proceed? Do you still want a new test, or is the PR good as it is? The test that existed before this changed passes before and after.
| pub struct ParquetFileReader { | ||
| pub file_metrics: ParquetFileMetrics, | ||
| pub inner: ParquetObjectReader, | ||
| pub partitioned_file: PartitionedFile, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, but here I think a breaking change is acceptable
…tTotal for 'scan_efficiency_ratio'
| pub fn ratio_metrics_with_strategy( | ||
| self, | ||
| name: impl Into<Cow<'static, str>>, | ||
| partition: usize, | ||
| merge_strategy: RatioMergeStrategy, | ||
| ) -> RatioMetrics { | ||
| let ratio_metrics = RatioMetrics::new().with_merge_strategy(merge_strategy); | ||
| self.with_partition(partition).build(MetricValue::Ratio { | ||
| name: name.into(), | ||
| ratio_metrics: ratio_metrics.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally, I tried builder.ratio_metrics().with_merge_strategy() in metrics.rs,
but found the merge strategy wasn't working since the ratio_metrics.clone() call here was registering the ratio_metrics before I added the merge strategy to it. Hence the need for this new ratio_metrics_with_strategy()
Which issue does this PR close?
DataSourceExecwithParquetsource #18195Rationale for this change
What changes are included in this PR?
Are these changes tested?
Added test
Are there any user-facing changes?
Yes, new metric to view.