Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b7d9757
spec
lonless9 Nov 1, 2025
cc0682a
update
lonless9 Nov 1, 2025
fa7b1d1
update
lonless9 Nov 1, 2025
c0b89ea
fmt
lonless9 Nov 1, 2025
d27c578
fmt
lonless9 Nov 2, 2025
aabc182
update
lonless9 Nov 2, 2025
5e5aa1d
update
lonless9 Nov 2, 2025
484d43c
clippy
lonless9 Nov 2, 2025
5cc721e
update
lonless9 Nov 10, 2025
b385b05
update
lonless9 Nov 10, 2025
e172622
update
lonless9 Nov 10, 2025
4fe717e
Merge remote-tracking branch 'origin/main' into ducklake-integration
lonless9 Nov 10, 2025
4989b73
postgre
lonless9 Nov 11, 2025
5a39b6e
Merge remote-tracking branch 'origin/main' into ducklake-integration
lonless9 Nov 11, 2025
5f38b9d
update
lonless9 Nov 11, 2025
a715837
update
lonless9 Nov 11, 2025
73f2e29
update
lonless9 Nov 11, 2025
c108f31
pruning
lonless9 Nov 11, 2025
4bb21d5
update
lonless9 Nov 11, 2025
1ff23c1
update
lonless9 Nov 11, 2025
81bfd42
clippy
lonless9 Nov 11, 2025
cdafe3a
fmt
lonless9 Nov 11, 2025
90ba7a3
Merge remote-tracking branch 'origin/main' into ducklake-integration
lonless9 Nov 11, 2025
60bbbb8
update
lonless9 Nov 11, 2025
2e69987
Merge remote-tracking branch 'origin/main' into ducklake-integration
lonless9 Nov 11, 2025
42dd860
fmt
lonless9 Nov 11, 2025
00cac69
Merge remote-tracking branch 'origin/main' into ducklake-integration
lonless9 Nov 12, 2025
551f477
Merge remote-tracking branch 'origin/main' into ducklake-integration
lonless9 Nov 14, 2025
ee3323b
bindparams
lonless9 Nov 14, 2025
23cc74f
update
lonless9 Nov 14, 2025
45cb6b6
update
lonless9 Nov 14, 2025
4cd036f
Merge remote-tracking branch 'origin/main' into ducklake-integration
lonless9 Nov 21, 2025
0ce10c4
update
lonless9 Nov 21, 2025
5f163ff
update
lonless9 Nov 21, 2025
767ed6d
update
lonless9 Nov 21, 2025
c82b93e
update
lonless9 Nov 21, 2025
75acc4c
update
lonless9 Nov 21, 2025
1ad77b2
update
lonless9 Nov 21, 2025
aa5ea9c
update
lonless9 Nov 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sail-data-source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ workspace = true
sail-common = { path = "../sail-common" }
sail-common-datafusion = { path = "../sail-common-datafusion" }
sail-delta-lake = { path = "../sail-delta-lake" }
sail-duck-lake = { path = "../sail-duck-lake" }
sail-iceberg = { path = "../sail-iceberg" }

async-trait = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/sail-data-source/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
build_options("ParquetWriteOptions", "parquet_write")?;
build_options("DeltaReadOptions", "delta_read")?;
build_options("DeltaWriteOptions", "delta_write")?;
build_options("DuckLakeReadOptions", "ducklake_read")?;
build_options("DuckLakeWriteOptions", "ducklake_write")?;
build_options("IcebergReadOptions", "iceberg_read")?;
build_options("IcebergWriteOptions", "iceberg_write")?;
build_options("TextReadOptions", "text_read")?;
Expand Down
276 changes: 276 additions & 0 deletions crates/sail-data-source/src/formats/ducklake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::catalog::{Session, TableProvider};
use datafusion::common::{not_impl_err, Result};
use datafusion::physical_plan::ExecutionPlan;
use sail_common_datafusion::datasource::{SinkInfo, SourceInfo, TableFormat};
use sail_duck_lake::create_ducklake_provider;
use sail_duck_lake::options::DuckLakeOptions;
use url::Url;

use crate::options::{load_default_options, load_options, DuckLakeReadOptions};

#[derive(Debug, Default)]
pub struct DuckLakeDataSourceFormat;

