fix(merge): partition pruning, EXPLAIN routing, and data-loss fix on the MERGE path (rebased)#134
Merged
rampage644 merged 8 commits intomainfrom Apr 23, 2026
Merged
Conversation
`CustomTypeCoercionRewriter::analyze_internal` built its lookup schema from `merge_schema(&plan.inputs())`. For leaf nodes like `LogicalPlan::TableScan`, `plan.inputs()` is empty and the merged schema has no fields, so any binary-op expression attached directly to the leaf — e.g. via `LogicalPlanBuilder::scan_with_filters` — would fail coercion with "Schema error: No field named <col>" during the analyzer pass. This broke the target-side partition pruning hint path that `UserQuery::merge_query` wires up when a MERGE source is a partitioned `DataFusionTable`: `target_filter_expression()` builds a per-partition `col(source) >= min AND col(source) <= max` predicate and pushes it into the target `TableScan`'s filters via `scan_with_filters`, expecting Iceberg's file pruner to use it at manifest level. The filter never made it past the analyzer. Fix: when `plan.inputs().is_empty()`, use `plan.schema()` directly for type resolution, mirroring the pattern DataFusion's built-in `TypeCoercion` analyzer uses. All existing `custom_type_coercion` snapshot tests still pass, and the full `merge_into` suite (22 tests) stays green. Verified end-to-end against a deployed Embucket Lambda: `MERGE INTO demo.atomic.events_hooli_tiny USING demo.atomic.events_hooli_ident` where the source is partitioned by `identity(event_name)` — previously failed with `custom_type_coercion / Schema error: No field named event_name`, now returns 100 matched rows and the update lands on disk. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Embucket has its own MERGE planner (`UserQuery::merge_query`) because
DataFusion's SQL path doesn't produce a usable plan for `MERGE INTO`.
The side effect was that `EXPLAIN MERGE INTO …` and
`EXPLAIN ANALYZE MERGE INTO …` both fell through `execute()` to
`execute_sql`, which hands the statement to DataFusion's planner and
bounces back with:
SQL compilation error: unsupported feature: Unsupported SQL statement:
MERGE INTO …
No observability for MERGE plans or for per-scan metrics — which made
it impossible to verify partition-pruning behaviour on partitioned
Iceberg targets (files scanned, bytes scanned, manifest-level pruning
counters).
Changes:
1. Split `merge_query` into a pure plan-builder `merge_to_logical_plan`
and a thin wrapper that calls `execute_logical_plan`.
2. In `execute()`, when the parsed statement is
`DFStatement::Explain(..)` whose inner statement is
`Statement::Merge { .. }`, build the MERGE logical plan via
`merge_to_logical_plan`, then wrap it in the same
`LogicalPlan::Explain` / `LogicalPlan::Analyze` shape DataFusion's
own `explain_to_plan` constructs. Everything downstream (physical
planning, execution, output formatting) is unchanged.
3. Add a snapshot test `merge_into_explain` over a minimal
unpartitioned target + source — asserts the logical and physical
plans render. `EXPLAIN ANALYZE` is exercised end-to-end through the
deployed Lambda rather than via snapshot because the formatted-table
column widths depend on the pre-redaction metric value widths and
aren't stable across runs.
After this change:
- `EXPLAIN MERGE INTO t USING s ON ... WHEN MATCHED THEN UPDATE ...`
returns the logical plan + physical plan (including
`MergeIntoSinkExec`, `HashJoinExec`, `DataSourceExec { file_groups,
projection, file_type }` for each side).
- `EXPLAIN ANALYZE` of the same statement executes the MERGE and
additionally reports per-node runtime metrics. The
`DataSourceExec` rows now surface the DataFusion/Parquet scan
counters that were previously invisible: `bytes_scanned`,
`files_ranges_pruned_statistics`, `row_groups_pruned_statistics`,
`pushdown_rows_pruned`, `page_index_rows_pruned`. That's the signal
you need to verify source-side partition-hint pruning actually prunes.
All 23 `merge_into` tests pass (22 existing + 1 new). Full
`cargo test -p executor --lib` is 359/0.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
`MergeIntoCOWSinkExec` tracked per-clause row counts in `AtomicI64` purely to populate the final result batch. After the EXPLAIN / EXPLAIN ANALYZE routing fix on this branch, `EXPLAIN ANALYZE MERGE INTO …` reports rich per-scan metrics on every `DataSourceExec` in the plan, but the sink line was still rendering as `MergeIntoSinkExec, metrics=[]` because this node didn't own an `ExecutionPlanMetricsSet`. Wire one up: register `Count` metrics `updated_rows`, `inserted_rows`, and `deleted_rows` via `MetricBuilder::new(&self.metrics).counter(..)` at the start of `execute()`, clone them into the async write closure, and `add()` the final `AtomicI64` values after the transaction commits. Implement `ExecutionPlan::metrics()` to return `Some(self.metrics.clone_inner())` so DataFusion's plan formatter picks them up. Row counts that exceed `usize::MAX` saturate via `try_from` rather than panicking. After this change, `EXPLAIN ANALYZE MERGE` shows the sink counters alongside the child scan counters, so an operator can read updated / inserted / deleted counts directly off the plan output instead of only from the result row. All 23 `merge_into` tests stay green. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The MergeCOWFilterStream "no matches in this batch" fast path short-circuited on `matching_data_and_manifest_files.is_empty()` without checking the cumulative `all_matching_data_files` set. If a target file had been seen as matching in an earlier batch and a later batch contained only target rows for that file, the rows in the later batch were silently dropped. The downstream COW commit then overwrote the original file with the partial result, permanently losing the unmatched target rows whose batch hit the dead path. The fix tightens the guard to also require `all_matching_data_files` to be empty before taking the fast path. When a batch belongs to a file already in the overwrite set, it falls through to the main filter path which correctly emits target rows via `file_predicate OR source_exists`. Adds three unit tests against MergeCOWFilterStream covering the matching-then-target patterns, plus a SQL snapshot test that exercises the same shape end-to-end. Fixes #128 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rustfmt on Rust 1.94 formats the long #[instrument(...)] attribute and the following fn signature differently than the PR was originally authored against. No semantic change.
Rust 1.94 clippy (clippy::collapsible_if, denied via clippy::all) flags
the nested `if let ... { if matches!(...) { ... } }` guard in execute().
Merge both conditions into a single let-chain so clippy is happy without
changing the observable behaviour of the MERGE EXPLAIN routing.
The DataFusion planner uses the host CPU count as the RoundRobinBatch partition target, so the EXPLAIN snapshot literal differed between the PR author's dev box (10 cores) and the 4-core ubuntu-latest GitHub runner. Add an insta filter to the shared test_query! macro that rewrites `RoundRobinBatch(N)` to `RoundRobinBatch([N])`, and regenerate the `query_merge_into_explain` snapshot to use the normalized token so the test is stable across core counts.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Rebase of #126 on top of latest
main(through7482768 fix(datediff)) with the CI failures fixed sostable / fmt,stable / clippy, andubuntu / stablepass. Content of the four original MERGE-path commits is unchanged — see #126 for the full technical write-up, end-to-end validation, and closes-#128 context.Rebase on top of main
The 4 original commits replay cleanly onto current
main:f24614ffix(custom_type_coercion): fall back to plan.schema() for leaf nodes16d54c2feat(merge): route EXPLAIN / EXPLAIN ANALYZE into the MERGE planner99645d6feat(merge): expose updated/inserted/deleted row counts as MetricsSet31c7bbafix(merge): preserve target rows when MERGE batch contains only targetCI fixes stacked on top
Three small follow-ups needed to turn the CI green — none of them change MERGE semantics.
67ee46estyle: apply rustfmt after rebase on main — rustc 1.94 rustfmt formats the long#[instrument(name = "UserQuery::merge_to_logical_plan", level = "trace", skip(self), err)]attribute and its followingpub async fn merge_to_logical_plan(&self, statement: Statement)signature differently than the PR was originally authored against. Mechanicalcargo fmtapplication only.a6f2b3fstyle(merge): collapse nested if into let-chain for EXPLAIN MERGE route —clippy::collapsible_if(denied viaclippy::all) flags theif let DFStatement::Statement(inner) = explain.statement.as_ref() { if matches!(inner.as_ref(), Statement::Merge { .. }) { ... } }guard inUserQuery::execute. Merged both conditions into a single let-chain; observable behaviour of the EXPLAIN-MERGE routing is unchanged. This was the hard error that brokestable / clippy.ac68f7atest(merge): normalize RoundRobinBatch fan-out in EXPLAIN snapshots — the DataFusion planner'sRoundRobinBatch(N)partition target equals the host CPU count, so thequery_merge_into_explainsnapshot baked inRoundRobinBatch(10)from the PR author's 10-core dev box and failed on the 4-coreubuntu-latestrunner. Added an insta filter in the sharedtest_query!macro that rewritesRoundRobinBatch(\d+)→RoundRobinBatch([N])(no other snapshots contain this token, verified by grep) and regenerated the snapshot. This was the failure that brokeubuntu / stable.Local verification (rustc 1.94, 4-core box)
cargo fmt --check: cleancargo clippy --all-targets --workspace: exit 0 (remaining output isclippy::pedantic/clippy::nurserywarnings on pre-existing code, not denied by the workspace config)cargo test --profile=ci -p executor --all-targets: 364 passed, 0 failed, 1 ignored (the original PR reports 362; the extra 2 are the newmerge_into_mixed_unsorted_multi_row_no_data_lossSQL snapshot and the newmerge_into_explainSQL snapshot added by this PR's own tests)Supersedes #126. Closes #128.
Generated by Claude Code