Skip to content

Conversation

@rayhhome
Copy link
Collaborator

@rayhhome rayhhome commented Nov 19, 2025

Statistics-augmented Catalog Service API and DataFusion connector

Augment the Statistics module with a Service wrapper to make ready for integration and benchmark. Implement connectors to DataFusion to complete pipeline. Connect to the datafusion CLI for ease of use.

Architecture

Optd Catalog (optd/catalog/)

The main catalog service module. This package provides a runnable service that serves as a thread-safe metadata-augmented catalog backed by DuckLake. Below is the code architecture.

  • CatalogService: Catalog service in background. Uses mpsc channels to provide serialized access to the encapsulated DuckLake catalog and process incoming requests;
  • CatalogServiceHandle: Handle for sending requests to CatalogService; instantiated together with the backend service when generating new CatalogService. This handle can be cloned to be adapted in multi-threading situations. It currently allows updates to and retrievals from the service for the following information:
    • Snapshot management (current_snapshot, current_snapshot_info);
    • Schema retrieval (current_schema, current_schema_info);
    • Statistics (table_statistics, update_table_column_stats).
  • DuckLakeCatalog: Wrapper of the DuckLake Catalog connection and implements the Catalog trait that determines the functionalities exposed.

Statistics are versioned: each update_table_column_stats creates a new snapshot with begin-to-end ranges, while supporting time-travel and advanced statistics stored as JSON.

DataFusion Connector (connectors/datafusion/src/catalog.rs, connectors/datafusion/src/table.rs)

Integration of the OptD catalog to fit in DataFusion's planning procedure:

  • OptdCatalogProviderList: Wrapper of DataFusion's catalog list, can take in a CatalogServiceHandle to communicate with its corresponding service and pass it down to its subordinate OptdCatalogProviders;
  • OptdCatalogProvider: Wrapper of DataFusion's catalog, can take in a CatalogServiceHandle;
  • OptdSchemaProvider and OptdTableProvider: Simple wrapper structs for schemas and tables.

CLI (cli/src/main.rs)

Allow default usage of the DataFusion Connector to interact with a background Optd Catalog Service process.

  • Catalog service is started in the background (tokio);
  • DuckLake catalog can be instantiated from OPTD_METADATA_CATALOG_PATH or default location.

Testing

Optd Catalog

  • Unit and edge tests for every single utility function (including time-travel and multiple-schema cases) for service and handle.
  • Stress and concurrency tests for the handle (single-thread heavy update and multi-thread update/retrieval).

DataFusion Connector

  • Integration tests that check interaction between the wrapper providers and the running background catalog service.

CLI

  • Two integration tests, one for thoroughly interacting with the DuckLakeCatalog, another for full integration among the Catalog service, the DataFusion Connector, and the CLI.
  • A smoke test (cli/src/smoke_test_cli.sh) for checking CLI usage in terminal (also provides intuition for how to use the catalog service).

Next Steps

The are two branching options for next steps:

  • Supporting updates to and retrievals from the Optd_query table, which tracks the queries executed at snapshot;
  • Exploring the more generic functionalities for the catalog, such as persistence (need research on delete vectors).

Note 1: currently the catalog service can buffer 1000 requests with mpsc (fixed). Do we make it a parameter that can be passed into the constructor of CatalogService?
Note 2: Should we make DuckLakeCatalog only public within the crate?

rayhhome and others added 30 commits August 5, 2025 18:33
v0 of the Cascades-style optimizer.

- Exhaustive optimization: expression and group returns only when the
subgraph is optimized.
- Applying enforcer rules and adding generated expressions to the memo
table.
- Special termination logic is required when the child has the same
group + physical requirement as the parent.
- Exhaustive exploration when applying rules, generate all bindings
before doing the transform, but only expand based on specified rule
patterns.

Signed-off-by: Yuchen Liang <[email protected]>
Signed-off-by: Yuchen Liang <[email protected]>
## Problem


Some methods are added to the IR as experimental features. We also got
feedback from the dev meeting that the rule seems hard to read (or
long). We would like to clean up these rough edges.


