From 82c05860b5528892f423d29b309dd75245f1b469 Mon Sep 17 00:00:00 2001 From: Sergei Turukin Date: Wed, 15 Apr 2026 10:54:47 -0500 Subject: [PATCH 1/4] fix(custom_type_coercion): fall back to plan.schema() for leaf nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `CustomTypeCoercionRewriter::analyze_internal` built its lookup schema from `merge_schema(&plan.inputs())`. For leaf nodes like `LogicalPlan::TableScan`, `plan.inputs()` is empty and the merged schema has no fields, so any binary-op expression attached directly to the leaf — e.g. via `LogicalPlanBuilder::scan_with_filters` — would fail coercion with "Schema error: No field named " during the analyzer pass. This broke the target-side partition pruning hint path that `UserQuery::merge_query` wires up when a MERGE source is a partitioned `DataFusionTable`: `target_filter_expression()` builds a per-partition `col(source) >= min AND col(source) <= max` predicate and pushes it into the target `TableScan`'s filters via `scan_with_filters`, expecting Iceberg's file pruner to use it at manifest level. The filter never made it past the analyzer. Fix: when `plan.inputs().is_empty()`, use `plan.schema()` directly for type resolution, mirroring the pattern DataFusion's built-in `TypeCoercion` analyzer uses. All existing `custom_type_coercion` snapshot tests still pass, and the full `merge_into` suite (22 tests) stays green. Verified end-to-end against a deployed Embucket Lambda: `MERGE INTO demo.atomic.events_hooli_tiny USING demo.atomic.events_hooli_ident` where the source is partitioned by `identity(event_name)` — previously failed with `custom_type_coercion / Schema error: No field named event_name`, now returns 100 matched rows and the update lands on disk. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../logical_analyzer/custom_type_coercion.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/crates/executor/src/datafusion/logical_analyzer/custom_type_coercion.rs b/crates/executor/src/datafusion/logical_analyzer/custom_type_coercion.rs index 850511f8..00f49ffd 100644 --- a/crates/executor/src/datafusion/logical_analyzer/custom_type_coercion.rs +++ b/crates/executor/src/datafusion/logical_analyzer/custom_type_coercion.rs @@ -39,9 +39,22 @@ impl AnalyzerRule for CustomTypeCoercionRewriter { } fn analyze_internal(plan: &LogicalPlan) -> DFResult> { - // get schema representing all available input fields. This is used for data type - // resolution only, so order does not matter here - let schema = merge_schema(&plan.inputs()); + // Get schema representing all available input fields. Used for data-type + // resolution only, so order doesn't matter. + // + // For leaf plan nodes (e.g. `TableScan`), `plan.inputs()` is empty and + // `merge_schema` returns an empty schema. If we relied on that, filter + // expressions attached to the leaf itself — such as the target filter + // that `UserQuery::merge_query` injects via + // `LogicalPlanBuilder::scan_with_filters` when the MERGE source is a + // partitioned `DataFusionTable` — would see no fields and fail with + // "Schema error: No field named …". Fall back to `plan.schema()` in + // that case so the rewriter can actually look up the column types. + let schema = if plan.inputs().is_empty() { + plan.schema().as_ref().clone() + } else { + merge_schema(&plan.inputs()) + }; let name_preserver = NamePreserver::new(plan); let new_plan = plan.clone().map_expressions(|expr| { From b3985ae86323769039142aafa94b6371c9438988 Mon Sep 17 00:00:00 2001 From: Sergei Turukin Date: Wed, 15 Apr 2026 11:53:09 -0500 Subject: [PATCH 2/4] feat(merge): route EXPLAIN / EXPLAIN ANALYZE into the MERGE planner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Embucket has its own MERGE planner (`UserQuery::merge_query`) because DataFusion's SQL path doesn't produce a usable plan for `MERGE INTO`. The side effect was that `EXPLAIN MERGE INTO …` and `EXPLAIN ANALYZE MERGE INTO …` both fell through `execute()` to `execute_sql`, which hands the statement to DataFusion's planner and bounces back with: SQL compilation error: unsupported feature: Unsupported SQL statement: MERGE INTO … No observability for MERGE plans or for per-scan metrics — which made it impossible to verify partition-pruning behaviour on partitioned Iceberg targets (files scanned, bytes scanned, manifest-level pruning counters). Changes: 1. Split `merge_query` into a pure plan-builder `merge_to_logical_plan` and a thin wrapper that calls `execute_logical_plan`. 2. In `execute()`, when the parsed statement is `DFStatement::Explain(..)` whose inner statement is `Statement::Merge { .. }`, build the MERGE logical plan via `merge_to_logical_plan`, then wrap it in the same `LogicalPlan::Explain` / `LogicalPlan::Analyze` shape DataFusion's own `explain_to_plan` constructs. Everything downstream (physical planning, execution, output formatting) is unchanged. 3. Add a snapshot test `merge_into_explain` over a minimal unpartitioned target + source — asserts the logical and physical plans render. `EXPLAIN ANALYZE` is exercised end-to-end through the deployed Lambda rather than via snapshot because the formatted-table column widths depend on the pre-redaction metric value widths and aren't stable across runs. After this change: - `EXPLAIN MERGE INTO t USING s ON ... WHEN MATCHED THEN UPDATE ...` returns the logical plan + physical plan (including `MergeIntoSinkExec`, `HashJoinExec`, `DataSourceExec { file_groups, projection, file_type }` for each side). - `EXPLAIN ANALYZE` of the same statement executes the MERGE and additionally reports per-node runtime metrics. The `DataSourceExec` rows now surface the DataFusion/Parquet scan counters that were previously invisible: `bytes_scanned`, `files_ranges_pruned_statistics`, `row_groups_pruned_statistics`, `pushdown_rows_pruned`, `page_index_rows_pruned`. That's the signal you need to verify source-side partition-hint pruning actually prunes. All 23 `merge_into` tests pass (22 existing + 1 new). Full `cargo test -p executor --lib` is 359/0. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/executor/src/query.rs | 65 +++++++++++++++++-- .../executor/src/tests/sql/ddl/merge_into.rs | 23 +++++++ .../merge_into/query_merge_into_explain.snap | 34 ++++++++++ 3 files changed, 118 insertions(+), 4 deletions(-) create mode 100644 crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_explain.snap diff --git a/crates/executor/src/query.rs b/crates/executor/src/query.rs index e789ff73..316ffe9d 100644 --- a/crates/executor/src/query.rs +++ b/crates/executor/src/query.rs @@ -57,7 +57,7 @@ use datafusion::sql::statement::object_name_to_string; use datafusion_common::config::ConfigOptions; use datafusion_common::{ Column, DFSchema, DataFusionError, ParamValues, ResolvedTableReference, SchemaReference, - TableReference, plan_datafusion_err, + TableReference, ToDFSchema, plan_datafusion_err, }; use datafusion_expr::conditional_expressions::CaseBuilder; use datafusion_expr::logical_plan::dml::{DmlStatement, InsertOp, WriteOp}; @@ -464,6 +464,50 @@ impl UserQuery { } } else if let DFStatement::CreateExternalTable(cetable) = statement { return Box::pin(self.create_external_table_query(cetable)).await; + } else if let DFStatement::Explain(explain) = &statement { + // DataFusion's default planner rejects `EXPLAIN MERGE INTO ...` as + // "Unsupported SQL statement: MERGE INTO" because MERGE has its + // own Embucket-side planner (`merge_query`). Intercept the case + // where the inner statement is a MERGE: build the merge logical + // plan ourselves, then wrap it in the equivalent `LogicalPlan::Explain` + // / `LogicalPlan::Analyze` that DataFusion's SQL path would have + // produced. This lets callers actually inspect the plan and see + // physical-level metrics via `EXPLAIN ANALYZE MERGE`. + if let DFStatement::Statement(inner) = explain.statement.as_ref() { + if matches!(inner.as_ref(), Statement::Merge { .. }) { + let analyze = explain.analyze; + let verbose = explain.verbose; + let format = explain.format.clone(); + let merge_stmt = (**inner).clone(); + let merge_plan = Box::pin(self.merge_to_logical_plan(merge_stmt)).await?; + let merge_plan = Arc::new(merge_plan); + let schema = datafusion_expr::LogicalPlan::explain_schema() + .to_dfschema_ref() + .context(ex_error::DataFusionSnafu)?; + let wrapped = if analyze { + LogicalPlan::Analyze(datafusion_expr::logical_plan::Analyze { + verbose, + input: merge_plan, + schema, + }) + } else { + let explain_format = match format.as_deref() { + Some(f) => datafusion_expr::logical_plan::ExplainFormat::from_str(f) + .unwrap_or(datafusion_expr::logical_plan::ExplainFormat::Indent), + None => datafusion_expr::logical_plan::ExplainFormat::Indent, + }; + LogicalPlan::Explain(datafusion_expr::logical_plan::Explain { + verbose, + explain_format, + plan: merge_plan, + stringified_plans: vec![], + schema, + logical_optimization_succeeded: false, + }) + }; + return self.execute_logical_plan(wrapped).await; + } + } } self.execute_sql(&self.query).await } @@ -1283,9 +1327,23 @@ impl UserQuery { } } - #[allow(clippy::too_many_lines)] #[instrument(name = "UserQuery::merge_query", level = "trace", skip(self), err)] pub async fn merge_query(&self, statement: Statement) -> Result { + let plan = self.merge_to_logical_plan(statement).await?; + self.execute_logical_plan(plan).await + } + + /// Builds the logical plan for a `MERGE INTO` statement without executing + /// it. Shared between `merge_query` (which runs the plan) and the + /// `DFStatement::Explain` routing in `execute` (which wraps it in + /// `LogicalPlan::Explain` / `LogicalPlan::Analyze` so callers can see the + /// plan or live physical metrics without a separate SQL path). + #[allow(clippy::too_many_lines)] + #[instrument(name = "UserQuery::merge_to_logical_plan", level = "trace", skip(self), err)] + pub async fn merge_to_logical_plan( + &self, + statement: Statement, + ) -> Result { let Statement::Merge { table: target, source, @@ -1501,10 +1559,9 @@ impl UserQuery { ) .context(ex_error::DataFusionSnafu)?; - self.execute_logical_plan(LogicalPlan::Extension(Extension { + Ok(LogicalPlan::Extension(Extension { node: Arc::new(merge_into_plan), })) - .await } #[instrument(name = "UserQuery::create_database", level = "trace", skip(self), err)] diff --git a/crates/executor/src/tests/sql/ddl/merge_into.rs b/crates/executor/src/tests/sql/ddl/merge_into.rs index e4acaeec..2eccd596 100644 --- a/crates/executor/src/tests/sql/ddl/merge_into.rs +++ b/crates/executor/src/tests/sql/ddl/merge_into.rs @@ -1,5 +1,28 @@ use crate::test_query; +// Observability: `EXPLAIN MERGE INTO ...` must work. Before the routing +// fix, Embucket rejected it with +// "SQL compilation error: unsupported feature: Unsupported SQL statement: +// MERGE INTO" because `execute()` never unwrapped +// `DFStatement::Explain(..MERGE..)` and fell through to DataFusion's default +// SQL path, which doesn't know about Embucket's MERGE planner. +// +// This test covers the plan shape only. `EXPLAIN ANALYZE MERGE` is +// exercised end-to-end against the deployed Lambda — its output contains +// per-run metric values whose width varies the formatted-table column +// padding, which is too unstable for an insta snapshot. +test_query!( + merge_into_explain, + "EXPLAIN MERGE INTO merge_target USING merge_source ON merge_target.id = merge_source.id WHEN MATCHED THEN UPDATE SET merge_target.description = merge_source.description", + setup_queries = [ + "CREATE TABLE embucket.public.merge_target (ID INTEGER, description VARCHAR)", + "CREATE TABLE embucket.public.merge_source (ID INTEGER, description VARCHAR)", + "INSERT INTO embucket.public.merge_target VALUES (1, 'existing row')", + "INSERT INTO embucket.public.merge_source VALUES (1, 'updated row')", + ], + snapshot_path = "merge_into" +); + test_query!( merge_into_only_update, "SELECT count(CASE WHEN description = 'updated row' THEN 1 ELSE NULL END) updated, count(CASE WHEN description = 'existing row' THEN 1 ELSE NULL END) existing FROM embucket.public.merge_target", diff --git a/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_explain.snap b/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_explain.snap new file mode 100644 index 00000000..03ecf366 --- /dev/null +++ b/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_explain.snap @@ -0,0 +1,34 @@ +--- +source: crates/executor/src/tests/sql/ddl/merge_into.rs +description: "\"EXPLAIN MERGE INTO merge_target USING merge_source ON merge_target.id = merge_source.id WHEN MATCHED THEN UPDATE SET merge_target.description = merge_source.description\"" +info: "Setup queries: CREATE TABLE embucket.public.merge_target (ID INTEGER, description VARCHAR); CREATE TABLE embucket.public.merge_source (ID INTEGER, description VARCHAR); INSERT INTO embucket.public.merge_target VALUES (1, 'existing row'); INSERT INTO embucket.public.merge_source VALUES (1, 'updated row')" +--- +Ok( + [ + "+---------------+----------------------------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+---------------+----------------------------------------------------------------------------------------------------------------------------------+", + "| logical_plan | MergeIntoSink |", + "| | Projection: embucket.public.merge_target.id, CASE WHEN __common_expr_1 THEN TRY_CAST(embucket.public.merge_source.description AS Utf8) ELSE embucket.public.merge_target.description END AS description, embucket.public.merge_target.__data_file_path, embucket.public.merge_target.__manifest_file_path, __target_exists, __source_exists, __common_expr_1 AS __merge_row_updated, Boolean(false) AS __merge_row_inserted |", + "| | Projection: __target_exists AND __source_exists AS __common_expr_1, embucket.public.merge_target.id, embucket.public.merge_target.description, embucket.public.merge_target.__data_file_path, embucket.public.merge_target.__manifest_file_path, __target_exists, embucket.public.merge_source.id, embucket.public.merge_source.description, __source_exists |", + "| | Full Join: embucket.public.merge_target.id = embucket.public.merge_source.id |", + "| | Projection: embucket.public.merge_target.id, embucket.public.merge_target.description, embucket.public.merge_target.__data_file_path, embucket.public.merge_target.__manifest_file_path, Boolean(true) AS __target_exists |", + "| | TableScan: embucket.public.merge_target |", + "| | Projection: embucket.public.merge_source.id, embucket.public.merge_source.description, Boolean(true) AS __source_exists |", + "| | TableScan: embucket.public.merge_source |", + "| physical_plan | MergeIntoSinkExec |", + "| | ProjectionExec: expr=[id@1 as id, CASE WHEN __common_expr_1@0 THEN description@7 ELSE description@2 END as description, __data_file_path@3 as __data_file_path, __manifest_file_path@4 as __manifest_file_path, __target_exists@5 as __target_exists, __source_exists@8 as __source_exists, __common_expr_1@0 as __merge_row_updated, false as __merge_row_inserted] |", + "| | ProjectionExec: expr=[__target_exists@4 AND __source_exists@7 as __common_expr_1, id@0 as id, description@1 as description, __data_file_path@2 as __data_file_path, __manifest_file_path@3 as __manifest_file_path, __target_exists@4 as __target_exists, id@5 as id, description@6 as description, __source_exists@7 as __source_exists] |", + "| | RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 |", + "| | ProjectionExec: expr=[id@3 as id, description@4 as description, __data_file_path@5 as __data_file_path, __manifest_file_path@6 as __manifest_file_path, __target_exists@7 as __target_exists, id@0 as id, description@1 as description, __source_exists@2 as __source_exists] |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", + "| | HashJoinExec: mode=CollectLeft, join_type=Full, on=[(id@0, id@0)] |", + "| | ProjectionExec: expr=[id@0 as id, description@1 as description, true as __source_exists] |", + "| | DataSourceExec: file_groups={1 group: [[/[PATH]/testing/data/[HEX]/[UUID].parquet]]}, projection=[id, description], file_type=parquet |", + "| | ProjectionExec: expr=[id@0 as id, description@1 as description, __data_file_path@2 as __data_file_path, __manifest_file_path@3 as __manifest_file_path, true as __target_exists] |", + "| | DataSourceExec: file_groups={1 group: [[/[PATH]/testing/data/[HEX]/[UUID].parquet]]}, projection=[id, description, __data_file_path, __manifest_file_path], file_type=parquet |", + "| | |", + "+---------------+----------------------------------------------------------------------------------------------------------------------------------+", + ], +) From 29ea4cfae5694daad23da3915c584e160fafde2b Mon Sep 17 00:00:00 2001 From: Sergei Turukin Date: Wed, 15 Apr 2026 12:03:34 -0500 Subject: [PATCH 3/4] feat(merge): expose updated/inserted/deleted row counts as MetricsSet MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `MergeIntoCOWSinkExec` tracked per-clause row counts in `AtomicI64` purely to populate the final result batch. After the EXPLAIN / EXPLAIN ANALYZE routing fix on this branch, `EXPLAIN ANALYZE MERGE INTO …` reports rich per-scan metrics on every `DataSourceExec` in the plan, but the sink line was still rendering as `MergeIntoSinkExec, metrics=[]` because this node didn't own an `ExecutionPlanMetricsSet`. Wire one up: register `Count` metrics `updated_rows`, `inserted_rows`, and `deleted_rows` via `MetricBuilder::new(&self.metrics).counter(..)` at the start of `execute()`, clone them into the async write closure, and `add()` the final `AtomicI64` values after the transaction commits. Implement `ExecutionPlan::metrics()` to return `Some(self.metrics.clone_inner())` so DataFusion's plan formatter picks them up. Row counts that exceed `usize::MAX` saturate via `try_from` rather than panicking. After this change, `EXPLAIN ANALYZE MERGE` shows the sink counters alongside the child scan counters, so an operator can read updated / inserted / deleted counts directly off the plan output instead of only from the result row. All 23 `merge_into` tests stay green. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/datafusion/physical_plan/merge.rs | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/crates/executor/src/datafusion/physical_plan/merge.rs b/crates/executor/src/datafusion/physical_plan/merge.rs index 22b63bc7..be74224e 100644 --- a/crates/executor/src/datafusion/physical_plan/merge.rs +++ b/crates/executor/src/datafusion/physical_plan/merge.rs @@ -19,6 +19,7 @@ use datafusion_physical_plan::{ SendableRecordBatchStream, coalesce_partitions::CoalescePartitionsExec, execution_plan::{Boundedness, EmissionType}, + metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, stream::RecordBatchStreamAdapter, }; use futures::{Stream, StreamExt}; @@ -52,6 +53,11 @@ pub struct MergeIntoCOWSinkExec { input: Arc, target: DataFusionTable, properties: PlanProperties, + /// Per-node metrics surfaced via `EXPLAIN ANALYZE`. Populated with + /// `updated_rows` / `inserted_rows` / `deleted_rows` counters after the + /// write transaction commits, so `EXPLAIN ANALYZE MERGE INTO …` reports + /// how many rows each clause produced alongside the child scan metrics. + metrics: ExecutionPlanMetricsSet, } impl MergeIntoCOWSinkExec { @@ -73,6 +79,7 @@ impl MergeIntoCOWSinkExec { input, target, properties, + metrics: ExecutionPlanMetricsSet::new(), } } } @@ -109,6 +116,13 @@ impl ExecutionPlan for MergeIntoCOWSinkExec { vec![&self.input] } + /// Surface per-clause row counts (updated / inserted / deleted) as + /// `EXPLAIN ANALYZE` metrics. Values are populated by `execute()` after + /// the write transaction commits; they're zero until then. + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + fn with_new_children( self: Arc, children: Vec>, @@ -142,6 +156,16 @@ impl ExecutionPlan for MergeIntoCOWSinkExec { let updated_rows: Arc = Arc::new(AtomicI64::new(0)); let inserted_rows: Arc = Arc::new(AtomicI64::new(0)); + // `Count` metrics that surface in `EXPLAIN ANALYZE` as + // `metrics=[updated_rows=…, inserted_rows=…, deleted_rows=…]` on this + // node. Populated below after the write transaction commits. + let updated_rows_metric: Count = + MetricBuilder::new(&self.metrics).counter("updated_rows", partition); + let inserted_rows_metric: Count = + MetricBuilder::new(&self.metrics).counter("inserted_rows", partition); + let deleted_rows_metric: Count = + MetricBuilder::new(&self.metrics).counter("deleted_rows", partition); + let coalesce = CoalescePartitionsExec::new(self.input.clone()); // Filter out rows whoose __data_file_path doesn't have a matching row @@ -163,6 +187,9 @@ impl ExecutionPlan for MergeIntoCOWSinkExec { let schema = schema.clone(); let updated_rows = Arc::clone(&updated_rows); let inserted_rows = Arc::clone(&inserted_rows); + let updated_rows_metric = updated_rows_metric.clone(); + let inserted_rows_metric = inserted_rows_metric.clone(); + let deleted_rows_metric = deleted_rows_metric.clone(); let projected_schema = count_and_project_stream.projected_schema(); let batches: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( projected_schema, @@ -222,6 +249,13 @@ impl ExecutionPlan for MergeIntoCOWSinkExec { // MERGE DELETE is not supported yet let deleted = 0i64; + // Publish per-clause counts to the `MetricsSet` so + // `EXPLAIN ANALYZE` shows them on the MergeIntoSinkExec line. + // Rely on `try_from` so huge row counts fall back cleanly. + updated_rows_metric.add(usize::try_from(updated).unwrap_or(usize::MAX)); + inserted_rows_metric.add(usize::try_from(inserted).unwrap_or(usize::MAX)); + deleted_rows_metric.add(usize::try_from(deleted).unwrap_or(usize::MAX)); + let arrays = schema .fields() .iter() From ce562f5adfa6b8febce43fac3b65851599739ef9 Mon Sep 17 00:00:00 2001 From: Sergei Turukin Date: Wed, 15 Apr 2026 16:07:06 -0500 Subject: [PATCH 4/4] fix(merge): preserve target rows when MERGE batch contains only target The MergeCOWFilterStream "no matches in this batch" fast path short-circuited on `matching_data_and_manifest_files.is_empty()` without checking the cumulative `all_matching_data_files` set. If a target file had been seen as matching in an earlier batch and a later batch contained only target rows for that file, the rows in the later batch were silently dropped. The downstream COW commit then overwrote the original file with the partial result, permanently losing the unmatched target rows whose batch hit the dead path. The fix tightens the guard to also require `all_matching_data_files` to be empty before taking the fast path. When a batch belongs to a file already in the overwrite set, it falls through to the main filter path which correctly emits target rows via `file_predicate OR source_exists`. Adds three unit tests against MergeCOWFilterStream covering the matching-then-target patterns, plus a SQL snapshot test that exercises the same shape end-to-end. Fixes #128 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/datafusion/physical_plan/merge.rs | 28 ++++++++++++++++++- .../executor/src/tests/sql/ddl/merge_into.rs | 22 +++++++++++++++ ...mixed_unsorted_multi_row_no_data_loss.snap | 14 ++++++++++ 3 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_mixed_unsorted_multi_row_no_data_loss.snap diff --git a/crates/executor/src/datafusion/physical_plan/merge.rs b/crates/executor/src/datafusion/physical_plan/merge.rs index be74224e..8caa0af3 100644 --- a/crates/executor/src/datafusion/physical_plan/merge.rs +++ b/crates/executor/src/datafusion/physical_plan/merge.rs @@ -689,7 +689,13 @@ impl Stream for MergeCOWFilterStream { .push(filtered_batch); } - if matching_data_and_manifest_files.is_empty() { + // Only take the fast paths if the current batch references no target file + // that will be (or has been) overwritten. Otherwise the full filter path + // below is required so target rows belonging to `all_matching_data_files` + // are re-emitted into the rewritten data file. + if matching_data_and_manifest_files.is_empty() + && all_matching_data_files.is_empty() + { // Return early if all rows only come from source if matching_data_file_array.len() == source_exists_array.len() { return Poll::Ready(Some(Ok(batch))); @@ -1244,4 +1250,24 @@ mod tests { &[(0, 2), (0, 1), (0, 6), (0, 5)], 60 ); + // Regression test for https://github.com/Embucket/embucket/issues/128 + // + // If a target file has been seen as "matching" in an earlier batch and a subsequent + // batch contains only target rows (no `__source_exists` = true rows) for that same + // file, the rows in the later batch must still be passed through the filter so they + // land in the rewritten data file. Previously the "no matches, no source" fast path + // dropped them, causing silent data loss during `MERGE INTO` on unsorted inputs. + test_merge_cow_filter_stream!(matching_then_target, &[(0, 4), (0, 1)], 20); + test_merge_cow_filter_stream!( + matching_then_target_then_matching, + &[(0, 4), (0, 1), (0, 4)], + 30 + ); + // Mixed scenario: several target-only batches arriving AFTER the target file has + // been matched. + test_merge_cow_filter_stream!( + matching_then_multiple_target_batches, + &[(0, 4), (0, 1), (0, 1), (0, 1)], + 40 + ); } diff --git a/crates/executor/src/tests/sql/ddl/merge_into.rs b/crates/executor/src/tests/sql/ddl/merge_into.rs index 2eccd596..6cda99e0 100644 --- a/crates/executor/src/tests/sql/ddl/merge_into.rs +++ b/crates/executor/src/tests/sql/ddl/merge_into.rs @@ -322,3 +322,25 @@ test_query!( ], snapshot_path = "merge_into" ); + +// Regression test for https://github.com/Embucket/embucket/issues/128. +// +// Target is one data file with many rows; source is a mix of updates (matches) and +// inserts (no match), and the target rows of the join land in the filter stream in +// batches where some contain source_exists=true rows and some only contain target +// rows. Previously the "no matches, no source" fast path would silently drop the +// target-only batches for a file that had already been marked as matching in an +// earlier batch, causing the final row count to be less than the expected +// (target_rows + new_source_rows). This test asserts that no target row is lost. +test_query!( + merge_into_mixed_unsorted_multi_row_no_data_loss, + "SELECT COUNT(*) as total_rows, COUNT(CASE WHEN description = 'updated row' THEN 1 END) as updated_rows, COUNT(CASE WHEN description = 'original row' THEN 1 END) as preserved_rows, COUNT(CASE WHEN description = 'new row' THEN 1 END) as inserted_rows FROM embucket.public.merge_target", + setup_queries = [ + "CREATE TABLE embucket.public.merge_target (id INTEGER, description VARCHAR)", + "CREATE TABLE embucket.public.merge_source (id INTEGER, description VARCHAR)", + "INSERT INTO embucket.public.merge_target VALUES (1, 'original row'), (2, 'original row'), (3, 'original row'), (4, 'original row'), (5, 'original row'), (6, 'original row'), (7, 'original row'), (8, 'original row'), (9, 'original row'), (10, 'original row')", + "INSERT INTO embucket.public.merge_source VALUES (3, 'updated row'), (7, 'updated row'), (11, 'new row'), (12, 'new row')", + "MERGE INTO merge_target t USING merge_source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET t.description = s.description WHEN NOT MATCHED THEN INSERT (id, description) VALUES (s.id, s.description)", + ], + snapshot_path = "merge_into" +); diff --git a/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_mixed_unsorted_multi_row_no_data_loss.snap b/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_mixed_unsorted_multi_row_no_data_loss.snap new file mode 100644 index 00000000..3dd7a8b1 --- /dev/null +++ b/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_mixed_unsorted_multi_row_no_data_loss.snap @@ -0,0 +1,14 @@ +--- +source: crates/executor/src/tests/sql/ddl/merge_into.rs +description: "\"SELECT COUNT(*) as total_rows, COUNT(CASE WHEN description = 'updated row' THEN 1 END) as updated_rows, COUNT(CASE WHEN description = 'original row' THEN 1 END) as preserved_rows, COUNT(CASE WHEN description = 'new row' THEN 1 END) as inserted_rows FROM embucket.public.merge_target\"" +info: "Setup queries: CREATE TABLE embucket.public.merge_target (id INTEGER, description VARCHAR); CREATE TABLE embucket.public.merge_source (id INTEGER, description VARCHAR); INSERT INTO embucket.public.merge_target VALUES (1, 'original row'), (2, 'original row'), (3, 'original row'), (4, 'original row'), (5, 'original row'), (6, 'original row'), (7, 'original row'), (8, 'original row'), (9, 'original row'), (10, 'original row'); INSERT INTO embucket.public.merge_source VALUES (3, 'updated row'), (7, 'updated row'), (11, 'new row'), (12, 'new row'); MERGE INTO merge_target t USING merge_source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET t.description = s.description WHEN NOT MATCHED THEN INSERT (id, description) VALUES (s.id, s.description)" +--- +Ok( + [ + "+------------+--------------+----------------+---------------+", + "| total_rows | updated_rows | preserved_rows | inserted_rows |", + "+------------+--------------+----------------+---------------+", + "| 12 | 2 | 8 | 2 |", + "+------------+--------------+----------------+---------------+", + ], +)