diff --git a/Cargo.lock b/Cargo.lock index f36ef2d28661..7bc828df0bac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1786,6 +1786,7 @@ dependencies = [ "common-recordbatch", "common-runtime", "common-telemetry", + "common-test-util", "common-time", "common-version", "common-wal", diff --git a/src/cli/Cargo.toml b/src/cli/Cargo.toml index 3c3e91c403a8..802010a8d49a 100644 --- a/src/cli/Cargo.toml +++ b/src/cli/Cargo.toml @@ -67,6 +67,7 @@ tracing-appender.workspace = true [dev-dependencies] common-meta = { workspace = true, features = ["testing"] } +common-test-util.workspace = true common-version.workspace = true serde.workspace = true tempfile.workspace = true diff --git a/src/cli/src/common.rs b/src/cli/src/common.rs index 1ad80db942e9..d6dc461b8986 100644 --- a/src/cli/src/common.rs +++ b/src/cli/src/common.rs @@ -15,5 +15,8 @@ mod object_store; mod store; -pub use object_store::{ObjectStoreConfig, new_fs_object_store}; +pub use object_store::{ + ObjectStoreConfig, PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection, + PrefixedS3Connection, new_fs_object_store, +}; pub use store::StoreConfig; diff --git a/src/cli/src/common/object_store.rs b/src/cli/src/common/object_store.rs index 601d17145e3b..4e746dbeb346 100644 --- a/src/cli/src/common/object_store.rs +++ b/src/cli/src/common/object_store.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_base::secrets::SecretString; +use common_base::secrets::{ExposeSecret, SecretString}; use common_error::ext::BoxedError; use object_store::services::{Azblob, Fs, Gcs, Oss, S3}; use object_store::util::{with_instrument_layers, with_retry_layers}; @@ -22,9 +22,69 @@ use snafu::ResultExt; use crate::error::{self}; +/// Trait to convert CLI field types to target struct field types. +/// This enables `Option` (CLI) -> `SecretString` (target) conversions, +/// allowing us to distinguish "not provided" from "provided but empty". +trait IntoField { + fn into_field(self) -> T; +} + +/// Identity conversion for types that are the same. +impl IntoField for T { + fn into_field(self) -> T { + self + } +} + +/// Convert `Option` to `SecretString`, using default for None. +impl IntoField for Option { + fn into_field(self) -> SecretString { + self.unwrap_or_default() + } +} + +/// Trait for checking if a field is effectively empty. +/// +/// **`is_empty()`**: Checks if the field has no meaningful value +/// - Used when backend is enabled to validate required fields +/// - `None`, `Some("")`, `false`, or `""` are considered empty +trait FieldValidator { + /// Check if the field is empty (has no meaningful value). + fn is_empty(&self) -> bool; +} + +/// String fields: empty if the string is empty +impl FieldValidator for String { + fn is_empty(&self) -> bool { + self.is_empty() + } +} + +/// Bool fields: false is considered "empty", true is "provided" +impl FieldValidator for bool { + fn is_empty(&self) -> bool { + !self + } +} + +/// Option fields: None or empty content is empty +impl FieldValidator for Option { + fn is_empty(&self) -> bool { + self.as_ref().is_none_or(|s| s.is_empty()) + } +} + +/// Option fields: None or empty secret is empty +/// For secrets, Some("") is treated as "not provided" for both checks +impl FieldValidator for Option { + fn is_empty(&self) -> bool { + self.as_ref().is_none_or(|s| s.expose_secret().is_empty()) + } +} + macro_rules! wrap_with_clap_prefix { ( - $new_name:ident, $prefix:literal, $base:ty, { + $new_name:ident, $prefix:literal, $enable_flag:literal, $base:ty, { $( $( #[doc = $doc:expr] )? $( #[alias = $alias:literal] )? $field:ident : $type:ty $( = $default:expr )? ),* $(,)? } ) => { @@ -34,15 +94,16 @@ macro_rules! wrap_with_clap_prefix { $( $( #[doc = $doc] )? $( #[clap(alias = $alias)] )? - #[clap(long $(, default_value_t = $default )? )] - [<$prefix $field>]: $type, + #[clap(long, requires = $enable_flag $(, default_value_t = $default )? )] + pub [<$prefix $field>]: $type, )* } impl From<$new_name> for $base { fn from(w: $new_name) -> Self { Self { - $( $field: w.[<$prefix $field>] ),* + // Use into_field() to handle Option -> SecretString conversion + $( $field: w.[<$prefix $field>].into_field() ),* } } } @@ -50,9 +111,90 @@ macro_rules! wrap_with_clap_prefix { }; } +/// Macro for declarative backend validation. +/// +/// # Validation Rules +/// +/// For each storage backend (S3, OSS, GCS, Azblob), this function validates: +/// **When backend is enabled** (e.g., `--s3`): All required fields must be non-empty +/// +/// Note: When backend is disabled, clap's `requires` attribute ensures no configuration +/// fields can be provided at parse time. +/// +/// # Syntax +/// +/// ```ignore +/// validate_backend!( +/// enable: self.enable_s3, +/// name: "S3", +/// required: [(field1, "name1"), (field2, "name2"), ...], +/// custom_validator: |missing| { ... } // optional +/// ) +/// ``` +/// +/// # Arguments +/// +/// - `enable`: Boolean expression indicating if backend is enabled +/// - `name`: Human-readable backend name for error messages +/// - `required`: Array of (field_ref, field_name) tuples for required fields +/// - `custom_validator`: Optional closure for complex validation logic +/// +/// # Example +/// +/// ```ignore +/// validate_backend!( +/// enable: self.enable_s3, +/// name: "S3", +/// required: [ +/// (&self.s3.s3_bucket, "bucket"), +/// (&self.s3.s3_access_key_id, "access key ID"), +/// ] +/// ) +/// ``` +macro_rules! validate_backend { + ( + enable: $enable:expr, + name: $backend_name:expr, + required: [ $( ($field:expr, $field_name:expr) ),* $(,)? ] + $(, custom_validator: $custom_validator:expr)? + ) => {{ + if $enable { + // Check required fields when backend is enabled + let mut missing = Vec::new(); + $( + if FieldValidator::is_empty($field) { + missing.push($field_name); + } + )* + + // Run custom validation if provided + $( + $custom_validator(&mut missing); + )? + + if !missing.is_empty() { + return Err(BoxedError::new( + error::MissingConfigSnafu { + msg: format!( + "{} {} must be set when --{} is enabled.", + $backend_name, + missing.join(", "), + $backend_name.to_lowercase().replace(" ", "") + ), + } + .build(), + )); + } + } + + Ok(()) + }}; +} + wrap_with_clap_prefix! { PrefixedAzblobConnection, "azblob-", + "enable_azblob", AzblobConnection, { #[doc = "The container of the object store."] @@ -60,9 +202,9 @@ wrap_with_clap_prefix! { #[doc = "The root of the object store."] root: String = Default::default(), #[doc = "The account name of the object store."] - account_name: SecretString = Default::default(), + account_name: Option, #[doc = "The account key of the object store."] - account_key: SecretString = Default::default(), + account_key: Option, #[doc = "The endpoint of the object store."] endpoint: String = Default::default(), #[doc = "The SAS token of the object store."] @@ -70,9 +212,33 @@ wrap_with_clap_prefix! { } } +impl PrefixedAzblobConnection { + pub fn validate(&self) -> Result<(), BoxedError> { + validate_backend!( + enable: true, + name: "Azure Blob", + required: [ + (&self.azblob_container, "container"), + (&self.azblob_root, "root"), + (&self.azblob_account_name, "account name"), + (&self.azblob_endpoint, "endpoint"), + ], + custom_validator: |missing: &mut Vec<&str>| { + // account_key is only required if sas_token is not provided + if self.azblob_sas_token.is_none() + && self.azblob_account_key.is_empty() + { + missing.push("account key (when sas_token is not provided)"); + } + } + ) + } +} + wrap_with_clap_prefix! { PrefixedS3Connection, "s3-", + "enable_s3", S3Connection, { #[doc = "The bucket of the object store."] @@ -80,9 +246,9 @@ wrap_with_clap_prefix! { #[doc = "The root of the object store."] root: String = Default::default(), #[doc = "The access key ID of the object store."] - access_key_id: SecretString = Default::default(), + access_key_id: Option, #[doc = "The secret access key of the object store."] - secret_access_key: SecretString = Default::default(), + secret_access_key: Option, #[doc = "The endpoint of the object store."] endpoint: Option, #[doc = "The region of the object store."] @@ -92,9 +258,25 @@ wrap_with_clap_prefix! { } } +impl PrefixedS3Connection { + pub fn validate(&self) -> Result<(), BoxedError> { + validate_backend!( + enable: true, + name: "S3", + required: [ + (&self.s3_bucket, "bucket"), + (&self.s3_access_key_id, "access key ID"), + (&self.s3_secret_access_key, "secret access key"), + (&self.s3_region, "region"), + ] + ) + } +} + wrap_with_clap_prefix! { PrefixedOssConnection, "oss-", + "enable_oss", OssConnection, { #[doc = "The bucket of the object store."] @@ -102,17 +284,33 @@ wrap_with_clap_prefix! { #[doc = "The root of the object store."] root: String = Default::default(), #[doc = "The access key ID of the object store."] - access_key_id: SecretString = Default::default(), + access_key_id: Option, #[doc = "The access key secret of the object store."] - access_key_secret: SecretString = Default::default(), + access_key_secret: Option, #[doc = "The endpoint of the object store."] endpoint: String = Default::default(), } } +impl PrefixedOssConnection { + pub fn validate(&self) -> Result<(), BoxedError> { + validate_backend!( + enable: true, + name: "OSS", + required: [ + (&self.oss_bucket, "bucket"), + (&self.oss_access_key_id, "access key ID"), + (&self.oss_access_key_secret, "access key secret"), + (&self.oss_endpoint, "endpoint"), + ] + ) + } +} + wrap_with_clap_prefix! { PrefixedGcsConnection, "gcs-", + "enable_gcs", GcsConnection, { #[doc = "The root of the object store."] @@ -122,40 +320,72 @@ wrap_with_clap_prefix! { #[doc = "The scope of the object store."] scope: String = Default::default(), #[doc = "The credential path of the object store."] - credential_path: SecretString = Default::default(), + credential_path: Option, #[doc = "The credential of the object store."] - credential: SecretString = Default::default(), + credential: Option, #[doc = "The endpoint of the object store."] endpoint: String = Default::default(), } } -/// common config for object store. +impl PrefixedGcsConnection { + pub fn validate(&self) -> Result<(), BoxedError> { + validate_backend!( + enable: true, + name: "GCS", + required: [ + (&self.gcs_bucket, "bucket"), + (&self.gcs_root, "root"), + (&self.gcs_scope, "scope"), + ] + // No custom_validator needed: GCS supports Application Default Credentials (ADC) + // where neither credential_path nor credential is required. + // Endpoint is also optional (defaults to https://storage.googleapis.com). + ) + } +} + +/// Common config for object store. +/// +/// # Dependency Enforcement +/// +/// Each backend's configuration fields (e.g., `--s3-bucket`) requires its corresponding +/// enable flag (e.g., `--s3`) to be present. This is enforced by `clap` at parse time +/// using the `requires` attribute. +/// +/// For example, attempting to use `--s3-bucket my-bucket` without `--s3` will result in: +/// ```text +/// error: The argument '--s3-bucket ' requires '--s3' +/// ``` +/// +/// This ensures that users cannot accidentally provide backend-specific configuration +/// without explicitly enabling that backend. #[derive(clap::Parser, Debug, Clone, PartialEq, Default)] +#[clap(group(clap::ArgGroup::new("storage_backend").required(false).multiple(false)))] pub struct ObjectStoreConfig { /// Whether to use S3 object store. - #[clap(long, alias = "s3")] + #[clap(long = "s3", group = "storage_backend")] pub enable_s3: bool, #[clap(flatten)] pub s3: PrefixedS3Connection, /// Whether to use OSS. - #[clap(long, alias = "oss")] + #[clap(long = "oss", group = "storage_backend")] pub enable_oss: bool, #[clap(flatten)] pub oss: PrefixedOssConnection, /// Whether to use GCS. - #[clap(long, alias = "gcs")] + #[clap(long = "gcs", group = "storage_backend")] pub enable_gcs: bool, #[clap(flatten)] pub gcs: PrefixedGcsConnection, /// Whether to use Azure Blob. - #[clap(long, alias = "azblob")] + #[clap(long = "azblob", group = "storage_backend")] pub enable_azblob: bool, #[clap(flatten)] @@ -173,52 +403,66 @@ pub fn new_fs_object_store(root: &str) -> std::result::Result { + pub fn $method(&self) -> Result { + let config = <$conn_type>::from(self.$field.clone()); + common_telemetry::info!( + "Building object store with {}: {:?}", + stringify!($field), + config + ); + let object_store = ObjectStore::new(<$service_type>::from(&config)) + .context(error::InitBackendSnafu) + .map_err(BoxedError::new)? + .finish(); + Ok(with_instrument_layers( + with_retry_layers(object_store), + false, + )) + } + }; +} + impl ObjectStoreConfig { + gen_object_store_builder!(build_s3, s3, S3Connection, S3); + + gen_object_store_builder!(build_oss, oss, OssConnection, Oss); + + gen_object_store_builder!(build_gcs, gcs, GcsConnection, Gcs); + + gen_object_store_builder!(build_azblob, azblob, AzblobConnection, Azblob); + + pub fn validate(&self) -> Result<(), BoxedError> { + if self.enable_s3 { + self.s3.validate()?; + } + if self.enable_oss { + self.oss.validate()?; + } + if self.enable_gcs { + self.gcs.validate()?; + } + if self.enable_azblob { + self.azblob.validate()?; + } + Ok(()) + } + /// Builds the object store from the config. pub fn build(&self) -> Result, BoxedError> { - let object_store = if self.enable_s3 { - let s3 = S3Connection::from(self.s3.clone()); - common_telemetry::info!("Building object store with s3: {:?}", s3); - Some( - ObjectStore::new(S3::from(&s3)) - .context(error::InitBackendSnafu) - .map_err(BoxedError::new)? - .finish(), - ) + self.validate()?; + + if self.enable_s3 { + self.build_s3().map(Some) } else if self.enable_oss { - let oss = OssConnection::from(self.oss.clone()); - common_telemetry::info!("Building object store with oss: {:?}", oss); - Some( - ObjectStore::new(Oss::from(&oss)) - .context(error::InitBackendSnafu) - .map_err(BoxedError::new)? - .finish(), - ) + self.build_oss().map(Some) } else if self.enable_gcs { - let gcs = GcsConnection::from(self.gcs.clone()); - common_telemetry::info!("Building object store with gcs: {:?}", gcs); - Some( - ObjectStore::new(Gcs::from(&gcs)) - .context(error::InitBackendSnafu) - .map_err(BoxedError::new)? - .finish(), - ) + self.build_gcs().map(Some) } else if self.enable_azblob { - let azblob = AzblobConnection::from(self.azblob.clone()); - common_telemetry::info!("Building object store with azblob: {:?}", azblob); - Some( - ObjectStore::new(Azblob::from(&azblob)) - .context(error::InitBackendSnafu) - .map_err(BoxedError::new)? - .finish(), - ) + self.build_azblob().map(Some) } else { - None - }; - - let object_store = object_store - .map(|object_store| with_instrument_layers(with_retry_layers(object_store), false)); - - Ok(object_store) + Ok(None) + } } } diff --git a/src/cli/src/data.rs b/src/cli/src/data.rs index be623f63a202..5966040a3ba1 100644 --- a/src/cli/src/data.rs +++ b/src/cli/src/data.rs @@ -14,6 +14,7 @@ mod export; mod import; +mod storage_export; use clap::Subcommand; use client::DEFAULT_CATALOG_NAME; diff --git a/src/cli/src/data/export.rs b/src/cli/src/data/export.rs index 007f8aa67c3c..f5c030aa1121 100644 --- a/src/cli/src/data/export.rs +++ b/src/cli/src/data/export.rs @@ -13,28 +13,27 @@ // limitations under the License. use std::collections::HashSet; -use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use clap::{Parser, ValueEnum}; -use common_base::secrets::{ExposeSecret, SecretString}; use common_error::ext::BoxedError; use common_telemetry::{debug, error, info}; -use object_store::layers::LoggingLayer; -use object_store::services::Oss; -use object_store::{ObjectStore, services}; +use object_store::ObjectStore; use serde_json::Value; use snafu::{OptionExt, ResultExt}; use tokio::sync::Semaphore; use tokio::time::Instant; +use crate::common::{ObjectStoreConfig, new_fs_object_store}; +use crate::data::storage_export::{ + AzblobBackend, FsBackend, GcsBackend, OssBackend, S3Backend, StorageExport, StorageType, +}; use crate::data::{COPY_PATH_PLACEHOLDER, default_database}; use crate::database::{DatabaseClient, parse_proxy_opts}; use crate::error::{ - EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu, - SchemaNotFoundSnafu, + EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, SchemaNotFoundSnafu, }; use crate::{Tool, database}; @@ -118,11 +117,7 @@ pub struct ExportCommand { #[clap(long)] no_proxy: bool, - /// if export data to s3 - #[clap(long)] - s3: bool, - - /// if both `ddl_local_dir` and remote storage (s3/oss) are set, `ddl_local_dir` will be only used for + /// if both `ddl_local_dir` and remote storage are set, `ddl_local_dir` will be only used for /// exported SQL files, and the data will be exported to remote storage. /// /// Note that `ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have @@ -132,75 +127,42 @@ pub struct ExportCommand { #[clap(long)] ddl_local_dir: Option, - /// The s3 bucket name - /// if s3 is set, this is required - #[clap(long)] - s3_bucket: Option, - - // The s3 root path - /// if s3 is set, this is required - #[clap(long)] - s3_root: Option, - - /// The s3 endpoint - /// if s3 is set, this is required - #[clap(long)] - s3_endpoint: Option, - - /// The s3 access key - /// if s3 is set, this is required - #[clap(long)] - s3_access_key: Option, - - /// The s3 secret key - /// if s3 is set, this is required - #[clap(long)] - s3_secret_key: Option, - - /// The s3 region - /// if s3 is set, this is required - #[clap(long)] - s3_region: Option, - - /// if export data to oss - #[clap(long)] - oss: bool, - - /// The oss bucket name - /// if oss is set, this is required - #[clap(long)] - oss_bucket: Option, - - /// The oss endpoint - /// if oss is set, this is required - #[clap(long)] - oss_endpoint: Option, - - /// The oss access key id - /// if oss is set, this is required - #[clap(long)] - oss_access_key_id: Option, - - /// The oss access key secret - /// if oss is set, this is required - #[clap(long)] - oss_access_key_secret: Option, + #[clap(flatten)] + storage: ObjectStoreConfig, } impl ExportCommand { pub async fn build(&self) -> std::result::Result, BoxedError> { - if self.s3 - && (self.s3_bucket.is_none() - || self.s3_endpoint.is_none() - || self.s3_access_key.is_none() - || self.s3_secret_key.is_none() - || self.s3_region.is_none()) - { - return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build())); - } - if !self.s3 && !self.oss && self.output_dir.is_none() { + // Determine storage type + let (storage_type, operator) = if self.storage.enable_s3 { + ( + StorageType::S3(S3Backend::new(self.storage.s3.clone())?), + self.storage.build_s3()?, + ) + } else if self.storage.enable_oss { + ( + StorageType::Oss(OssBackend::new(self.storage.oss.clone())?), + self.storage.build_oss()?, + ) + } else if self.storage.enable_gcs { + ( + StorageType::Gcs(GcsBackend::new(self.storage.gcs.clone())?), + self.storage.build_gcs()?, + ) + } else if self.storage.enable_azblob { + ( + StorageType::Azblob(AzblobBackend::new(self.storage.azblob.clone())?), + self.storage.build_azblob()?, + ) + } else if let Some(output_dir) = &self.output_dir { + ( + StorageType::Fs(FsBackend::new(output_dir.clone())), + new_fs_object_store(output_dir)?, + ) + } else { return Err(BoxedError::new(OutputDirNotSetSnafu {}.build())); - } + }; + let (catalog, schema) = database::split_database(&self.database).map_err(BoxedError::new)?; let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?; @@ -217,39 +179,14 @@ impl ExportCommand { catalog, schema, database_client, - output_dir: self.output_dir.clone(), export_jobs: self.db_parallelism, target: self.target.clone(), start_time: self.start_time.clone(), end_time: self.end_time.clone(), parallelism: self.table_parallelism, - s3: self.s3, + storage_type, ddl_local_dir: self.ddl_local_dir.clone(), - s3_bucket: self.s3_bucket.clone(), - s3_root: self.s3_root.clone(), - s3_endpoint: self.s3_endpoint.clone(), - // Wrap sensitive values in SecretString - s3_access_key: self - .s3_access_key - .as_ref() - .map(|k| SecretString::from(k.clone())), - s3_secret_key: self - .s3_secret_key - .as_ref() - .map(|k| SecretString::from(k.clone())), - s3_region: self.s3_region.clone(), - oss: self.oss, - oss_bucket: self.oss_bucket.clone(), - oss_endpoint: self.oss_endpoint.clone(), - // Wrap sensitive values in SecretString - oss_access_key_id: self - .oss_access_key_id - .as_ref() - .map(|k| SecretString::from(k.clone())), - oss_access_key_secret: self - .oss_access_key_secret - .as_ref() - .map(|k| SecretString::from(k.clone())), + operator, })) } } @@ -259,40 +196,17 @@ pub struct Export { catalog: String, schema: Option, database_client: DatabaseClient, - output_dir: Option, export_jobs: usize, target: ExportTarget, start_time: Option, end_time: Option, parallelism: usize, - s3: bool, + storage_type: StorageType, ddl_local_dir: Option, - s3_bucket: Option, - s3_root: Option, - s3_endpoint: Option, - // Changed to SecretString for sensitive data - s3_access_key: Option, - s3_secret_key: Option, - s3_region: Option, - oss: bool, - oss_bucket: Option, - oss_endpoint: Option, - // Changed to SecretString for sensitive data - oss_access_key_id: Option, - oss_access_key_secret: Option, + operator: ObjectStore, } impl Export { - fn catalog_path(&self) -> PathBuf { - if self.s3 || self.oss { - PathBuf::from(&self.catalog) - } else if let Some(dir) = &self.output_dir { - PathBuf::from(dir).join(&self.catalog) - } else { - unreachable!("catalog_path: output_dir must be set when not using remote storage") - } - } - async fn get_db_names(&self) -> Result> { let db_names = self.all_db_names().await?; let Some(schema) = &self.schema else { @@ -462,7 +376,8 @@ impl Export { "Exported {}.{} database creation SQL to {}", self.catalog, schema, - self.format_output_path(&file_path) + self.storage_type + .format_output_path(&self.catalog, &file_path) ); } @@ -491,7 +406,7 @@ impl Export { .await?; // Create directory if needed for file system storage - if !export_self.s3 && !export_self.oss { + if !export_self.storage_type.is_remote_storage() { let db_dir = format!("{}/{}/", export_self.catalog, schema); operator.create_dir(&db_dir).await.context(OpenDalSnafu)?; } @@ -520,7 +435,9 @@ impl Export { "Finished exporting {}.{schema} with {} table schemas to path: {}", export_self.catalog, metric_physical_tables.len() + remaining_tables.len() + views.len(), - export_self.format_output_path(&file_path) + export_self + .storage_type + .format_output_path(&export_self.catalog, &file_path) ); Ok::<(), Error>(()) @@ -535,104 +452,23 @@ impl Export { } async fn build_operator(&self) -> Result { - if self.s3 { - self.build_s3_operator().await - } else if self.oss { - self.build_oss_operator().await - } else { - self.build_fs_operator().await - } + Ok(self.operator.clone()) } /// build operator with preference for file system async fn build_prefer_fs_operator(&self) -> Result { - if (self.s3 || self.oss) && self.ddl_local_dir.is_some() { + if self.storage_type.is_remote_storage() && self.ddl_local_dir.is_some() { let root = self.ddl_local_dir.as_ref().unwrap().clone(); - let op = ObjectStore::new(services::Fs::default().root(&root)) - .context(OpenDalSnafu)? - .layer(LoggingLayer::default()) - .finish(); + let op = new_fs_object_store(&root).map_err(|e| Error::Other { + source: e, + location: snafu::location!(), + })?; Ok(op) - } else if self.s3 { - self.build_s3_operator().await - } else if self.oss { - self.build_oss_operator().await } else { - self.build_fs_operator().await + Ok(self.operator.clone()) } } - async fn build_s3_operator(&self) -> Result { - let mut builder = services::S3::default().bucket( - self.s3_bucket - .as_ref() - .expect("s3_bucket must be provided when s3 is enabled"), - ); - - if let Some(root) = self.s3_root.as_ref() { - builder = builder.root(root); - } - - if let Some(endpoint) = self.s3_endpoint.as_ref() { - builder = builder.endpoint(endpoint); - } - - if let Some(region) = self.s3_region.as_ref() { - builder = builder.region(region); - } - - if let Some(key_id) = self.s3_access_key.as_ref() { - builder = builder.access_key_id(key_id.expose_secret()); - } - - if let Some(secret_key) = self.s3_secret_key.as_ref() { - builder = builder.secret_access_key(secret_key.expose_secret()); - } - - let op = ObjectStore::new(builder) - .context(OpenDalSnafu)? - .layer(LoggingLayer::default()) - .finish(); - Ok(op) - } - - async fn build_oss_operator(&self) -> Result { - let mut builder = Oss::default() - .bucket(self.oss_bucket.as_ref().expect("oss_bucket must be set")) - .endpoint( - self.oss_endpoint - .as_ref() - .expect("oss_endpoint must be set"), - ); - - // Use expose_secret() to access the actual secret value - if let Some(key_id) = self.oss_access_key_id.as_ref() { - builder = builder.access_key_id(key_id.expose_secret()); - } - if let Some(secret_key) = self.oss_access_key_secret.as_ref() { - builder = builder.access_key_secret(secret_key.expose_secret()); - } - - let op = ObjectStore::new(builder) - .context(OpenDalSnafu)? - .layer(LoggingLayer::default()) - .finish(); - Ok(op) - } - - async fn build_fs_operator(&self) -> Result { - let root = self - .output_dir - .as_ref() - .context(OutputDirNotSetSnafu)? - .clone(); - let op = ObjectStore::new(services::Fs::default().root(&root)) - .context(OpenDalSnafu)? - .layer(LoggingLayer::default()) - .finish(); - Ok(op) - } - async fn export_database_data(&self) -> Result<()> { let timer = Instant::now(); let semaphore = Arc::new(Semaphore::new(self.export_jobs)); @@ -654,12 +490,14 @@ impl Export { let _permit = semaphore_moved.acquire().await.unwrap(); // Create directory if not using remote storage - if !export_self.s3 && !export_self.oss { + if !export_self.storage_type.is_remote_storage() { let db_dir = format!("{}/{}/", export_self.catalog, schema); operator.create_dir(&db_dir).await.context(OpenDalSnafu)?; } - let (path, connection_part) = export_self.get_storage_params(&schema); + let (path, connection_part) = export_self + .storage_type + .get_storage_path(&export_self.catalog, &schema); // Execute COPY DATABASE TO command let sql = format!( @@ -668,7 +506,7 @@ impl Export { ); // Log SQL command but mask sensitive information - let safe_sql = export_self.mask_sensitive_sql(&sql); + let safe_sql = export_self.storage_type.mask_sensitive_info(&sql); info!("Executing sql: {}", safe_sql); export_self.database_client.sql_in_public(&sql).await?; @@ -712,7 +550,9 @@ impl Export { "Finished exporting {}.{} copy_from.sql to {}", export_self.catalog, schema, - export_self.format_output_path(©_from_path) + export_self + .storage_type + .format_output_path(&export_self.catalog, ©_from_path) ); Ok::<(), Error>(()) @@ -726,61 +566,10 @@ impl Export { Ok(()) } - /// Mask sensitive information in SQL commands for safe logging - fn mask_sensitive_sql(&self, sql: &str) -> String { - let mut masked_sql = sql.to_string(); - - // Mask S3 credentials - if let Some(access_key) = &self.s3_access_key { - masked_sql = masked_sql.replace(access_key.expose_secret(), "[REDACTED]"); - } - if let Some(secret_key) = &self.s3_secret_key { - masked_sql = masked_sql.replace(secret_key.expose_secret(), "[REDACTED]"); - } - - // Mask OSS credentials - if let Some(access_key_id) = &self.oss_access_key_id { - masked_sql = masked_sql.replace(access_key_id.expose_secret(), "[REDACTED]"); - } - if let Some(access_key_secret) = &self.oss_access_key_secret { - masked_sql = masked_sql.replace(access_key_secret.expose_secret(), "[REDACTED]"); - } - - masked_sql - } - fn get_file_path(&self, schema: &str, file_name: &str) -> String { format!("{}/{}/{}", self.catalog, schema, file_name) } - fn format_output_path(&self, file_path: &str) -> String { - if self.s3 { - format!( - "s3://{}{}/{}", - self.s3_bucket.as_ref().unwrap_or(&String::new()), - if let Some(root) = &self.s3_root { - format!("/{}", root) - } else { - String::new() - }, - file_path - ) - } else if self.oss { - format!( - "oss://{}/{}/{}", - self.oss_bucket.as_ref().unwrap_or(&String::new()), - self.catalog, - file_path - ) - } else { - format!( - "{}/{}", - self.output_dir.as_ref().unwrap_or(&String::new()), - file_path - ) - } - } - async fn write_to_storage( &self, op: &ObjectStore, @@ -793,70 +582,6 @@ impl Export { .map(|_| ()) } - fn get_storage_params(&self, schema: &str) -> (String, String) { - if self.s3 { - let s3_path = format!( - "s3://{}{}/{}/{}/", - // Safety: s3_bucket is required when s3 is enabled - self.s3_bucket.as_ref().unwrap(), - if let Some(root) = &self.s3_root { - format!("/{}", root) - } else { - String::new() - }, - self.catalog, - schema - ); - - // endpoint is optional - let endpoint_option = if let Some(endpoint) = self.s3_endpoint.as_ref() { - format!(", ENDPOINT='{}'", endpoint) - } else { - String::new() - }; - - // Safety: All s3 options are required - // Use expose_secret() to access the actual secret values - let connection_options = format!( - "ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}", - self.s3_access_key.as_ref().unwrap().expose_secret(), - self.s3_secret_key.as_ref().unwrap().expose_secret(), - self.s3_region.as_ref().unwrap(), - endpoint_option - ); - - (s3_path, format!(" CONNECTION ({})", connection_options)) - } else if self.oss { - let oss_path = format!( - "oss://{}/{}/{}/", - self.oss_bucket.as_ref().unwrap(), - self.catalog, - schema - ); - let endpoint_option = if let Some(endpoint) = self.oss_endpoint.as_ref() { - format!(", ENDPOINT='{}'", endpoint) - } else { - String::new() - }; - - let connection_options = format!( - "ACCESS_KEY_ID='{}', ACCESS_KEY_SECRET='{}'{}", - self.oss_access_key_id.as_ref().unwrap().expose_secret(), - self.oss_access_key_secret.as_ref().unwrap().expose_secret(), - endpoint_option - ); - (oss_path, format!(" CONNECTION ({})", connection_options)) - } else { - ( - self.catalog_path() - .join(format!("{schema}/")) - .to_string_lossy() - .to_string(), - String::new(), - ) - } - } - async fn execute_tasks( &self, tasks: Vec>>, @@ -913,3 +638,773 @@ fn build_with_options( options.push(format!("parallelism = {}", parallelism)); options.join(", ") } + +#[cfg(test)] +mod tests { + use clap::Parser; + use common_test_util::temp_dir::create_temp_dir; + + use super::*; + + // ==================== Basic Success Cases ==================== + + #[tokio::test] + async fn test_export_command_build_with_local_fs() { + let temp_dir = create_temp_dir("test_export_local_fs"); + let output_dir = temp_dir.path().to_str().unwrap(); + + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--output-dir", + output_dir, + ]); + + let result = cmd.build().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_export_command_build_with_s3_success() { + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--s3", + "--s3-bucket", + "test-bucket", + "--s3-root", + "test-root", + "--s3-access-key-id", + "test-key", + "--s3-secret-access-key", + "test-secret", + // Optional fields + "--s3-region", + "us-west-2", + "--s3-endpoint", + "https://s3.amazonaws.com", + ]); + + let result = cmd.build().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_export_command_build_with_oss_success() { + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--oss", + "--oss-bucket", + "test-bucket", + "--oss-root", + "test-root", + "--oss-access-key-id", + "test-key-id", + "--oss-access-key-secret", + "test-secret", + "--oss-endpoint", + "https://oss.example.com", + ]); + + let result = cmd.build().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_export_command_build_with_gcs_success() { + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--gcs", + "--gcs-bucket", + "test-bucket", + "--gcs-root", + "test-root", + "--gcs-scope", + "test-scope", + "--gcs-credential-path", + "/path/to/credential", + "--gcs-credential", + "test-credential-content", + "--gcs-endpoint", + "https://storage.googleapis.com", + ]); + + let result = cmd.build().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_export_command_build_with_gcs_adc_success() { + // Test GCS with Application Default Credentials (no explicit credentials provided) + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--gcs", + "--gcs-bucket", + "test-bucket", + "--gcs-root", + "test-root", + "--gcs-scope", + "test-scope", + // No credential_path or credential + // No endpoint (optional) + ]); + + let result = cmd.build().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_export_command_build_with_azblob_success() { + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--azblob", + "--azblob-container", + "test-container", + "--azblob-root", + "test-root", + "--azblob-account-name", + "test-account", + "--azblob-account-key", + "test-key", + "--azblob-endpoint", + "https://account.blob.core.windows.net", + ]); + + let result = cmd.build().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_export_command_build_with_azblob_with_sas_token() { + // Test Azure Blob with SAS token + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--azblob", + "--azblob-container", + "test-container", + "--azblob-root", + "test-root", + "--azblob-account-name", + "test-account", + "--azblob-account-key", + "test-key", + "--azblob-endpoint", + "https://account.blob.core.windows.net", + "--azblob-sas-token", + "test-sas-token", + ]); + + let result = cmd.build().await; + assert!(result.is_ok()); + } + + // ==================== Gap 1: Parse-time dependency checks ==================== + + #[test] + fn test_export_command_build_with_conflict() { + // Try to enable both S3 and OSS + let result = + ExportCommand::try_parse_from(["export", "--addr", "127.0.0.1:4000", "--s3", "--oss"]); + + assert!(result.is_err()); + let err = result.unwrap_err(); + // clap error for conflicting arguments + assert!(err.kind() == clap::error::ErrorKind::ArgumentConflict); + } + + #[tokio::test] + async fn test_export_command_build_with_s3_no_enable_flag() { + // Test that providing S3 config without --s3 flag fails + let result = ExportCommand::try_parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + // Note: no --s3 flag + "--s3-bucket", + "test-bucket", + "--s3-access-key-id", + "test-key", + "--output-dir", + "/tmp/test", + ]); + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument); + assert!(err.to_string().contains("--s3")); + } + + #[tokio::test] + async fn test_export_command_build_with_oss_no_enable_flag() { + // Test that providing OSS config without --oss flag fails at parse time + let result = ExportCommand::try_parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--oss-bucket", + "test-bucket", + "--output-dir", + "/tmp/test", + ]); + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument); + assert!(err.to_string().contains("--oss")); + } + + #[tokio::test] + async fn test_export_command_build_with_gcs_no_enable_flag() { + // Test that providing GCS config without --gcs flag fails at parse time + let result = ExportCommand::try_parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--gcs-bucket", + "test-bucket", + "--output-dir", + "/tmp/test", + ]); + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument); + assert!(err.to_string().contains("--gcs")); + } + + #[tokio::test] + async fn test_export_command_build_with_azblob_no_enable_flag() { + // Test that providing Azure Blob config without --azblob flag fails at parse time + let result = ExportCommand::try_parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--azblob-container", + "test-container", + "--output-dir", + "/tmp/test", + ]); + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument); + assert!(err.to_string().contains("--azblob")); + } + + // ==================== Gap 2: Empty string vs missing tests ==================== + + #[tokio::test] + async fn test_export_command_build_with_s3_empty_access_key() { + // Test S3 with empty access key ID (empty string, not missing) + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--s3", + "--s3-bucket", + "test-bucket", + "--s3-root", + "test-root", + "--s3-access-key-id", + "", // Empty string + "--s3-secret-access-key", + "test-secret", + "--s3-region", + "us-west-2", + ]); + + let result = cmd.build().await; + assert!(result.is_err()); + if let Err(err) = result { + assert!( + err.to_string().contains("S3 access key ID must be set"), + "Actual error: {}", + err + ); + } + } + + #[tokio::test] + async fn test_export_command_build_with_s3_missing_secret_key() { + // Test S3 with empty secret access key + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--s3", + "--s3-bucket", + "test-bucket", + "--s3-root", + "test-root", + "--s3-access-key-id", + "test-key", + // Missing --s3-secret-access-key + "--s3-region", + "us-west-2", + ]); + + let result = cmd.build().await; + assert!(result.is_err()); + if let Err(err) = result { + assert!( + err.to_string().contains("S3 secret access key must be set"), + "Actual error: {}", + err + ); + } + } + + #[tokio::test] + async fn test_export_command_build_with_s3_empty_root() { + // Empty root should be allowed (it's optional path component) + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--s3", + "--s3-bucket", + "test-bucket", + "--s3-root", + "", // Empty root is OK + "--s3-access-key-id", + "test-key", + "--s3-secret-access-key", + "test-secret", + "--s3-region", + "us-west-2", + ]); + + let result = cmd.build().await; + // Should succeed because root is not a required field + assert!( + result.is_ok(), + "Expected success but got: {:?}", + result.err() + ); + } + + #[tokio::test] + async fn test_export_command_build_with_oss_empty_access_key_id() { + // Test OSS with empty access_key_id (empty string, not missing) + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--oss", + "--oss-bucket", + "test-bucket", + "--oss-access-key-id", + "", // Empty string + "--oss-access-key-secret", + "test-secret", + "--oss-endpoint", + "https://oss.example.com", + ]); + + let result = cmd.build().await; + assert!(result.is_err()); + if let Err(err) = result { + assert!( + err.to_string().contains("OSS access key ID must be set"), + "Actual error: {}", + err + ); + } + } + + #[tokio::test] + async fn test_export_command_build_with_oss_missing_endpoint() { + // Missing endpoint + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--oss", + "--oss-bucket", + "test-bucket", + "--oss-root", + "test-root", + "--oss-access-key-id", + "test-key-id", + "--oss-access-key-secret", + "test-secret", + ]); + + let result = cmd.build().await; + assert!(result.is_err()); + if let Err(err) = result { + assert!( + err.to_string().contains("OSS endpoint must be set"), + "Actual error: {}", + err + ); + } + } + + #[tokio::test] + async fn test_export_command_build_with_oss_multiple_missing_fields() { + // Test OSS with multiple missing required fields + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--oss", + "--oss-bucket", + "test-bucket", + // Missing: root, access_key_id, access_key_secret, endpoint + ]); + + let result = cmd.build().await; + assert!(result.is_err()); + if let Err(err) = result { + let err_str = err.to_string(); + // Should mention multiple missing fields + assert!( + err_str.contains("OSS"), + "Error should mention OSS: {}", + err_str + ); + assert!( + err_str.contains("must be set"), + "Error should mention required fields: {}", + err_str + ); + } + } + + #[tokio::test] + async fn test_export_command_build_with_gcs_empty_bucket() { + // Test GCS with empty bucket + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--gcs", + "--gcs-bucket", + "", // Empty bucket + "--gcs-root", + "test-root", + "--gcs-scope", + "test-scope", + ]); + + let result = cmd.build().await; + assert!(result.is_err()); + if let Err(err) = result { + assert!( + err.to_string().contains("GCS bucket must be set"), + "Actual error: {}", + err + ); + } + } + + #[tokio::test] + async fn test_export_command_build_with_gcs_empty_root() { + // Test GCS when root is missing (should fail as it's required) + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--gcs", + "--gcs-bucket", + "test-bucket", + "--gcs-root", + "", // Empty root + "--gcs-scope", + "test-scope", + "--gcs-credential-path", + "/path/to/credential", + "--gcs-credential", + "test-credential", + "--gcs-endpoint", + "https://storage.googleapis.com", + ]); + + let result = cmd.build().await; + assert!(result.is_err()); + if let Err(err) = result { + assert!( + err.to_string().contains("GCS root must be set"), + "Actual error: {}", + err + ); + } + } + + #[tokio::test] + async fn test_export_command_build_with_azblob_empty_account_name() { + // Test Azure Blob with empty account_name + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--azblob", + "--azblob-container", + "test-container", + "--azblob-root", + "test-root", + "--azblob-account-name", + "", // Empty account name + "--azblob-account-key", + "test-key", + "--azblob-endpoint", + "https://account.blob.core.windows.net", + ]); + + let result = cmd.build().await; + assert!(result.is_err()); + if let Err(err) = result { + assert!( + err.to_string() + .contains("Azure Blob account name must be set"), + "Actual error: {}", + err + ); + } + } + + #[tokio::test] + async fn test_export_command_build_with_azblob_missing_account_key() { + // Missing account key + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--azblob", + "--azblob-container", + "test-container", + "--azblob-root", + "test-root", + "--azblob-account-name", + "test-account", + "--azblob-endpoint", + "https://account.blob.core.windows.net", + ]); + + let result = cmd.build().await; + assert!(result.is_err()); + if let Err(err) = result { + assert!( + err.to_string().contains( + "Azure Blob account key (when sas_token is not provided) must be set" + ), + "Actual error: {}", + err + ); + } + } + + // ==================== Gap 3: Boundary cases ==================== + + #[tokio::test] + async fn test_export_command_build_with_no_storage() { + // No output-dir and no backend - should fail + let cmd = ExportCommand::parse_from(["export", "--addr", "127.0.0.1:4000"]); + + let result = cmd.build().await; + assert!(result.is_err()); + if let Err(err) = result { + assert!( + err.to_string().contains("Output directory not set"), + "Actual error: {}", + err + ); + } + } + + #[tokio::test] + async fn test_export_command_build_with_s3_minimal_config() { + // S3 with only required fields (no optional fields) + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--s3", + "--s3-bucket", + "test-bucket", + "--s3-access-key-id", + "test-key", + "--s3-secret-access-key", + "test-secret", + "--s3-region", + "us-west-2", + // No root, endpoint, or enable_virtual_host_style + ]); + + let result = cmd.build().await; + assert!(result.is_ok(), "Minimal S3 config should succeed"); + } + + #[tokio::test] + async fn test_export_command_build_with_oss_minimal_config() { + // OSS with only required fields + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--oss", + "--oss-bucket", + "test-bucket", + "--oss-access-key-id", + "test-key-id", + "--oss-access-key-secret", + "test-secret", + "--oss-endpoint", + "https://oss.example.com", + // No root + ]); + + let result = cmd.build().await; + assert!(result.is_ok(), "Minimal OSS config should succeed"); + } + + #[tokio::test] + async fn test_export_command_build_with_gcs_minimal_config() { + // GCS with only required fields (using ADC) + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--gcs", + "--gcs-bucket", + "test-bucket", + "--gcs-root", + "test-root", + "--gcs-scope", + "test-scope", + // No credential_path, credential, or endpoint + ]); + + let result = cmd.build().await; + assert!(result.is_ok(), "Minimal GCS config should succeed"); + } + + #[tokio::test] + async fn test_export_command_build_with_azblob_minimal_config() { + // Azure Blob with only required fields + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--azblob", + "--azblob-container", + "test-container", + "--azblob-root", + "test-root", + "--azblob-account-name", + "test-account", + "--azblob-account-key", + "test-key", + "--azblob-endpoint", + "https://account.blob.core.windows.net", + // No sas_token + ]); + + let result = cmd.build().await; + assert!(result.is_ok(), "Minimal Azure Blob config should succeed"); + } + + #[tokio::test] + async fn test_export_command_build_with_local_and_s3() { + // Both output-dir and S3 - S3 should take precedence + let temp_dir = create_temp_dir("test_export_local_and_s3"); + let output_dir = temp_dir.path().to_str().unwrap(); + + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--output-dir", + output_dir, + "--s3", + "--s3-bucket", + "test-bucket", + "--s3-access-key-id", + "test-key", + "--s3-secret-access-key", + "test-secret", + "--s3-region", + "us-west-2", + ]); + + let result = cmd.build().await; + assert!( + result.is_ok(), + "S3 should be selected when both are provided" + ); + } + + // ==================== Gap 4: Custom validation (Azure Blob) ==================== + + #[tokio::test] + async fn test_export_command_build_with_azblob_only_sas_token() { + // Azure Blob with sas_token but no account_key - should succeed + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--azblob", + "--azblob-container", + "test-container", + "--azblob-root", + "test-root", + "--azblob-account-name", + "test-account", + "--azblob-endpoint", + "https://account.blob.core.windows.net", + "--azblob-sas-token", + "test-sas-token", + // No account_key + ]); + + let result = cmd.build().await; + assert!( + result.is_ok(), + "Azure Blob with only sas_token should succeed: {:?}", + result.err() + ); + } + + #[tokio::test] + async fn test_export_command_build_with_azblob_empty_account_key_with_sas() { + // Azure Blob with empty account_key but valid sas_token - should succeed + let cmd = ExportCommand::parse_from([ + "export", + "--addr", + "127.0.0.1:4000", + "--azblob", + "--azblob-container", + "test-container", + "--azblob-root", + "test-root", + "--azblob-account-name", + "test-account", + "--azblob-account-key", + "", // Empty account_key is OK if sas_token is provided + "--azblob-endpoint", + "https://account.blob.core.windows.net", + "--azblob-sas-token", + "test-sas-token", + ]); + + let result = cmd.build().await; + assert!( + result.is_ok(), + "Azure Blob with empty account_key but sas_token should succeed: {:?}", + result.err() + ); + } +} diff --git a/src/cli/src/data/storage_export.rs b/src/cli/src/data/storage_export.rs new file mode 100644 index 000000000000..03d4ba5093a0 --- /dev/null +++ b/src/cli/src/data/storage_export.rs @@ -0,0 +1,353 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::path::PathBuf; + +use async_trait::async_trait; +use common_base::secrets::{ExposeSecret, SecretString}; +use common_error::ext::BoxedError; + +use crate::common::{ + PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection, PrefixedS3Connection, +}; + +/// Helper function to extract secret string from Option. +/// Returns empty string if None. +fn expose_optional_secret(secret: &Option) -> &str { + secret + .as_ref() + .map(|s| s.expose_secret().as_str()) + .unwrap_or("") +} + +/// Helper function to format root path with leading slash if non-empty. +fn format_root_path(root: &str) -> String { + if root.is_empty() { + String::new() + } else { + format!("/{}", root) + } +} + +/// Helper function to mask multiple secrets in a string. +fn mask_secrets(mut sql: String, secrets: &[&str]) -> String { + for secret in secrets { + if !secret.is_empty() { + sql = sql.replace(secret, "[REDACTED]"); + } + } + sql +} + +/// Trait for storage backends that can be used for data export. +#[async_trait] +pub trait StorageExport: Send + Sync { + /// Generate the storage path for COPY DATABASE command. + /// Returns (path, connection_string) where connection_string includes CONNECTION clause. + fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String); + + /// Format the output path for logging purposes. + fn format_output_path(&self, catalog: &str, file_path: &str) -> String; + + /// Mask sensitive information in SQL commands for safe logging. + fn mask_sensitive_info(&self, sql: &str) -> String; +} + +macro_rules! define_backend { + ($name:ident, $config:ty) => { + #[derive(Clone)] + pub struct $name { + config: $config, + } + + impl $name { + pub fn new(config: $config) -> Result { + config.validate()?; + Ok(Self { config }) + } + } + }; +} + +macro_rules! define_storage_type { + ( + pub enum $enum_name:ident { + $($variant:ident($backend:ty)),* $(,)? + } + ) => { + #[derive(Clone)] + pub enum $enum_name { + $($variant($backend)),* + } + + #[async_trait] + impl StorageExport for $enum_name { + fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) { + match self { + $(Self::$variant(backend) => backend.get_storage_path(catalog, schema)),* + } + } + + fn format_output_path(&self, catalog: &str, file_path: &str) -> String { + match self { + $(Self::$variant(backend) => backend.format_output_path(catalog, file_path)),* + } + } + + fn mask_sensitive_info(&self, sql: &str) -> String { + match self { + $(Self::$variant(backend) => backend.mask_sensitive_info(sql)),* + } + } + } + + impl $enum_name { + /// Returns true if the storage backend is remote (not local filesystem). + pub fn is_remote_storage(&self) -> bool { + !matches!(self, Self::Fs(_)) + } + } + }; +} + +/// Local file system storage backend. +#[derive(Clone)] +pub struct FsBackend { + output_dir: String, +} + +impl FsBackend { + pub fn new(output_dir: String) -> Self { + Self { output_dir } + } +} + +#[async_trait] +impl StorageExport for FsBackend { + fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) { + if self.output_dir.is_empty() { + unreachable!("output_dir must be set when not using remote storage") + } + let path = PathBuf::from(&self.output_dir) + .join(catalog) + .join(format!("{schema}/")) + .to_string_lossy() + .to_string(); + (path, String::new()) + } + + fn format_output_path(&self, _catalog: &str, file_path: &str) -> String { + format!("{}/{}", self.output_dir, file_path) + } + + fn mask_sensitive_info(&self, sql: &str) -> String { + sql.to_string() + } +} + +define_backend!(S3Backend, PrefixedS3Connection); + +#[async_trait] +impl StorageExport for S3Backend { + fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) { + let bucket = &self.config.s3_bucket; + let root = format_root_path(&self.config.s3_root); + + let s3_path = format!("s3://{}{}/{}/{}/", bucket, root, catalog, schema); + + let mut connection_options = vec![ + format!( + "ACCESS_KEY_ID='{}'", + expose_optional_secret(&self.config.s3_access_key_id) + ), + format!( + "SECRET_ACCESS_KEY='{}'", + expose_optional_secret(&self.config.s3_secret_access_key) + ), + ]; + + if let Some(region) = &self.config.s3_region { + connection_options.push(format!("REGION='{}'", region)); + } + + if let Some(endpoint) = &self.config.s3_endpoint { + connection_options.push(format!("ENDPOINT='{}'", endpoint)); + } + + let connection_str = format!(" CONNECTION ({})", connection_options.join(", ")); + (s3_path, connection_str) + } + + fn format_output_path(&self, _catalog: &str, file_path: &str) -> String { + let bucket = &self.config.s3_bucket; + let root = format_root_path(&self.config.s3_root); + format!("s3://{}{}/{}", bucket, root, file_path) + } + + fn mask_sensitive_info(&self, sql: &str) -> String { + mask_secrets( + sql.to_string(), + &[ + expose_optional_secret(&self.config.s3_access_key_id), + expose_optional_secret(&self.config.s3_secret_access_key), + ], + ) + } +} +define_backend!(OssBackend, PrefixedOssConnection); + +#[async_trait] +impl StorageExport for OssBackend { + fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) { + let bucket = &self.config.oss_bucket; + let oss_path = format!("oss://{}/{}/{}/", bucket, catalog, schema); + + let connection_options = [ + format!( + "ACCESS_KEY_ID='{}'", + expose_optional_secret(&self.config.oss_access_key_id) + ), + format!( + "ACCESS_KEY_SECRET='{}'", + expose_optional_secret(&self.config.oss_access_key_secret) + ), + ]; + + let connection_str = format!(" CONNECTION ({})", connection_options.join(", ")); + (oss_path, connection_str) + } + + fn format_output_path(&self, catalog: &str, file_path: &str) -> String { + let bucket = &self.config.oss_bucket; + format!("oss://{}/{}/{}", bucket, catalog, file_path) + } + + fn mask_sensitive_info(&self, sql: &str) -> String { + mask_secrets( + sql.to_string(), + &[ + expose_optional_secret(&self.config.oss_access_key_id), + expose_optional_secret(&self.config.oss_access_key_secret), + ], + ) + } +} + +define_backend!(GcsBackend, PrefixedGcsConnection); + +#[async_trait] +impl StorageExport for GcsBackend { + fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) { + let bucket = &self.config.gcs_bucket; + let root = format_root_path(&self.config.gcs_root); + + let gcs_path = format!("gcs://{}{}/{}/{}/", bucket, root, catalog, schema); + + let mut connection_options = Vec::new(); + + let credential_path = expose_optional_secret(&self.config.gcs_credential_path); + if !credential_path.is_empty() { + connection_options.push(format!("CREDENTIAL_PATH='{}'", credential_path)); + } + + let credential = expose_optional_secret(&self.config.gcs_credential); + if !credential.is_empty() { + connection_options.push(format!("CREDENTIAL='{}'", credential)); + } + + if !self.config.gcs_endpoint.is_empty() { + connection_options.push(format!("ENDPOINT='{}'", self.config.gcs_endpoint)); + } + + let connection_str = if connection_options.is_empty() { + String::new() + } else { + format!(" CONNECTION ({})", connection_options.join(", ")) + }; + + (gcs_path, connection_str) + } + + fn format_output_path(&self, _catalog: &str, file_path: &str) -> String { + let bucket = &self.config.gcs_bucket; + let root = format_root_path(&self.config.gcs_root); + format!("gcs://{}{}/{}", bucket, root, file_path) + } + + fn mask_sensitive_info(&self, sql: &str) -> String { + mask_secrets( + sql.to_string(), + &[ + expose_optional_secret(&self.config.gcs_credential_path), + expose_optional_secret(&self.config.gcs_credential), + ], + ) + } +} + +define_backend!(AzblobBackend, PrefixedAzblobConnection); + +#[async_trait] +impl StorageExport for AzblobBackend { + fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) { + let container = &self.config.azblob_container; + let root = format_root_path(&self.config.azblob_root); + + let azblob_path = format!("azblob://{}{}/{}/{}/", container, root, catalog, schema); + + let mut connection_options = vec![ + format!( + "ACCOUNT_NAME='{}'", + expose_optional_secret(&self.config.azblob_account_name) + ), + format!( + "ACCOUNT_KEY='{}'", + expose_optional_secret(&self.config.azblob_account_key) + ), + ]; + + if let Some(sas_token) = &self.config.azblob_sas_token { + connection_options.push(format!("SAS_TOKEN='{}'", sas_token)); + } + + let connection_str = format!(" CONNECTION ({})", connection_options.join(", ")); + (azblob_path, connection_str) + } + + fn format_output_path(&self, _catalog: &str, file_path: &str) -> String { + let container = &self.config.azblob_container; + let root = format_root_path(&self.config.azblob_root); + format!("azblob://{}{}/{}", container, root, file_path) + } + + fn mask_sensitive_info(&self, sql: &str) -> String { + mask_secrets( + sql.to_string(), + &[ + expose_optional_secret(&self.config.azblob_account_name), + expose_optional_secret(&self.config.azblob_account_key), + ], + ) + } +} + +define_storage_type!( + pub enum StorageType { + Fs(FsBackend), + S3(S3Backend), + Oss(OssBackend), + Gcs(GcsBackend), + Azblob(AzblobBackend), + } +); diff --git a/src/cli/src/error.rs b/src/cli/src/error.rs index 91a6d35b5c19..aca3e6e29c29 100644 --- a/src/cli/src/error.rs +++ b/src/cli/src/error.rs @@ -253,12 +253,6 @@ pub enum Error { error: ObjectStoreError, }, - #[snafu(display("S3 config need be set"))] - S3ConfigNotSet { - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Output directory not set"))] OutputDirNotSet { #[snafu(implicit)] @@ -364,9 +358,9 @@ impl ErrorExt for Error { Error::Other { source, .. } => source.status_code(), Error::OpenDal { .. } | Error::InitBackend { .. } => StatusCode::Internal, - Error::S3ConfigNotSet { .. } - | Error::OutputDirNotSet { .. } - | Error::EmptyStoreAddrs { .. } => StatusCode::InvalidArguments, + Error::OutputDirNotSet { .. } | Error::EmptyStoreAddrs { .. } => { + StatusCode::InvalidArguments + } Error::BuildRuntime { source, .. } => source.status_code(),