## Summary of changes

- eliminate`try_bind_ref_xxx` and use `try_borrow`
- add `borrow_raw_parts` so we always refer to `$node_name` instead of
`$ref_name`.
- Plumb through property methods to use shorthand.

**_TODO:_** Pattern builder can also be generated by macros.

---------

Signed-off-by: Yuchen Liang <[email protected]>
Signed-off-by: Yuchen Liang <[email protected]>
Signed-off-by: Yuchen Liang <[email protected]>
Signed-off-by: Yuchen Liang <[email protected]>
@rayhhome rayhhome added enhancement New feature or request rust Pull requests that update rust code dependencies Pull requests that update a dependency file labels Nov 19, 2025
.DS_Store Outdated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this and add .DS_Store to .gitignore

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I've already added that to .gitignore, not sure why it sneaked through. Will fix this later today.

#[derive(Debug)]
pub struct OptdSchemaProvider {
inner: Arc<dyn SchemaProvider>,
catalog_handle: Option<CatalogServiceHandle>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this an Option

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I think this is an artifact I left when I was trying to add caching to the catalog service and forgot to remove it from SchemaProvider or TableProvider. Let me fix this

Copilot AI review requested due to automatic review settings December 13, 2025 22:47
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a statistics-augmented catalog service API backed by DuckLake and provides DataFusion integration to complete the query optimization pipeline. The implementation adds a thread-safe catalog service using async message passing, wrapper providers for DataFusion integration, and CLI support with optional persistent metadata.

Key Changes:

  • Implements CatalogService with async request handling via mpsc channels, supporting snapshot-based time travel for statistics
  • Provides DataFusion connector wrappers (OptdCatalogProviderList, OptdCatalogProvider, OptdSchemaProvider, OptdTableProvider) to integrate the catalog service into DataFusion's planning
  • Adds CLI integration with optional DuckLake metadata persistence via OPTD_METADATA_CATALOG_PATH environment variable

Reviewed changes

Copilot reviewed 18 out of 19 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
optd/storage/src/lib.rs Removed unused DuckLake storage module scaffolding
optd/storage/Cargo.toml Removed unused storage crate
optd/catalog/src/lib.rs Core catalog implementation with DuckDB backend, snapshot management, and statistics versioning
optd/catalog/src/service.rs Async catalog service using tokio channels for thread-safe access
optd/catalog/tests/statistics_tests.rs Comprehensive unit tests for catalog functionality including edge cases and time-travel
optd/catalog/tests/service_tests.rs Service-level tests covering concurrency, shutdown, and handle cloning
optd/catalog/Cargo.toml Dependencies for DuckDB, tokio, serde, and testing
connectors/datafusion/src/catalog.rs Wrapper providers for DataFusion catalog integration
connectors/datafusion/src/table.rs Table provider wrapper with statistics support
connectors/datafusion/src/lib.rs Public API exports for connector
connectors/datafusion/tests/integration_test.rs Integration tests for DataFusion connector with catalog service
connectors/datafusion/Cargo.toml Dependencies including optd-catalog and async-trait
cli/src/main.rs CLI integration with optional catalog service initialization
cli/src/lib.rs Session context enhancements for external table creation
cli/tests/catalog_service_integration.rs End-to-end integration tests for CLI with catalog service
cli/smoke_test_cli.sh Shell script for manual CLI smoke testing
cli/Cargo.toml CLI dependencies including optd-catalog
Cargo.toml Workspace member cleanup removing optd-storage

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +15 to +54
#[allow(dead_code)]
pub struct OptdTable {
inner: Box<ListingTable>,
name: String,
table_reference: TableReference,
}

impl OptdTable {
pub fn try_new(
inner: ListingTable,
name: String,
table_reference: TableReference,
) -> Result<Self> {
Ok(Self {
inner: Box::new(inner),
name,
table_reference,
})
}

pub fn new_with_inner(
inner: Box<ListingTable>,
name: String,
table_reference: TableReference,
) -> Self {
Self {
inner,
name,
table_reference,
}
}

pub fn name(&self) -> &str {
&self.name
}

pub fn table_reference(&self) -> &TableReference {
&self.table_reference
}
}
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting #[allow(dead_code)] on the entire OptdTable struct and its methods suppresses warnings that could indicate unused code. If this is intended for future use, consider documenting why it exists or remove it if not needed in the current implementation.

Copilot uses AI. Check for mistakes.
Comment on lines +112 to +120
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![
TableProviderFilterPushDown::Unsupported;
filters.len()
])
}
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The supports_filters_pushdown implementation returns Unsupported for all filters. This means filter pushdown optimizations are disabled. If filter pushdown is not yet implemented, add a TODO comment explaining the plan to enable it in the future.

