diff --git a/datafusion_iceberg/src/pruning_statistics.rs b/datafusion_iceberg/src/pruning_statistics.rs index c1be1947..85ebc4c8 100644 --- a/datafusion_iceberg/src/pruning_statistics.rs +++ b/datafusion_iceberg/src/pruning_statistics.rs @@ -357,14 +357,14 @@ struct DateTransform { impl DateTransform { fn new() -> Self { + // Accept any second-argument type via `TypeSignature::UserDefined` and + // normalize it in `coerce_types`. The underlying transform is + // timezone-agnostic (it operates on the i64 microseconds-since-epoch), + // so any `Timestamp(Microsecond, *)` is a valid input — we just need + // to strip the timezone metadata so the physical invocation sees + // `Timestamp(Microsecond, None)`. let signature = Signature { - type_signature: TypeSignature::OneOf(vec![ - TypeSignature::Exact(vec![DataType::Utf8, DataType::Date32]), - TypeSignature::Exact(vec![ - DataType::Utf8, - DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Microsecond, None), - ]), - ]), + type_signature: TypeSignature::UserDefined, volatility: Volatility::Immutable, }; Self { signature } @@ -388,6 +388,38 @@ impl ScalarUDFImpl for DateTransform { Ok(DataType::Int32) } + fn coerce_types( + &self, + arg_types: &[DataType], + ) -> datafusion::error::Result> { + use datafusion::arrow::datatypes::TimeUnit; + if arg_types.len() != 2 { + return Err(DataFusionError::Plan(format!( + "date_transform expects 2 arguments, got {}", + arg_types.len() + ))); + } + if !matches!(arg_types[0], DataType::Utf8 | DataType::LargeUtf8) { + return Err(DataFusionError::Plan(format!( + "date_transform first argument must be Utf8, got {}", + arg_types[0] + ))); + } + let coerced_second = match &arg_types[1] { + DataType::Date32 => DataType::Date32, + DataType::Timestamp(TimeUnit::Microsecond, _) => { + DataType::Timestamp(TimeUnit::Microsecond, None) + } + DataType::Timestamp(unit, _) => DataType::Timestamp(*unit, None), + other => { + return Err(DataFusionError::Plan(format!( + "date_transform second argument must be Date32 or Timestamp, got {other}" + ))) + } + }; + Ok(vec![DataType::Utf8, coerced_second]) + } + fn invoke_with_args( &self, args: ScalarFunctionArgs, diff --git a/datafusion_iceberg/src/table.rs b/datafusion_iceberg/src/table.rs index ee5c64bc..7358664a 100644 --- a/datafusion_iceberg/src/table.rs +++ b/datafusion_iceberg/src/table.rs @@ -432,7 +432,29 @@ async fn table_scan( None }; - let mut table_partition_cols = datafusion_partition_columns(partition_fields)?; + // Compute the subset of partition fields that are materialized via the + // Hive-style directory path (i.e. NOT already present in the parquet file + // body). Iceberg identity partitions on source columns duplicate those + // columns into both the file and the path; for the datafusion scan we must + // omit them from `table_partition_cols` so the parquet reader doesn't try + // to project them from the path. Column-level pruning still works for + // identity partitions via the per-file lower/upper bounds in + // PruneDataFiles, so we lose nothing by excluding them here. + let (file_partition_fields, drop_partition_indices): (Vec<&BoundPartitionField<'_>>, Vec) = { + use iceberg_rust::spec::partition::Transform; + let mut kept: Vec<&BoundPartitionField<'_>> = Vec::new(); + let mut dropped: Vec = Vec::new(); + for (i, pf) in partition_fields.iter().enumerate() { + if matches!(pf.transform(), Transform::Identity) && pf.name() == pf.source_name() { + dropped.push(i); + } else { + kept.push(pf); + } + } + (kept, dropped) + }; + + let mut table_partition_cols = datafusion_partition_columns(&file_partition_fields)?; let file_schema: SchemaRef = Arc::new((schema.fields()).try_into().unwrap()); @@ -472,11 +494,14 @@ async fn table_scan( physical_predicate.clone() { let partition_schema = Arc::new(ArrowSchema::new(table_partition_cols.clone())); - let partition_column_names = partition_fields + // Use only the file-path-materialized partition fields here so that + // predicates on identity-self-named columns (which are pruned via + // PruneDataFiles) are NOT routed through the manifest-level + // partition pruner. + let partition_column_names = file_partition_fields .iter() - .map(|field| Ok(field.source_name().to_owned())) - .collect::, Error>>() - .map_err(DataFusionIcebergError::from)?; + .map(|field| field.source_name().to_owned()) + .collect::>(); let partition_predicates = conjunction( filters @@ -633,6 +658,7 @@ async fn table_scan( .then(|(partition_value, mut delete_files)| { let object_store_url = object_store_url.clone(); let table_partition_cols = table_partition_cols.clone(); + let drop_partition_indices = drop_partition_indices.clone(); let statistics = statistics.clone(); let physical_predicate = physical_predicate.clone(); let schema = &schema; @@ -689,6 +715,7 @@ async fn table_scan( .try_fold(None, |acc, delete_manifest| { let object_store_url = object_store_url.clone(); let table_partition_cols = table_partition_cols.clone(); + let drop_partition_indices = drop_partition_indices.clone(); let statistics = statistics.clone(); let physical_predicate = physical_predicate.clone(); let schema = &schema; @@ -713,6 +740,7 @@ async fn table_scan( last_updated_ms, enable_data_file_path_column, manifest_path, + &drop_partition_indices, ) .unwrap(); data_files.push(data_file); @@ -741,6 +769,7 @@ async fn table_scan( last_updated_ms, enable_data_file_path_column, manifest_path, + &drop_partition_indices, )?; let delete_file_source = Arc::new( @@ -843,6 +872,7 @@ async fn table_scan( last_updated_ms, enable_data_file_path_column, manifest_path, + &drop_partition_indices, ) }) .collect::, _>>()?; @@ -892,6 +922,7 @@ async fn table_scan( last_updated_ms, enable_data_file_path_column, manifest_path, + &drop_partition_indices, ) .unwrap() }) @@ -930,7 +961,7 @@ async fn table_scan( } fn datafusion_partition_columns( - partition_fields: &[BoundPartitionField<'_>], + partition_fields: &[&BoundPartitionField<'_>], ) -> Result, DataFusionError> { let table_partition_cols: Vec = partition_fields .iter() @@ -1038,13 +1069,21 @@ fn generate_partitioned_file( last_updated_ms: i64, enable_data_file_path: bool, manifest_file_path: Option, + drop_partition_indices: &[usize], ) -> Result { let manifest_statistics = manifest_statistics(schema, manifest); + // Iceberg stores partition values in the order of the partition spec + // (one entry per partition field). Drop entries that correspond to + // identity-self-named partitions we've excluded from `table_partition_cols` + // so that `file.partition_values.len()` matches the filtered + // `partition_fields` length passed to datafusion's `FilePruner`. let mut partition_values = manifest .data_file() .partition() .iter() - .map(|x| { + .enumerate() + .filter(|(i, _)| !drop_partition_indices.contains(i)) + .map(|(_, x)| { x.as_ref() .map(value_to_scalarvalue) .unwrap_or(Ok(ScalarValue::Null)) diff --git a/iceberg-rust-spec/src/spec/manifest_list.rs b/iceberg-rust-spec/src/spec/manifest_list.rs index 4d47f8b0..727d479f 100644 --- a/iceberg-rust-spec/src/spec/manifest_list.rs +++ b/iceberg-rust-spec/src/spec/manifest_list.rs @@ -130,10 +130,13 @@ mod _serde { /// ID of the snapshot where the manifest file was added pub added_snapshot_id: i64, /// Number of entries in the manifest that have status ADDED (1), when null this is assumed to be non-zero + #[serde(rename = "added_data_files_count", alias = "added_files_count")] pub added_files_count: i32, /// Number of entries in the manifest that have status EXISTING (0), when null this is assumed to be non-zero + #[serde(rename = "existing_data_files_count", alias = "existing_files_count")] pub existing_files_count: i32, /// Number of entries in the manifest that have status DELETED (2), when null this is assumed to be non-zero + #[serde(rename = "deleted_data_files_count", alias = "deleted_files_count")] pub deleted_files_count: i32, /// Number of rows in all of files in the manifest that have status ADDED, when null this is assumed to be non-zero pub added_rows_count: i64, @@ -570,17 +573,17 @@ pub fn manifest_list_schema_v2() -> &'static AvroSchema { "field-id": 503 }, { - "name": "added_files_count", + "name": "added_data_files_count", "type": "int", "field-id": 504 }, { - "name": "existing_files_count", + "name": "existing_data_files_count", "type": "int", "field-id": 505 }, { - "name": "deleted_files_count", + "name": "deleted_data_files_count", "type": "int", "field-id": 506 }, @@ -843,4 +846,101 @@ mod tests { ); } } + + /// Simulates reading a v2 manifest list written by Apache Iceberg >= 1.0, + /// which uses the renamed field names `added_data_files_count`, + /// `existing_data_files_count`, `deleted_data_files_count`. Our reader + /// schema declares these as Avro field aliases, so resolution should succeed. + #[test] + pub fn test_manifest_list_v2_apache_field_names() { + // Writer schema using Apache Iceberg spec-v2 field names. + let apache_writer_schema = apache_avro::Schema::parse_str( + r#" + { + "type": "record", + "name": "manifest_file", + "fields": [ + {"name": "manifest_path", "type": "string", "field-id": 500}, + {"name": "manifest_length", "type": "long", "field-id": 501}, + {"name": "partition_spec_id", "type": "int", "field-id": 502}, + {"name": "content", "type": "int", "field-id": 517}, + {"name": "sequence_number", "type": "long", "field-id": 515}, + {"name": "min_sequence_number", "type": "long", "field-id": 516}, + {"name": "added_snapshot_id", "type": "long", "field-id": 503}, + {"name": "added_data_files_count", "type": "int", "field-id": 504}, + {"name": "existing_data_files_count", "type": "int", "field-id": 505}, + {"name": "deleted_data_files_count", "type": "int", "field-id": 506}, + {"name": "added_rows_count", "type": "long", "field-id": 512}, + {"name": "existing_rows_count", "type": "long", "field-id": 513}, + {"name": "deleted_rows_count", "type": "long", "field-id": 514}, + { + "name": "partitions", + "type": ["null", { + "type": "array", + "items": { + "type": "record", + "name": "r508", + "fields": [ + {"name": "contains_null", "type": "boolean", "field-id": 509}, + {"name": "contains_nan", "type": ["null", "boolean"], "field-id": 518}, + {"name": "lower_bound", "type": ["null", "bytes"], "field-id": 510}, + {"name": "upper_bound", "type": ["null", "bytes"], "field-id": 511} + ] + }, + "element-id": 508 + }], + "default": null, + "field-id": 507 + }, + { + "name": "key_metadata", + "type": ["null", "bytes"], + "default": null, + "field-id": 519 + } + ] + } + "#, + ) + .unwrap(); + + // Build an Avro record using the Apache field names. + let mut record = apache_avro::types::Record::new(&apache_writer_schema).unwrap(); + record.put("manifest_path", "s3://bucket/path/manifest-0.avro"); + record.put("manifest_length", 1200_i64); + record.put("partition_spec_id", 0_i32); + record.put("content", 0_i32); + record.put("sequence_number", 566_i64); + record.put("min_sequence_number", 0_i64); + record.put("added_snapshot_id", 39487483032_i64); + record.put("added_data_files_count", 1_i32); + record.put("existing_data_files_count", 2_i32); + record.put("deleted_data_files_count", 0_i32); + record.put("added_rows_count", 1000_i64); + record.put("existing_rows_count", 8000_i64); + record.put("deleted_rows_count", 0_i64); + record.put("partitions", AvroValue::Union(0, Box::new(AvroValue::Null))); + record.put("key_metadata", AvroValue::Union(0, Box::new(AvroValue::Null))); + + let mut writer = apache_avro::Writer::new(&apache_writer_schema, Vec::new()); + writer.append(record).unwrap(); + let encoded = writer.into_inner().unwrap(); + + // Read with iceberg-rust's schema (aliases declared). + let reader_schema = manifest_list_schema_v2(); + let reader = apache_avro::Reader::with_schema(reader_schema, &*encoded).unwrap(); + + let mut count = 0; + for record in reader { + let result = + apache_avro::from_value::<_serde::ManifestListEntryV2>(&record.unwrap()).unwrap(); + assert_eq!(result.added_files_count, 1); + assert_eq!(result.existing_files_count, 2); + assert_eq!(result.deleted_files_count, 0); + assert_eq!(result.added_rows_count, 1000); + assert_eq!(result.manifest_path, "s3://bucket/path/manifest-0.avro"); + count += 1; + } + assert_eq!(count, 1, "expected exactly one record"); + } }