Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions datafusion_iceberg/src/pruning_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,14 @@ struct DateTransform {

impl DateTransform {
fn new() -> Self {
// Accept any second-argument type via `TypeSignature::UserDefined` and
// normalize it in `coerce_types`. The underlying transform is
// timezone-agnostic (it operates on the i64 microseconds-since-epoch),
// so any `Timestamp(Microsecond, *)` is a valid input — we just need
// to strip the timezone metadata so the physical invocation sees
// `Timestamp(Microsecond, None)`.
let signature = Signature {
type_signature: TypeSignature::OneOf(vec![
TypeSignature::Exact(vec![DataType::Utf8, DataType::Date32]),
TypeSignature::Exact(vec![
DataType::Utf8,
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Microsecond, None),
]),
]),
type_signature: TypeSignature::UserDefined,
volatility: Volatility::Immutable,
};
Self { signature }
Expand All @@ -388,6 +388,38 @@ impl ScalarUDFImpl for DateTransform {
Ok(DataType::Int32)
}

fn coerce_types(
&self,
arg_types: &[DataType],
) -> datafusion::error::Result<Vec<DataType>> {
use datafusion::arrow::datatypes::TimeUnit;
if arg_types.len() != 2 {
return Err(DataFusionError::Plan(format!(
"date_transform expects 2 arguments, got {}",
arg_types.len()
)));
}
if !matches!(arg_types[0], DataType::Utf8 | DataType::LargeUtf8) {
return Err(DataFusionError::Plan(format!(
"date_transform first argument must be Utf8, got {}",
arg_types[0]
)));
}
let coerced_second = match &arg_types[1] {
DataType::Date32 => DataType::Date32,
DataType::Timestamp(TimeUnit::Microsecond, _) => {
DataType::Timestamp(TimeUnit::Microsecond, None)
}
DataType::Timestamp(unit, _) => DataType::Timestamp(*unit, None),
other => {
return Err(DataFusionError::Plan(format!(
"date_transform second argument must be Date32 or Timestamp, got {other}"
)))
}
};
Ok(vec![DataType::Utf8, coerced_second])
}

fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
Expand Down
53 changes: 46 additions & 7 deletions datafusion_iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,29 @@ async fn table_scan(
None
};

let mut table_partition_cols = datafusion_partition_columns(partition_fields)?;
// Compute the subset of partition fields that are materialized via the
// Hive-style directory path (i.e. NOT already present in the parquet file
// body). Iceberg identity partitions on source columns duplicate those
// columns into both the file and the path; for the datafusion scan we must
// omit them from `table_partition_cols` so the parquet reader doesn't try
// to project them from the path. Column-level pruning still works for
// identity partitions via the per-file lower/upper bounds in
// PruneDataFiles, so we lose nothing by excluding them here.
let (file_partition_fields, drop_partition_indices): (Vec<&BoundPartitionField<'_>>, Vec<usize>) = {
use iceberg_rust::spec::partition::Transform;
let mut kept: Vec<&BoundPartitionField<'_>> = Vec::new();
let mut dropped: Vec<usize> = Vec::new();
for (i, pf) in partition_fields.iter().enumerate() {
if matches!(pf.transform(), Transform::Identity) && pf.name() == pf.source_name() {
dropped.push(i);
} else {
kept.push(pf);
}
}
(kept, dropped)
};

let mut table_partition_cols = datafusion_partition_columns(&file_partition_fields)?;

let file_schema: SchemaRef = Arc::new((schema.fields()).try_into().unwrap());

Expand Down Expand Up @@ -472,11 +494,14 @@ async fn table_scan(
physical_predicate.clone()
{
let partition_schema = Arc::new(ArrowSchema::new(table_partition_cols.clone()));
let partition_column_names = partition_fields
// Use only the file-path-materialized partition fields here so that
// predicates on identity-self-named columns (which are pruned via
// PruneDataFiles) are NOT routed through the manifest-level
// partition pruner.
let partition_column_names = file_partition_fields
.iter()
.map(|field| Ok(field.source_name().to_owned()))
.collect::<Result<HashSet<_>, Error>>()
.map_err(DataFusionIcebergError::from)?;
.map(|field| field.source_name().to_owned())
.collect::<HashSet<String>>();

let partition_predicates = conjunction(
filters
Expand Down Expand Up @@ -633,6 +658,7 @@ async fn table_scan(
.then(|(partition_value, mut delete_files)| {
let object_store_url = object_store_url.clone();
let table_partition_cols = table_partition_cols.clone();
let drop_partition_indices = drop_partition_indices.clone();
let statistics = statistics.clone();
let physical_predicate = physical_predicate.clone();
let schema = &schema;
Expand Down Expand Up @@ -689,6 +715,7 @@ async fn table_scan(
.try_fold(None, |acc, delete_manifest| {
let object_store_url = object_store_url.clone();
let table_partition_cols = table_partition_cols.clone();
let drop_partition_indices = drop_partition_indices.clone();
let statistics = statistics.clone();
let physical_predicate = physical_predicate.clone();
let schema = &schema;
Expand All @@ -713,6 +740,7 @@ async fn table_scan(
last_updated_ms,
enable_data_file_path_column,
manifest_path,
&drop_partition_indices,
)
.unwrap();
data_files.push(data_file);
Expand Down Expand Up @@ -741,6 +769,7 @@ async fn table_scan(
last_updated_ms,
enable_data_file_path_column,
manifest_path,
&drop_partition_indices,
)?;

let delete_file_source = Arc::new(
Expand Down Expand Up @@ -843,6 +872,7 @@ async fn table_scan(
last_updated_ms,
enable_data_file_path_column,
manifest_path,
&drop_partition_indices,
)
})
.collect::<Result<Vec<_>, _>>()?;
Expand Down Expand Up @@ -892,6 +922,7 @@ async fn table_scan(
last_updated_ms,
enable_data_file_path_column,
manifest_path,
&drop_partition_indices,
)
.unwrap()
})
Expand Down Expand Up @@ -930,7 +961,7 @@ async fn table_scan(
}

fn datafusion_partition_columns(
partition_fields: &[BoundPartitionField<'_>],
partition_fields: &[&BoundPartitionField<'_>],
) -> Result<Vec<Field>, DataFusionError> {
let table_partition_cols: Vec<Field> = partition_fields
.iter()
Expand Down Expand Up @@ -1038,13 +1069,21 @@ fn generate_partitioned_file(
last_updated_ms: i64,
enable_data_file_path: bool,
manifest_file_path: Option<ManifestPath>,
drop_partition_indices: &[usize],
) -> Result<PartitionedFile, DataFusionError> {
let manifest_statistics = manifest_statistics(schema, manifest);
// Iceberg stores partition values in the order of the partition spec
// (one entry per partition field). Drop entries that correspond to
// identity-self-named partitions we've excluded from `table_partition_cols`
// so that `file.partition_values.len()` matches the filtered
// `partition_fields` length passed to datafusion's `FilePruner`.
let mut partition_values = manifest
.data_file()
.partition()
.iter()
.map(|x| {
.enumerate()
.filter(|(i, _)| !drop_partition_indices.contains(i))
.map(|(_, x)| {
x.as_ref()
.map(value_to_scalarvalue)
.unwrap_or(Ok(ScalarValue::Null))
Expand Down
106 changes: 103 additions & 3 deletions iceberg-rust-spec/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,13 @@ mod _serde {
/// ID of the snapshot where the manifest file was added
pub added_snapshot_id: i64,
/// Number of entries in the manifest that have status ADDED (1), when null this is assumed to be non-zero
#[serde(rename = "added_data_files_count", alias = "added_files_count")]
pub added_files_count: i32,
/// Number of entries in the manifest that have status EXISTING (0), when null this is assumed to be non-zero
#[serde(rename = "existing_data_files_count", alias = "existing_files_count")]
pub existing_files_count: i32,
/// Number of entries in the manifest that have status DELETED (2), when null this is assumed to be non-zero
#[serde(rename = "deleted_data_files_count", alias = "deleted_files_count")]
pub deleted_files_count: i32,
/// Number of rows in all of files in the manifest that have status ADDED, when null this is assumed to be non-zero
pub added_rows_count: i64,
Expand Down Expand Up @@ -570,17 +573,17 @@ pub fn manifest_list_schema_v2() -> &'static AvroSchema {
"field-id": 503
},
{
"name": "added_files_count",
"name": "added_data_files_count",
"type": "int",
"field-id": 504
},
{
"name": "existing_files_count",
"name": "existing_data_files_count",
"type": "int",
"field-id": 505
},
{
"name": "deleted_files_count",
"name": "deleted_data_files_count",
"type": "int",
"field-id": 506
},
Expand Down Expand Up @@ -843,4 +846,101 @@ mod tests {
);
}
}

/// Simulates reading a v2 manifest list written by Apache Iceberg >= 1.0,
/// which uses the renamed field names `added_data_files_count`,
/// `existing_data_files_count`, `deleted_data_files_count`. Our reader
/// schema declares these as Avro field aliases, so resolution should succeed.
#[test]
pub fn test_manifest_list_v2_apache_field_names() {
// Writer schema using Apache Iceberg spec-v2 field names.
let apache_writer_schema = apache_avro::Schema::parse_str(
r#"
{
"type": "record",
"name": "manifest_file",
"fields": [
{"name": "manifest_path", "type": "string", "field-id": 500},
{"name": "manifest_length", "type": "long", "field-id": 501},
{"name": "partition_spec_id", "type": "int", "field-id": 502},
{"name": "content", "type": "int", "field-id": 517},
{"name": "sequence_number", "type": "long", "field-id": 515},
{"name": "min_sequence_number", "type": "long", "field-id": 516},
{"name": "added_snapshot_id", "type": "long", "field-id": 503},
{"name": "added_data_files_count", "type": "int", "field-id": 504},
{"name": "existing_data_files_count", "type": "int", "field-id": 505},
{"name": "deleted_data_files_count", "type": "int", "field-id": 506},
{"name": "added_rows_count", "type": "long", "field-id": 512},
{"name": "existing_rows_count", "type": "long", "field-id": 513},
{"name": "deleted_rows_count", "type": "long", "field-id": 514},
{
"name": "partitions",
"type": ["null", {
"type": "array",
"items": {
"type": "record",
"name": "r508",
"fields": [
{"name": "contains_null", "type": "boolean", "field-id": 509},
{"name": "contains_nan", "type": ["null", "boolean"], "field-id": 518},
{"name": "lower_bound", "type": ["null", "bytes"], "field-id": 510},
{"name": "upper_bound", "type": ["null", "bytes"], "field-id": 511}
]
},
"element-id": 508
}],
"default": null,
"field-id": 507
},
{
"name": "key_metadata",
"type": ["null", "bytes"],
"default": null,
"field-id": 519
}
]
}
"#,
)
.unwrap();

// Build an Avro record using the Apache field names.
let mut record = apache_avro::types::Record::new(&apache_writer_schema).unwrap();
record.put("manifest_path", "s3://bucket/path/manifest-0.avro");
record.put("manifest_length", 1200_i64);
record.put("partition_spec_id", 0_i32);
record.put("content", 0_i32);
record.put("sequence_number", 566_i64);
record.put("min_sequence_number", 0_i64);
record.put("added_snapshot_id", 39487483032_i64);
record.put("added_data_files_count", 1_i32);
record.put("existing_data_files_count", 2_i32);
record.put("deleted_data_files_count", 0_i32);
record.put("added_rows_count", 1000_i64);
record.put("existing_rows_count", 8000_i64);
record.put("deleted_rows_count", 0_i64);
record.put("partitions", AvroValue::Union(0, Box::new(AvroValue::Null)));
record.put("key_metadata", AvroValue::Union(0, Box::new(AvroValue::Null)));

let mut writer = apache_avro::Writer::new(&apache_writer_schema, Vec::new());
writer.append(record).unwrap();
let encoded = writer.into_inner().unwrap();

// Read with iceberg-rust's schema (aliases declared).
let reader_schema = manifest_list_schema_v2();
let reader = apache_avro::Reader::with_schema(reader_schema, &*encoded).unwrap();

let mut count = 0;
for record in reader {
let result =
apache_avro::from_value::<_serde::ManifestListEntryV2>(&record.unwrap()).unwrap();
assert_eq!(result.added_files_count, 1);
assert_eq!(result.existing_files_count, 2);
assert_eq!(result.deleted_files_count, 0);
assert_eq!(result.added_rows_count, 1000);
assert_eq!(result.manifest_path, "s3://bucket/path/manifest-0.avro");
count += 1;
}
assert_eq!(count, 1, "expected exactly one record");
}
}