Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,22 @@ impl AnalyzerRule for CustomTypeCoercionRewriter {
}

fn analyze_internal(plan: &LogicalPlan) -> DFResult<Transformed<LogicalPlan>> {
// get schema representing all available input fields. This is used for data type
// resolution only, so order does not matter here
let schema = merge_schema(&plan.inputs());
// Get schema representing all available input fields. Used for data-type
// resolution only, so order doesn't matter.
//
// For leaf plan nodes (e.g. `TableScan`), `plan.inputs()` is empty and
// `merge_schema` returns an empty schema. If we relied on that, filter
// expressions attached to the leaf itself — such as the target filter
// that `UserQuery::merge_query` injects via
// `LogicalPlanBuilder::scan_with_filters` when the MERGE source is a
// partitioned `DataFusionTable` — would see no fields and fail with
// "Schema error: No field named …". Fall back to `plan.schema()` in
// that case so the rewriter can actually look up the column types.
let schema = if plan.inputs().is_empty() {
plan.schema().as_ref().clone()
} else {
merge_schema(&plan.inputs())
};

let name_preserver = NamePreserver::new(plan);
let new_plan = plan.clone().map_expressions(|expr| {
Expand Down
62 changes: 61 additions & 1 deletion crates/executor/src/datafusion/physical_plan/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use datafusion_physical_plan::{
SendableRecordBatchStream,
coalesce_partitions::CoalescePartitionsExec,
execution_plan::{Boundedness, EmissionType},
metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
stream::RecordBatchStreamAdapter,
};
use futures::{Stream, StreamExt};
Expand Down Expand Up @@ -52,6 +53,11 @@ pub struct MergeIntoCOWSinkExec {
input: Arc<dyn ExecutionPlan>,
target: DataFusionTable,
properties: PlanProperties,
/// Per-node metrics surfaced via `EXPLAIN ANALYZE`. Populated with
/// `updated_rows` / `inserted_rows` / `deleted_rows` counters after the
/// write transaction commits, so `EXPLAIN ANALYZE MERGE INTO …` reports
/// how many rows each clause produced alongside the child scan metrics.
metrics: ExecutionPlanMetricsSet,
}

impl MergeIntoCOWSinkExec {
Expand All @@ -73,6 +79,7 @@ impl MergeIntoCOWSinkExec {
input,
target,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
}
Expand Down Expand Up @@ -109,6 +116,13 @@ impl ExecutionPlan for MergeIntoCOWSinkExec {
vec![&self.input]
}

/// Surface per-clause row counts (updated / inserted / deleted) as
/// `EXPLAIN ANALYZE` metrics. Values are populated by `execute()` after
/// the write transaction commits; they're zero until then.
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
Expand Down Expand Up @@ -142,6 +156,16 @@ impl ExecutionPlan for MergeIntoCOWSinkExec {
let updated_rows: Arc<AtomicI64> = Arc::new(AtomicI64::new(0));
let inserted_rows: Arc<AtomicI64> = Arc::new(AtomicI64::new(0));

// `Count` metrics that surface in `EXPLAIN ANALYZE` as
// `metrics=[updated_rows=…, inserted_rows=…, deleted_rows=…]` on this
// node. Populated below after the write transaction commits.
let updated_rows_metric: Count =
MetricBuilder::new(&self.metrics).counter("updated_rows", partition);
let inserted_rows_metric: Count =
MetricBuilder::new(&self.metrics).counter("inserted_rows", partition);
let deleted_rows_metric: Count =
MetricBuilder::new(&self.metrics).counter("deleted_rows", partition);

let coalesce = CoalescePartitionsExec::new(self.input.clone());

// Filter out rows whoose __data_file_path doesn't have a matching row
Expand All @@ -163,6 +187,9 @@ impl ExecutionPlan for MergeIntoCOWSinkExec {
let schema = schema.clone();
let updated_rows = Arc::clone(&updated_rows);
let inserted_rows = Arc::clone(&inserted_rows);
let updated_rows_metric = updated_rows_metric.clone();
let inserted_rows_metric = inserted_rows_metric.clone();
let deleted_rows_metric = deleted_rows_metric.clone();
let projected_schema = count_and_project_stream.projected_schema();
let batches: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
projected_schema,
Expand Down Expand Up @@ -222,6 +249,13 @@ impl ExecutionPlan for MergeIntoCOWSinkExec {
// MERGE DELETE is not supported yet
let deleted = 0i64;

// Publish per-clause counts to the `MetricsSet` so
// `EXPLAIN ANALYZE` shows them on the MergeIntoSinkExec line.
// Rely on `try_from` so huge row counts fall back cleanly.
updated_rows_metric.add(usize::try_from(updated).unwrap_or(usize::MAX));
inserted_rows_metric.add(usize::try_from(inserted).unwrap_or(usize::MAX));
deleted_rows_metric.add(usize::try_from(deleted).unwrap_or(usize::MAX));

let arrays = schema
.fields()
.iter()
Expand Down Expand Up @@ -655,7 +689,13 @@ impl Stream for MergeCOWFilterStream {
.push(filtered_batch);
}

if matching_data_and_manifest_files.is_empty() {
// Only take the fast paths if the current batch references no target file
// that will be (or has been) overwritten. Otherwise the full filter path
// below is required so target rows belonging to `all_matching_data_files`
// are re-emitted into the rewritten data file.
if matching_data_and_manifest_files.is_empty()
&& all_matching_data_files.is_empty()
{
// Return early if all rows only come from source
if matching_data_file_array.len() == source_exists_array.len() {
return Poll::Ready(Some(Ok(batch)));
Expand Down Expand Up @@ -1210,4 +1250,24 @@ mod tests {
&[(0, 2), (0, 1), (0, 6), (0, 5)],
60
);
// Regression test for https://github.com/Embucket/embucket/issues/128
//
// If a target file has been seen as "matching" in an earlier batch and a subsequent
// batch contains only target rows (no `__source_exists` = true rows) for that same
// file, the rows in the later batch must still be passed through the filter so they
// land in the rewritten data file. Previously the "no matches, no source" fast path
// dropped them, causing silent data loss during `MERGE INTO` on unsorted inputs.
test_merge_cow_filter_stream!(matching_then_target, &[(0, 4), (0, 1)], 20);
test_merge_cow_filter_stream!(
matching_then_target_then_matching,
&[(0, 4), (0, 1), (0, 4)],
30
);
// Mixed scenario: several target-only batches arriving AFTER the target file has
// been matched.
test_merge_cow_filter_stream!(
matching_then_multiple_target_batches,
&[(0, 4), (0, 1), (0, 1), (0, 1)],
40
);
}
67 changes: 63 additions & 4 deletions crates/executor/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use datafusion::sql::statement::object_name_to_string;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{
Column, DFSchema, DataFusionError, ParamValues, ResolvedTableReference, SchemaReference,
TableReference, plan_datafusion_err,
TableReference, ToDFSchema, plan_datafusion_err,
};
use datafusion_expr::conditional_expressions::CaseBuilder;
use datafusion_expr::logical_plan::dml::{DmlStatement, InsertOp, WriteOp};
Expand Down Expand Up @@ -464,6 +464,50 @@ impl UserQuery {
}
} else if let DFStatement::CreateExternalTable(cetable) = statement {
return Box::pin(self.create_external_table_query(cetable)).await;
} else if let DFStatement::Explain(explain) = &statement {
// DataFusion's default planner rejects `EXPLAIN MERGE INTO ...` as
// "Unsupported SQL statement: MERGE INTO" because MERGE has its
// own Embucket-side planner (`merge_query`). Intercept the case
// where the inner statement is a MERGE: build the merge logical
// plan ourselves, then wrap it in the equivalent `LogicalPlan::Explain`
// / `LogicalPlan::Analyze` that DataFusion's SQL path would have
// produced. This lets callers actually inspect the plan and see
// physical-level metrics via `EXPLAIN ANALYZE MERGE`.
if let DFStatement::Statement(inner) = explain.statement.as_ref()
&& matches!(inner.as_ref(), Statement::Merge { .. })
{
let analyze = explain.analyze;
let verbose = explain.verbose;
let format = explain.format.clone();
let merge_stmt = (**inner).clone();
let merge_plan = Box::pin(self.merge_to_logical_plan(merge_stmt)).await?;
let merge_plan = Arc::new(merge_plan);
let schema = datafusion_expr::LogicalPlan::explain_schema()
.to_dfschema_ref()
.context(ex_error::DataFusionSnafu)?;
let wrapped = if analyze {
LogicalPlan::Analyze(datafusion_expr::logical_plan::Analyze {
verbose,
input: merge_plan,
schema,
})
} else {
let explain_format = match format.as_deref() {
Some(f) => datafusion_expr::logical_plan::ExplainFormat::from_str(f)
.unwrap_or(datafusion_expr::logical_plan::ExplainFormat::Indent),
None => datafusion_expr::logical_plan::ExplainFormat::Indent,
};
LogicalPlan::Explain(datafusion_expr::logical_plan::Explain {
verbose,
explain_format,
plan: merge_plan,
stringified_plans: vec![],
schema,
logical_optimization_succeeded: false,
})
};
return self.execute_logical_plan(wrapped).await;
}
}
self.execute_sql(&self.query).await
}
Expand Down Expand Up @@ -1281,9 +1325,25 @@ impl UserQuery {
}
}

#[allow(clippy::too_many_lines)]
#[instrument(name = "UserQuery::merge_query", level = "trace", skip(self), err)]
pub async fn merge_query(&self, statement: Statement) -> Result<QueryResult> {
let plan = self.merge_to_logical_plan(statement).await?;
self.execute_logical_plan(plan).await
}

/// Builds the logical plan for a `MERGE INTO` statement without executing
/// it. Shared between `merge_query` (which runs the plan) and the
/// `DFStatement::Explain` routing in `execute` (which wraps it in
/// `LogicalPlan::Explain` / `LogicalPlan::Analyze` so callers can see the
/// plan or live physical metrics without a separate SQL path).
#[allow(clippy::too_many_lines)]
#[instrument(
name = "UserQuery::merge_to_logical_plan",
level = "trace",
skip(self),
err
)]
pub async fn merge_to_logical_plan(&self, statement: Statement) -> Result<LogicalPlan> {
let Statement::Merge {
table: target,
source,
Expand Down Expand Up @@ -1499,10 +1559,9 @@ impl UserQuery {
)
.context(ex_error::DataFusionSnafu)?;

self.execute_logical_plan(LogicalPlan::Extension(Extension {
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(merge_into_plan),
}))
.await
}

#[instrument(name = "UserQuery::create_database", level = "trace", skip(self), err)]
Expand Down
5 changes: 5 additions & 0 deletions crates/executor/src/tests/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ macro_rules! test_query {
settings.add_filter(r"(?i)\b(metadata_load_time|time_elapsed_opening|time_elapsed_processing|time_elapsed_scanning_total|time_elapsed_scanning_until_data|elapsed_compute|bloom_filter_eval_time|page_index_eval_time|row_pushdown_eval_time|statistics_eval_time)\s*=\s*[0-9]+(?:\.[0-9]+)?\s*(?:ns|µs|us|ms|s)", "$1=[TIME]");
settings.add_filter(r"(-{130})(-{1,})", "$1");
settings.add_filter(r"( {100})( {1,})", "$1");
// RoundRobinBatch fan-out equals the DataFusion planner's partition
// target, which in practice is the host CPU count. Normalize it so
// EXPLAIN snapshots don't flake between 4-core CI and dev boxes with
// different core counts.
settings.add_filter(r"RoundRobinBatch\(\d+\)", "RoundRobinBatch([N])");

let setup: Vec<&str> = vec![$($($setup_queries),*)?];
if !setup.is_empty() {
Expand Down
45 changes: 45 additions & 0 deletions crates/executor/src/tests/sql/ddl/merge_into.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
use crate::test_query;

// Observability: `EXPLAIN MERGE INTO ...` must work. Before the routing
// fix, Embucket rejected it with
// "SQL compilation error: unsupported feature: Unsupported SQL statement:
// MERGE INTO" because `execute()` never unwrapped
// `DFStatement::Explain(..MERGE..)` and fell through to DataFusion's default
// SQL path, which doesn't know about Embucket's MERGE planner.
//
// This test covers the plan shape only. `EXPLAIN ANALYZE MERGE` is
// exercised end-to-end against the deployed Lambda — its output contains
// per-run metric values whose width varies the formatted-table column
// padding, which is too unstable for an insta snapshot.
test_query!(
merge_into_explain,
"EXPLAIN MERGE INTO merge_target USING merge_source ON merge_target.id = merge_source.id WHEN MATCHED THEN UPDATE SET merge_target.description = merge_source.description",
setup_queries = [
"CREATE TABLE embucket.public.merge_target (ID INTEGER, description VARCHAR)",
"CREATE TABLE embucket.public.merge_source (ID INTEGER, description VARCHAR)",
"INSERT INTO embucket.public.merge_target VALUES (1, 'existing row')",
"INSERT INTO embucket.public.merge_source VALUES (1, 'updated row')",
],
snapshot_path = "merge_into"
);

test_query!(
merge_into_only_update,
"SELECT count(CASE WHEN description = 'updated row' THEN 1 ELSE NULL END) updated, count(CASE WHEN description = 'existing row' THEN 1 ELSE NULL END) existing FROM embucket.public.merge_target",
Expand Down Expand Up @@ -299,3 +322,25 @@ test_query!(
],
snapshot_path = "merge_into"
);

// Regression test for https://github.com/Embucket/embucket/issues/128.
//
// Target is one data file with many rows; source is a mix of updates (matches) and
// inserts (no match), and the target rows of the join land in the filter stream in
// batches where some contain source_exists=true rows and some only contain target
// rows. Previously the "no matches, no source" fast path would silently drop the
// target-only batches for a file that had already been marked as matching in an
// earlier batch, causing the final row count to be less than the expected
// (target_rows + new_source_rows). This test asserts that no target row is lost.
test_query!(
merge_into_mixed_unsorted_multi_row_no_data_loss,
"SELECT COUNT(*) as total_rows, COUNT(CASE WHEN description = 'updated row' THEN 1 END) as updated_rows, COUNT(CASE WHEN description = 'original row' THEN 1 END) as preserved_rows, COUNT(CASE WHEN description = 'new row' THEN 1 END) as inserted_rows FROM embucket.public.merge_target",
setup_queries = [
"CREATE TABLE embucket.public.merge_target (id INTEGER, description VARCHAR)",
"CREATE TABLE embucket.public.merge_source (id INTEGER, description VARCHAR)",
"INSERT INTO embucket.public.merge_target VALUES (1, 'original row'), (2, 'original row'), (3, 'original row'), (4, 'original row'), (5, 'original row'), (6, 'original row'), (7, 'original row'), (8, 'original row'), (9, 'original row'), (10, 'original row')",
"INSERT INTO embucket.public.merge_source VALUES (3, 'updated row'), (7, 'updated row'), (11, 'new row'), (12, 'new row')",
"MERGE INTO merge_target t USING merge_source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET t.description = s.description WHEN NOT MATCHED THEN INSERT (id, description) VALUES (s.id, s.description)",
],
snapshot_path = "merge_into"
);
Loading