diff --git a/crates/embucketd/src/main.rs b/crates/embucketd/src/main.rs index 0efb990d..b472282d 100644 --- a/crates/embucketd/src/main.rs +++ b/crates/embucketd/src/main.rs @@ -182,7 +182,7 @@ async fn async_main( .layer(TraceLayer::new_for_http()) .layer(TimeoutLayer::with_status_code( StatusCode::REQUEST_TIMEOUT, - std::time::Duration::from_secs(1200), + std::time::Duration::from_mins(20), )) .layer(CatchPanicLayer::new()) .into_make_service_with_connect_info::(); diff --git a/crates/executor/src/datafusion/physical_plan/merge.rs b/crates/executor/src/datafusion/physical_plan/merge.rs index 8caa0af3..9ac7be8c 100644 --- a/crates/executor/src/datafusion/physical_plan/merge.rs +++ b/crates/executor/src/datafusion/physical_plan/merge.rs @@ -1,11 +1,7 @@ use datafusion::{ arrow::{ array::{Array, ArrayRef, BooleanArray, RecordBatch, StringArray, downcast_array}, - compute::{ - filter, filter_record_batch, - kernels::cmp::{distinct, eq}, - or, or_kleene, - }, + compute::{filter, kernels::cmp::distinct}, datatypes::Schema, }, physical_expr::EquivalenceProperties, @@ -15,7 +11,7 @@ use datafusion_iceberg::{ DataFusionTable, error::Error as DataFusionIcebergError, table::write_parquet_data_files, }; use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, SendableRecordBatchStream, coalesce_partitions::CoalescePartitionsExec, execution_plan::{Boundedness, EmissionType}, @@ -24,17 +20,14 @@ use datafusion_physical_plan::{ }; use futures::{Stream, StreamExt}; use iceberg_rust::{catalog::tabular::Tabular, error::Error as IcebergError}; -use lru::LruCache; use pin_project_lite::pin_project; use snafu::ResultExt; use std::{ - collections::{HashMap, HashSet}, - num::NonZeroUsize, + collections::HashMap, ops::BitAnd, sync::atomic::{AtomicI64, Ordering}, sync::{Arc, Mutex}, task::Poll, - thread::available_parallelism, }; use crate::error; @@ -45,7 +38,6 @@ pub(crate) static DATA_FILE_PATH_COLUMN: &str = "__data_file_path"; pub(crate) static MANIFEST_FILE_PATH_COLUMN: &str = "__manifest_file_path"; pub(crate) static MERGE_UPDATED_COLUMN: &str = "__merge_row_updated"; pub(crate) static MERGE_INSERTED_COLUMN: &str = "__merge_row_inserted"; -static THREAD_FILE_RATIO: usize = 4; #[derive(Debug)] pub struct MergeIntoCOWSinkExec { @@ -156,29 +148,26 @@ impl ExecutionPlan for MergeIntoCOWSinkExec { let updated_rows: Arc = Arc::new(AtomicI64::new(0)); let inserted_rows: Arc = Arc::new(AtomicI64::new(0)); - // `Count` metrics that surface in `EXPLAIN ANALYZE` as - // `metrics=[updated_rows=…, inserted_rows=…, deleted_rows=…]` on this - // node. Populated below after the write transaction commits. + // `Count` metrics surfaced via `EXPLAIN ANALYZE` on this node. + // Updated incrementally by the stream as rows flow through. let updated_rows_metric: Count = MetricBuilder::new(&self.metrics).counter("updated_rows", partition); let inserted_rows_metric: Count = MetricBuilder::new(&self.metrics).counter("inserted_rows", partition); - let deleted_rows_metric: Count = + // MERGE DELETE is not supported yet; register the metric so it shows as 0. + let _deleted_rows_metric: Count = MetricBuilder::new(&self.metrics).counter("deleted_rows", partition); let coalesce = CoalescePartitionsExec::new(self.input.clone()); - // Filter out rows whoose __data_file_path doesn't have a matching row - let filtered: Arc = Arc::new(MergeCOWFilterExec::new( - Arc::new(coalesce), - matching_files.clone(), - )); - - let input_batches = filtered.execute(partition, context.clone())?; + let input_batches = coalesce.execute(partition, context.clone())?; let count_and_project_stream = MergeCOWCountAndProjectStream::new( input_batches, updated_rows.clone(), inserted_rows.clone(), + updated_rows_metric.clone(), + inserted_rows_metric.clone(), + matching_files.clone(), ); let stream = futures::stream::once({ @@ -187,9 +176,6 @@ impl ExecutionPlan for MergeIntoCOWSinkExec { let schema = schema.clone(); let updated_rows = Arc::clone(&updated_rows); let inserted_rows = Arc::clone(&inserted_rows); - let updated_rows_metric = updated_rows_metric.clone(); - let inserted_rows_metric = inserted_rows_metric.clone(); - let deleted_rows_metric = deleted_rows_metric.clone(); let projected_schema = count_and_project_stream.projected_schema(); let batches: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( projected_schema, @@ -249,13 +235,6 @@ impl ExecutionPlan for MergeIntoCOWSinkExec { // MERGE DELETE is not supported yet let deleted = 0i64; - // Publish per-clause counts to the `MetricsSet` so - // `EXPLAIN ANALYZE` shows them on the MergeIntoSinkExec line. - // Rely on `try_from` so huge row counts fall back cleanly. - updated_rows_metric.add(usize::try_from(updated).unwrap_or(usize::MAX)); - inserted_rows_metric.add(usize::try_from(inserted).unwrap_or(usize::MAX)); - deleted_rows_metric.add(usize::try_from(deleted).unwrap_or(usize::MAX)); - let arrays = schema .fields() .iter() @@ -290,8 +269,9 @@ impl ExecutionPlan for MergeIntoCOWSinkExec { } pin_project! { - /// Stream wrapper that counts per-action MERGE rows (insert/update markers) and projects away - /// auxiliary merge columns before writing to data files. + /// Stream wrapper that counts per-action MERGE rows (insert/update markers), collects + /// matching data/manifest file pairs, and projects away auxiliary merge columns before + /// writing to data files. pub struct MergeCOWCountAndProjectStream { projection_indices: Vec, projected_schema: Arc, @@ -299,6 +279,12 @@ pin_project! { inserted_idx: Option, updated_rows: Arc, inserted_rows: Arc, + updated_rows_metric: Count, + inserted_rows_metric: Count, + data_file_path_idx: usize, + manifest_file_path_idx: usize, + matching_files: HashMap, + matching_files_ref: Arc>>, #[pin] input: SendableRecordBatchStream, @@ -310,11 +296,18 @@ impl MergeCOWCountAndProjectStream { input: SendableRecordBatchStream, updated_rows: Arc, inserted_rows: Arc, + updated_rows_metric: Count, + inserted_rows_metric: Count, + matching_files_ref: Arc>>, ) -> Self { let input_schema = input.schema(); let updated_idx = input_schema.index_of(MERGE_UPDATED_COLUMN).ok(); let inserted_idx = input_schema.index_of(MERGE_INSERTED_COLUMN).ok(); + let data_file_path_idx = input_schema.index_of(DATA_FILE_PATH_COLUMN).unwrap_or(0); + let manifest_file_path_idx = input_schema + .index_of(MANIFEST_FILE_PATH_COLUMN) + .unwrap_or(0); // Drop auxiliary columns so we only write table columns to parquet let projection_indices: Vec = input_schema @@ -350,6 +343,12 @@ impl MergeCOWCountAndProjectStream { inserted_idx, updated_rows, inserted_rows, + updated_rows_metric, + inserted_rows_metric, + data_file_path_idx, + manifest_file_path_idx, + matching_files: HashMap::new(), + matching_files_ref, input, } } @@ -369,21 +368,36 @@ impl Stream for MergeCOWCountAndProjectStream { let mut project = self.project(); match project.input.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(batch))) => { + // Collect unique (data_file, manifest_file) pairs + let data_file_col = batch.column(*project.data_file_path_idx); + let manifest_file_col = batch.column(*project.manifest_file_path_idx); + let file_pairs = + unique_files_and_manifests(data_file_col.as_ref(), manifest_file_col.as_ref())?; + project.matching_files.extend(file_pairs); + + // Count updated/inserted rows if let Some(updated_idx) = *project.updated_idx && let Some(col) = batch.columns().get(updated_idx) { let updated = downcast_array::(col.as_ref()); - let n = usize_to_i64_saturating(count_true_and_valid(&updated)); - project.updated_rows.fetch_add(n, Ordering::Relaxed); + let count = count_true_and_valid(&updated); + project + .updated_rows + .fetch_add(usize_to_i64_saturating(count), Ordering::Relaxed); + project.updated_rows_metric.add(count); } if let Some(inserted_idx) = *project.inserted_idx && let Some(col) = batch.columns().get(inserted_idx) { let inserted = downcast_array::(col.as_ref()); - let n = usize_to_i64_saturating(count_true_and_valid(&inserted)); - project.inserted_rows.fetch_add(n, Ordering::Relaxed); + let count = count_true_and_valid(&inserted); + project + .inserted_rows + .fetch_add(usize_to_i64_saturating(count), Ordering::Relaxed); + project.inserted_rows_metric.add(count); } + // Project away auxiliary columns let cols = project .projection_indices .iter() @@ -399,7 +413,21 @@ impl Stream for MergeCOWCountAndProjectStream { Poll::Ready(Some(Ok(projected))) } Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), - Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(None) => { + // Store matching files for the commit phase + let matching_files = std::mem::take(project.matching_files); + let mut grouped: HashMap> = HashMap::new(); + for (file, manifest) in matching_files { + grouped + .entry(manifest) + .and_modify(|v| v.push(file.clone())) + .or_insert_with(|| vec![file]); + } + #[allow(clippy::unwrap_used)] + let mut lock = project.matching_files_ref.lock().unwrap(); + lock.replace(grouped); + Poll::Ready(None) + } Poll::Pending => Poll::Pending, } } @@ -425,399 +453,6 @@ fn usize_to_i64_saturating(v: usize) -> i64 { i64::try_from(v).unwrap_or(i64::MAX) } -#[derive(Debug)] -struct MergeCOWFilterExec { - input: Arc, - properties: PlanProperties, - matching_files: Arc>>, -} - -impl MergeCOWFilterExec { - fn new( - input: Arc, - matching_files: Arc>>, - ) -> Self { - let properties = input.properties().clone(); - Self { - input, - properties, - matching_files, - } - } -} - -impl DisplayAs for MergeCOWFilterExec { - fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match t { - DisplayFormatType::Default - | DisplayFormatType::Verbose - | DisplayFormatType::TreeRender => { - write!(f, "SourceExistFilterExec") - } - } - } -} - -impl ExecutionPlan for MergeCOWFilterExec { - fn name(&self) -> &'static str { - "MergeCOWFilterExec" - } - - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn properties(&self) -> &PlanProperties { - &self.properties - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.input] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> datafusion_common::Result> { - if children.len() != 1 { - return Err(DataFusionError::Internal( - error::LogicalExtensionChildCountSnafu { - name: "MergeCOWFilterExec".to_string(), - expected: 1usize, - } - .build() - .to_string(), - )); - } - Ok(Arc::new(Self::new( - children[0].clone(), - self.matching_files.clone(), - ))) - } - - fn execute( - &self, - partition: usize, - context: Arc, - ) -> datafusion_common::Result { - Ok(Box::pin(MergeCOWFilterStream::new( - self.input.execute(partition, context)?, - self.matching_files.clone(), - ))) - } -} - -pin_project! { - /// A streaming filter for Copy-on-Write (COW) merge operations that tracks file matching state. - /// - /// This stream processes record batches and maintains state about which data files have - /// matching rows (where `__source_exists` = true) and which do not. It buffers batches - /// from non-matching files until their matching status is determined, then releases them - /// when appropriate. - /// - /// The stream is used to efficiently handle merge operations by: - /// - Tracking files that have already found matching rows - /// - Buffering data from files that haven't found matches yet - /// - Managing the flow of data to optimize merge performance - pub struct MergeCOWFilterStream { - // Files which already encountered a "__source_exists" = true value - matching_files: HashMap, - // Reference to store the matching files after the stream has finished executing - matching_files_ref: Arc>>, - // Files which haven't encountered a "__source_exists" = true value - not_matching_files: HashMap, - // Buffer of RecordBatches whoose data files haven't had a matching row yet - not_matched_buffer: LruCache>, - // Previously buffered RecordBatches that are now ready to be consumed - ready_batches: Vec, - - #[pin] - input: SendableRecordBatchStream, - } -} - -impl MergeCOWFilterStream { - fn new( - input: SendableRecordBatchStream, - matching_files_ref: Arc>>, - ) -> Self { - let buffer_size = (available_parallelism().map(NonZeroUsize::get).unwrap_or(1) - / THREAD_FILE_RATIO) - .max(2); - Self { - matching_files: HashMap::new(), - not_matching_files: HashMap::new(), - #[allow(clippy::unwrap_used)] - not_matched_buffer: LruCache::new(NonZeroUsize::new(buffer_size).unwrap()), - ready_batches: Vec::new(), - matching_files_ref, - input, - } - } -} - -impl Stream for MergeCOWFilterStream { - type Item = Result; - - #[allow(clippy::too_many_lines)] - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let mut project = self.project(); - - loop { - // Return early if a batch is ready - if let Some(batch) = project.ready_batches.pop() { - return Poll::Ready(Some(Ok(batch))); - } - - match project.input.as_mut().poll_next(cx) { - Poll::Ready(Some(Ok(batch))) => { - let schema = batch.schema(); - - let source_exists_array = batch.column(schema.index_of(SOURCE_EXISTS_COLUMN)?); - let data_file_path_array = - batch.column(schema.index_of(DATA_FILE_PATH_COLUMN)?); - let manifest_file_path_array = - batch.column(schema.index_of(MANIFEST_FILE_PATH_COLUMN)?); - - // All data files in the current batch - - let data_and_manifest_files = unique_files_and_manifests( - &data_file_path_array, - &manifest_file_path_array, - )?; - - let current_data_files: HashSet = data_and_manifest_files - .keys() - .map(ToOwned::to_owned) - .collect(); - - // All data files that have a matching row in the current batch - let matching_data_file_array = filter( - &data_file_path_array, - &downcast_array::(source_exists_array), - )?; - - let matching_data_files = unique_values(&matching_data_file_array)?; - - // Data files that have a matching row now, but didn't before - let newly_matched_data_files: HashSet = project - .not_matching_files - .keys() - .map(ToOwned::to_owned) - .collect::>() - .intersection(&matching_data_files) - .map(ToOwned::to_owned) - .collect(); - - // All data files that ever had a matching row, previous batches included - let all_matching_data_files = { - // Datafiles in the current batch that were matched before - let mut previously_matched_data_files: HashSet = project - .matching_files - .keys() - .map(ToOwned::to_owned) - .collect::>() - .intersection(¤t_data_files) - .map(ToOwned::to_owned) - .collect(); - - previously_matched_data_files.extend(matching_data_files.clone()); - previously_matched_data_files - }; - - let not_matched_data_files: HashSet = current_data_files - .difference(&all_matching_data_files) - .map(ToOwned::to_owned) - .collect(); - - let matching_data_and_manifest_files: HashMap = - data_and_manifest_files - .iter() - .filter(|(file, _)| matching_data_files.contains(*file)) - .map(|(x, y)| (x.clone(), y.clone())) - .collect(); - - // When datafile didn't match in previous record batches but matches now, the - // previous record batches have to be appended to the output - for file in newly_matched_data_files { - let manifest = - project.not_matching_files.remove(&file).ok_or_else(|| { - DataFusionError::Internal( - error::MergeFilterStreamNotMatchingSnafu { file: file.clone() } - .build() - .to_string(), - ) - })?; - - let batches = project.not_matched_buffer.pop(&file).ok_or_else(|| { - DataFusionError::Internal( - error::MergeFilterStreamNotMatchingSnafu { file: file.clone() } - .build() - .to_string(), - ) - })?; - - for batch in batches { - project.ready_batches.push(batch); - } - - project.matching_files.insert(file, manifest); - } - - // All files without a match are recorded and their record batches are stored - // in the 'not_matched_buffer' - for file in not_matched_data_files { - let manifest = data_and_manifest_files.get(&file).ok_or_else(|| { - DataFusionError::Internal( - error::MergeFilterStreamNotMatchingSnafu { file: file.clone() } - .build() - .to_string(), - ) - })?; - project - .not_matching_files - .insert(file.clone(), manifest.clone()); - - let predicate = eq(&data_file_path_array, &StringArray::new_scalar(&file))?; - let filtered_batch = filter_record_batch(&batch, &predicate)?; - project - .not_matched_buffer - .get_or_insert_mut(file, Vec::new) - .push(filtered_batch); - } - - // Only take the fast paths if the current batch references no target file - // that will be (or has been) overwritten. Otherwise the full filter path - // below is required so target rows belonging to `all_matching_data_files` - // are re-emitted into the rewritten data file. - if matching_data_and_manifest_files.is_empty() - && all_matching_data_files.is_empty() - { - // Return early if all rows only come from source - if matching_data_file_array.len() == source_exists_array.len() { - return Poll::Ready(Some(Ok(batch))); - } else if matching_data_file_array.is_empty() { - //NO matches and no rows from source - continue; - } - } - - let file_predicate = all_matching_data_files.iter().try_fold( - None::, - |acc, x| { - let new = eq(&data_file_path_array, &StringArray::new_scalar(x))?; - if let Some(acc) = acc { - let result = or(&acc, &new)?; - Ok::<_, DataFusionError>(Some(result)) - } else { - Ok(Some(new)) - } - }, - )?; - let predicate = if let Some(file_predicate) = file_predicate { - or_kleene( - &file_predicate, - &downcast_array::(&source_exists_array), - )? - } else { - downcast_array::(&source_exists_array) - }; - - project - .matching_files - .extend(matching_data_and_manifest_files); - - let filtered_batch = filter_record_batch(&batch, &predicate)?; - - return Poll::Ready(Some(Ok(filtered_batch))); - } - Poll::Ready(None) => { - // The stream has finished, we now have to pass the list of matched files to the - // matching_files_ref to be accessed from outside of this stream - let mut matching_files = std::mem::take(project.matching_files); - let mut new: HashMap> = HashMap::new(); - for (file, manifest) in matching_files.drain() { - new.entry(manifest) - .and_modify(|v| v.push(file.clone())) - .or_insert_with(|| vec![file]); - } - #[allow(clippy::unwrap_used)] - let mut lock = project.matching_files_ref.lock().unwrap(); - lock.replace(new); - return Poll::Ready(None); - } - x => return x, - } - } - } -} - -impl RecordBatchStream for MergeCOWFilterStream { - fn schema(&self) -> datafusion::arrow::datatypes::SchemaRef { - self.input.schema() - } -} - -/// Extracts unique string values from an array efficiently by only comparing consecutive elements. -/// -/// This function assumes the input array is sorted and leverages this property to find unique values -/// by only checking consecutive pairs rather than comparing all elements. It: -/// 1. Takes the first element as a starting point -/// 2. Identifies positions where consecutive elements differ -/// 3. Filters to keep only the distinct values -/// 4. Returns a `HashSet` containing all unique string values -/// -/// # Arguments -/// * `array` - A reference to an Array (expected to be a `StringArray`) -/// -/// # Returns -/// * `Result, DataFusionError>` - `HashSet` of unique string values or an error -fn unique_values(array: &dyn Array) -> Result, DataFusionError> { - if array.is_empty() { - return Ok(HashSet::new()); - } - - let first = downcast_array::(array).value(0).to_owned(); - - let init = if first.is_empty() { - HashSet::new() - } else { - HashSet::from_iter([first]) - }; - - let slice_len = array.len() - 1; - - if slice_len == 0 { - return Ok(init); - } - - let v1 = array.slice(0, slice_len); - let v2 = array.slice(1, slice_len); - - // Which consecutive entries are different - let mask = distinct(&v1, &v2)?; - - // only keep values that are diffirent from their previous value, this drastically reduces the - // number of values needed to process - let unique = filter(&v2, &mask)?; - - let strings = downcast_array::(&unique); - - let result = strings.iter().fold(init, |mut acc, x| { - if let Some(x) = x - && !x.is_empty() - { - acc.insert(x.to_owned()); - } - acc - }); - - Ok(result) -} - /// Creates a mapping of unique file paths to their corresponding manifest paths. /// /// This function efficiently extracts unique file-manifest pairs from two sorted arrays by @@ -893,88 +528,10 @@ fn unique_files_and_manifests( #[cfg(test)] mod tests { #![allow(clippy::unwrap_used)] - use datafusion::arrow::array::{GenericStringBuilder, Int32Array}; - use datafusion::arrow::compute; - use datafusion::arrow::datatypes::{DataType, Field}; use super::*; use std::sync::Arc; - macro_rules! test_merge_cow_filter_stream { - ($test_name:ident, $input_slice:expr, $expected_sum:expr) => { - paste::paste! { - #[tokio::test] - async fn []() { - use datafusion::arrow::datatypes::{DataType, Field}; - use futures::stream; - - let schema = Arc::new(Schema::new(vec![ - Field::new(SOURCE_EXISTS_COLUMN, DataType::Boolean, true), - Field::new(DATA_FILE_PATH_COLUMN, DataType::Utf8, true), - Field::new(MANIFEST_FILE_PATH_COLUMN, DataType::Utf8, true), - Field::new("data", DataType::Int32, false), - ])); - - let input_stream = stream::iter(build_input_stream($input_slice)); - - let stream = Box::pin(RecordBatchStreamAdapter::new(schema, input_stream)); - - let matching_files = Arc::default(); - - let mut filter_stream = MergeCOWFilterStream::new(stream, matching_files); - - let mut sum = 0; - while let Some(result) = StreamExt::next(&mut filter_stream).await { - let batch = result.unwrap(); - let data = batch.column(3); - - sum += compute::sum(&downcast_array::(&data)).unwrap(); - } - - assert_eq!(sum, $expected_sum); - } - } - }; - } - - #[test] - fn test_unique_values_with_duplicates() { - let array = Arc::new(StringArray::from(vec!["a", "a", "b", "b", "c"])); - let result = unique_values(array.as_ref()).unwrap(); - - let expected: HashSet = ["a", "b", "c"].iter().map(|&s| s.to_string()).collect(); - - assert_eq!(result, expected); - } - - #[test] - fn test_unique_values_all_same() { - let array = Arc::new(StringArray::from(vec!["same", "same", "same"])); - let result = unique_values(array.as_ref()).unwrap(); - - let expected: HashSet = std::iter::once(&"same") - .map(std::string::ToString::to_string) - .collect(); - - assert_eq!(result, expected); - } - - #[test] - fn test_unique_values_with_nulls() { - let array = Arc::new(StringArray::from(vec![ - Some("a"), - None, - Some("b"), - None, - Some("a"), - ])); - let result = unique_values(array.as_ref()).unwrap(); - - let expected: HashSet = ["a", "b"].iter().map(|&s| s.to_string()).collect(); - - assert_eq!(result, expected); - } - #[test] fn test_unique_files_and_manifests_with_duplicates() { let files = Arc::new(StringArray::from(vec![ @@ -999,275 +556,4 @@ mod tests { ]); assert_eq!(result, expected); } - - #[tokio::test] - async fn test_merge_cow_filter_stream_simple() { - use datafusion::arrow::datatypes::{DataType, Field}; - use futures::stream; - - let schema = Arc::new(Schema::new(vec![ - Field::new(SOURCE_EXISTS_COLUMN, DataType::Boolean, false), - Field::new(DATA_FILE_PATH_COLUMN, DataType::Utf8, false), - Field::new(MANIFEST_FILE_PATH_COLUMN, DataType::Utf8, false), - Field::new("data", DataType::Int32, false), - ])); - - let batch1 = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(BooleanArray::from(vec![false, false, true, false])), - Arc::new(StringArray::from(vec!["file1", "file1", "file2", "file2"])), - Arc::new(StringArray::from(vec![ - "manifest1", - "manifest1", - "manifest1", - "manifest1", - ])), - Arc::new(datafusion::arrow::array::Int32Array::from(vec![1, 2, 3, 4])), - ], - ) - .unwrap(); - - let batch2 = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(BooleanArray::from(vec![false, true, false, true])), - Arc::new(StringArray::from(vec!["file2", "file3", "file3", "file3"])), - Arc::new(StringArray::from(vec![ - "manifest1", - "manifest2", - "manifest2", - "manifest2", - ])), - Arc::new(datafusion::arrow::array::Int32Array::from(vec![5, 6, 7, 8])), - ], - ) - .unwrap(); - - let batch3 = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(BooleanArray::from(vec![true, true, false])), - Arc::new(StringArray::from(vec!["file4", "file4", "file4"])), - Arc::new(StringArray::from(vec![ - "manifest3", - "manifest3", - "manifest3", - ])), - Arc::new(datafusion::arrow::array::Int32Array::from(vec![9, 10, 11])), - ], - ) - .unwrap(); - - let input_stream = stream::iter(vec![Ok(batch1), Ok(batch2), Ok(batch3)]); - let stream = Box::pin(RecordBatchStreamAdapter::new(schema, input_stream)); - - let matching_files = Arc::default(); - - let mut filter_stream = MergeCOWFilterStream::new(stream, matching_files); - - let mut total_rows = 0; - while let Some(result) = StreamExt::next(&mut filter_stream).await { - let batch = result.unwrap(); - total_rows += batch.num_rows(); - } - - assert!(total_rows == 9); - } - - /// Generates test record batches from a sequence of scenario identifiers. - /// - /// Each tuple in the sequence contains (index, `scenario_type`) where `scenario_type` maps to: - /// 1. Target-only data, 2. Source-only data, 3. Target+Source, 4. Matching data, - /// 5. Target+Matching, 6. Source+Matching, 7. Target+Source+Matching - fn build_input_stream( - sequence: &[(usize, usize)], - ) -> Vec> { - sequence - .iter() - .map(|(n, i)| { - let n: i32 = (*n).try_into().unwrap(); - match i { - 1 => Ok(build_record_batch(&[generate_target(n)])), - 2 => Ok(build_record_batch(&[generate_source(n)])), - 3 => Ok(build_record_batch(&[ - generate_target(n), - generate_source(n), - ])), - 4 => Ok(build_record_batch(&[generate_matching(n)])), - 5 => Ok(build_record_batch(&[ - generate_target(n), - generate_matching(n), - ])), - 6 => Ok(build_record_batch(&[ - generate_source(n), - generate_matching(n), - ])), - 7 => Ok(build_record_batch(&[ - generate_source(n), - generate_target(n), - generate_matching(n), - ])), - - _ => panic!(), - } - }) - .collect() - } - - /// Builds a test record batch from a sequence of input data. - #[allow(clippy::type_complexity)] - fn build_record_batch( - input: &[( - Vec, - Vec>, - Vec>, - Vec, - )], - ) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![ - Field::new(SOURCE_EXISTS_COLUMN, DataType::Boolean, true), - Field::new(DATA_FILE_PATH_COLUMN, DataType::Utf8, true), - Field::new(MANIFEST_FILE_PATH_COLUMN, DataType::Utf8, true), - Field::new("data", DataType::Int32, false), - ])); - - let mut b_builder = BooleanArray::builder(8); - let mut df_builder = GenericStringBuilder::::new(); - let mut mf_builder = GenericStringBuilder::::new(); - let mut d_builder = Int32Array::builder(8); - for (b, df, mf, d) in input { - b_builder.append_slice(b); - df_builder.append_array(&StringArray::from(df.clone())); - mf_builder.append_array(&StringArray::from(mf.clone())); - d_builder.append_slice(d); - } - RecordBatch::try_new( - schema, - vec![ - Arc::new(b_builder.finish()), - Arc::new(df_builder.finish()), - Arc::new(mf_builder.finish()), - Arc::new(d_builder.finish()), - ], - ) - .expect("Failed to build record batch") - } - - // Creates test data for the target table where __source_exists is NULL ==> NOT MATCHED - #[allow(clippy::type_complexity)] - fn generate_target( - i: i32, - ) -> ( - Vec, - Vec>, - Vec>, - Vec, - ) { - ( - vec![false, false, false, false], - vec![ - Some(format!("file{i}")), - Some(format!("file{i}")), - Some(format!("file{i}")), - Some(format!("file{i}")), - ], - vec![ - Some(format!("manifest{i}")), - Some(format!("manifest{i}")), - Some(format!("manifest{i}")), - Some(format!("manifest{i}")), - ], - vec![i * 4 + 1, i * 4 + 2, i * 4 + 3, i * 4 + 4], - ) - } - - // Creates test data for the source table where __data_file_path & __manifest_file_path are - // NULL ==> NOT MATCHED - #[allow(clippy::type_complexity)] - fn generate_source( - i: i32, - ) -> ( - Vec, - Vec>, - Vec>, - Vec, - ) { - ( - vec![true, true, true, true], - vec![None, None, None, None], - vec![None, None, None, None], - vec![i * 4 + 1, i * 4 + 2, i * 4 + 3, i * 4 + 4], - ) - } - - // Creates MATCHED test data for target and source table - #[allow(clippy::type_complexity)] - fn generate_matching( - i: i32, - ) -> ( - Vec, - Vec>, - Vec>, - Vec, - ) { - ( - vec![true, true, true, true], - vec![ - Some(format!("file{i}")), - Some(format!("file{i}")), - Some(format!("file{i}")), - Some(format!("file{i}")), - ], - vec![ - Some(format!("manifest{i}")), - Some(format!("manifest{i}")), - Some(format!("manifest{i}")), - Some(format!("manifest{i}")), - ], - vec![i * 4 + 1, i * 4 + 2, i * 4 + 3, i * 4 + 4], - ) - } - - test_merge_cow_filter_stream!(single_target, &[(0, 1)], 0); - test_merge_cow_filter_stream!(single_source, &[(0, 2)], 10); - test_merge_cow_filter_stream!(single_matching, &[(0, 4)], 10); - test_merge_cow_filter_stream!(single_target_source, &[(0, 3)], 10); - test_merge_cow_filter_stream!(single_target_matching, &[(0, 5)], 20); - test_merge_cow_filter_stream!(single_source_matching, &[(0, 6)], 20); - test_merge_cow_filter_stream!(single_target_source_matching, &[(0, 7)], 30); - test_merge_cow_filter_stream!(target_source, &[(0, 1), (0, 2)], 10); - test_merge_cow_filter_stream!(target_matching, &[(0, 1), (0, 4)], 20); - test_merge_cow_filter_stream!(source_matching, &[(0, 2), (0, 4)], 20); - test_merge_cow_filter_stream!(target_target_matching, &[(0, 1), (0, 1), (0, 4)], 30); - test_merge_cow_filter_stream!( - target1_target2_matching2_target3_matching1, - &[(0, 1), (1, 1), (1, 4), (2, 1), (0, 4)], - 72 - ); - test_merge_cow_filter_stream!( - source_target_source_matching, - &[(0, 2), (0, 1), (0, 6), (0, 5)], - 60 - ); - // Regression test for https://github.com/Embucket/embucket/issues/128 - // - // If a target file has been seen as "matching" in an earlier batch and a subsequent - // batch contains only target rows (no `__source_exists` = true rows) for that same - // file, the rows in the later batch must still be passed through the filter so they - // land in the rewritten data file. Previously the "no matches, no source" fast path - // dropped them, causing silent data loss during `MERGE INTO` on unsorted inputs. - test_merge_cow_filter_stream!(matching_then_target, &[(0, 4), (0, 1)], 20); - test_merge_cow_filter_stream!( - matching_then_target_then_matching, - &[(0, 4), (0, 1), (0, 4)], - 30 - ); - // Mixed scenario: several target-only batches arriving AFTER the target file has - // been matched. - test_merge_cow_filter_stream!( - matching_then_multiple_target_batches, - &[(0, 4), (0, 1), (0, 1), (0, 1)], - 40 - ); } diff --git a/crates/executor/src/error.rs b/crates/executor/src/error.rs index 3d08461e..6269fd8a 100644 --- a/crates/executor/src/error.rs +++ b/crates/executor/src/error.rs @@ -495,13 +495,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Data for not-matching file {file} is not available"))] - MergeFilterStreamNotMatching { - file: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Matching files have already been consumed"))] MatchingFilesAlreadyConsumed { #[snafu(implicit)] diff --git a/crates/executor/src/query.rs b/crates/executor/src/query.rs index fe19f8d8..b7f589f1 100644 --- a/crates/executor/src/query.rs +++ b/crates/executor/src/query.rs @@ -1517,6 +1517,7 @@ impl UserQuery { // Create the LogicalPlan for the join let sql_planner = SqlToRel::new(&session_context_provider); + let schema = build_join_schema(&target_schema, &source_schema, &JoinType::Full) .context(ex_error::DataFusionLogicalPlanMergeJoinSnafu)?; @@ -1524,6 +1525,31 @@ impl UserQuery { .sql_to_expr((*on).clone(), &schema, &mut planner_context) .context(ex_error::DataFusionLogicalPlanMergeJoinSnafu)?; + let affected_files_plan = LogicalPlanBuilder::new(target_plan.clone()) + .join_on(source_plan.clone(), JoinType::Inner, [on_expr.clone(); 1]) + .context(ex_error::DataFusionLogicalPlanMergeJoinSnafu)? + .aggregate( + [col(DATA_FILE_PATH_COLUMN)], + Vec::::new(), + ) + .context(ex_error::DataFusionLogicalPlanMergeJoinSnafu)? + .build() + .context(ex_error::DataFusionLogicalPlanMergeJoinSnafu)?; + + let target_plan = LogicalPlanBuilder::new(target_plan) + .join( + affected_files_plan, + JoinType::LeftSemi, + ( + vec![Column::from_name(DATA_FILE_PATH_COLUMN)], + vec![Column::from_name(DATA_FILE_PATH_COLUMN)], + ), + None, + ) + .context(ex_error::DataFusionLogicalPlanMergeJoinSnafu)? + .build() + .context(ex_error::DataFusionLogicalPlanMergeJoinSnafu)?; + let has_insert = clauses .iter() .any(|c| matches!(c.action, MergeAction::Insert(_))); diff --git a/crates/executor/src/snowflake_error.rs b/crates/executor/src/snowflake_error.rs index 963e7cea..82c60ac8 100644 --- a/crates/executor/src/snowflake_error.rs +++ b/crates/executor/src/snowflake_error.rs @@ -254,7 +254,6 @@ pub fn executor_error(error: &Error) -> SnowflakeError { | Error::CatalogListDowncast { .. } | Error::CatalogDownCast { .. } | Error::LogicalExtensionChildCount { .. } - | Error::MergeFilterStreamNotMatching { .. } | Error::MatchingFilesAlreadyConsumed { .. } | Error::MissingFilterPredicates { .. } => CustomSnafu { message, diff --git a/crates/executor/src/tests/query.rs b/crates/executor/src/tests/query.rs index 74e6842e..9a47a0dc 100644 --- a/crates/executor/src/tests/query.rs +++ b/crates/executor/src/tests/query.rs @@ -206,6 +206,10 @@ macro_rules! test_query { // EXPLAIN snapshots don't flake between 4-core CI and dev boxes with // different core counts. settings.add_filter(r"RoundRobinBatch\(\d+\)", "RoundRobinBatch([N])"); + // Hash-repartition fan-out and input_partitions also depend on + // the host CPU count. Normalize both so snapshots are stable. + settings.add_filter(r"Hash\((\[[^\]]*\]),\s*\d+\)", "Hash($1, [N])"); + settings.add_filter(r"input_partitions=\d+", "input_partitions=[N]"); let setup: Vec<&str> = vec![$($($setup_queries),*)?]; if !setup.is_empty() { diff --git a/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_explain.snap b/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_explain.snap index 4591fb22..75563860 100644 --- a/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_explain.snap +++ b/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_explain.snap @@ -12,22 +12,46 @@ Ok( "| | Projection: embucket.public.merge_target.id, CASE WHEN __common_expr_1 THEN TRY_CAST(embucket.public.merge_source.description AS Utf8) ELSE embucket.public.merge_target.description END AS description, embucket.public.merge_target.__data_file_path, embucket.public.merge_target.__manifest_file_path, __target_exists, __source_exists, __common_expr_1 AS __merge_row_updated, Boolean(false) AS __merge_row_inserted |", "| | Projection: __target_exists AND __source_exists AS __common_expr_1, embucket.public.merge_target.id, embucket.public.merge_target.description, embucket.public.merge_target.__data_file_path, embucket.public.merge_target.__manifest_file_path, __target_exists, embucket.public.merge_source.id, embucket.public.merge_source.description, __source_exists |", "| | Full Join: embucket.public.merge_target.id = embucket.public.merge_source.id |", - "| | Projection: embucket.public.merge_target.id, embucket.public.merge_target.description, embucket.public.merge_target.__data_file_path, embucket.public.merge_target.__manifest_file_path, Boolean(true) AS __target_exists |", - "| | TableScan: embucket.public.merge_target |", + "| | LeftSemi Join: embucket.public.merge_target.__data_file_path = embucket.public.merge_target.__data_file_path |", + "| | Projection: embucket.public.merge_target.id, embucket.public.merge_target.description, embucket.public.merge_target.__data_file_path, embucket.public.merge_target.__manifest_file_path, Boolean(true) AS __target_exists |", + "| | TableScan: embucket.public.merge_target |", + "| | Aggregate: groupBy=[[embucket.public.merge_target.__data_file_path]], aggr=[[]] |", + "| | Inner Join: embucket.public.merge_target.id = embucket.public.merge_source.id |", + "| | Projection: embucket.public.merge_target.id, embucket.public.merge_target.description, embucket.public.merge_target.__data_file_path, embucket.public.merge_target.__manifest_file_path, Boolean(true) AS __target_exists |", + "| | TableScan: embucket.public.merge_target |", + "| | Projection: embucket.public.merge_source.id, embucket.public.merge_source.description, Boolean(true) AS __source_exists |", + "| | TableScan: embucket.public.merge_source |", "| | Projection: embucket.public.merge_source.id, embucket.public.merge_source.description, Boolean(true) AS __source_exists |", "| | TableScan: embucket.public.merge_source |", "| physical_plan | MergeIntoSinkExec |", "| | ProjectionExec: expr=[id@1 as id, CASE WHEN __common_expr_1@0 THEN description@7 ELSE description@2 END as description, __data_file_path@3 as __data_file_path, __manifest_file_path@4 as __manifest_file_path, __target_exists@5 as __target_exists, __source_exists@8 as __source_exists, __common_expr_1@0 as __merge_row_updated, false as __merge_row_inserted] |", "| | ProjectionExec: expr=[__target_exists@4 AND __source_exists@7 as __common_expr_1, id@0 as id, description@1 as description, __data_file_path@2 as __data_file_path, __manifest_file_path@3 as __manifest_file_path, __target_exists@4 as __target_exists, id@5 as id, description@6 as description, __source_exists@7 as __source_exists] |", - "| | RepartitionExec: partitioning=RoundRobinBatch([N]), input_partitions=1 |", - "| | ProjectionExec: expr=[id@3 as id, description@4 as description, __data_file_path@5 as __data_file_path, __manifest_file_path@6 as __manifest_file_path, __target_exists@7 as __target_exists, id@0 as id, description@1 as description, __source_exists@2 as __source_exists] |", + "| | RepartitionExec: partitioning=RoundRobinBatch([N]), input_partitions=[N] |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", "| | CoalesceBatchesExec: target_batch_size=8192 |", - "| | CoalesceBatchesExec: target_batch_size=8192 |", - "| | HashJoinExec: mode=CollectLeft, join_type=Full, on=[(id@0, id@0)] |", - "| | ProjectionExec: expr=[id@0 as id, description@1 as description, true as __source_exists] |", - "| | DataSourceExec: file_groups={1 group: [[/[PATH]/testing/data/[HEX]/[UUID].parquet]]}, projection=[id, description], file_type=parquet |", - "| | ProjectionExec: expr=[id@0 as id, description@1 as description, __data_file_path@2 as __data_file_path, __manifest_file_path@3 as __manifest_file_path, true as __target_exists] |", - "| | DataSourceExec: file_groups={1 group: [[/[PATH]/testing/data/[HEX]/[UUID].parquet]]}, projection=[id, description, __data_file_path, __manifest_file_path], file_type=parquet |", + "| | HashJoinExec: mode=CollectLeft, join_type=Full, on=[(id@0, id@0)] |", + "| | CoalescePartitionsExec |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", + "| | HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(__data_file_path@2, __data_file_path@0)] |", + "| | ProjectionExec: expr=[id@0 as id, description@1 as description, __data_file_path@2 as __data_file_path, __manifest_file_path@3 as __manifest_file_path, true as __target_exists] |", + "| | DataSourceExec: file_groups={1 group: [[/[PATH]/testing/data/[HEX]/[UUID].parquet]]}, projection=[id, description, __data_file_path, __manifest_file_path], file_type=parquet |", + "| | AggregateExec: mode=FinalPartitioned, gby=[__data_file_path@0 as __data_file_path], aggr=[] |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", + "| | RepartitionExec: partitioning=Hash([__data_file_path@0], [N]), input_partitions=[N] |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", + "| | AggregateExec: mode=Partial, gby=[__data_file_path@2 as __data_file_path], aggr=[] |", + "| | ProjectionExec: expr=[id@3 as id, description@4 as description, __data_file_path@5 as __data_file_path, __manifest_file_path@6 as __manifest_file_path, __target_exists@7 as __target_exists, id@0 as id, description@1 as description, __source_exists@2 as __source_exists] |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", + "| | HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)] |", + "| | ProjectionExec: expr=[id@0 as id, description@1 as description, true as __source_exists] |", + "| | DataSourceExec: file_groups={1 group: [[/[PATH]/testing/data/[HEX]/[UUID].parquet]]}, projection=[id, description], file_type=parquet |", + "| | RepartitionExec: partitioning=RoundRobinBatch([N]), input_partitions=[N] |", + "| | ProjectionExec: expr=[id@0 as id, description@1 as description, __data_file_path@2 as __data_file_path, __manifest_file_path@3 as __manifest_file_path, true as __target_exists] |", + "| | DataSourceExec: file_groups={1 group: [[/[PATH]/testing/data/[HEX]/[UUID].parquet]]}, projection=[id, description, __data_file_path, __manifest_file_path], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] AND DynamicFilterPhysicalExpr [ true ] |", + "| | ProjectionExec: expr=[id@0 as id, description@1 as description, true as __source_exists] |", + "| | DataSourceExec: file_groups={1 group: [[/[PATH]/testing/data/[HEX]/[UUID].parquet]]}, projection=[id, description], file_type=parquet |", "| | |", "+---------------+----------------------------------------------------------------------------------------------------------------------------------+", ],