diff --git a/Cargo.lock b/Cargo.lock index 1673f49f9..c6100b54e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5841,6 +5841,7 @@ dependencies = [ "sail-common", "sail-common-datafusion", "sail-delta-lake", + "sail-duck-lake", "sail-iceberg", "serde", "serde_yaml", @@ -5880,6 +5881,24 @@ dependencies = [ "uuid", ] +[[package]] +name = "sail-duck-lake" +version = "0.4.3" +dependencies = [ + "async-trait", + "chrono", + "datafusion", + "datafusion-common", + "log", + "object_store", + "pyo3", + "serde", + "serde_json", + "tokio", + "url", + "uuid", +] + [[package]] name = "sail-execution" version = "0.4.3" diff --git a/crates/sail-data-source/Cargo.toml b/crates/sail-data-source/Cargo.toml index 64e7f5728..cee858d48 100644 --- a/crates/sail-data-source/Cargo.toml +++ b/crates/sail-data-source/Cargo.toml @@ -10,6 +10,7 @@ workspace = true sail-common = { path = "../sail-common" } sail-common-datafusion = { path = "../sail-common-datafusion" } sail-delta-lake = { path = "../sail-delta-lake" } +sail-duck-lake = { path = "../sail-duck-lake" } sail-iceberg = { path = "../sail-iceberg" } async-trait = { workspace = true } diff --git a/crates/sail-data-source/build.rs b/crates/sail-data-source/build.rs index d379ca0f2..9fc1c3a7e 100644 --- a/crates/sail-data-source/build.rs +++ b/crates/sail-data-source/build.rs @@ -136,6 +136,8 @@ fn main() -> Result<(), Box> { build_options("ParquetWriteOptions", "parquet_write")?; build_options("DeltaReadOptions", "delta_read")?; build_options("DeltaWriteOptions", "delta_write")?; + build_options("DuckLakeReadOptions", "ducklake_read")?; + build_options("DuckLakeWriteOptions", "ducklake_write")?; build_options("IcebergReadOptions", "iceberg_read")?; build_options("IcebergWriteOptions", "iceberg_write")?; build_options("TextReadOptions", "text_read")?; diff --git a/crates/sail-data-source/src/formats/ducklake.rs b/crates/sail-data-source/src/formats/ducklake.rs new file mode 100644 index 000000000..6e240922b --- /dev/null +++ b/crates/sail-data-source/src/formats/ducklake.rs @@ -0,0 +1,276 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::catalog::{Session, TableProvider}; +use datafusion::common::{not_impl_err, Result}; +use datafusion::physical_plan::ExecutionPlan; +use sail_common_datafusion::datasource::{SinkInfo, SourceInfo, TableFormat}; +use sail_duck_lake::create_ducklake_provider; +use sail_duck_lake::options::DuckLakeOptions; +use url::Url; + +use crate::options::{load_default_options, load_options, DuckLakeReadOptions}; + +#[derive(Debug, Default)] +pub struct DuckLakeDataSourceFormat; + +#[async_trait] +impl TableFormat for DuckLakeDataSourceFormat { + fn name(&self) -> &str { + "ducklake" + } + + async fn create_provider( + &self, + ctx: &dyn Session, + info: SourceInfo, + ) -> Result> { + let SourceInfo { + paths, + schema: _, + constraints: _, + partition_by: _, + bucket_by: _, + sort_order: _, + options, + } = info; + + // Prefer location-first (ducklake+sqlite://...) if provided + let loc_opts = match paths.as_slice() { + [single] => parse_ducklake_location(single)?, + _ => None, + }; + + let ducklake_options = if let Some(mut base_opts) = loc_opts { + // Merge additive options (snapshot_id, case_sensitive) from defaults and provided options + let mut merged = DuckLakeOptions::default(); + apply_ducklake_read_options(load_default_options()?, &mut merged)?; + for opt in options { + apply_ducklake_read_options(load_options(opt)?, &mut merged)?; + } + if let Some(snap) = merged.snapshot_id { + base_opts.snapshot_id = Some(snap); + } + base_opts.case_sensitive = merged.case_sensitive; + base_opts.validate()?; + base_opts + } else { + log::warn!( + "DuckLake: location not provided; falling back to options; location is preferred" + ); + resolve_ducklake_read_options(options)? + }; + + create_ducklake_provider(ctx, ducklake_options).await + } + + async fn create_writer( + &self, + _ctx: &dyn Session, + _info: SinkInfo, + ) -> Result> { + not_impl_err!("Writing to DuckLake tables is not yet supported") + } +} + +fn apply_ducklake_read_options(from: DuckLakeReadOptions, to: &mut DuckLakeOptions) -> Result<()> { + if let Some(url) = from.url { + to.url = url; + } + if let Some(table) = from.table { + to.table = table; + } + if let Some(base_path) = from.base_path { + to.base_path = base_path; + } + if let Some(snapshot_id) = from.snapshot_id { + to.snapshot_id = Some(snapshot_id); + } + if let Some(schema) = from.schema { + to.schema = Some(schema); + } + if let Some(case_sensitive) = from.case_sensitive { + to.case_sensitive = case_sensitive; + } + Ok(()) +} + +pub fn resolve_ducklake_read_options( + options: Vec>, +) -> Result { + let mut ducklake_options = DuckLakeOptions::default(); + apply_ducklake_read_options(load_default_options()?, &mut ducklake_options)?; + for opt in options { + apply_ducklake_read_options(load_options(opt)?, &mut ducklake_options)?; + } + ducklake_options.validate()?; + Ok(ducklake_options) +} + +// Parse a location string like: +// ducklake+sqlite:///path/to/metadata.ducklake/analytics/metrics?base_path=file:///path/to/data/&snapshot_id=1 +// Returns Ok(None) if the scheme is not ducklake+* +fn parse_ducklake_location(path: &str) -> Result> { + if !path.starts_with("ducklake+") { + return Ok(None); + } + let url = + Url::parse(path).map_err(|e| datafusion::common::DataFusionError::External(Box::new(e)))?; + let scheme = url.scheme(); + if !scheme.starts_with("ducklake+") { + return Ok(None); + } + + let meta_scheme = &scheme["ducklake+".len()..]; + if meta_scheme != "sqlite" && meta_scheme != "postgres" && meta_scheme != "postgresql" { + return Err(datafusion::common::DataFusionError::Plan(format!( + "Unsupported DuckLake meta scheme: {}", + meta_scheme + ))); + } + + // Common: parse query params + let qp: Vec<(String, String)> = url + .query_pairs() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + + // Required base_path + let base_path = qp + .iter() + .find(|(k, _)| k == "base_path") + .map(|(_, v)| v.to_string()) + .ok_or_else(|| { + datafusion::common::DataFusionError::Plan("Missing base_path query param".into()) + })?; + + // Optional params + let snapshot_id = qp + .iter() + .find(|(k, _)| k == "snapshot_id") + .and_then(|(_, v)| v.parse::().ok()); + let case_sensitive = qp + .iter() + .find(|(k, _)| k == "case_sensitive") + .is_some_and(|(_, v)| v == "true"); + + let url_str; + let table: String; + + if meta_scheme == "sqlite" { + // Identify metadata file (*.ducklake) and table path segments after it + let segments: Vec = url + .path_segments() + .map(|s| s.map(|p| p.to_string()).collect()) + .unwrap_or_else(Vec::new); + + let split_idx = segments + .iter() + .position(|s| s.ends_with(".ducklake")) + .ok_or_else(|| { + datafusion::common::DataFusionError::Plan( + "Missing metadata .ducklake file in location".into(), + ) + })?; + + let db_parts = &segments[..=split_idx]; + let table_parts = &segments[split_idx + 1..]; + + table = match table_parts { + [t] => t.clone(), + [s, t] => format!("{}.{t}", s), + [] => qp + .iter() + .find(|(k, _)| k == "table") + .map(|(_, v)| v.to_string()) + .ok_or_else(|| { + datafusion::common::DataFusionError::Plan("Missing table in location".into()) + })?, + _ => { + return Err(datafusion::common::DataFusionError::Plan( + "Invalid table path in location".into(), + )); + } + }; + + // Reconstruct metadata URL (strip query, keep path up to .ducklake) + let mut meta_url = url.clone(); + meta_url.set_query(None); + let db_path = format!("/{}", db_parts.join("/")); + meta_url.set_path(&db_path); + let auth = meta_url.authority().to_string(); + url_str = if auth.is_empty() { + format!("{}://{}", meta_scheme, meta_url.path()) + } else { + format!("{}://{}{}", meta_scheme, auth, meta_url.path()) + }; + } else { + // postgres/postgresql: expect path like /dbname[/schema[/table]] + let segments: Vec = url + .path_segments() + .map(|s| s.map(|p| p.to_string()).collect()) + .unwrap_or_else(Vec::new); + if segments.is_empty() { + return Err(datafusion::common::DataFusionError::Plan( + "Missing database name in location".into(), + )); + } + let dbname = &segments[0]; + let table_parts = &segments[1..]; + table = match table_parts { + [t] => t.clone(), + [s, t] => format!("{}.{t}", s), + [] => qp + .iter() + .find(|(k, _)| k == "table") + .map(|(_, v)| v.to_string()) + .ok_or_else(|| { + datafusion::common::DataFusionError::Plan("Missing table in location".into()) + })?, + _ => { + return Err(datafusion::common::DataFusionError::Plan( + "Invalid table path in location".into(), + )); + } + }; + + // Reconstruct metadata URL to include scheme, authority, and /dbname, preserving + // connection query params but excluding DuckLake-specific params. + let mut meta_url = url.clone(); + // Preserve only non-ducklake query params + let filtered: Vec<(String, String)> = qp + .into_iter() + .filter(|(k, _)| { + k != "base_path" && k != "snapshot_id" && k != "case_sensitive" && k != "table" + }) + .collect(); + if filtered.is_empty() { + meta_url.set_query(None); + } else { + let mut serializer = url::form_urlencoded::Serializer::new(String::new()); + for (k, v) in filtered { + serializer.append_pair(&k, &v); + } + let q = serializer.finish(); + meta_url.set_query(Some(&q)); + } + let db_path = format!("/{}", dbname); + meta_url.set_path(&db_path); + let auth = meta_url.authority().to_string(); + url_str = if auth.is_empty() { + format!("{}://{}", meta_scheme, meta_url.path()) + } else { + format!("{}://{}{}", meta_scheme, auth, meta_url.path()) + }; + } + + Ok(Some(DuckLakeOptions { + url: url_str, + table, + base_path, + snapshot_id, + schema: None, + case_sensitive, + })) +} diff --git a/crates/sail-data-source/src/formats/mod.rs b/crates/sail-data-source/src/formats/mod.rs index 2998e8a1b..c1d8dfe7a 100644 --- a/crates/sail-data-source/src/formats/mod.rs +++ b/crates/sail-data-source/src/formats/mod.rs @@ -4,6 +4,7 @@ pub mod binary; pub mod console; pub mod csv; pub mod delta; +pub mod ducklake; pub mod iceberg; pub mod json; pub mod listing; diff --git a/crates/sail-data-source/src/options/data/ducklake_read.yaml b/crates/sail-data-source/src/options/data/ducklake_read.yaml new file mode 100644 index 000000000..35b991185 --- /dev/null +++ b/crates/sail-data-source/src/options/data/ducklake_read.yaml @@ -0,0 +1,53 @@ +# Options for reading from a DuckLake table. + +- key: url + description: | + Metadata database connection string. + Supported formats: + - SQLite: sqlite:///path/to/metadata.db + - PostgreSQL: postgres://user:password@host:port/database + - PostgreSQL: postgresql://user:password@host:port/database + supported: true + rust_type: String + +- key: table + description: | + Table name to read. Format: 'table_name' or 'schema.table_name'. + If schema is omitted, defaults to 'main'. + supported: true + rust_type: String + +- key: base_path + description: | + Base URL for resolving relative data file paths. + Must be a valid URL (e.g., s3://bucket/prefix/, file:///data/, gs://bucket/). + supported: true + rust_type: String + +- key: snapshot_id + aliases: + - snapshotId + description: | + Specific snapshot ID to read (integer). + If omitted, reads the current (latest) snapshot. + supported: true + rust_type: u64 + rust_deserialize_with: crate::options::serde::deserialize_u64 + +- key: schema + description: | + Schema name override (string). + Rarely needed; use qualified table name instead. + supported: true + rust_type: String + +- key: case_sensitive + aliases: + - caseSensitive + description: | + Case-sensitive column matching. + default: "false" + supported: true + rust_type: bool + rust_deserialize_with: crate::options::serde::deserialize_bool + diff --git a/crates/sail-data-source/src/options/data/ducklake_write.yaml b/crates/sail-data-source/src/options/data/ducklake_write.yaml new file mode 100644 index 000000000..1025981c0 --- /dev/null +++ b/crates/sail-data-source/src/options/data/ducklake_write.yaml @@ -0,0 +1,144 @@ +- key: data_inlining_row_limit + aliases: + - dataInliningRowLimit + description: | + Maximum number of rows to inline in a single insert. + When set to a positive value and the metadata catalog is DuckDB, small inserts can be stored directly + in the DuckLake metadata instead of creating separate Parquet data files. + supported: true + rust_type: u64 + rust_deserialize_with: crate::options::serde::deserialize_u64 + +- key: parquet_compression + aliases: + - parquetCompression + description: | + Compression algorithm for Parquet files written by DuckLake. + Supported values: uncompressed, snappy, gzip, zstd, brotli, lz4, lz4_raw. + supported: true + rust_type: String + +- key: parquet_version + aliases: + - parquetVersion + description: | + Parquet format version used for newly written data files. + Supported values are 1 and 2. + supported: true + rust_type: u64 + rust_deserialize_with: crate::options::serde::deserialize_u64 + +- key: parquet_compression_level + aliases: + - parquetCompressionLevel + description: | + Compression level for the selected Parquet compression codec. + supported: true + rust_type: u64 + rust_deserialize_with: crate::options::serde::deserialize_u64 + +- key: parquet_row_group_size + aliases: + - parquetRowGroupSize + description: | + Number of rows per row group in Parquet files written by DuckLake. + supported: true + rust_type: u64 + rust_deserialize_with: crate::options::serde::deserialize_u64 + +- key: parquet_row_group_size_bytes + aliases: + - parquetRowGroupSizeBytes + description: | + Approximate target size in bytes per Parquet row group. + Accepts DuckDB-style size strings such as "10KB" or "512MB". + supported: true + rust_type: String + +- key: hive_file_pattern + aliases: + - hiveFilePattern + description: | + Whether partitioned data should be written using a hive-style folder layout (one directory per partition value). + supported: true + rust_type: bool + rust_deserialize_with: crate::options::serde::deserialize_bool + +- key: target_file_size + aliases: + - targetFileSize + description: | + Target data file size for insertion and compaction operations. + Accepts DuckDB-style size strings such as "5.4MB". + supported: true + rust_type: String + +- key: require_commit_message + aliases: + - requireCommitMessage + description: | + If true, every snapshot commit must have an explicit commit message set (for example via ducklake.set_commit_message). + supported: true + rust_type: bool + rust_deserialize_with: crate::options::serde::deserialize_bool + +- key: rewrite_delete_threshold + aliases: + - rewriteDeleteThreshold + description: | + Threshold that determines the minimum fraction of data that must be removed from a file before a rewrite is warranted. + Valid range is from 0 to 1. + supported: true + rust_type: f64 + rust_deserialize_with: crate::options::serde::deserialize_f64 + +- key: delete_older_than + aliases: + - deleteOlderThan + description: | + How old unused files must be in order to be removed by cleanup functions such as + ducklake_delete_orphaned_files or ducklake_cleanup_old_files. + Accepts interval strings such as "1 week" or "1 millisecond". + supported: true + rust_type: String + +- key: expire_older_than + aliases: + - expireOlderThan + description: | + Default age threshold for snapshots to be expired by ducklake_expire_snapshots. + Accepts interval strings such as "1 week" or "1 millisecond". + supported: true + rust_type: String + +- key: compaction_schema + aliases: + - compactionSchema + description: | + Predefined schema used as the default target for compaction helper functions such as + ducklake_flush_inlined_data, ducklake_merge_adjacent_files, ducklake_rewrite_data_files, + and ducklake_delete_orphaned_files. + supported: true + rust_type: String + +- key: compaction_table + aliases: + - compactionTable + description: | + Predefined table used as the default target for compaction helper functions such as + ducklake_flush_inlined_data, ducklake_merge_adjacent_files, ducklake_rewrite_data_files, + and ducklake_delete_orphaned_files. + supported: true + rust_type: String + +- key: per_thread_output + aliases: + - perThreadOutput + description: | + Whether to create separate output files per thread during parallel insertion. + When true, each thread writes its own output files; when false, a single output file is produced. + supported: true + rust_type: bool + rust_deserialize_with: crate::options::serde::deserialize_bool + + diff --git a/crates/sail-data-source/src/options/mod.rs b/crates/sail-data-source/src/options/mod.rs index 5682e4804..8075203b9 100644 --- a/crates/sail-data-source/src/options/mod.rs +++ b/crates/sail-data-source/src/options/mod.rs @@ -3,8 +3,9 @@ mod serde; pub use internal::{ BinaryReadOptions, CsvReadOptions, CsvWriteOptions, DeltaReadOptions, DeltaWriteOptions, - IcebergReadOptions, IcebergWriteOptions, JsonReadOptions, JsonWriteOptions, ParquetReadOptions, - ParquetWriteOptions, TextReadOptions, TextWriteOptions, + DuckLakeReadOptions, DuckLakeWriteOptions, IcebergReadOptions, IcebergWriteOptions, + JsonReadOptions, JsonWriteOptions, ParquetReadOptions, ParquetWriteOptions, TextReadOptions, + TextWriteOptions, }; #[cfg(test)] pub use loader::build_options; @@ -20,6 +21,8 @@ pub(crate) mod internal { include!(concat!(env!("OUT_DIR"), "/options/parquet_write.rs")); include!(concat!(env!("OUT_DIR"), "/options/delta_read.rs")); include!(concat!(env!("OUT_DIR"), "/options/delta_write.rs")); + include!(concat!(env!("OUT_DIR"), "/options/ducklake_read.rs")); + include!(concat!(env!("OUT_DIR"), "/options/ducklake_write.rs")); include!(concat!(env!("OUT_DIR"), "/options/iceberg_read.rs")); include!(concat!(env!("OUT_DIR"), "/options/iceberg_write.rs")); include!(concat!(env!("OUT_DIR"), "/options/text_read.rs")); diff --git a/crates/sail-data-source/src/registry.rs b/crates/sail-data-source/src/registry.rs index 5fd60ceb5..0f4346329 100644 --- a/crates/sail-data-source/src/registry.rs +++ b/crates/sail-data-source/src/registry.rs @@ -11,6 +11,7 @@ use crate::formats::binary::BinaryTableFormat; use crate::formats::console::ConsoleTableFormat; use crate::formats::csv::CsvTableFormat; use crate::formats::delta::DeltaTableFormat; +use crate::formats::ducklake::DuckLakeDataSourceFormat; use crate::formats::iceberg::IcebergDataSourceFormat; use crate::formats::json::JsonTableFormat; use crate::formats::parquet::ParquetTableFormat; @@ -43,6 +44,7 @@ impl TableFormatRegistry { registry.register_format(Arc::new(BinaryTableFormat::default())); registry.register_format(Arc::new(CsvTableFormat::default())); registry.register_format(Arc::new(DeltaTableFormat)); + registry.register_format(Arc::new(DuckLakeDataSourceFormat)); registry.register_format(Arc::new(IcebergDataSourceFormat)); registry.register_format(Arc::new(JsonTableFormat::default())); registry.register_format(Arc::new(ParquetTableFormat::default())); diff --git a/crates/sail-duck-lake/Cargo.toml b/crates/sail-duck-lake/Cargo.toml new file mode 100644 index 000000000..0e45559d6 --- /dev/null +++ b/crates/sail-duck-lake/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "sail-duck-lake" +version = { workspace = true } +edition = { workspace = true } + +[dependencies] + +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true, features = ["serde"] } +async-trait = { workspace = true } +datafusion = { workspace = true } +datafusion-common = { workspace = true } +object_store = { workspace = true } +url = { workspace = true } +log = { workspace = true } +tokio = { workspace = true } +pyo3 = { workspace = true } + +[lints] +workspace = true + diff --git a/crates/sail-duck-lake/src/datasource/arrow.rs b/crates/sail-duck-lake/src/datasource/arrow.rs new file mode 100644 index 000000000..da74aa2ec --- /dev/null +++ b/crates/sail-duck-lake/src/datasource/arrow.rs @@ -0,0 +1,90 @@ +use std::sync::Arc; + +use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}; +use datafusion::common::{DataFusionError, Result as DataFusionResult}; + +use crate::spec::ColumnInfo; + +impl TryFrom<&ColumnInfo> for Field { + type Error = DataFusionError; + + fn try_from(col: &ColumnInfo) -> Result { + let data_type = parse_ducklake_type(&col.column_type)?; + Ok(Field::new(&col.column_name, data_type, col.nulls_allowed)) + } +} + +pub fn parse_ducklake_type(type_str: &str) -> DataFusionResult { + let type_str = type_str.trim().to_uppercase(); + + match type_str.as_str() { + "BOOLEAN" | "BOOL" => Ok(DataType::Boolean), + "TINYINT" | "INT8" => Ok(DataType::Int8), + "SMALLINT" | "INT16" => Ok(DataType::Int16), + "INTEGER" | "INT" | "INT32" => Ok(DataType::Int32), + "BIGINT" | "INT64" => Ok(DataType::Int64), + "UTINYINT" | "UINT8" => Ok(DataType::UInt8), + "USMALLINT" | "UINT16" => Ok(DataType::UInt16), + "UINTEGER" | "UINT" | "UINT32" => Ok(DataType::UInt32), + "UBIGINT" | "UINT64" => Ok(DataType::UInt64), + "FLOAT" | "FLOAT32" => Ok(DataType::Float32), + "DOUBLE" | "FLOAT64" => Ok(DataType::Float64), + "DATE" | "DATE32" => Ok(DataType::Date32), + "TIMESTAMP" => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)), + "TIME" | "TIME64" => Ok(DataType::Time64(TimeUnit::Microsecond)), + "INTERVAL" => Ok(DataType::Interval( + datafusion::arrow::datatypes::IntervalUnit::MonthDayNano, + )), + "VARCHAR" | "TEXT" | "STRING" | "UTF8" => Ok(DataType::Utf8), + "BLOB" | "BYTEA" | "BINARY" => Ok(DataType::Binary), + "UUID" => Ok(DataType::Utf8), + s if s.starts_with("DECIMAL(") => { + let inner = s + .strip_prefix("DECIMAL(") + .and_then(|s| s.strip_suffix(")")) + .ok_or_else(|| { + DataFusionError::Plan(format!("Invalid DECIMAL type: {}", type_str)) + })?; + let parts: Vec<&str> = inner.split(',').collect(); + match parts.as_slice() { + [precision, scale] => { + let precision = precision.trim().parse::().map_err(|_| { + DataFusionError::Plan(format!("Invalid DECIMAL precision: {}", precision)) + })?; + let scale = scale.trim().parse::().map_err(|_| { + DataFusionError::Plan(format!("Invalid DECIMAL scale: {}", scale)) + })?; + Ok(DataType::Decimal128(precision, scale)) + } + _ => Err(DataFusionError::Plan(format!( + "Invalid DECIMAL format: {}", + type_str + ))), + } + } + s if s.ends_with("[]") => { + #[allow(clippy::unwrap_used)] + let element_type_str = s.strip_suffix("[]").unwrap(); + let element_type = parse_ducklake_type(element_type_str)?; + Ok(DataType::List(Arc::new(Field::new( + "item", + element_type, + true, + )))) + } + _ => Err(DataFusionError::Plan(format!( + "Unsupported DuckLake type: {}", + type_str + ))), + } +} + +pub fn columns_to_arrow_schema(columns: &[ColumnInfo]) -> DataFusionResult { + let fields: Vec = columns + .iter() + .filter(|c| c.parent_column.is_none()) + .map(Field::try_from) + .collect::>>()?; + + Ok(ArrowSchema::new(fields)) +} diff --git a/crates/sail-duck-lake/src/datasource/expressions.rs b/crates/sail-duck-lake/src/datasource/expressions.rs new file mode 100644 index 000000000..75c0635f4 --- /dev/null +++ b/crates/sail-duck-lake/src/datasource/expressions.rs @@ -0,0 +1,50 @@ +use std::sync::Arc; + +use datafusion::catalog::Session; +use datafusion::common::DFSchema; +use datafusion::logical_expr::execution_props::ExecutionProps; +use datafusion::logical_expr::simplify::SimplifyContext; +use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; +use datafusion::optimizer::simplify_expressions::ExprSimplifier; +use datafusion::physical_expr::PhysicalExpr; + +pub fn simplify_expr( + session: &dyn Session, + df_schema: &DFSchema, + expr: Expr, +) -> Arc { + let props = ExecutionProps::new(); + let simplify_context = SimplifyContext::new(&props).with_schema(df_schema.clone().into()); + let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10); + #[allow(clippy::expect_used)] + let simplified = simplifier + .simplify(expr) + .expect("Failed to simplify expression"); + #[allow(clippy::expect_used)] + session + .create_physical_expr(simplified, df_schema) + .expect("Failed to create physical expression") +} + +pub fn get_pushdown_filters( + filters: &[&Expr], + _partition_cols: &[String], +) -> Vec { + filters + .iter() + .map(|expr| match expr { + Expr::BinaryExpr(be) => match be.op { + datafusion::logical_expr::Operator::Eq + | datafusion::logical_expr::Operator::Lt + | datafusion::logical_expr::Operator::LtEq + | datafusion::logical_expr::Operator::Gt + | datafusion::logical_expr::Operator::GtEq + | datafusion::logical_expr::Operator::And + | datafusion::logical_expr::Operator::Or => TableProviderFilterPushDown::Inexact, + _ => TableProviderFilterPushDown::Unsupported, + }, + Expr::InList(_) => TableProviderFilterPushDown::Inexact, + _ => TableProviderFilterPushDown::Unsupported, + }) + .collect() +} diff --git a/crates/sail-duck-lake/src/datasource/mod.rs b/crates/sail-duck-lake/src/datasource/mod.rs new file mode 100644 index 000000000..f7158c9fb --- /dev/null +++ b/crates/sail-duck-lake/src/datasource/mod.rs @@ -0,0 +1,32 @@ +pub mod arrow; +pub mod expressions; +pub mod provider; +pub mod pruning; + +use std::sync::Arc; + +use datafusion::catalog::{Session, TableProvider}; +use datafusion_common::Result as DataFusionResult; +use provider::DuckLakeTableProvider; +use url::Url; + +use crate::metadata::{DuckLakeMetaStore, PythonMetaStore}; +use crate::options::DuckLakeOptions; + +pub async fn create_ducklake_provider( + ctx: &dyn Session, + opts: DuckLakeOptions, +) -> DataFusionResult> { + let url = Url::parse(&opts.url) + .map_err(|e| datafusion_common::DataFusionError::External(Box::new(e)))?; + + let meta_store: Arc = match url.scheme() { + "sqlite" | "postgres" | "postgresql" => Arc::new(PythonMetaStore::new(&opts.url).await?), + scheme => { + return datafusion_common::plan_err!("Unsupported metadata URL scheme: {}", scheme) + } + }; + + let provider = DuckLakeTableProvider::new(ctx, meta_store, opts).await?; + Ok(Arc::new(provider)) +} diff --git a/crates/sail-duck-lake/src/datasource/provider.rs b/crates/sail-duck-lake/src/datasource/provider.rs new file mode 100644 index 000000000..39191598e --- /dev/null +++ b/crates/sail-duck-lake/src/datasource/provider.rs @@ -0,0 +1,429 @@ +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::catalog::memory::DataSourceExec; +use datafusion::catalog::Session; +use datafusion::common::stats::{ColumnStatistics, Precision, Statistics}; +use datafusion::common::ToDFSchema; +use datafusion::config::TableParquetOptions; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource}; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_common::{DataFusionError, Result as DataFusionResult}; +use url::Url; + +use crate::datasource::arrow::columns_to_arrow_schema; +use crate::datasource::expressions::{get_pushdown_filters, simplify_expr}; +use crate::datasource::pruning::prune_files; +use crate::metadata::{DuckLakeMetaStore, DuckLakeTable}; +use crate::options::DuckLakeOptions; +use crate::spec::{ColumnInfo, PartitionFieldInfo, PartitionFilter}; + +pub struct DuckLakeTableProvider { + table: DuckLakeTable, + schema: ArrowSchemaRef, + base_path: String, + snapshot_id: Option, + meta_store: Arc, +} + +impl DuckLakeTableProvider { + pub async fn new( + _ctx: &dyn Session, + meta_store: Arc, + opts: DuckLakeOptions, + ) -> DataFusionResult { + let parts: Vec<&str> = opts.table.split('.').collect(); + let (schema_name, table_name) = match parts.as_slice() { + [table] => (None, *table), + [schema, table] => (Some(*schema), *table), + _ => { + return Err(DataFusionError::Plan(format!( + "Invalid table name format: {}", + opts.table + ))) + } + }; + + let table = meta_store.load_table(table_name, schema_name).await?; + let schema = Arc::new(columns_to_arrow_schema(&table.columns)?); + + log::trace!( + "Loaded DuckLake table: {}.{} with {} columns", + table.schema_info.schema_name, + table.table_info.table_name, + table.columns.len() + ); + + Ok(Self { + table, + schema, + base_path: opts.base_path, + snapshot_id: opts.snapshot_id, + meta_store, + }) + } +} + +impl std::fmt::Debug for DuckLakeTableProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DuckLakeTableProvider") + .field("table", &self.table.table_info.table_name) + .field("schema", &self.table.schema_info.schema_name) + .finish() + } +} + +#[async_trait] +impl TableProvider for DuckLakeTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> ArrowSchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DataFusionResult> { + Ok(get_pushdown_filters(filters, &[])) + } + + async fn scan( + &self, + session: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> DataFusionResult> { + log::trace!( + "Scanning DuckLake table: {}.{}", + self.table.schema_info.schema_name, + self.table.table_info.table_name + ); + + let (partition_filters, remaining_filters) = Self::extract_partition_filters( + filters, + &self.table.columns, + &self.table.partition_fields, + ); + let (pruning_filters, pushdown_filters) = self.separate_filters(&remaining_filters); + + let files = self + .meta_store + .list_data_files( + self.table.table_info.table_id, + self.snapshot_id, + if partition_filters.is_empty() { + None + } else { + Some(partition_filters) + }, + ) + .await?; + + log::trace!("Found {} data files", files.len()); + + let prune_schema = if let Some(used_columns) = projection { + let mut fields = vec![]; + for idx in used_columns { + fields.push(self.schema.field(*idx).to_owned()); + } + if let Some(expr) = + datafusion::logical_expr::utils::conjunction(pruning_filters.iter().cloned()) + { + for c in expr.column_refs() { + if let Ok(idx) = self.schema.index_of(c.name.as_str()) { + if !used_columns.contains(&idx) + && !fields.iter().any(|f| f.name() == c.name.as_str()) + { + fields.push(self.schema.field(idx).to_owned()); + } + } + } + } + Arc::new(datafusion::arrow::datatypes::Schema::new(fields)) + } else { + self.schema.clone() + }; + + let mut files = { + let (kept, _mask) = prune_files( + session, + &pruning_filters, + limit, + prune_schema.clone(), + files, + &self.table.columns, + &self.table.partition_fields, + )?; + kept + }; + + // Parse base_path URL and construct ObjectStoreUrl with only scheme + authority + let base_url = + Url::parse(&self.base_path).map_err(|e| DataFusionError::External(Box::new(e)))?; + let object_store_base = format!("{}://{}", base_url.scheme(), base_url.authority()); + let object_store_base_parsed = + Url::parse(&object_store_base).map_err(|e| DataFusionError::External(Box::new(e)))?; + let object_store_url = ObjectStoreUrl::parse(object_store_base_parsed) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + // Build table-level prefix: {base_path}/{schema}/{table} + let base_path_str = base_url.path(); + let mut table_prefix = + object_store::path::Path::parse(base_path_str.trim_start_matches('/')) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + table_prefix = table_prefix + .child(self.table.schema_info.schema_name.as_str()) + .child(self.table.table_info.table_name.as_str()); + + let mut file_groups: HashMap, Vec> = HashMap::new(); + + for file in files.drain(..) { + let object_path = if file.path_is_relative { + let mut p = table_prefix.clone(); + for comp in file.path.split('/') { + if !comp.is_empty() { + p = p.child(comp); + } + } + p + } else if let Ok(path_url) = Url::parse(&file.path) { + let encoded_path = path_url.path(); + let no_leading = encoded_path.strip_prefix('/').unwrap_or(encoded_path); + object_store::path::Path::parse(no_leading) + .map_err(|e| DataFusionError::External(Box::new(e)))? + } else { + let no_leading = file.path.strip_prefix('/').unwrap_or(&file.path); + object_store::path::Path::parse(no_leading) + .map_err(|e| DataFusionError::External(Box::new(e)))? + }; + + let partitioned_file = PartitionedFile::new(object_path.clone(), file.file_size_bytes); + + let partition_key = file.partition_id.map(|p| p.0); + file_groups + .entry(partition_key) + .or_default() + .push(partitioned_file); + } + + log::trace!( + "Created {} file groups from {} files", + file_groups.len(), + file_groups.values().map(|v| v.len()).sum::() + ); + + let file_groups = if file_groups.is_empty() { + log::warn!("No data files found for table"); + vec![FileGroup::from(vec![])] + } else { + file_groups.into_values().map(FileGroup::from).collect() + }; + + let parquet_options = TableParquetOptions { + global: session.config().options().execution.parquet.clone(), + ..Default::default() + }; + + let mut parquet_source = ParquetSource::new(parquet_options); + let pushdown_filter: Option> = if !pushdown_filters.is_empty() { + let df_schema = prune_schema.clone().to_dfschema()?; + let pushdown_expr = datafusion::logical_expr::utils::conjunction(pushdown_filters); + pushdown_expr.map(|expr| simplify_expr(session, &df_schema, expr)) + } else { + None + }; + if let Some(pred) = pushdown_filter { + parquet_source = parquet_source.with_predicate(pred); + } + let parquet_source = Arc::new(parquet_source); + + let table_stats = self.aggregate_statistics(self.schema.as_ref()); + + let file_scan_config = + FileScanConfigBuilder::new(object_store_url, self.schema.clone(), parquet_source) + .with_file_groups(file_groups) + .with_statistics(table_stats) + .with_projection_indices(projection.cloned()) + .with_limit(limit) + .build(); + + Ok(DataSourceExec::from_data_source(file_scan_config)) + } +} + +impl DuckLakeTableProvider { + fn separate_filters(&self, filters: &[Expr]) -> (Vec, Vec) { + let predicates: Vec<&Expr> = filters.iter().collect(); + let pushdown_kinds = get_pushdown_filters(&predicates, &[]); + let mut pruning_filters = Vec::new(); + let mut parquet_pushdown_filters = Vec::new(); + for (filter, kind) in filters.iter().zip(pushdown_kinds) { + match kind { + TableProviderFilterPushDown::Exact => { + pruning_filters.push(filter.clone()); + } + TableProviderFilterPushDown::Inexact => { + pruning_filters.push(filter.clone()); + parquet_pushdown_filters.push(filter.clone()); + } + TableProviderFilterPushDown::Unsupported => {} + } + } + (pruning_filters, parquet_pushdown_filters) + } + + // TODO: Add extraction of stats-based filters for column statistics pushdown. + + fn extract_partition_filters( + filters: &[Expr], + columns: &[ColumnInfo], + partition_fields: &[PartitionFieldInfo], + ) -> (Vec, Vec) { + let mut name_to_partition_key: HashMap = HashMap::new(); + for field in partition_fields { + if let Some(col) = columns.iter().find(|c| c.column_id == field.column_id) { + if field.transform.trim().eq_ignore_ascii_case("identity") { + name_to_partition_key + .insert(col.column_name.clone(), field.partition_key_index); + } + } + } + + let mut partition_values: HashMap> = HashMap::new(); + let mut remaining_filters = Vec::new(); + + for expr in filters.iter().cloned() { + if let Some((col_name, values)) = Self::extract_values_from_expr(&expr) { + if let Some(partition_key_index) = name_to_partition_key.get(&col_name).copied() { + let entry = partition_values.entry(partition_key_index).or_default(); + for v in values { + if !entry.contains(&v) { + entry.push(v); + } + } + remaining_filters.push(expr); + continue; + } + } + remaining_filters.push(expr); + } + + let mut out_filters = Vec::new(); + for (partition_key_index, values) in partition_values { + if !values.is_empty() { + out_filters.push(PartitionFilter { + partition_key_index, + values, + }); + } + } + + (out_filters, remaining_filters) + } + + fn extract_values_from_expr(expr: &Expr) -> Option<(String, Vec)> { + match expr { + Expr::BinaryExpr(be) => { + use datafusion::logical_expr::Operator; + match be.op { + Operator::Eq => { + if let (Some(col), Some(value)) = ( + Self::column_name(&be.left), + Self::literal_to_string(&be.right), + ) { + return Some((col, vec![value])); + } + if let (Some(col), Some(value)) = ( + Self::column_name(&be.right), + Self::literal_to_string(&be.left), + ) { + return Some((col, vec![value])); + } + None + } + _ => None, + } + } + Expr::InList(in_list) if !in_list.negated => { + if let Some(col) = Self::column_name(&in_list.expr) { + let mut values = Vec::new(); + for v in &in_list.list { + if let Some(s) = Self::literal_to_string(v) { + values.push(s); + } else { + return None; + } + } + if values.is_empty() { + None + } else { + Some((col, values)) + } + } else { + None + } + } + _ => None, + } + } + + fn column_name(expr: &Expr) -> Option { + if let Expr::Column(c) = expr { + Some(c.name.clone()) + } else { + None + } + } + + fn literal_to_string(expr: &Expr) -> Option { + if let Expr::Literal(value, _) = expr { + if value.is_null() { + return None; + } + let s = value.to_string(); + if let Some(stripped) = s + .strip_prefix('\'') + .and_then(|rest| rest.strip_suffix('\'')) + { + Some(stripped.to_string()) + } else { + Some(s) + } + } else { + None + } + } + + fn aggregate_statistics(&self, schema: &datafusion::arrow::datatypes::Schema) -> Statistics { + let column_statistics = (0..schema.fields().len()) + .map(|_| ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + distinct_count: Precision::Absent, + sum_value: Precision::Absent, + }) + .collect(); + Statistics { + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics, + } + } +} diff --git a/crates/sail-duck-lake/src/datasource/pruning.rs b/crates/sail-duck-lake/src/datasource/pruning.rs new file mode 100644 index 000000000..70a52fa9b --- /dev/null +++ b/crates/sail-duck-lake/src/datasource/pruning.rs @@ -0,0 +1,365 @@ +use std::cell::RefCell; +use std::collections::HashMap; +use std::sync::Arc; + +use chrono::{Datelike, Timelike}; +use datafusion::arrow::array::{ArrayRef, BooleanArray, UInt64Array}; +use datafusion::arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema}; +use datafusion::catalog::Session; +use datafusion::common::pruning::PruningStatistics; +use datafusion::common::scalar::ScalarValue; +use datafusion::common::{Column, Result as DataFusionResult, ToDFSchema}; +use datafusion::logical_expr::utils::conjunction; +use datafusion::logical_expr::Expr; +use datafusion::physical_optimizer::pruning::PruningPredicate; + +use crate::spec::{ + parse_year, parse_year_month, parse_year_month_day, parse_year_month_day_hour, ColumnInfo, + ColumnStatsInfo, FieldIndex, FileInfo, PartitionFieldInfo, Transform, +}; + +/// TODO:Implement contains_nan-aware float/double pruning gates +struct DuckLakePruningStats { + files: Vec, + name_to_field_id: HashMap, + field_id_to_datatype: HashMap, + min_cache: RefCell>, + max_cache: RefCell>, + nulls_cache: RefCell>, + rows_cache: RefCell>, + partition_col_map: HashMap>, +} + +impl DuckLakePruningStats { + fn new( + files: Vec, + arrow_schema: Arc, + columns: &[ColumnInfo], + partition_fields: &[PartitionFieldInfo], + ) -> Self { + let mut name_to_field_id: HashMap = HashMap::new(); + let mut field_id_to_datatype: HashMap = HashMap::new(); + for c in columns { + name_to_field_id.insert(c.column_name.clone(), c.column_id); + } + for f in arrow_schema.fields() { + if let Some(fid) = name_to_field_id.get(f.name()) { + field_id_to_datatype.insert(*fid, f.data_type().clone()); + } + } + Self { + files, + name_to_field_id, + field_id_to_datatype, + min_cache: RefCell::new(HashMap::new()), + max_cache: RefCell::new(HashMap::new()), + nulls_cache: RefCell::new(HashMap::new()), + rows_cache: RefCell::new(None), + partition_col_map: Self::build_partition_map(partition_fields), + } + } + + fn field_id_for(&self, column: &Column) -> Option { + self.name_to_field_id.get(&column.name).copied() + } + + fn field_type_for(&self, field_id: &FieldIndex) -> Option { + self.field_id_to_datatype.get(field_id).cloned() + } + + fn parse_stat_value(&self, field_id: &FieldIndex, value: &str) -> Option { + let dt = self.field_type_for(field_id)?; + ScalarValue::try_from_string(value.to_string(), &dt).ok() + } + + fn get_column_stats<'a>( + file: &'a FileInfo, + field_id: &FieldIndex, + ) -> Option<&'a ColumnStatsInfo> { + file.column_stats.iter().find(|s| s.column_id == *field_id) + } + + fn build_partition_map( + partition_fields: &[PartitionFieldInfo], + ) -> HashMap> { + let mut map: HashMap> = HashMap::new(); + for field in partition_fields { + let transform = field.transform.parse().unwrap_or(Transform::Unknown); + map.entry(field.column_id) + .or_default() + .push((field.partition_key_index, transform)); + } + map + } + + fn get_partition_range_for_file( + &self, + file: &FileInfo, + field_id: &FieldIndex, + ) -> Option<(ScalarValue, ScalarValue)> { + let mappings = self.partition_col_map.get(field_id)?; + let data_type = self.field_type_for(field_id)?; + let mut identity_value: Option = None; + let mut year_component: Option = None; + let mut month_component: Option = None; + let mut day_component: Option = None; + let mut hour_component: Option = None; + + for (partition_key_index, transform) in mappings { + let part_value = file + .partition_values + .iter() + .find(|v| v.partition_key_index == *partition_key_index) + .map(|v| v.partition_value.as_str()); + let Some(part_value) = part_value else { + continue; + }; + match transform { + Transform::Identity => { + if identity_value.is_none() { + if let Ok(value) = + ScalarValue::try_from_string(part_value.to_string(), &data_type) + { + identity_value = Some(value); + } + } + } + Transform::Year => { + if year_component.is_none() { + year_component = parse_year(part_value); + } + } + Transform::Month => { + if let Some((year, month)) = parse_year_month(part_value) { + if year_component.is_none() { + year_component = Some(year); + } + if Self::is_valid_month(month) { + month_component = Some(month); + } + } else if let Ok(month) = part_value.trim().parse::() { + if Self::is_valid_month(month) { + month_component = Some(month); + } + } + } + Transform::Day => { + if let Some(date) = parse_year_month_day(part_value) { + if year_component.is_none() { + year_component = Some(date.year()); + } + if month_component.is_none() && Self::is_valid_month(date.month()) { + month_component = Some(date.month()); + } + if Self::is_valid_day(date.day()) { + day_component = Some(date.day()); + } + } else if let Ok(day) = part_value.trim().parse::() { + if Self::is_valid_day(day) { + day_component = Some(day); + } + } + } + Transform::Hour => { + if let Some(dt) = parse_year_month_day_hour(part_value) { + if year_component.is_none() { + year_component = Some(dt.year()); + } + if month_component.is_none() && Self::is_valid_month(dt.month()) { + month_component = Some(dt.month()); + } + if day_component.is_none() && Self::is_valid_day(dt.day()) { + day_component = Some(dt.day()); + } + if Self::is_valid_hour(dt.hour()) { + hour_component = Some(dt.hour()); + } + } else if let Ok(hour) = part_value.trim().parse::() { + if Self::is_valid_hour(hour) { + hour_component = Some(hour); + } + } + } + Transform::Unknown => {} + } + } + + if let Some(identity) = identity_value { + return Some((identity.clone(), identity)); + } + + Self::build_temporal_range( + &data_type, + year_component, + month_component, + day_component, + hour_component, + ) + } + + fn build_temporal_range( + data_type: &ArrowDataType, + year: Option, + month: Option, + day: Option, + hour: Option, + ) -> Option<(ScalarValue, ScalarValue)> { + let year = year?; + if let Some(month) = month { + if let Some(day) = day { + if let Some(hour) = hour { + let value = format!("{year:04}-{month:02}-{day:02}-{hour:02}"); + return Transform::Hour.get_range(&value, data_type); + } + let value = format!("{year:04}-{month:02}-{day:02}"); + return Transform::Day.get_range(&value, data_type); + } + let value = format!("{year:04}-{month:02}"); + return Transform::Month.get_range(&value, data_type); + } + let value = format!("{year}"); + Transform::Year.get_range(&value, data_type) + } + + fn is_valid_month(month: u32) -> bool { + (1..=12).contains(&month) + } + + fn is_valid_day(day: u32) -> bool { + (1..=31).contains(&day) + } + + fn is_valid_hour(hour: u32) -> bool { + hour < 24 + } +} + +impl PruningStatistics for DuckLakePruningStats { + fn min_values(&self, column: &Column) -> Option { + let field_id = self.field_id_for(column)?; + if let Some(arr) = self.min_cache.borrow().get(&field_id) { + return Some(arr.clone()); + } + let values = self.files.iter().map(|f| { + self.get_partition_range_for_file(f, &field_id) + .map(|(min, _)| min) + .or_else(|| { + Self::get_column_stats(f, &field_id) + .and_then(|s| s.min_value.as_deref()) + .and_then(|s| self.parse_stat_value(&field_id, s)) + }) + .unwrap_or(ScalarValue::Null) + }); + let arr = ScalarValue::iter_to_array(values).ok()?; + self.min_cache.borrow_mut().insert(field_id, arr.clone()); + Some(arr) + } + + fn max_values(&self, column: &Column) -> Option { + let field_id = self.field_id_for(column)?; + if let Some(arr) = self.max_cache.borrow().get(&field_id) { + return Some(arr.clone()); + } + let values = self.files.iter().map(|f| { + self.get_partition_range_for_file(f, &field_id) + .map(|(_, max)| max) + .or_else(|| { + Self::get_column_stats(f, &field_id) + .and_then(|s| s.max_value.as_deref()) + .and_then(|s| self.parse_stat_value(&field_id, s)) + }) + .unwrap_or(ScalarValue::Null) + }); + let arr = ScalarValue::iter_to_array(values).ok()?; + self.max_cache.borrow_mut().insert(field_id, arr.clone()); + Some(arr) + } + + fn null_counts(&self, column: &Column) -> Option { + let field_id = self.field_id_for(column)?; + if let Some(arr) = self.nulls_cache.borrow().get(&field_id) { + return Some(arr.clone()); + } + let counts: Vec = self + .files + .iter() + .map(|f| { + Self::get_column_stats(f, &field_id) + .and_then(|s| s.null_count) + .unwrap_or(0) + }) + .collect(); + let arr: ArrayRef = Arc::new(UInt64Array::from(counts)); + self.nulls_cache.borrow_mut().insert(field_id, arr.clone()); + Some(arr) + } + + fn row_counts(&self, _column: &Column) -> Option { + if let Some(arr) = self.rows_cache.borrow().as_ref() { + return Some(arr.clone()); + } + let rows: Vec = self.files.iter().map(|f| f.record_count).collect(); + let arr: ArrayRef = Arc::new(UInt64Array::from(rows)); + *self.rows_cache.borrow_mut() = Some(arr.clone()); + Some(arr) + } + + fn contained( + &self, + _column: &Column, + _value: &std::collections::HashSet, + ) -> Option { + None + } + + fn num_containers(&self) -> usize { + self.files.len() + } +} + +pub fn prune_files( + session: &dyn Session, + filters: &[Expr], + limit: Option, + logical_schema: Arc, + files: Vec, + columns: &[ColumnInfo], + partition_fields: &[PartitionFieldInfo], +) -> DataFusionResult<(Vec, Option>)> { + let filter_expr = conjunction(filters.iter().cloned()); + if filter_expr.is_none() && limit.is_none() { + return Ok((files, None)); + } + + let stats = DuckLakePruningStats::new(files, logical_schema.clone(), columns, partition_fields); + let files_to_keep = if let Some(predicate) = &filter_expr { + let df_schema = logical_schema.clone().to_dfschema()?; + let physical_predicate = session.create_physical_expr(predicate.clone(), &df_schema)?; + let pruning_predicate = PruningPredicate::try_new(physical_predicate, logical_schema)?; + pruning_predicate.prune(&stats)? + } else { + vec![true; stats.num_containers()] + }; + + let mut kept = Vec::new(); + let mut rows_collected: u64 = 0; + for (file, keep) in stats.files.into_iter().zip(files_to_keep.iter()) { + if *keep { + if let Some(lim) = limit { + if rows_collected <= lim as u64 { + rows_collected = rows_collected.saturating_add(file.record_count); + kept.push(file); + if rows_collected > lim as u64 { + break; + } + } else { + break; + } + } else { + kept.push(file); + } + } + } + + Ok((kept, Some(files_to_keep))) +} diff --git a/crates/sail-duck-lake/src/lib.rs b/crates/sail-duck-lake/src/lib.rs new file mode 100644 index 000000000..99cb3fc5c --- /dev/null +++ b/crates/sail-duck-lake/src/lib.rs @@ -0,0 +1,9 @@ +pub mod datasource; +pub mod metadata; +pub mod options; +mod python; +pub mod spec; + +pub use datasource::create_ducklake_provider; +pub use options::DuckLakeOptions; +pub use spec::*; diff --git a/crates/sail-duck-lake/src/metadata/mod.rs b/crates/sail-duck-lake/src/metadata/mod.rs new file mode 100644 index 000000000..62f165f4c --- /dev/null +++ b/crates/sail-duck-lake/src/metadata/mod.rs @@ -0,0 +1,49 @@ +mod py_impl; + +use async_trait::async_trait; +use datafusion::common::Result as DataFusionResult; +pub use py_impl::PythonMetaStore; + +use crate::spec::{ColumnInfo, FileInfo, PartitionFilter, SchemaInfo, SnapshotInfo, TableInfo}; + +#[derive(Debug, Clone)] +pub struct DuckLakeTable { + pub table_info: TableInfo, + pub schema_info: SchemaInfo, + pub columns: Vec, + pub partition_fields: Vec, +} + +#[derive(Debug, Clone)] +pub struct DuckLakeSnapshot { + pub snapshot: SnapshotInfo, +} + +#[async_trait] +pub trait DuckLakeMetaStore: Send + Sync { + async fn load_table( + &self, + table_name: &str, + schema_name: Option<&str>, + ) -> DataFusionResult; + + async fn current_snapshot(&self) -> DataFusionResult; + + async fn snapshot_by_id(&self, snapshot_id: u64) -> DataFusionResult; + + // TODO: Add paginated or streaming metadata APIs for listing data files. + async fn list_data_files( + &self, + table_id: crate::spec::TableIndex, + snapshot_id: Option, + partition_filters: Option>, + ) -> DataFusionResult>; + + async fn list_delete_files( + &self, + _table_id: crate::spec::TableIndex, + _snapshot_id: Option, + ) -> DataFusionResult> { + Ok(vec![]) + } +} diff --git a/crates/sail-duck-lake/src/metadata/py_impl.rs b/crates/sail-duck-lake/src/metadata/py_impl.rs new file mode 100644 index 000000000..32918481a --- /dev/null +++ b/crates/sail-duck-lake/src/metadata/py_impl.rs @@ -0,0 +1,461 @@ +use async_trait::async_trait; +use datafusion::common::{DataFusionError, Result as DataFusionResult}; +use pyo3::prelude::PyAnyMethods; +use pyo3::type_object::PyTypeInfo; +use pyo3::{FromPyObject, PyResult, Python}; +use serde::Deserialize; + +use crate::metadata::{DuckLakeMetaStore, DuckLakeSnapshot, DuckLakeTable}; +use crate::python::Modules; +use crate::spec::{ + ColumnInfo, DataFileIndex, FieldIndex, FileInfo, MappingIndex, PartitionFilter, PartitionId, + SchemaInfo, SnapshotInfo, TableIndex, TableInfo, +}; + +#[derive(Deserialize, FromPyObject)] +#[pyo3(from_item_all)] +struct LoadTableResult { + schema_info: PySchemaInfo, + table_info: PyTableInfo, + columns: Vec, + partition_fields: Vec, +} + +#[derive(Deserialize, FromPyObject)] +#[pyo3(from_item_all)] +struct PySchemaInfo { + schema_id: u64, + schema_uuid: String, + begin_snapshot: Option, + end_snapshot: Option, + schema_name: String, + path: String, + path_is_relative: bool, +} + +#[derive(Deserialize, FromPyObject)] +#[pyo3(from_item_all)] +struct PyTableInfo { + table_id: u64, + table_uuid: String, + begin_snapshot: Option, + end_snapshot: Option, + schema_id: u64, + table_name: String, + path: String, + path_is_relative: bool, +} + +#[derive(Deserialize, FromPyObject)] +#[pyo3(from_item_all)] +struct PyColumnInfo { + column_id: u64, + begin_snapshot: Option, + end_snapshot: Option, + #[allow(dead_code)] + table_id: u64, + column_order: u64, + column_name: String, + column_type: String, + initial_default: Option, + default_value: Option, + nulls_allowed: bool, + parent_column: Option, +} + +#[derive(Deserialize, FromPyObject)] +#[pyo3(from_item_all)] +struct PyPartitionField { + partition_key_index: u64, + column_id: u64, + transform: String, +} + +#[derive(Deserialize, FromPyObject)] +#[pyo3(from_item_all)] +struct PySnapshotInfo { + snapshot_id: u64, + snapshot_time: String, + schema_version: u64, + next_catalog_id: u64, + next_file_id: u64, + changes_made: Option, + author: Option, + commit_message: Option, + commit_extra_info: Option, +} + +#[derive(Deserialize, FromPyObject)] +#[pyo3(from_item_all)] +struct PyFileInfo { + data_file_id: u64, + #[allow(dead_code)] + table_id: u64, + begin_snapshot: Option, + end_snapshot: Option, + file_order: u64, + path: String, + path_is_relative: bool, + file_format: Option, + record_count: u64, + file_size_bytes: u64, + footer_size: Option, + row_id_start: Option, + partition_id: Option, + encryption_key: String, + partial_file_info: Option, + mapping_id: u64, + column_stats: Vec, + partition_values: Vec, +} + +#[derive(Deserialize, FromPyObject)] +#[pyo3(from_item_all)] +struct PyColumnStatsInfo { + column_id: u64, + column_size_bytes: Option, + value_count: Option, + null_count: Option, + min_value: Option, + max_value: Option, + contains_nan: Option, + extra_stats: Option, +} + +#[derive(Deserialize, FromPyObject)] +#[pyo3(from_item_all)] +struct PyFilePartitionInfo { + partition_key_index: u64, + partition_value: String, +} + +pub struct PythonMetaStore { + url: String, +} + +impl PythonMetaStore { + pub async fn new(url: &str) -> DataFusionResult { + Ok(Self { + url: url.to_string(), + }) + } +} + +fn parse_snapshot_time(s: &str) -> DataFusionResult> { + chrono::DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z") + .or_else(|_| chrono::DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%z")) + .map(|dt| dt.with_timezone(&chrono::Utc)) + .map_err(|e| DataFusionError::Plan(format!("Failed to parse snapshot_time: {}: {}", s, e))) +} + +impl TryFrom for SchemaInfo { + type Error = DataFusionError; + + fn try_from(p: PySchemaInfo) -> Result { + Ok(Self { + schema_id: crate::spec::SchemaIndex(p.schema_id), + schema_uuid: uuid::Uuid::parse_str(&p.schema_uuid) + .map_err(|e| DataFusionError::External(Box::new(e)))?, + begin_snapshot: p.begin_snapshot, + end_snapshot: p.end_snapshot, + schema_name: p.schema_name, + path: p.path, + path_is_relative: p.path_is_relative, + }) + } +} + +impl TryFrom for TableInfo { + type Error = DataFusionError; + + fn try_from(p: PyTableInfo) -> Result { + Ok(Self { + table_id: TableIndex(p.table_id), + table_uuid: uuid::Uuid::parse_str(&p.table_uuid) + .map_err(|e| DataFusionError::External(Box::new(e)))?, + begin_snapshot: p.begin_snapshot, + end_snapshot: p.end_snapshot, + schema_id: crate::spec::SchemaIndex(p.schema_id), + table_name: p.table_name, + path: p.path, + path_is_relative: p.path_is_relative, + columns: vec![], + inlined_data_tables: vec![], + }) + } +} + +impl PyColumnInfo { + fn into_column_info(self, table_id: TableIndex) -> ColumnInfo { + ColumnInfo { + column_id: FieldIndex(self.column_id), + begin_snapshot: self.begin_snapshot, + end_snapshot: self.end_snapshot, + table_id, + column_order: self.column_order, + column_name: self.column_name, + column_type: self.column_type, + initial_default: self.initial_default, + default_value: self.default_value, + nulls_allowed: self.nulls_allowed, + parent_column: self.parent_column.map(FieldIndex), + } + } +} + +impl From for crate::spec::PartitionFieldInfo { + fn from(p: PyPartitionField) -> Self { + Self { + partition_key_index: p.partition_key_index, + column_id: FieldIndex(p.column_id), + transform: p.transform, + } + } +} + +impl TryFrom for SnapshotInfo { + type Error = DataFusionError; + + fn try_from(p: PySnapshotInfo) -> Result { + Ok(Self { + snapshot_id: p.snapshot_id, + snapshot_time: parse_snapshot_time(&p.snapshot_time)?, + schema_version: p.schema_version, + next_catalog_id: p.next_catalog_id, + next_file_id: p.next_file_id, + changes_made: p.changes_made, + author: p.author, + commit_message: p.commit_message, + commit_extra_info: p.commit_extra_info, + }) + } +} + +impl From for crate::spec::ColumnStatsInfo { + fn from(s: PyColumnStatsInfo) -> Self { + Self { + column_id: FieldIndex(s.column_id), + column_size_bytes: s.column_size_bytes, + value_count: s.value_count, + null_count: s.null_count, + min_value: s.min_value, + max_value: s.max_value, + contains_nan: s.contains_nan, + extra_stats: s.extra_stats, + } + } +} + +impl From for crate::spec::FilePartitionInfo { + fn from(p: PyFilePartitionInfo) -> Self { + Self { + partition_key_index: p.partition_key_index, + partition_value: p.partition_value, + } + } +} + +impl PyFileInfo { + fn into_file_info(self, table_id: TableIndex) -> FileInfo { + FileInfo { + data_file_id: DataFileIndex(self.data_file_id), + table_id, + begin_snapshot: self.begin_snapshot, + end_snapshot: self.end_snapshot, + file_order: self.file_order, + path: self.path, + path_is_relative: self.path_is_relative, + file_format: self.file_format, + record_count: self.record_count, + file_size_bytes: self.file_size_bytes, + footer_size: self.footer_size, + row_id_start: self.row_id_start, + partition_id: self.partition_id.map(PartitionId), + encryption_key: self.encryption_key, + partial_file_info: self.partial_file_info, + mapping_id: MappingIndex(self.mapping_id), + column_stats: self.column_stats.into_iter().map(Into::into).collect(), + partition_values: self.partition_values.into_iter().map(Into::into).collect(), + } + } +} + +#[async_trait] +impl DuckLakeMetaStore for PythonMetaStore { + async fn load_table( + &self, + table_name: &str, + schema_name: Option<&str>, + ) -> DataFusionResult { + let url = self.url.clone(); + let table_name = table_name.to_string(); + let schema_name = schema_name.map(|s| s.to_string()); + let parsed: LoadTableResult = tokio::task::spawn_blocking(move || { + Python::attach(|py| { + let call: PyResult = (|| { + let m = crate::python::Modules::DUCKLAKE_METADATA.load(py)?; + let obj = m.getattr("load_table")?.call1(( + url.as_str(), + table_name.as_str(), + schema_name.as_deref(), + ))?; + obj.extract() + })(); + match call { + Ok(result) => Ok(result), + Err(e) => { + let is_value_error = e + .matches(py, pyo3::exceptions::PyValueError::type_object(py)) + .unwrap_or(false); + if is_value_error { + let msg = e.to_string(); + Err(DataFusionError::Plan(msg)) + } else { + Err(DataFusionError::External(Box::new(e))) + } + } + } + }) + }) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))??; + + let schema_info: SchemaInfo = parsed.schema_info.try_into()?; + let table_info: TableInfo = parsed.table_info.try_into()?; + let table_id = table_info.table_id; + let columns: Vec = parsed + .columns + .into_iter() + .map(|c| c.into_column_info(table_id)) + .collect(); + + Ok(DuckLakeTable { + table_info, + schema_info, + columns, + partition_fields: parsed + .partition_fields + .into_iter() + .map(Into::into) + .collect(), + }) + } + + async fn current_snapshot(&self) -> DataFusionResult { + let url = self.url.clone(); + let p: PySnapshotInfo = tokio::task::spawn_blocking(move || { + Python::attach(|py| { + let call: PyResult = (|| { + let m = Modules::DUCKLAKE_METADATA.load(py)?; + let obj = m.getattr("current_snapshot")?.call1((url.as_str(),))?; + obj.extract() + })(); + match call { + Ok(result) => Ok(result), + Err(e) => { + let is_value_error = e + .matches(py, pyo3::exceptions::PyValueError::type_object(py)) + .unwrap_or(false); + if is_value_error { + let msg = e.to_string(); + Err(DataFusionError::Plan(msg)) + } else { + Err(DataFusionError::External(Box::new(e))) + } + } + } + }) + }) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))??; + + Ok(DuckLakeSnapshot { + snapshot: p.try_into()?, + }) + } + + async fn snapshot_by_id(&self, snapshot_id: u64) -> DataFusionResult { + let url = self.url.clone(); + let p: PySnapshotInfo = tokio::task::spawn_blocking(move || { + Python::attach(|py| { + let call: PyResult = (|| { + let m = Modules::DUCKLAKE_METADATA.load(py)?; + let obj = m + .getattr("snapshot_by_id")? + .call1((url.as_str(), snapshot_id))?; + obj.extract() + })(); + match call { + Ok(result) => Ok(result), + Err(e) => { + let is_value_error = e + .matches(py, pyo3::exceptions::PyValueError::type_object(py)) + .unwrap_or(false); + if is_value_error { + Err(DataFusionError::Plan(e.to_string())) + } else { + Err(DataFusionError::External(Box::new(e))) + } + } + } + }) + }) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))??; + + Ok(DuckLakeSnapshot { + snapshot: p.try_into()?, + }) + } + + async fn list_data_files( + &self, + table_id: TableIndex, + snapshot_id: Option, + partition_filters: Option>, + ) -> DataFusionResult> { + let url = self.url.clone(); + let py_partition_filters: Option)>> = + partition_filters.map(|filters| { + filters + .into_iter() + .map(|f| (f.partition_key_index, f.values)) + .collect() + }); + let rows: Vec = tokio::task::spawn_blocking(move || { + Python::attach(|py| { + let call: PyResult> = (|| { + let m = Modules::DUCKLAKE_METADATA.load(py)?; + let obj = m.getattr("list_data_files")?.call1(( + url.as_str(), + table_id.0, + snapshot_id, + py_partition_filters, + ))?; + obj.extract() + })(); + match call { + Ok(result) => Ok(result), + Err(e) => { + let is_value_error = e + .matches(py, pyo3::exceptions::PyValueError::type_object(py)) + .unwrap_or(false); + if is_value_error { + Err(DataFusionError::Plan(e.to_string())) + } else { + Err(DataFusionError::External(Box::new(e))) + } + } + } + }) + }) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))??; + + let out = rows + .into_iter() + .map(|r| r.into_file_info(table_id)) + .collect(); + Ok(out) + } +} diff --git a/crates/sail-duck-lake/src/options.rs b/crates/sail-duck-lake/src/options.rs new file mode 100644 index 000000000..b230c0f98 --- /dev/null +++ b/crates/sail-duck-lake/src/options.rs @@ -0,0 +1,80 @@ +use datafusion::common::{DataFusionError, Result}; +use url::Url; + +#[derive(Debug, Clone, Default)] +pub struct DuckLakeOptions { + pub url: String, + pub table: String, + pub base_path: String, + pub snapshot_id: Option, + pub schema: Option, + pub case_sensitive: bool, +} + +impl DuckLakeOptions { + pub fn validate(&self) -> Result<()> { + if self.url.is_empty() { + return Err(DataFusionError::Plan( + "Missing required option: url".to_string(), + )); + } + Self::validate_url(&self.url)?; + + if self.table.is_empty() { + return Err(DataFusionError::Plan( + "Missing required option: table".to_string(), + )); + } + Self::validate_table_name(&self.table)?; + + if self.base_path.is_empty() { + return Err(DataFusionError::Plan( + "Missing required option: base_path".to_string(), + )); + } + Self::validate_base_path(&self.base_path)?; + + Ok(()) + } + + fn validate_url(url: &str) -> Result<()> { + let parsed = Url::parse(url) + .map_err(|e| DataFusionError::Plan(format!("Invalid metadata URL: {}: {}", url, e)))?; + + match parsed.scheme() { + "sqlite" | "postgres" | "postgresql" => Ok(()), + scheme => Err(DataFusionError::Plan(format!( + "Invalid metadata URL scheme. Expected sqlite, postgres, or postgresql, got: {}", + scheme + ))), + } + } + + fn validate_table_name(table: &str) -> Result<()> { + if table.is_empty() { + return Err(DataFusionError::Plan( + "Table name cannot be empty".to_string(), + )); + } + let parts: Vec<&str> = table.split('.').collect(); + if parts.len() > 2 { + return Err(DataFusionError::Plan(format!( + "Invalid table name format. Expected 'table' or 'schema.table', got: {}", + table + ))); + } + Ok(()) + } + + fn validate_base_path(base_path: &str) -> Result<()> { + if base_path.is_empty() { + return Err(DataFusionError::Plan( + "Base path cannot be empty".to_string(), + )); + } + Url::parse(base_path).map_err(|e| { + DataFusionError::Plan(format!("Invalid base_path URL: {}: {}", base_path, e)) + })?; + Ok(()) + } +} diff --git a/crates/sail-duck-lake/src/python.rs b/crates/sail-duck-lake/src/python.rs new file mode 100644 index 000000000..72fdab0ae --- /dev/null +++ b/crates/sail-duck-lake/src/python.rs @@ -0,0 +1,51 @@ +use std::ffi::CString; +use std::marker::PhantomData; + +use pyo3::prelude::PyModule; +use pyo3::{Bound, PyResult, Python}; + +pub struct Module { + name: &'static str, + source: &'static str, + _initializer: PhantomData, +} + +impl Module { + pub const fn new(name: &'static str, source: &'static str) -> Self { + Self { + name, + source, + _initializer: PhantomData, + } + } + + pub fn load<'py>(&self, py: Python<'py>) -> PyResult> { + let m = PyModule::from_code( + py, + CString::new(self.source)?.as_c_str(), + CString::new("")?.as_c_str(), + CString::new(self.name)?.as_c_str(), + )?; + I::init(&m)?; + Ok(m) + } +} + +pub trait ModuleInitializer { + fn init(m: &Bound) -> PyResult<()>; +} + +impl ModuleInitializer for () { + fn init(_: &Bound) -> PyResult<()> { + Ok(()) + } +} + +pub struct Modules; + +impl Modules { + pub const DUCKLAKE_METADATA: Module<()> = Module::new( + "_sail_ducklake_metadata", + include_str!("python/ducklake_metadata.py"), + ); +} diff --git a/crates/sail-duck-lake/src/python/ducklake_metadata.py b/crates/sail-duck-lake/src/python/ducklake_metadata.py new file mode 100644 index 000000000..91c4cde17 --- /dev/null +++ b/crates/sail-duck-lake/src/python/ducklake_metadata.py @@ -0,0 +1,408 @@ +from __future__ import annotations + +from typing import Any +from urllib.parse import urlparse, urlunparse + +from sqlalchemy import and_, bindparam, column, create_engine, or_, select, table, text + +_ENGINES: dict[str, Any] = {} + + +def _normalize_sqlalchemy_url(url: str) -> str: + parsed = urlparse(url) + scheme_lower = parsed.scheme.lower() + + if scheme_lower in ("postgres", "postgresql"): + return urlunparse(parsed._replace(scheme="postgresql+psycopg")) + + if scheme_lower == "sqlite" and parsed.netloc: + # Legacy sqlite URLs sometimes encode the path inside the netloc portion. + new_path = f"/{parsed.netloc}{parsed.path}" + return urlunparse(parsed._replace(netloc="", path=new_path)) + + return url + + +def _get_engine(url: str): + url = _normalize_sqlalchemy_url(url) + eng = _ENGINES.get(url) + if eng is None: + eng = create_engine(url, future=True) + _ENGINES[url] = eng + return eng + + +def current_snapshot(url: str) -> dict[str, Any]: + with _get_engine(url).connect() as conn: + row = conn.execute( + text( + """ + select snapshot_id, snapshot_time, schema_version, next_catalog_id, next_file_id + from ducklake_snapshot + order by snapshot_id desc + limit 1 + """ + ) + ).first() + if row is None: + msg = "No snapshots found in metadata" + raise ValueError(msg) + return { + "snapshot_id": int(row[0]), + "snapshot_time": str(row[1]), + "schema_version": int(row[2]), + "next_catalog_id": int(row[3]), + "next_file_id": int(row[4]), + "changes_made": None, + "author": None, + "commit_message": None, + "commit_extra_info": None, + } + + +def snapshot_by_id(url: str, snapshot_id: int) -> dict[str, Any]: + with _get_engine(url).connect() as conn: + row = conn.execute( + text( + """ + select snapshot_id, snapshot_time, schema_version, next_catalog_id, next_file_id + from ducklake_snapshot + where snapshot_id = :sid + """ + ), + {"sid": int(snapshot_id)}, + ).first() + if row is None: + msg = f"Snapshot not found: {snapshot_id}" + raise ValueError(msg) + return { + "snapshot_id": int(row[0]), + "snapshot_time": str(row[1]), + "schema_version": int(row[2]), + "next_catalog_id": int(row[3]), + "next_file_id": int(row[4]), + "changes_made": None, + "author": None, + "commit_message": None, + "commit_extra_info": None, + } + + +def load_table(url: str, table_name: str, schema_name: str | None) -> dict[str, Any]: + schema_name = schema_name or "main" + with _get_engine(url).connect() as conn: + srow = conn.execute( + text( + """ + select schema_id, schema_uuid, begin_snapshot, end_snapshot, schema_name, path, path_is_relative + from ducklake_schema + where schema_name = :sname and end_snapshot is null + limit 1 + """ + ), + {"sname": schema_name}, + ).first() + if srow is None: + msg = f"Schema not found: {schema_name}" + raise ValueError(msg) + schema_id = int(srow[0]) + schema_info = { + "schema_id": schema_id, + "schema_uuid": str(srow[1]), + "begin_snapshot": int(srow[2]) if srow[2] is not None else None, + "end_snapshot": int(srow[3]) if srow[3] is not None else None, + "schema_name": str(srow[4]), + "path": str(srow[5]), + "path_is_relative": bool(srow[6]), + } + + trow = conn.execute( + text( + """ + select table_id, table_uuid, begin_snapshot, end_snapshot, schema_id, table_name, path, path_is_relative + from ducklake_table + where table_name = :tname and schema_id = :sid and end_snapshot is null + limit 1 + """ + ), + {"tname": table_name, "sid": schema_id}, + ).first() + if trow is None: + msg = f"Table not found: {schema_name}.{table_name}" + raise ValueError(msg) + table_id = int(trow[0]) + table_info = { + "table_id": table_id, + "table_uuid": str(trow[1]), + "begin_snapshot": int(trow[2]) if trow[2] is not None else None, + "end_snapshot": int(trow[3]) if trow[3] is not None else None, + "schema_id": schema_id, + "table_name": str(trow[5]), + "path": str(trow[6]), + "path_is_relative": bool(trow[7]), + "columns": [], + "inlined_data_tables": [], + } + + cols = conn.execute( + text( + """ + select column_id, begin_snapshot, end_snapshot, table_id, column_order, column_name, + column_type, initial_default, default_value, nulls_allowed, parent_column + from ducklake_column + where table_id = :tid and end_snapshot is null + order by column_order asc + """ + ), + {"tid": table_id}, + ).all() + columns: list[dict[str, Any]] = [ + { + "column_id": int(row[0]), + "begin_snapshot": int(row[1]) if row[1] is not None else None, + "end_snapshot": int(row[2]) if row[2] is not None else None, + "table_id": int(row[3]), + "column_order": int(row[4]), + "column_name": str(row[5]), + "column_type": str(row[6]), + "initial_default": str(row[7]) if row[7] is not None else None, + "default_value": str(row[8]) if row[8] is not None else None, + "nulls_allowed": bool(row[9]), + "parent_column": int(row[10]) if row[10] is not None else None, + } + for row in cols + ] + + # Load partition fields (if any) + pf_rows = conn.execute( + text( + """ + SELECT pc.partition_key_index, pc.column_id, pc.transform + FROM ducklake_partition_column pc + JOIN ducklake_partition_info pi USING (partition_id, table_id) + WHERE pc.table_id = :tid AND (pi.end_snapshot IS NULL) + ORDER BY pc.partition_key_index + """ + ), + {"tid": table_id}, + ).all() + partition_fields = [ + { + "partition_key_index": int(r[0]), + "column_id": int(r[1]), + "transform": str(r[2]) if r[2] is not None else "identity", + } + for r in pf_rows + ] + + return { + "schema_info": schema_info, + "table_info": table_info, + "columns": columns, + "partition_fields": partition_fields, + } + + +def list_data_files( + url: str, + table_id: int, + snapshot_id: int | None, + partition_filters: list[tuple[int, list[str]]] | None = None, +) -> list[dict[str, Any]]: + # TODO: Add optional stats-based filter pushdown using ducklake_file_column_stats. + # TODO: Add iterator-based API for lazy or paginated data file loading. + data_file_table = table( + "ducklake_data_file", + column("data_file_id"), + column("table_id"), + column("begin_snapshot"), + column("end_snapshot"), + column("file_order"), + column("path"), + column("path_is_relative"), + column("file_format"), + column("record_count"), + column("file_size_bytes"), + column("footer_size"), + column("row_id_start"), + column("partition_id"), + column("encryption_key"), + column("partial_file_info"), + column("mapping_id"), + ) + + partition_value_table = table( + "ducklake_file_partition_value", + column("data_file_id"), + column("partition_key_index"), + column("partition_value"), + ) + + where_clauses_active: list[Any] = [ + data_file_table.c.table_id == bindparam("tid"), + data_file_table.c.end_snapshot.is_(None), + ] + where_clauses_asof: list[Any] = [ + data_file_table.c.table_id == bindparam("tid"), + data_file_table.c.begin_snapshot <= bindparam("sid"), + or_( + data_file_table.c.end_snapshot.is_(None), + data_file_table.c.end_snapshot > bindparam("sid"), + ), + ] + params_active: dict[str, Any] = {"tid": int(table_id)} + params_asof: dict[str, Any] = {"tid": int(table_id)} + if snapshot_id is not None: + params_asof["sid"] = int(snapshot_id) + + if partition_filters: + for idx, (partition_key_index, values) in enumerate(partition_filters): + if not values: + continue + key_param = f"pf_{idx}_key" + values_param = f"pf_{idx}_values" + partition_exists = ( + select(1) + .select_from(partition_value_table) + .where( + and_( + partition_value_table.c.data_file_id == data_file_table.c.data_file_id, + partition_value_table.c.partition_key_index == bindparam(key_param), + partition_value_table.c.partition_value.in_(bindparam(values_param, expanding=True)), + ) + ) + .exists() + ) + where_clauses_active.append(partition_exists) + where_clauses_asof.append(partition_exists) + params_active[key_param] = int(partition_key_index) + params_asof[key_param] = int(partition_key_index) + params_active[values_param] = list(values) + params_asof[values_param] = list(values) + stmt_active = ( + select( + data_file_table.c.data_file_id, + data_file_table.c.table_id, + data_file_table.c.begin_snapshot, + data_file_table.c.end_snapshot, + data_file_table.c.file_order, + data_file_table.c.path, + data_file_table.c.path_is_relative, + data_file_table.c.file_format, + data_file_table.c.record_count, + data_file_table.c.file_size_bytes, + data_file_table.c.footer_size, + data_file_table.c.row_id_start, + data_file_table.c.partition_id, + data_file_table.c.encryption_key, + data_file_table.c.partial_file_info, + data_file_table.c.mapping_id, + ) + .where(and_(*where_clauses_active)) + .order_by(data_file_table.c.file_order.asc()) + ) + stmt_asof = ( + select( + data_file_table.c.data_file_id, + data_file_table.c.table_id, + data_file_table.c.begin_snapshot, + data_file_table.c.end_snapshot, + data_file_table.c.file_order, + data_file_table.c.path, + data_file_table.c.path_is_relative, + data_file_table.c.file_format, + data_file_table.c.record_count, + data_file_table.c.file_size_bytes, + data_file_table.c.footer_size, + data_file_table.c.row_id_start, + data_file_table.c.partition_id, + data_file_table.c.encryption_key, + data_file_table.c.partial_file_info, + data_file_table.c.mapping_id, + ) + .where(and_(*where_clauses_asof)) + .order_by(data_file_table.c.file_order.asc()) + ) + with _get_engine(url).connect() as conn: + if snapshot_id is None: + rows = conn.execute(stmt_active, params_active).all() + else: + rows = conn.execute(stmt_asof, params_asof).all() + out: list[dict[str, Any]] = [ + { + "data_file_id": int(row[0]), + "table_id": int(row[1]), + "begin_snapshot": int(row[2]) if row[2] is not None else None, + "end_snapshot": int(row[3]) if row[3] is not None else None, + "file_order": int(row[4]) if row[4] is not None else 0, + "path": str(row[5]), + "path_is_relative": bool(row[6]), + "file_format": str(row[7]) if row[7] is not None else None, + "record_count": int(row[8]), + "file_size_bytes": int(row[9]), + "footer_size": int(row[10]) if row[10] is not None else None, + "row_id_start": int(row[11]) if row[11] is not None else None, + "partition_id": int(row[12]) if row[12] is not None else None, + "encryption_key": str(row[13]) if row[13] is not None else "", + "partial_file_info": str(row[14]) if row[14] is not None else None, + "mapping_id": int(row[15]) if row[15] is not None else 0, + "column_stats": [], + "partition_values": [], + } + for row in rows + ] + if not out: + return out + file_ids = [int(item["data_file_id"]) for item in out] + stats_sql = text( + """ + select data_file_id, column_id, column_size_bytes, value_count, null_count, + min_value, max_value, contains_nan, extra_stats + from ducklake_file_column_stats + where data_file_id in :file_ids + """ + ).bindparams(bindparam("file_ids", expanding=True)) + pv_sql = text( + """ + select data_file_id, partition_key_index, partition_value + from ducklake_file_partition_value + where data_file_id in :file_ids + """ + ).bindparams(bindparam("file_ids", expanding=True)) + params = {"file_ids": file_ids} + stats_rows = conn.execute(stats_sql, params).all() + pv_rows = conn.execute(pv_sql, params).all() + + stats_map: dict[int, list[dict[str, Any]]] = {} + for r in stats_rows: + fid = int(r[0]) + lst = stats_map.setdefault(fid, []) + lst.append( + { + "column_id": int(r[1]), + "column_size_bytes": int(r[2]) if r[2] is not None else None, + "value_count": int(r[3]) if r[3] is not None else None, + "null_count": int(r[4]) if r[4] is not None else None, + "min_value": str(r[5]) if r[5] is not None else None, + "max_value": str(r[6]) if r[6] is not None else None, + "contains_nan": bool(r[7]) if r[7] is not None else None, + "extra_stats": str(r[8]) if r[8] is not None else None, + } + ) + + pv_map: dict[int, list[dict[str, Any]]] = {} + for r in pv_rows: + fid = int(r[0]) + lst = pv_map.setdefault(fid, []) + lst.append( + { + "partition_key_index": int(r[1]), + "partition_value": str(r[2]), + } + ) + + for item in out: + fid = item["data_file_id"] + item["column_stats"] = stats_map.get(fid, []) + item["partition_values"] = pv_map.get(fid, []) + return out diff --git a/crates/sail-duck-lake/src/spec/mod.rs b/crates/sail-duck-lake/src/spec/mod.rs new file mode 100644 index 000000000..8cdff999d --- /dev/null +++ b/crates/sail-duck-lake/src/spec/mod.rs @@ -0,0 +1,252 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +pub mod transform; + +pub use transform::{ + parse_year, parse_year_month, parse_year_month_day, parse_year_month_day_hour, Transform, +}; + +/// Schema identifier in DuckLake metadata. +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct SchemaIndex(pub u64); + +/// Table identifier in DuckLake metadata. +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct TableIndex(pub u64); + +/// Column/field identifier in DuckLake metadata. +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct FieldIndex(pub u64); + +/// Data file identifier in DuckLake metadata. +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct DataFileIndex(pub u64); + +/// Delete file identifier in DuckLake metadata. +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct DeleteFileIndex(pub u64); + +/// Column mapping identifier in DuckLake metadata. +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct MappingIndex(pub u64); + +/// Partition identifier in DuckLake metadata. +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct PartitionId(pub u64); + +/// Compaction operation type for optimizing DuckLake tables. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +pub enum CompactionType { + MergeAdjacentTables, + RewriteDeletes, +} + +/// Cleanup operation type for removing obsolete files. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +pub enum CleanupType { + OldFiles, + OrphanedFiles, +} + +/// Encryption mode for DuckLake data files. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +pub enum DuckLakeEncryption { + Automatic, + Encrypted, + Unencrypted, +} + +/// Snapshot metadata from `ducklake_snapshot` and `ducklake_snapshot_changes` tables. +/// Represents a point-in-time state of the DuckLake catalog. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct SnapshotInfo { + pub snapshot_id: u64, + pub snapshot_time: DateTime, + pub schema_version: u64, + pub next_catalog_id: u64, + pub next_file_id: u64, + pub changes_made: Option, + pub author: Option, + pub commit_message: Option, + pub commit_extra_info: Option, +} + +/// Schema metadata from `ducklake_schema` table. +/// Represents a namespace containing tables in DuckLake. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct SchemaInfo { + pub schema_id: SchemaIndex, + pub schema_uuid: Uuid, + pub begin_snapshot: Option, + pub end_snapshot: Option, + pub schema_name: String, + pub path: String, + pub path_is_relative: bool, +} + +/// Column metadata from `ducklake_column` table. +/// Supports nested columns via `parent_column` field for complex types. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct ColumnInfo { + pub column_id: FieldIndex, + pub begin_snapshot: Option, + pub end_snapshot: Option, + pub table_id: TableIndex, + pub column_order: u64, + pub column_name: String, + pub column_type: String, + pub initial_default: Option, + pub default_value: Option, + pub nulls_allowed: bool, + pub parent_column: Option, +} + +/// Table metadata from `ducklake_table` table. +/// Aggregates columns and inlined data tables for the complete table definition. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct TableInfo { + pub table_id: TableIndex, + pub table_uuid: Uuid, + pub begin_snapshot: Option, + pub end_snapshot: Option, + pub schema_id: SchemaIndex, + pub table_name: String, + pub path: String, + pub path_is_relative: bool, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub columns: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub inlined_data_tables: Vec, +} + +/// Inlined data table metadata from `ducklake_inlined_data_tables` table. +/// References small tables stored inline within the metadata database. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct InlinedTableInfo { + pub table_name: String, + pub schema_version: u64, +} + +/// Data file metadata from `ducklake_data_file` table. +/// Describes a Parquet or other format file containing table data. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct FileInfo { + pub data_file_id: DataFileIndex, + pub table_id: TableIndex, + pub begin_snapshot: Option, + pub end_snapshot: Option, + pub file_order: u64, + pub path: String, + pub path_is_relative: bool, + pub file_format: Option, + pub record_count: u64, + pub file_size_bytes: u64, + pub footer_size: Option, + pub row_id_start: Option, + pub partition_id: Option, + pub encryption_key: String, + pub partial_file_info: Option, + pub mapping_id: MappingIndex, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub column_stats: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub partition_values: Vec, +} + +/// Column statistics from `ducklake_file_column_stats` table. +/// Provides per-column statistics for query optimization and pruning. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct ColumnStatsInfo { + pub column_id: FieldIndex, + pub column_size_bytes: Option, + pub value_count: Option, + pub null_count: Option, + pub min_value: Option, + pub max_value: Option, + pub contains_nan: Option, + pub extra_stats: Option, +} + +/// Partition value from `ducklake_file_partition_value` table. +/// Associates a data file with specific partition key values. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct FilePartitionInfo { + pub partition_key_index: u64, + pub partition_value: String, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct PartitionFilter { + pub partition_key_index: u64, + pub values: Vec, +} + +/// Partition field definition for a table (identity or transformed partition) +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct PartitionFieldInfo { + pub partition_key_index: u64, + pub column_id: FieldIndex, + pub transform: String, +} + +/// Delete file metadata from `ducklake_delete_file` table. +/// Tracks files containing row-level deletes for a data file. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct DeleteFileInfo { + pub delete_file_id: DeleteFileIndex, + pub table_id: TableIndex, + pub begin_snapshot: Option, + pub end_snapshot: Option, + pub data_file_id: DataFileIndex, + pub path: String, + pub path_is_relative: bool, + pub format: Option, + pub delete_count: u64, + pub file_size_bytes: u64, + pub footer_size: u64, + pub encryption_key: String, +} + +/// Column mapping metadata from `ducklake_column_mapping` table. +/// Defines how source file columns map to table schema for schema evolution. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct NameMapping { + pub mapping_id: MappingIndex, + pub table_id: TableIndex, + pub map_type: String, +} + +/// Individual name mapping entry from `ducklake_name_mapping` table. +/// Maps a source column name to a target field ID in the table schema. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct NameMappingEntry { + pub mapping_id: MappingIndex, + pub column_id: FieldIndex, + pub source_name: String, + pub target_field_id: FieldIndex, + pub parent_column: Option, + pub is_partition: bool, +} + +/// Key-value tag for metadata annotation. +/// Used in `ducklake_tag` and `ducklake_column_tag` tables. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct DuckLakeTag { + pub key: String, + pub value: String, +} + +/// Configuration option from `ducklake_metadata` table. +/// Stores catalog-level, schema-level, or table-level configuration. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct DuckLakeConfigOption { + pub key: String, + pub value: String, + pub scope: Option, + pub scope_id: Option, +} diff --git a/crates/sail-duck-lake/src/spec/transform.rs b/crates/sail-duck-lake/src/spec/transform.rs new file mode 100644 index 000000000..2a9d654dc --- /dev/null +++ b/crates/sail-duck-lake/src/spec/transform.rs @@ -0,0 +1,251 @@ +use std::str::FromStr; +use std::sync::Arc; + +use chrono::{Duration, NaiveDate, NaiveDateTime}; +use datafusion::arrow::datatypes::{DataType, TimeUnit}; +use datafusion::common::scalar::ScalarValue; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Transform { + Identity, + Year, + Month, + Day, + Hour, + Unknown, +} + +impl FromStr for Transform { + type Err = (); + + fn from_str(value: &str) -> Result { + Ok(match value.trim().to_ascii_lowercase().as_str() { + "identity" => Self::Identity, + "year" => Self::Year, + "month" => Self::Month, + "day" => Self::Day, + "hour" => Self::Hour, + _ => Self::Unknown, + }) + } +} + +impl Transform { + pub fn get_range( + &self, + part_value: &str, + target_type: &DataType, + ) -> Option<(ScalarValue, ScalarValue)> { + match self { + Self::Identity => { + let scalar = + ScalarValue::try_from_string(part_value.to_string(), target_type).ok()?; + Some((scalar.clone(), scalar)) + } + Self::Year => { + let year = parse_year(part_value)?; + let kind = classify_temporal_type(target_type)?; + let start = year_start(year)?; + let end = year_end_exclusive(year)?; + build_scalar_range(&kind, start, end) + } + Self::Month => { + let (year, month) = parse_year_month(part_value)?; + let kind = classify_temporal_type(target_type)?; + let start = month_start(year, month)?; + let end = month_end_exclusive(year, month)?; + build_scalar_range(&kind, start, end) + } + Self::Day => { + let date = parse_year_month_day(part_value)?; + let kind = classify_temporal_type(target_type)?; + let start = date.and_hms_opt(0, 0, 0)?; + let end = start.checked_add_signed(Duration::days(1))?; + build_scalar_range(&kind, start, end) + } + Self::Hour => { + let dt = parse_year_month_day_hour(part_value)?; + let kind = classify_temporal_type(target_type)?; + let end = dt.checked_add_signed(Duration::hours(1))?; + build_scalar_range(&kind, dt, end) + } + Self::Unknown => None, + } + } +} + +pub fn parse_year(value: &str) -> Option { + value.trim().parse::().ok() +} + +pub fn parse_year_month(value: &str) -> Option<(i32, u32)> { + let trimmed = value.trim(); + let normalized = trimmed.replace(['_', '/', '.'], "-"); + if let Some((year, month)) = normalized.split_once('-') { + let year_val = year.parse::().ok()?; + let month_val = month.parse::().ok()?; + if (1..=12).contains(&month_val) { + return Some((year_val, month_val)); + } + } + if normalized.len() == 6 { + let (year_part, month_part) = normalized.split_at(4); + let year_val = year_part.parse::().ok()?; + let month_val = month_part.parse::().ok()?; + if (1..=12).contains(&month_val) { + return Some((year_val, month_val)); + } + } + None +} + +pub fn parse_year_month_day(value: &str) -> Option { + let trimmed = value.trim(); + if let Ok(date) = NaiveDate::parse_from_str(trimmed, "%Y-%m-%d") { + return Some(date); + } + if let Ok(date) = NaiveDate::parse_from_str(trimmed, "%Y%m%d") { + return Some(date); + } + let parts: Vec<&str> = trimmed.split('-').collect(); + if parts.len() == 3 { + let year = parts[0].parse::().ok()?; + let month = parts[1].parse::().ok()?; + let day = parts[2].parse::().ok()?; + return NaiveDate::from_ymd_opt(year, month, day); + } + None +} + +pub fn parse_year_month_day_hour(value: &str) -> Option { + let trimmed = value.trim(); + for fmt in ["%Y-%m-%d-%H", "%Y-%m-%d %H", "%Y-%m-%dT%H", "%Y%m%d%H"] { + if let Ok(dt) = NaiveDateTime::parse_from_str(trimmed, fmt) { + return Some(dt); + } + } + let parts: Vec<&str> = trimmed.split('-').collect(); + if parts.len() == 4 { + let year = parts[0].parse::().ok()?; + let month = parts[1].parse::().ok()?; + let day = parts[2].parse::().ok()?; + let hour = parts[3].parse::().ok()?; + let date = NaiveDate::from_ymd_opt(year, month, day)?; + return date.and_hms_opt(hour, 0, 0); + } + None +} + +fn year_start(year: i32) -> Option { + NaiveDate::from_ymd_opt(year, 1, 1)?.and_hms_opt(0, 0, 0) +} + +fn year_end_exclusive(year: i32) -> Option { + NaiveDate::from_ymd_opt(year + 1, 1, 1)?.and_hms_opt(0, 0, 0) +} + +fn month_start(year: i32, month: u32) -> Option { + NaiveDate::from_ymd_opt(year, month, 1)?.and_hms_opt(0, 0, 0) +} + +fn month_end_exclusive(year: i32, month: u32) -> Option { + let (next_year, next_month) = if month == 12 { + (year + 1, 1) + } else { + (year, month + 1) + }; + NaiveDate::from_ymd_opt(next_year, next_month, 1)?.and_hms_opt(0, 0, 0) +} + +enum TemporalType { + Timestamp(TimeUnit, Option>), + Date32, + Date64, +} + +fn classify_temporal_type(data_type: &DataType) -> Option { + match data_type { + DataType::Timestamp(unit, tz) => Some(TemporalType::Timestamp(*unit, tz.clone())), + DataType::Date32 => Some(TemporalType::Date32), + DataType::Date64 => Some(TemporalType::Date64), + _ => None, + } +} + +fn to_timestamp_value(dt: NaiveDateTime, unit: TimeUnit) -> Option { + match unit { + TimeUnit::Second => Some(dt.and_utc().timestamp()), + TimeUnit::Millisecond => Some(dt.and_utc().timestamp_millis()), + TimeUnit::Microsecond => Some(dt.and_utc().timestamp_micros()), + TimeUnit::Nanosecond => dt.and_utc().timestamp_nanos_opt(), + } +} + +fn days_since_epoch(date: NaiveDate) -> Option { + let epoch = NaiveDate::from_ymd_opt(1970, 1, 1)?; + let duration = date.signed_duration_since(epoch); + i32::try_from(duration.num_days()).ok() +} + +fn build_scalar_range( + kind: &TemporalType, + start: NaiveDateTime, + end_exclusive: NaiveDateTime, +) -> Option<(ScalarValue, ScalarValue)> { + match kind { + TemporalType::Timestamp(unit, tz) => { + let start_val = to_timestamp_value(start, *unit)?; + let end_exclusive_val = to_timestamp_value(end_exclusive, *unit)?; + if end_exclusive_val <= start_val { + return None; + } + let end_val = end_exclusive_val.checked_sub(1)?; + let min = match unit { + TimeUnit::Second => ScalarValue::TimestampSecond(Some(start_val), tz.clone()), + TimeUnit::Millisecond => { + ScalarValue::TimestampMillisecond(Some(start_val), tz.clone()) + } + TimeUnit::Microsecond => { + ScalarValue::TimestampMicrosecond(Some(start_val), tz.clone()) + } + TimeUnit::Nanosecond => { + ScalarValue::TimestampNanosecond(Some(start_val), tz.clone()) + } + }; + let max = match unit { + TimeUnit::Second => ScalarValue::TimestampSecond(Some(end_val), tz.clone()), + TimeUnit::Millisecond => { + ScalarValue::TimestampMillisecond(Some(end_val), tz.clone()) + } + TimeUnit::Microsecond => { + ScalarValue::TimestampMicrosecond(Some(end_val), tz.clone()) + } + TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(end_val), tz.clone()), + }; + Some((min, max)) + } + TemporalType::Date32 => { + let start_days = days_since_epoch(start.date())?; + let end_days_exclusive = days_since_epoch(end_exclusive.date())?; + if end_days_exclusive <= start_days { + return None; + } + let max_days = end_days_exclusive - 1; + Some(( + ScalarValue::Date32(Some(start_days)), + ScalarValue::Date32(Some(max_days)), + )) + } + TemporalType::Date64 => { + let start_ms = start.and_utc().timestamp_millis(); + let end_ms_exclusive = end_exclusive.and_utc().timestamp_millis(); + if end_ms_exclusive <= start_ms { + return None; + } + Some(( + ScalarValue::Date64(Some(start_ms)), + ScalarValue::Date64(Some(end_ms_exclusive - 1)), + )) + } + } +} diff --git a/pyproject.toml b/pyproject.toml index 9ef1a0f01..c5c78c2a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,6 +84,8 @@ dependencies = [ "pillow>=10.3.0", "pyiceberg[sql-sqlite,pyiceberg-core]==0.10.0", "pydantic>=2.11,<2.12", + "SQLAlchemy>=2,<3", + "psycopg[binary]>=3,<4", ] path = ".venvs/default" @@ -126,6 +128,8 @@ dependencies = [ "pillow>=10.3.0", "pyiceberg[sql-sqlite,pyiceberg-core]==0.10.0", "pydantic>=2.11,<2.12", + "SQLAlchemy>=2,<3", + "psycopg[binary]>=3,<4", ] [[tool.hatch.envs.test.matrix]] @@ -152,6 +156,7 @@ dependencies = [ "pytest-xdist>=3.7,<4", "pytest-timeout>=2.4,<3", "pytest-reportlog>=0.4,<0.5", + "SQLAlchemy>=2,<3", ] [[tool.hatch.envs.test-spark.matrix]] @@ -198,6 +203,7 @@ dependencies = [ "tomli>=2.0.1,<3", # The following additional dependencies are needed in our test setup. "pytest-reportlog>=0.4,<0.5", + "SQLAlchemy>=2,<3", ] path = ".venvs/test-ibis" diff --git a/python/pysail/tests/spark/ducklake/__init__.py b/python/pysail/tests/spark/ducklake/__init__.py new file mode 100644 index 000000000..a29508b3b --- /dev/null +++ b/python/pysail/tests/spark/ducklake/__init__.py @@ -0,0 +1 @@ +"""DuckLake integration tests for Sail.""" diff --git a/python/pysail/tests/spark/ducklake/test_ducklake_pruning.py b/python/pysail/tests/spark/ducklake/test_ducklake_pruning.py new file mode 100644 index 000000000..11fc03477 --- /dev/null +++ b/python/pysail/tests/spark/ducklake/test_ducklake_pruning.py @@ -0,0 +1,316 @@ +import duckdb +import pandas as pd +import pyarrow as pa +import pytest + + +@pytest.fixture(scope="module") +def duckdb_conn(): + conn = duckdb.connect(":memory:") + try: + conn.execute("INSTALL ducklake") + conn.execute("LOAD ducklake") + except (RuntimeError, duckdb.Error) as e: + pytest.skip(f"DuckLake extension not available: {e}") + return conn + + +def _attach(conn: duckdb.DuckDBPyConnection, meta_path: str, data_path: str, schema_name: str = "dl"): + conn.execute(f"ATTACH 'ducklake:sqlite:{meta_path}' AS {schema_name} (DATA_PATH '{data_path}/')") + + +def _detach(conn: duckdb.DuckDBPyConnection, schema_name: str = "dl"): + conn.execute(f"DETACH {schema_name}") + + +def _append_batches(conn: duckdb.DuckDBPyConnection, table_ident: str, batches: list[pd.DataFrame]): + for df in batches: + tbl = pa.Table.from_pandas(df) + conn.register("tmp_append", tbl) + conn.execute(f"INSERT INTO {table_ident} SELECT * FROM tmp_append") # noqa: S608 + conn.unregister("tmp_append") + + +def _mk_table_eq_in(conn: duckdb.DuckDBPyConnection, meta_path, data_path, ident="dl.prune_eq_in"): + _attach(conn, meta_path, data_path) + try: + conn.execute( + f""" + CREATE TABLE {ident}( + id BIGINT, + year INTEGER, + month INTEGER, + value VARCHAR + ) + """ + ) + batches = [ + pd.DataFrame({"id": [1, 2], "year": [2023, 2023], "month": [1, 1], "value": ["a", "b"]}).astype( + {"id": "int64", "year": "int32", "month": "int32"} + ), + pd.DataFrame({"id": [3, 4], "year": [2023, 2023], "month": [2, 2], "value": ["c", "d"]}).astype( + {"id": "int64", "year": "int32", "month": "int32"} + ), + pd.DataFrame({"id": [5, 6], "year": [2024, 2024], "month": [1, 1], "value": ["e", "f"]}).astype( + {"id": "int64", "year": "int32", "month": "int32"} + ), + pd.DataFrame({"id": [7, 8], "year": [2024, 2024], "month": [2, 2], "value": ["g", "h"]}).astype( + {"id": "int64", "year": "int32", "month": "int32"} + ), + ] + _append_batches(conn, ident, batches) + finally: + _detach(conn) + return ident.split(".")[-1] + + +def _mk_table_cmp(conn: duckdb.DuckDBPyConnection, meta_path, data_path, ident="dl.prune_cmp"): + _attach(conn, meta_path, data_path) + try: + conn.execute( + f""" + CREATE TABLE {ident}( + id BIGINT, + year INTEGER, + month INTEGER + ) + """ + ) + data = [] + for year in [2021, 2022, 2023, 2024]: + for month in [1, 6, 12]: + data.append({"id": len(data) + 1, "year": year, "month": month}) + for i in range(0, len(data), 6): + df = pd.DataFrame(data[i : i + 6]).astype({"id": "int64", "year": "int32", "month": "int32"}) + _append_batches(conn, ident, [df]) + finally: + _detach(conn) + return ident.split(".")[-1] + + +def _mk_table_null_bool(conn: duckdb.DuckDBPyConnection, meta_path, data_path, ident="dl.prune_null_bool"): + _attach(conn, meta_path, data_path) + try: + conn.execute( + f""" + CREATE TABLE {ident}( + id BIGINT, + region VARCHAR, + active BOOLEAN + ) + """ + ) + _append_batches( + conn, + ident, + [ + pd.DataFrame( + [{"id": 1, "region": None, "active": True}, {"id": 2, "region": None, "active": True}] + ).astype({"id": "int64"}), + pd.DataFrame( + [{"id": 3, "region": "US", "active": False}, {"id": 4, "region": "EU", "active": False}] + ).astype({"id": "int64"}), + ], + ) + finally: + _detach(conn) + return ident.split(".")[-1] + + +def _mk_table_string(conn: duckdb.DuckDBPyConnection, meta_path, data_path, ident="dl.prune_string"): + _attach(conn, meta_path, data_path) + try: + conn.execute( + f""" + CREATE TABLE {ident}( + id BIGINT, + dept VARCHAR, + team VARCHAR + ) + """ + ) + _append_batches( + conn, + ident, + [ + pd.DataFrame( + [ + {"id": 1, "dept": "engineering", "team": "backend"}, + {"id": 2, "dept": "engineering", "team": "frontend"}, + {"id": 3, "dept": "marketing", "team": "growth"}, + {"id": 4, "dept": "sales", "team": "enterprise"}, + ] + ).astype({"id": "int64"}) + ], + ) + finally: + _detach(conn) + return ident.split(".")[-1] + + +def _mk_table_limit(conn: duckdb.DuckDBPyConnection, meta_path, data_path, ident="dl.prune_limit"): + _attach(conn, meta_path, data_path) + try: + conn.execute( + f""" + CREATE TABLE {ident}( + id BIGINT, + flag BOOLEAN + ) + """ + ) + rows = [{"id": i, "flag": i % 2 == 0} for i in range(100)] + for i in range(0, len(rows), 20): + df = pd.DataFrame(rows[i : i + 20]).astype({"id": "int64"}) + _append_batches(conn, ident, [df]) + finally: + _detach(conn) + return ident.split(".")[-1] + + +def _add_identity_partition( + conn: duckdb.DuckDBPyConnection, meta_path: str, data_path: str, table_name: str, src_col: str +): + _attach(conn, meta_path, data_path) + try: + meta_schema = "__ducklake_metadata_dl" + table_id = conn.execute( + f"SELECT table_id FROM {meta_schema}.ducklake_table WHERE table_name = ? AND end_snapshot IS NULL", # noqa: S608 + [table_name], + ).fetchone()[0] + col_row = conn.execute( + f"SELECT column_id FROM {meta_schema}.ducklake_column WHERE table_id = ? AND column_name = ? AND end_snapshot IS NULL", # noqa: S608 + [table_id, src_col], + ).fetchone() + assert col_row is not None, "column not found in ducklake_column" + column_id = col_row[0] + + partition_id = int(table_id) * 1000 + 1 + + conn.execute( + f"INSERT INTO {meta_schema}.ducklake_partition_info(partition_id, table_id, begin_snapshot, end_snapshot) VALUES (?, ?, 0, NULL)", # noqa: S608 + [partition_id, table_id], + ) + conn.execute( + f"INSERT INTO {meta_schema}.ducklake_partition_column(partition_id, table_id, partition_key_index, column_id, transform) VALUES (?, ?, 0, ?, 'identity')", # noqa: S608 + [partition_id, table_id, column_id], + ) + + files = conn.execute( + f"SELECT data_file_id, file_order FROM {meta_schema}.ducklake_data_file WHERE table_id = ? AND end_snapshot IS NULL ORDER BY file_order", # noqa: S608 + [table_id], + ).fetchall() + n = len(files) + for i, (fid, _order) in enumerate(files): + val = "2023" if i < (n // 2) else "2024" + conn.execute( + f"INSERT INTO {meta_schema}.ducklake_file_partition_value(data_file_id, table_id, partition_key_index, partition_value) VALUES (?, ?, 0, ?)", # noqa: S608 + [fid, table_id, val], + ) + finally: + _detach(conn) + + +@pytest.fixture +def ducklake_paths(tmp_path): + meta = tmp_path / "meta.ducklake" + data = tmp_path / "data" + data.mkdir(exist_ok=True) + return str(meta), str(data) + + +def test_pruning_equality_filters(spark, duckdb_conn, ducklake_paths): + meta_path, data_path = ducklake_paths + table = _mk_table_eq_in(duckdb_conn, meta_path, data_path) + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{meta_path}", table=table, base_path=f"file://{data_path}/") + .load() + ) + assert df.filter("year = 2023").count() == 4 # noqa: PLR2004 + assert df.filter("year = 2023 AND month = 1").count() == 2 # noqa: PLR2004 + + +def test_pruning_in_clause(spark, duckdb_conn, ducklake_paths): + meta_path, data_path = ducklake_paths + table = _mk_table_eq_in(duckdb_conn, meta_path, data_path) + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{meta_path}", table=table, base_path=f"file://{data_path}/") + .load() + ) + assert df.filter("month IN (2)").count() == 4 # noqa: PLR2004 + + +def test_comparison_and_between(spark, duckdb_conn, ducklake_paths): + meta_path, data_path = ducklake_paths + table = _mk_table_cmp(duckdb_conn, meta_path, data_path) + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{meta_path}", table=table, base_path=f"file://{data_path}/") + .load() + ) + assert df.filter("year > 2022").count() == 6 # noqa: PLR2004 + assert df.filter("year BETWEEN 2022 AND 2023").count() == 6 # noqa: PLR2004 + assert df.filter("year >= 2023 AND month >= 6").count() == 4 # noqa: PLR2004 + + +def test_null_and_boolean(spark, duckdb_conn, ducklake_paths): + meta_path, data_path = ducklake_paths + table = _mk_table_null_bool(duckdb_conn, meta_path, data_path) + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{meta_path}", table=table, base_path=f"file://{data_path}/") + .load() + ) + assert df.filter("region IS NULL").count() == 2 # noqa: PLR2004 + assert df.filter("active = true").count() == 2 # noqa: PLR2004 + + +def test_string_in_and_range_pruning(spark, duckdb_conn, ducklake_paths): + meta_path, data_path = ducklake_paths + table = _mk_table_string(duckdb_conn, meta_path, data_path) + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{meta_path}", table=table, base_path=f"file://{data_path}/") + .load() + ) + assert df.filter("team IN ('backend','frontend')").count() == 2 # noqa: PLR2004 + assert df.filter("dept > 'engineering'").count() == 2 # noqa: PLR2004 + + +def test_limit_pushdown_behavior(spark, duckdb_conn, ducklake_paths): + meta_path, data_path = ducklake_paths + table = _mk_table_limit(duckdb_conn, meta_path, data_path) + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{meta_path}", table=table, base_path=f"file://{data_path}/") + .load() + ) + assert df.filter("flag = true").limit(7).count() == 7 # noqa: PLR2004 + + +def test_identity_partition_eq_in_pruning(spark, duckdb_conn, ducklake_paths): + meta_path, data_path = ducklake_paths + table = _mk_table_eq_in(duckdb_conn, meta_path, data_path) + _add_identity_partition(duckdb_conn, meta_path, data_path, table, "year") + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{meta_path}", table=table, base_path=f"file://{data_path}/") + .load() + ) + assert df.filter("year = 2023").count() == 4 # noqa: PLR2004 + assert df.filter("month IN (2)").count() == 4 # noqa: PLR2004 + + +def test_identity_partition_range_pruning(spark, duckdb_conn, ducklake_paths): + meta_path, data_path = ducklake_paths + table = _mk_table_eq_in(duckdb_conn, meta_path, data_path) + _add_identity_partition(duckdb_conn, meta_path, data_path, table, "year") + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{meta_path}", table=table, base_path=f"file://{data_path}/") + .load() + ) + assert df.filter("year >= 2024").count() == 4 # noqa: PLR2004 + assert df.filter("year BETWEEN 2023 AND 2024").count() == 8 # noqa: PLR2004 diff --git a/python/pysail/tests/spark/ducklake/test_ducklake_read.py b/python/pysail/tests/spark/ducklake/test_ducklake_read.py new file mode 100644 index 000000000..38f93d386 --- /dev/null +++ b/python/pysail/tests/spark/ducklake/test_ducklake_read.py @@ -0,0 +1,608 @@ +""" +DuckLake read tests for Sail. + +This module tests reading DuckLake tables through Sail's DataSource API. +DuckLake is an open Lakehouse format extension for DuckDB that stores +metadata in SQLite/PostgreSQL and data files in object storage. +""" + +import os +import platform + +import duckdb +import pandas as pd +import pytest +from pandas.testing import assert_frame_equal +from pyspark.errors.exceptions.connect import AnalysisException + +from pysail.tests.spark.utils import escape_sql_string_literal + + +@pytest.fixture(scope="module") +def duckdb_conn(): + """Create a DuckDB connection with DuckLake extension loaded.""" + conn = duckdb.connect(":memory:") + try: + conn.execute("INSTALL ducklake") + conn.execute("LOAD ducklake") + except (RuntimeError, duckdb.Error) as e: + pytest.skip(f"DuckLake extension not available: {e}") + return conn + + +@pytest.fixture +def ducklake_setup(tmp_path, duckdb_conn): + """ + Set up a DuckLake database with test data. + + Returns a tuple of (metadata_path, data_path, table_name). + """ + metadata_path = tmp_path / "metadata.ducklake" + data_path = tmp_path / "data" + data_path.mkdir(exist_ok=True) + + duckdb_conn.execute( + f""" + ATTACH 'ducklake:sqlite:{metadata_path}' AS my_ducklake + (DATA_PATH '{data_path}/') + """ + ) + + # Create a test table + table_name = "my_ducklake.test_table" + duckdb_conn.execute( + """ + CREATE TABLE my_ducklake.test_table ( + id INTEGER, + name VARCHAR, + score DOUBLE + ) + """ + ) + + duckdb_conn.execute( + """ + INSERT INTO my_ducklake.test_table + VALUES + (1, 'Alice', 95.5), + (2, 'Bob', 87.3), + (3, 'Charlie', 92.1) + """ + ) + + # IMPORTANT: Detach the database to release the SQLite lock + # This allows Sail's Diesel connection to access the database + duckdb_conn.execute("DETACH my_ducklake") + + return str(metadata_path), str(data_path), table_name + + +def test_ducklake_basic_read(spark, ducklake_setup): + """Test basic DuckLake table read.""" + metadata_path, data_path, _table_name = ducklake_setup + + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{metadata_path}", table="test_table", base_path=f"file://{data_path}/") + .load() + ) + + result = df.sort("id").collect() + + expected_count = 3 + score_tolerance = 0.01 + expected_id_alice = 1 + expected_id_bob = 2 + expected_id_charlie = 3 + + assert len(result) == expected_count + assert result[0].id == expected_id_alice + assert result[0].name == "Alice" + assert abs(result[0].score - 95.5) < score_tolerance + assert result[1].id == expected_id_bob + assert result[1].name == "Bob" + assert abs(result[1].score - 87.3) < score_tolerance + assert result[2].id == expected_id_charlie + assert result[2].name == "Charlie" + assert abs(result[2].score - 92.1) < score_tolerance + + +def test_ducklake_read_with_filter(spark, ducklake_setup): + """Test DuckLake read with filter pushdown.""" + metadata_path, data_path, _table_name = ducklake_setup + + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{metadata_path}", table="test_table", base_path=f"file://{data_path}/") + .load() + ) + + filtered_df = df.filter("score > 90").sort("id") + result = filtered_df.collect() + + expected_filtered_count = 2 + assert len(result) == expected_filtered_count + assert result[0].name == "Alice" + assert result[1].name == "Charlie" + + +def test_ducklake_read_with_projection(spark, ducklake_setup): + """Test DuckLake read with column projection.""" + metadata_path, data_path, _table_name = ducklake_setup + + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{metadata_path}", table="test_table", base_path=f"file://{data_path}/") + .load() + ) + + projected_df = df.select("id", "name").sort("id") + result = projected_df.collect() + + expected_count = 3 + assert len(result) == expected_count + assert hasattr(result[0], "id") + assert hasattr(result[0], "name") + assert not hasattr(result[0], "score") + + +def test_ducklake_read_with_limit(spark, ducklake_setup): + """Test DuckLake read with limit.""" + metadata_path, data_path, _table_name = ducklake_setup + + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{metadata_path}", table="test_table", base_path=f"file://{data_path}/") + .load() + ) + + limit_count = 2 + limited_df = df.limit(limit_count) + result = limited_df.collect() + + assert len(result) == limit_count + + +def test_ducklake_read_to_pandas(spark, ducklake_setup): + """Test DuckLake read and conversion to pandas.""" + metadata_path, data_path, _table_name = ducklake_setup + + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{metadata_path}", table="test_table", base_path=f"file://{data_path}/") + .load() + ) + + pandas_df = df.sort("id").toPandas() + + expected_df = pd.DataFrame( + {"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"], "score": [95.5, 87.3, 92.1]} + ).astype({"id": "int32", "name": "string", "score": "float64"}) + + assert_frame_equal(pandas_df.reset_index(drop=True), expected_df, check_dtype=False) + + +@pytest.mark.skipif(platform.system() == "Windows", reason="may not work on Windows") +def test_ducklake_read_with_sql(spark, ducklake_setup): + """Test DuckLake read using SQL CREATE TABLE.""" + metadata_path, data_path, _table_name = ducklake_setup + + spark.sql( + f""" + CREATE TABLE ducklake_test + USING ducklake + OPTIONS ( + url 'sqlite:///{metadata_path}', + `table` 'test_table', + base_path 'file://{data_path}/' + ) + """ + ) + + try: + result_df = spark.sql("SELECT * FROM ducklake_test").sort("id") + result = result_df.collect() + + expected_count = 3 + assert len(result) == expected_count + assert result[0].name == "Alice" + assert result[1].name == "Bob" + assert result[2].name == "Charlie" + finally: + spark.sql("DROP TABLE IF EXISTS ducklake_test") + + +def test_ducklake_read_with_sql_location(spark, ducklake_setup): + """Test DuckLake read using SQL LOCATION (location-first).""" + metadata_path, data_path, table_name = ducklake_setup + + tbl_only = table_name.split(".")[-1] + + loc = f"ducklake+sqlite:///{metadata_path}/{tbl_only}?base_path=file://{data_path}/" + + spark.sql( + f""" + CREATE TABLE ducklake_test_loc + USING ducklake + LOCATION '{escape_sql_string_literal(loc)}' + """ + ) + + try: + result_df = spark.sql("SELECT * FROM ducklake_test_loc").sort("id") + result = result_df.collect() + + expected_count = 3 + assert len(result) == expected_count + assert result[0].name == "Alice" + assert result[1].name == "Bob" + assert result[2].name == "Charlie" + finally: + spark.sql("DROP TABLE IF EXISTS ducklake_test_loc") + + +def test_ducklake_read_with_schema_prefix(spark, tmp_path, duckdb_conn): + """Test DuckLake read with schema.table format.""" + metadata_path = tmp_path / "metadata_schema.ducklake" + data_path = tmp_path / "data_schema" + data_path.mkdir(exist_ok=True) + + duckdb_conn.execute(f""" + ATTACH 'ducklake:sqlite:{metadata_path}' AS my_db + (DATA_PATH '{data_path}/') + """) + + duckdb_conn.execute("CREATE SCHEMA IF NOT EXISTS my_db.analytics") + duckdb_conn.execute(""" + CREATE TABLE my_db.analytics.metrics ( + metric_id INTEGER, + metric_name VARCHAR, + value DOUBLE + ) + """) + + duckdb_conn.execute(""" + INSERT INTO my_db.analytics.metrics + VALUES (1, 'cpu_usage', 75.5), (2, 'memory_usage', 82.3) + """) + + duckdb_conn.execute("DETACH my_db") + + df = ( + spark.read.format("ducklake") + .options( + url=f"sqlite:///{metadata_path}", + table="analytics.metrics", + base_path=f"file://{data_path}/", + ) + .load() + ) + + result = df.sort("metric_id").collect() + + expected_count = 2 + assert len(result) == expected_count + assert result[0].metric_name == "cpu_usage" + assert result[1].metric_name == "memory_usage" + + +def test_ducklake_read_empty_table(spark, tmp_path, duckdb_conn): + """Test reading an empty DuckLake table.""" + metadata_path = tmp_path / "metadata_empty.ducklake" + data_path = tmp_path / "data_empty" + data_path.mkdir(exist_ok=True) + + duckdb_conn.execute(f""" + ATTACH 'ducklake:sqlite:{metadata_path}' AS empty_db + (DATA_PATH '{data_path}/') + """) + + duckdb_conn.execute(""" + CREATE TABLE empty_db.empty_table ( + id INTEGER, + value VARCHAR + ) + """) + + duckdb_conn.execute("DETACH empty_db") + + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{metadata_path}", table="empty_table", base_path=f"file://{data_path}/") + .load() + ) + + result = df.collect() + assert len(result) == 0 + + +def test_ducklake_read_various_types(spark, tmp_path, duckdb_conn): + """Test DuckLake read with various data types.""" + + metadata_path = tmp_path / "metadata_types.ducklake" + data_path = tmp_path / "data_types" + data_path.mkdir(exist_ok=True) + + duckdb_conn.execute(f""" + ATTACH 'ducklake:sqlite:{metadata_path}' AS types_db + (DATA_PATH '{data_path}/') + """) + + duckdb_conn.execute(""" + CREATE TABLE types_db.type_test ( + int_col INTEGER, + bigint_col BIGINT, + double_col DOUBLE, + varchar_col VARCHAR, + bool_col BOOLEAN, + date_col DATE + ) + """) + + duckdb_conn.execute(""" + INSERT INTO types_db.type_test + VALUES ( + 42, + 9223372036854775807, + 3.14159, + 'test string', + true, + '2024-01-15' + ) + """) + + duckdb_conn.execute("DETACH types_db") + + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{metadata_path}", table="type_test", base_path=f"file://{data_path}/") + .load() + ) + + result = df.collect() + + expected_row_count = 1 + expected_int_val = 42 + expected_bigint_val = 9223372036854775807 + double_tolerance = 0.00001 + + assert len(result) == expected_row_count + row = result[0] + assert row.int_col == expected_int_val + assert row.bigint_col == expected_bigint_val + assert abs(row.double_col - 3.14159) < double_tolerance + assert row.varchar_col == "test string" + assert row.bool_col is True + + +def test_ducklake_read_missing_table_error(spark, tmp_path, duckdb_conn): + """Test error handling when reading non-existent table.""" + metadata_path = tmp_path / "metadata_missing.ducklake" + data_path = tmp_path / "data_missing" + data_path.mkdir(exist_ok=True) + + duckdb_conn.execute(f""" + ATTACH 'ducklake:sqlite:{metadata_path}' AS missing_db + (DATA_PATH '{data_path}/') + """) + + duckdb_conn.execute("DETACH missing_db") + + with pytest.raises(AnalysisException): + spark.read.format("ducklake").options( + url=f"sqlite:///{metadata_path}", table="nonexistent_table", base_path=f"file://{data_path}/" + ).load().collect() + + +def test_ducklake_read_missing_options_error(spark): + """Test error handling when required options are missing.""" + with pytest.raises(AnalysisException): + spark.read.format("ducklake").options(table="test_table", base_path="file:///tmp/data/").load().collect() + + with pytest.raises(AnalysisException): + spark.read.format("ducklake").options( + url="sqlite:///tmp/metadata.db", base_path="file:///tmp/data/" + ).load().collect() + + with pytest.raises(AnalysisException): + spark.read.format("ducklake").options(url="sqlite:///tmp/metadata.db", table="test_table").load().collect() + + +def test_ducklake_read_with_aggregation(spark, ducklake_setup): + """Test DuckLake read with aggregation.""" + metadata_path, data_path, _table_name = ducklake_setup + + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{metadata_path}", table="test_table", base_path=f"file://{data_path}/") + .load() + ) + + avg_score = df.agg({"score": "avg"}).collect()[0][0] + + expected_avg = (95.5 + 87.3 + 92.1) / 3 + avg_tolerance = 0.01 + assert abs(avg_score - expected_avg) < avg_tolerance + + +def test_ducklake_read_with_join(spark, ducklake_setup, duckdb_conn): + """Test DuckLake read with join operations.""" + metadata_path, data_path, _table_name = ducklake_setup + + duckdb_conn.execute(f""" + ATTACH 'ducklake:sqlite:{metadata_path}' AS my_ducklake + (DATA_PATH '{data_path}/') + """) + + duckdb_conn.execute(""" + CREATE TABLE my_ducklake.departments ( + id INTEGER, + dept_name VARCHAR + ) + """) + + duckdb_conn.execute(""" + INSERT INTO my_ducklake.departments + VALUES (1, 'Engineering'), (2, 'Sales'), (3, 'Marketing') + """) + + duckdb_conn.execute("DETACH my_ducklake") + + df1 = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{metadata_path}", table="test_table", base_path=f"file://{data_path}/") + .load() + ) + + df2 = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{metadata_path}", table="departments", base_path=f"file://{data_path}/") + .load() + ) + + joined_df = df1.join(df2, df1.id == df2.id, "inner").select(df1.name, df2.dept_name).sort(df1.name) + + result = joined_df.collect() + + expected_join_count = 3 + assert len(result) == expected_join_count + assert result[0].name == "Alice" + assert result[0].dept_name == "Engineering" + + +def test_ducklake_read_count(spark, ducklake_setup): + """Test DuckLake read with count operation.""" + metadata_path, data_path, _table_name = ducklake_setup + + df = ( + spark.read.format("ducklake") + .options(url=f"sqlite:///{metadata_path}", table="test_table", base_path=f"file://{data_path}/") + .load() + ) + + count = df.count() + expected_count = 3 + assert count == expected_count + + +@pytest.fixture(scope="module") +def postgres_url(): + url = os.environ.get("DUCKLAKE_PG_URL") + if not url: + pytest.skip("DUCKLAKE_PG_URL not set; skipping DuckLake Postgres tests") + return url + + +@pytest.fixture +def ducklake_pg_setup(tmp_path, duckdb_conn, postgres_url): + """ + Set up a DuckLake Postgres database with test data. + Returns a tuple of (postgres_url, data_path, table_name). + """ + data_path = tmp_path / "data_pg" + data_path.mkdir(exist_ok=True) + + # Attach DuckLake using Postgres metadata + pg_url_sql = postgres_url.replace("'", "''") + duckdb_conn.execute( + f""" + ATTACH 'ducklake:postgresql://{pg_url_sql[len("postgresql://") :]}' AS my_ducklake + (DATA_PATH '{data_path}/') + """ + ) + + try: + duckdb_conn.execute( + """ + CREATE TABLE my_ducklake.test_table ( + id INTEGER, + name VARCHAR, + score DOUBLE + ) + """ + ) + duckdb_conn.execute( + """ + INSERT INTO my_ducklake.test_table + VALUES + (1, 'Alice', 95.5), + (2, 'Bob', 87.3), + (3, 'Charlie', 92.1) + """ + ) + finally: + duckdb_conn.execute("DETACH my_ducklake") + + return postgres_url, str(data_path), "test_table" + + +def test_ducklake_pg_basic_read(spark, ducklake_pg_setup): + postgres_url, data_path, table = ducklake_pg_setup + df = spark.read.format("ducklake").options(url=postgres_url, table=table, base_path=f"file://{data_path}/").load() + result = df.sort("id").collect() + expected_count = 3 + assert len(result) == expected_count + assert result[0].name == "Alice" + assert result[1].name == "Bob" + assert result[2].name == "Charlie" + + +def test_ducklake_pg_with_filter(spark, ducklake_pg_setup): + postgres_url, data_path, table = ducklake_pg_setup + df = spark.read.format("ducklake").options(url=postgres_url, table=table, base_path=f"file://{data_path}/").load() + result = df.filter("score > 90").sort("id").collect() + expected_filtered_count = 2 + assert len(result) == expected_filtered_count + assert result[0].name == "Alice" + assert result[1].name == "Charlie" + + +def test_ducklake_pg_with_projection(spark, ducklake_pg_setup): + postgres_url, data_path, table = ducklake_pg_setup + df = spark.read.format("ducklake").options(url=postgres_url, table=table, base_path=f"file://{data_path}/").load() + rows = df.select("id", "name").sort("id").collect() + expected_count = 3 + assert len(rows) == expected_count + assert hasattr(rows[0], "id") + assert hasattr(rows[0], "name") + assert not hasattr(rows[0], "score") + + +def test_ducklake_pg_with_limit(spark, ducklake_pg_setup): + postgres_url, data_path, table = ducklake_pg_setup + df = spark.read.format("ducklake").options(url=postgres_url, table=table, base_path=f"file://{data_path}/").load() + limit_count = 2 + assert df.limit(limit_count).count() == limit_count + + +def test_ducklake_pg_read_to_pandas(spark, ducklake_pg_setup): + postgres_url, data_path, table = ducklake_pg_setup + df = spark.read.format("ducklake").options(url=postgres_url, table=table, base_path=f"file://{data_path}/").load() + pdf = df.sort("id").toPandas() + expected_count = 3 + assert len(pdf) == expected_count + assert list(pdf["name"]) == ["Alice", "Bob", "Charlie"] + + +def test_ducklake_pg_with_sql_location(spark, ducklake_pg_setup): + postgres_url, data_path, _table = ducklake_pg_setup + loc_base = postgres_url.replace("postgresql://", "ducklake+postgresql://", 1) + loc = f"{loc_base}/test_table?base_path=file://{data_path}/" + + spark.sql( + f""" + CREATE TABLE ducklake_pg_test + USING ducklake + LOCATION '{escape_sql_string_literal(loc)}' + """ + ) + try: + result_df = spark.sql("SELECT * FROM ducklake_pg_test").sort("id") + rows = result_df.collect() + expected_count = 3 + assert len(rows) == expected_count + assert rows[0].name == "Alice" + assert rows[1].name == "Bob" + assert rows[2].name == "Charlie" + finally: + spark.sql("DROP TABLE IF EXISTS ducklake_pg_test")