#[async_trait]
impl TableFormat for DuckLakeDataSourceFormat {
fn name(&self) -> &str {
"ducklake"
}

async fn create_provider(
&self,
ctx: &dyn Session,
info: SourceInfo,
) -> Result<Arc<dyn TableProvider>> {
let SourceInfo {
paths,
schema: _,
constraints: _,
partition_by: _,
bucket_by: _,
sort_order: _,
options,
} = info;

// Prefer location-first (ducklake+sqlite://...) if provided
let loc_opts = match paths.as_slice() {
[single] => parse_ducklake_location(single)?,
_ => None,
};

let ducklake_options = if let Some(mut base_opts) = loc_opts {
// Merge additive options (snapshot_id, case_sensitive) from defaults and provided options
let mut merged = DuckLakeOptions::default();
apply_ducklake_read_options(load_default_options()?, &mut merged)?;
for opt in options {
apply_ducklake_read_options(load_options(opt)?, &mut merged)?;
}
if let Some(snap) = merged.snapshot_id {
base_opts.snapshot_id = Some(snap);
}
base_opts.case_sensitive = merged.case_sensitive;
base_opts.validate()?;
base_opts
} else {
log::warn!(
"DuckLake: location not provided; falling back to options; location is preferred"
);
resolve_ducklake_read_options(options)?
};

create_ducklake_provider(ctx, ducklake_options).await
}

async fn create_writer(
&self,
_ctx: &dyn Session,
_info: SinkInfo,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writing to DuckLake tables is not yet supported")
}
}

fn apply_ducklake_read_options(from: DuckLakeReadOptions, to: &mut DuckLakeOptions) -> Result<()> {
if let Some(url) = from.url {
to.url = url;
}
if let Some(table) = from.table {
to.table = table;
}
if let Some(base_path) = from.base_path {
to.base_path = base_path;
}
if let Some(snapshot_id) = from.snapshot_id {
to.snapshot_id = Some(snapshot_id);
}
if let Some(schema) = from.schema {
to.schema = Some(schema);
}
if let Some(case_sensitive) = from.case_sensitive {
to.case_sensitive = case_sensitive;
}
Ok(())
}

pub fn resolve_ducklake_read_options(
options: Vec<HashMap<String, String>>,
) -> Result<DuckLakeOptions> {
let mut ducklake_options = DuckLakeOptions::default();
apply_ducklake_read_options(load_default_options()?, &mut ducklake_options)?;
for opt in options {
apply_ducklake_read_options(load_options(opt)?, &mut ducklake_options)?;
}
ducklake_options.validate()?;
Ok(ducklake_options)
}

// Parse a location string like:
// ducklake+sqlite:///path/to/metadata.ducklake/analytics/metrics?base_path=file:///path/to/data/&snapshot_id=1
// Returns Ok(None) if the scheme is not ducklake+*
fn parse_ducklake_location(path: &str) -> Result<Option<DuckLakeOptions>> {
if !path.starts_with("ducklake+") {
return Ok(None);
}
let url =
Url::parse(path).map_err(|e| datafusion::common::DataFusionError::External(Box::new(e)))?;
let scheme = url.scheme();
if !scheme.starts_with("ducklake+") {
return Ok(None);
}

let meta_scheme = &scheme["ducklake+".len()..];
if meta_scheme != "sqlite" && meta_scheme != "postgres" && meta_scheme != "postgresql" {
return Err(datafusion::common::DataFusionError::Plan(format!(
"Unsupported DuckLake meta scheme: {}",
meta_scheme
)));
}

// Common: parse query params
let qp: Vec<(String, String)> = url
.query_pairs()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();

// Required base_path
let base_path = qp
.iter()
.find(|(k, _)| k == "base_path")
.map(|(_, v)| v.to_string())
.ok_or_else(|| {
datafusion::common::DataFusionError::Plan("Missing base_path query param".into())
})?;

// Optional params
let snapshot_id = qp
.iter()
.find(|(k, _)| k == "snapshot_id")
.and_then(|(_, v)| v.parse::<u64>().ok());
let case_sensitive = qp
.iter()
.find(|(k, _)| k == "case_sensitive")
.is_some_and(|(_, v)| v == "true");

let url_str;
let table: String;

if meta_scheme == "sqlite" {
// Identify metadata file (*.ducklake) and table path segments after it
let segments: Vec<String> = url
.path_segments()
.map(|s| s.map(|p| p.to_string()).collect())
.unwrap_or_else(Vec::new);

let split_idx = segments
.iter()
.position(|s| s.ends_with(".ducklake"))
.ok_or_else(|| {
datafusion::common::DataFusionError::Plan(
"Missing metadata .ducklake file in location".into(),
)
})?;

let db_parts = &segments[..=split_idx];
let table_parts = &segments[split_idx + 1..];

table = match table_parts {
[t] => t.clone(),
[s, t] => format!("{}.{t}", s),
[] => qp
.iter()
.find(|(k, _)| k == "table")
.map(|(_, v)| v.to_string())
.ok_or_else(|| {
datafusion::common::DataFusionError::Plan("Missing table in location".into())
})?,
_ => {
return Err(datafusion::common::DataFusionError::Plan(
"Invalid table path in location".into(),
));
}
};

// Reconstruct metadata URL (strip query, keep path up to .ducklake)
let mut meta_url = url.clone();
meta_url.set_query(None);
let db_path = format!("/{}", db_parts.join("/"));
meta_url.set_path(&db_path);
let auth = meta_url.authority().to_string();
url_str = if auth.is_empty() {
format!("{}://{}", meta_scheme, meta_url.path())
} else {
format!("{}://{}{}", meta_scheme, auth, meta_url.path())
};
} else {
// postgres/postgresql: expect path like /dbname[/schema[/table]]
let segments: Vec<String> = url
.path_segments()
.map(|s| s.map(|p| p.to_string()).collect())
.unwrap_or_else(Vec::new);
if segments.is_empty() {
return Err(datafusion::common::DataFusionError::Plan(
"Missing database name in location".into(),
));
}
let dbname = &segments[0];
let table_parts = &segments[1..];
table = match table_parts {
[t] => t.clone(),
[s, t] => format!("{}.{t}", s),
[] => qp
.iter()
.find(|(k, _)| k == "table")
.map(|(_, v)| v.to_string())
.ok_or_else(|| {
datafusion::common::DataFusionError::Plan("Missing table in location".into())
})?,
_ => {
return Err(datafusion::common::DataFusionError::Plan(
"Invalid table path in location".into(),
));
}
};

// Reconstruct metadata URL to include scheme, authority, and /dbname, preserving
// connection query params but excluding DuckLake-specific params.
let mut meta_url = url.clone();
// Preserve only non-ducklake query params
let filtered: Vec<(String, String)> = qp
.into_iter()
.filter(|(k, _)| {
k != "base_path" && k != "snapshot_id" && k != "case_sensitive" && k != "table"
})
.collect();
if filtered.is_empty() {
meta_url.set_query(None);
} else {
let mut serializer = url::form_urlencoded::Serializer::new(String::new());
for (k, v) in filtered {
serializer.append_pair(&k, &v);
}
let q = serializer.finish();
meta_url.set_query(Some(&q));
}
let db_path = format!("/{}", dbname);
meta_url.set_path(&db_path);
let auth = meta_url.authority().to_string();
url_str = if auth.is_empty() {
format!("{}://{}", meta_scheme, meta_url.path())
} else {
format!("{}://{}{}", meta_scheme, auth, meta_url.path())
};
}

Ok(Some(DuckLakeOptions {
url: url_str,
table,
base_path,
snapshot_id,
schema: None,
case_sensitive,
}))
}
1 change: 1 addition & 0 deletions crates/sail-data-source/src/formats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod binary;
pub mod console;
pub mod csv;
pub mod delta;
pub mod ducklake;
pub mod iceberg;
pub mod json;
pub mod listing;
Expand Down
53 changes: 53 additions & 0 deletions crates/sail-data-source/src/options/data/ducklake_read.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Options for reading from a DuckLake table.

- key: url
description: |
Metadata database connection string.
Supported formats:
- SQLite: sqlite:///path/to/metadata.db
- PostgreSQL: postgres://user:password@host:port/database
- PostgreSQL: postgresql://user:password@host:port/database
supported: true
rust_type: String

- key: table
description: |
Table name to read. Format: 'table_name' or 'schema.table_name'.
If schema is omitted, defaults to 'main'.
supported: true
rust_type: String

- key: base_path
description: |
Base URL for resolving relative data file paths.
Must be a valid URL (e.g., s3://bucket/prefix/, file:///data/, gs://bucket/).
supported: true
rust_type: String

- key: snapshot_id
aliases:
- snapshotId
description: |
Specific snapshot ID to read (integer).
If omitted, reads the current (latest) snapshot.
supported: true
rust_type: u64
rust_deserialize_with: crate::options::serde::deserialize_u64

- key: schema
description: |
Schema name override (string).
Rarely needed; use qualified table name instead.
supported: true
rust_type: String

- key: case_sensitive
aliases:
- caseSensitive
description: |
Case-sensitive column matching.
default: "false"
supported: true
rust_type: bool
rust_deserialize_with: crate::options::serde::deserialize_bool

Loading