Copilot uses AI. Check for mistakes.
#!/usr/bin/env bash
# CLI smoke test - verifies catalog integration is active

set -e # Exit on error
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shell script uses set -e but does not use set -u (fail on undefined variables) or set -o pipefail (fail if any command in a pipeline fails). This could mask errors in test execution. Consider adding these flags for more robust error detection.

Suggested change
set -e # Exit on error
set -euo pipefail # Exit on error, undefined variable, or failed pipeline

Copilot uses AI. Check for mistakes.
Comment on lines +7 to +8
/// Max pending requests
const CHANNEL_BUFFER_SIZE: usize = 1000;
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel buffer size is hardcoded to 1000. As noted in the PR description, this might need to be configurable. Consider making CHANNEL_BUFFER_SIZE a parameter in the constructor to allow tuning based on expected load, or document why 1000 is the appropriate fixed value.

Suggested change
/// Max pending requests
const CHANNEL_BUFFER_SIZE: usize = 1000;
// The channel buffer size should be passed as a parameter to the constructor or initialization function
// to allow tuning based on expected load.

Copilot uses AI. Check for mistakes.
Comment on lines +50 to +70
async fn create_external_table(&self, cmd: &CreateExternalTable) -> Result<DataFrame> {
let exist = self.inner.table_exist(cmd.name.clone())?;

if cmd.temporary {
return not_impl_err!("Temporary tables not supported");
}

if exist {
match cmd.if_not_exists {
true => return self.return_empty_dataframe(),
false => {
return exec_err!("Table '{}' already exists", cmd.name);
}
}
}

let table_provider: Arc<dyn TableProvider> = self.create_custom_table(cmd).await?;
self.register_table(cmd.name.clone(), table_provider)?;

self.return_empty_dataframe()
}
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing documentation for the CreateExternalTable handling. The new create_external_table method adds significant functionality but lacks documentation explaining its purpose, parameters, and behavior. Add docstring comments to clarify the intent and usage.

Copilot uses AI. Check for mistakes.
Comment on lines +117 to +122
source: duckdb::Error::ExecuteReturnedResults,
})?;

rx.await.map_err(|_| Error::QueryExecution {
source: duckdb::Error::ExecuteReturnedResults,
})?
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling here uses ExecuteReturnedResults as a catch-all error for channel communication failures. This is misleading because ExecuteReturnedResults is a DuckDB error meant for SQL execution, not channel communication errors. Consider creating a more specific error variant for channel/communication errors.

Copilot uses AI. Check for mistakes.
Comment on lines +330 to +337
/// Get a reference to the underlying DuckLakeCatalog for test setup only.
/// Only available in test/debug builds and should
/// only be used for setting up test fixtures.
#[cfg(any(test, debug_assertions))]
pub fn catalog_for_setup(&self) -> &DuckLakeCatalog {
&self.backend
}
}
Copy link

Copilot AI Dec 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method catalog_for_setup is only available in debug/test builds but returns a reference to the backend which could enable modifications outside the intended async service pattern. Even though this is test-only, consider returning a read-only interface or documenting that this should only be used before spawning the service.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dependencies Pull requests that update a dependency file enhancement New feature or request rust Pull requests that update rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants