-
Notifications
You must be signed in to change notification settings - Fork 0
Statistics-augmented Catalog Service API and DataFusion connector #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
36cfae2
7f74b6d
8716478
deeac21
746468c
ef1a181
e1bf28c
fa8e974
762fae7
f92efb1
40274ab
f2a17dd
bad8c8f
af832ba
a610854
c114adf
b0e5329
a85ad15
ff7ce20
1952737
86dcfa1
c2f31c2
bf988e3
23192a5
dd3f9eb
54731f6
1753080
7bf7e27
d25b78e
83e4cb4
c658be8
e6e9253
ba0bfa0
740e1f3
4b579be
85205c4
4bebc8f
2103a8a
4dba0d4
b5709e9
03ebbbb
11e8240
a487782
5727ced
664e55a
851be1f
f6aa2c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| #!/usr/bin/env bash | ||
| # CLI smoke test - verifies catalog integration is active | ||
|
|
||
| set -e # Exit on error | ||
|
|
||
| GREEN='\033[0;32m' | ||
| RED='\033[0;31m' | ||
| RESET='\033[0m' | ||
|
|
||
| echo "=== CLI Smoke Test ===" | ||
|
|
||
| # Build | ||
| echo "Building..." | ||
| cargo build --package optd-cli --quiet | ||
| if [ ! -f ./target/debug/optd-cli ]; then | ||
| echo -e "${RED}✗ Build failed${RESET}" | ||
| exit 1 | ||
| fi | ||
|
|
||
| CLI=./target/debug/optd-cli | ||
|
|
||
| # Test 1: Basic functionality | ||
| echo "Test 1: Basic query execution" | ||
| output=$($CLI -c "SELECT 1 as test;" 2>&1) | ||
| if [ $? -eq 0 ] && echo "$output" | grep -q "OptD catalog"; then | ||
| echo -e "${GREEN}✓ PASS${RESET} - CLI runs, catalog integration active" | ||
| else | ||
| echo -e "${RED}✗ FAIL${RESET}" | ||
| exit 1 | ||
| fi | ||
|
|
||
| # Test 2: Session persistence (multiple commands) | ||
| echo "Test 2: Session state persistence" | ||
| output=$($CLI -c "CREATE TABLE t (x INT);" -c "INSERT INTO t VALUES (1);" -c "SELECT * FROM t;" 2>&1) | ||
| if [ $? -eq 0 ] && echo "$output" | grep -q "1 row"; then | ||
| echo -e "${GREEN}✓ PASS${RESET} - Multiple commands work, session persists" | ||
| else | ||
| echo -e "${RED}✗ FAIL${RESET}" | ||
| exit 1 | ||
| fi | ||
|
|
||
| # Test 3: Metadata path configuration | ||
| echo "Test 3: Metadata path environment variable" | ||
| TMPDIR_PATH=$(mktemp -d) | ||
| export OPTD_METADATA_CATALOG_PATH="$TMPDIR_PATH/test.ducklake" | ||
| output=$($CLI -c "SELECT 1;" 2>&1) | ||
| unset OPTD_METADATA_CATALOG_PATH | ||
| rm -rf "$TMPDIR_PATH" | ||
| if echo "$output" | grep -q "Using OptD catalog with metadata path"; then | ||
| echo -e "${GREEN}✓ PASS${RESET} - Metadata path recognized" | ||
| else | ||
| echo -e "${RED}✗ FAIL${RESET}" | ||
| exit 1 | ||
| fi | ||
|
|
||
| echo "" | ||
| echo -e "${GREEN}✓ All smoke tests passed!${RESET}" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,14 @@ | ||
| use std::sync::Arc; | ||
|
|
||
| use datafusion::{ | ||
| common::{DataFusionError, Result, exec_err, not_impl_err}, | ||
| datasource::TableProvider, | ||
| execution::{SessionStateBuilder, runtime_env::RuntimeEnv}, | ||
| logical_expr::{CreateExternalTable, LogicalPlanBuilder}, | ||
| prelude::{DataFrame, SessionConfig, SessionContext}, | ||
| sql::TableReference, | ||
| }; | ||
| use datafusion_cli::cli_context::CliSessionContext; | ||
| use optd_datafusion::{OptdExtensionConfig, SessionStateBuilderOptdExt}; | ||
| use std::sync::Arc; | ||
|
|
||
| pub struct OptdCliSessionContext { | ||
| inner: SessionContext, | ||
|
|
@@ -39,10 +42,62 @@ impl OptdCliSessionContext { | |
| &self.inner | ||
| } | ||
|
|
||
| pub fn return_empty_dataframe(&self) -> datafusion::common::Result<DataFrame> { | ||
| let plan = datafusion::logical_expr::LogicalPlanBuilder::empty(false).build()?; | ||
| pub fn return_empty_dataframe(&self) -> Result<DataFrame> { | ||
| let plan = LogicalPlanBuilder::empty(false).build()?; | ||
| Ok(DataFrame::new(self.inner.state(), plan)) | ||
| } | ||
|
|
||
| async fn create_external_table(&self, cmd: &CreateExternalTable) -> Result<DataFrame> { | ||
| let exist = self.inner.table_exist(cmd.name.clone())?; | ||
|
|
||
| if cmd.temporary { | ||
| return not_impl_err!("Temporary tables not supported"); | ||
| } | ||
|
|
||
| if exist { | ||
| match cmd.if_not_exists { | ||
| true => return self.return_empty_dataframe(), | ||
| false => { | ||
| return exec_err!("Table '{}' already exists", cmd.name); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let table_provider: Arc<dyn TableProvider> = self.create_custom_table(cmd).await?; | ||
| self.register_table(cmd.name.clone(), table_provider)?; | ||
|
|
||
| self.return_empty_dataframe() | ||
| } | ||
|
Comment on lines
+50
to
+70
|
||
|
|
||
| async fn create_custom_table( | ||
| &self, | ||
| cmd: &CreateExternalTable, | ||
| ) -> Result<Arc<dyn TableProvider>> { | ||
| let state = self.inner.state_ref().read().clone(); | ||
| let file_type = cmd.file_type.to_uppercase(); | ||
| let factory = state | ||
| .table_factories() | ||
| .get(file_type.as_str()) | ||
| .ok_or_else(|| { | ||
| DataFusionError::Execution(format!("Unable to find factory for {}", cmd.file_type)) | ||
| })?; | ||
| let table = (*factory).create(&state, cmd).await?; | ||
| Ok(table) | ||
| } | ||
|
|
||
| pub fn register_table( | ||
| &self, | ||
| table_ref: impl Into<TableReference>, | ||
| provider: Arc<dyn TableProvider>, | ||
| ) -> Result<Option<Arc<dyn TableProvider>>> { | ||
| let table_ref: TableReference = table_ref.into(); | ||
| let table = table_ref.table().to_owned(); | ||
| self.inner | ||
| .state_ref() | ||
| .read() | ||
| .schema_for_ref(table_ref)? | ||
| .register_table(table, provider) | ||
| } | ||
| } | ||
|
|
||
| impl CliSessionContext for OptdCliSessionContext { | ||
|
|
@@ -72,12 +127,8 @@ impl CliSessionContext for OptdCliSessionContext { | |
| plan: datafusion::logical_expr::LogicalPlan, | ||
| ) -> ::core::pin::Pin< | ||
| Box< | ||
| dyn ::core::future::Future< | ||
| Output = Result< | ||
| datafusion::prelude::DataFrame, | ||
| datafusion::common::DataFusionError, | ||
| >, | ||
| > + ::core::marker::Send | ||
| dyn ::core::future::Future<Output = Result<DataFrame, DataFusionError>> | ||
| + ::core::marker::Send | ||
| + 'async_trait, | ||
| >, | ||
| > | ||
|
|
@@ -102,8 +153,14 @@ impl CliSessionContext for OptdCliSessionContext { | |
| } | ||
| _ => (), | ||
| } | ||
| } else if let datafusion::logical_expr::LogicalPlan::Ddl(ddl) = &plan { | ||
| match ddl { | ||
| datafusion::logical_expr::DdlStatement::CreateExternalTable(create_table) => { | ||
| return self.create_external_table(&create_table).await; | ||
| } | ||
| _ => (), | ||
| } | ||
| } | ||
|
|
||
| self.inner.execute_logical_plan(plan).await | ||
| }; | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The shell script uses
set -ebut does not useset -u(fail on undefined variables) orset -o pipefail(fail if any command in a pipeline fails). This could mask errors in test execution. Consider adding these flags for more robust error detection.