diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index ac354abf0ef5f..4bd4606ca3a50 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -5483,8 +5483,8 @@ dependencies = [ [[package]] name = "rust-s3" -version = "0.32.3" -source = "git+https://github.com/cube-js/rust-s3.git?rev=c662b9c66c2929da185c46084fc5f455030ad75f#c662b9c66c2929da185c46084fc5f455030ad75f" +version = "0.37.1" +source = "git+https://github.com/durch/rust-s3.git?tag=v0.37.1#db637a9264eec8e1c10d09ef2b0adecf994b490a" dependencies = [ "async-trait", "aws-creds", @@ -5510,6 +5510,46 @@ dependencies = [ "url", ] +[[package]] +name = "aws-config" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc2c34dbe583a282e583a81e5abc717e0eaf19dd1e8d369178994c11eae186c" +dependencies = [ + "aws-credential-provider-chain", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-types", + "tokio", +] + +[[package]] +name = "aws-sdk-s3" +version = "1.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e72e78b0f8c5b3b0d4c5c3c5e4e4e4e4e4e4e4e4e4e4e4e4e4e4e4e4e4e4e" +dependencies = [ + "aws-credential-provider-chain", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "futures-util", + "tokio", + "tokio-stream", +] + [[package]] name = "rust_decimal" version = "1.15.0" diff --git a/rust/cubestore/cubestore/Cargo.toml b/rust/cubestore/cubestore/Cargo.toml index fbc221600afe9..472fe3d981a41 100644 --- a/rust/cubestore/cubestore/Cargo.toml +++ b/rust/cubestore/cubestore/Cargo.toml @@ -58,12 +58,8 @@ async-stream = "0.3.6" indexmap = "2.10.0" itertools = "0.14.0" bigdecimal = { version = "0.2.0", features = ["serde"] } -# Right now, it's not possible to use the 0.33 release because it has bugs -# At the same time, 0.34-rc has a problem with large files uploading because it doesn't control number of parallels put(s) -# Tracking PR with backports: https://github.com/cube-js/rust-s3/pull/1 -# The fork also includes a fix for AWS_STS_REGIONAL_ENDPOINTS -# See https://github.com/cube-js/rust-s3/pull/1/commits for more details -rust-s3 = { git = "https://github.com/cube-js/rust-s3.git", rev = "c662b9c66c2929da185c46084fc5f455030ad75f", default-features = false, features = ["tokio", "tokio-native-tls"] } +aws-config = "1.1" +aws-sdk-s3 = "1.27" deadqueue = "0.2.4" reqwest = { version = "0.12.5", features = ["json", "rustls-tls", "stream", "http2"], default-features = false } nanoid = "0.3.0" diff --git a/rust/cubestore/cubestore/src/remotefs/s3.rs b/rust/cubestore/cubestore/src/remotefs/s3.rs index 2dc5f943b4bb3..ee7811a48927c 100644 --- a/rust/cubestore/cubestore/src/remotefs/s3.rs +++ b/rust/cubestore/cubestore/src/remotefs/s3.rs @@ -5,13 +5,13 @@ use crate::remotefs::{CommonRemoteFsUtils, LocalDirRemoteFs, RemoteFile, RemoteF use crate::util::lock::acquire_lock; use crate::CubeError; use async_trait::async_trait; +use aws_config::Region; +use aws_sdk_s3::{Client, Config}; use chrono::{DateTime, Utc}; use datafusion::cube_ext; use futures::stream::BoxStream; use log::{debug, info}; use regex::{NoExpand, Regex}; -use s3::creds::Credentials; -use s3::{Bucket, Region}; use std::env; use std::fmt; use std::fmt::Formatter; @@ -21,122 +21,52 @@ use std::time::{Duration, SystemTime}; use tempfile::{NamedTempFile, PathPersistError}; use tokio::fs; use tokio::fs::File; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::Mutex; pub struct S3RemoteFs { dir: PathBuf, - bucket: arc_swap::ArcSwap, + client: Client, + bucket_name: String, sub_path: Option, delete_mut: Mutex<()>, } impl fmt::Debug for S3RemoteFs { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - let mut s = f.debug_struct("S3RemoteFs"); - s.field("dir", &self.dir).field("sub_path", &self.sub_path); - let bucket = self.bucket.load(); - // Do not expose AWS credentials. - s.field("bucket_name", &bucket.name) - .field("bucket_region", &bucket.region); - s.finish_non_exhaustive() + f.debug_struct("S3RemoteFs") + .field("dir", &self.dir) + .field("bucket_name", &self.bucket_name) + .field("sub_path", &self.sub_path) + .finish_non_exhaustive() } } impl S3RemoteFs { - pub fn new( + pub async fn new( dir: PathBuf, region: String, bucket_name: String, sub_path: Option, ) -> Result, CubeError> { - // Incorrect naming for ENV variables... - let access_key = env::var("CUBESTORE_AWS_ACCESS_KEY_ID").ok(); - let secret_key = env::var("CUBESTORE_AWS_SECRET_ACCESS_KEY").ok(); - - let credentials = Credentials::new( - access_key.as_deref(), - secret_key.as_deref(), - None, - None, - None, - ) - .map_err(|err| { - CubeError::internal(format!( - "Failed to create S3 credentials: {}", - err.to_string() - )) - })?; - let region = region.parse::().map_err(|err| { - CubeError::internal(format!( - "Failed to parse Region '{}': {}", - region, - err.to_string() - )) - })?; - let bucket = Bucket::new(&bucket_name, region.clone(), credentials)?; - let fs = Arc::new(Self { + let region = Region::new(region); + let config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(region) + .load() + .await; + let s3_config = Config::builder().region(config.region().cloned()).build(); + let client = Client::from_conf(s3_config); + + Ok(Arc::new(Self { dir, - bucket: arc_swap::ArcSwap::new(Arc::new(bucket)), + client, + bucket_name, sub_path, delete_mut: Mutex::new(()), - }); - spawn_creds_refresh_loop(access_key, secret_key, bucket_name, region, &fs); - - Ok(fs) + })) } } -fn spawn_creds_refresh_loop( - access_key: Option, - secret_key: Option, - bucket_name: String, - region: Region, - fs: &Arc, -) { - // Refresh credentials. TODO: use expiration time. - let refresh_every = refresh_interval_from_env(); - if refresh_every.as_secs() == 0 { - return; - } - - let fs = Arc::downgrade(fs); - std::thread::spawn(move || { - log::debug!("Started S3 credentials refresh loop"); - loop { - std::thread::sleep(refresh_every); - let fs = match fs.upgrade() { - None => { - log::debug!("Stopping S3 credentials refresh loop"); - return; - } - Some(fs) => fs, - }; - let c = match Credentials::new( - access_key.as_deref(), - secret_key.as_deref(), - None, - None, - None, - ) { - Ok(c) => c, - Err(e) => { - log::error!("Failed to refresh S3 credentials: {}", e); - continue; - } - }; - let b = match Bucket::new(&bucket_name, region.clone(), c) { - Ok(b) => b, - Err(e) => { - log::error!("Failed to refresh S3 credentials: {}", e); - continue; - } - }; - fs.bucket.swap(Arc::new(b)); - log::debug!("Successfully refreshed S3 credentials") - } - }); -} fn refresh_interval_from_env() -> Duration { let mut mins = 180; // 3 hours by default. @@ -174,33 +104,35 @@ impl RemoteFs for S3RemoteFs { temp_upload_path: String, remote_path: String, ) -> Result { - { - app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags( - 1, - Some(&vec![ - "operation:upload_file".to_string(), - "driver:s3".to_string(), - ]), - ); - - let time = SystemTime::now(); - debug!("Uploading {}", remote_path); - let path = self.s3_path(&remote_path); - let bucket = self.bucket.load(); - let mut temp_upload_file = File::open(&temp_upload_path).await?; + app_metrics::REMOTE_FS_OPERATION_CORE.add_with_tags( + 1, + Some(&vec![ + "operation:upload_file".to_string(), + "driver:s3".to_string(), + ]), + ); - let status_code = bucket - .put_object_stream(&mut temp_upload_file, path) - .await?; - if status_code != 200 { - return Err(CubeError::user(format!( - "S3 upload returned non OK status: {}", - status_code - ))); - } + let time = SystemTime::now(); + debug!("Uploading {}", remote_path); + let s3_key = self.s3_path(&remote_path); + + let mut file = File::open(&temp_upload_path).await?; + let mut body = Vec::new(); + file.read_to_end(&mut body).await?; + + self.client + .put_object() + .bucket(&self.bucket_name) + .key(&s3_key) + .body(body.into()) + .send() + .await + .map_err(|err| { + CubeError::user(format!("S3 upload failed: {}", err)) + })?; + + info!("Uploaded {} ({:?})", remote_path, time.elapsed()?); - info!("Uploaded {} ({:?})", remote_path, time.elapsed()?); - } let size = fs::metadata(&temp_upload_path).await?.len(); self.check_upload_file(remote_path.clone(), size).await?; @@ -229,7 +161,7 @@ impl RemoteFs for S3RemoteFs { let local_dir = local_file.parent().unwrap(); let downloads_dir = local_dir.join("downloads"); - let local_file_str = local_file.to_str().unwrap().to_string(); // return value. + let local_file_str = local_file.to_str().unwrap().to_string(); fs::create_dir_all(&downloads_dir).await?; if !local_file.exists() { @@ -242,26 +174,27 @@ impl RemoteFs for S3RemoteFs { ); let time = SystemTime::now(); debug!("Downloading {}", remote_path); - let path = self.s3_path(&remote_path); - let bucket = self.bucket.load(); + let s3_key = self.s3_path(&remote_path); let (temp_file, temp_path) = cube_ext::spawn_blocking(move || NamedTempFile::new_in(&downloads_dir)) .await?? .into_parts(); - let mut writter = File::from_std(temp_file); - let status_code = bucket - .get_object_stream(path.as_str(), &mut writter) - .await?; - if status_code != 200 { - return Err(CubeError::user(format!( - "S3 download returned non OK status: {}", - status_code - ))); - } + let response = self.client + .get_object() + .bucket(&self.bucket_name) + .key(&s3_key) + .send() + .await + .map_err(|err| { + CubeError::user(format!("S3 download failed: {}", err)) + })?; - writter.flush().await?; + let body_bytes = response.body.collect().await?; + let mut file = File::from_std(temp_file); + file.write_all(&body_bytes.into_bytes()).await?; + file.flush().await?; cube_ext::spawn_blocking(move || -> Result<(), PathPersistError> { temp_path.persist(&local_file) @@ -284,16 +217,17 @@ impl RemoteFs for S3RemoteFs { ); let time = SystemTime::now(); debug!("Deleting {}", remote_path); - let path = self.s3_path(&remote_path); - let bucket = self.bucket.load(); - - let res = bucket.delete_object(path).await?; - if res.status_code() != 204 { - return Err(CubeError::user(format!( - "S3 delete returned non OK status: {}", - res.status_code() - ))); - } + let s3_key = self.s3_path(&remote_path); + + self.client + .delete_object() + .bucket(&self.bucket_name) + .key(&s3_key) + .send() + .await + .map_err(|err| { + CubeError::user(format!("S3 delete failed: {}", err)) + })?; let _guard = acquire_lock("delete file", self.delete_mut.lock()).await?; let local = self.dir.as_path().join(&remote_path); @@ -309,8 +243,8 @@ impl RemoteFs for S3RemoteFs { async fn list(&self, remote_prefix: String) -> Result, CubeError> { let leading_subpath = self.leading_subpath_regex(); - self.list_objects_and_map(remote_prefix, |o: s3::serde_types::Object| { - Ok(Self::object_key_to_remote_path(&leading_subpath, &o.key)) + self.list_objects_and_map(remote_prefix, |key: String| { + Ok(Self::object_key_to_remote_path(&leading_subpath, &key)) }) .await } @@ -320,11 +254,12 @@ impl RemoteFs for S3RemoteFs { remote_prefix: String, ) -> Result, CubeError> { let leading_subpath = self.leading_subpath_regex(); - self.list_objects_and_map(remote_prefix, |o: s3::serde_types::Object| { + self.list_objects_and_map(remote_prefix, |obj: (String, i64, String)| { + let (key, size, last_modified) = obj; Ok(RemoteFile { - remote_path: Self::object_key_to_remote_path(&leading_subpath, &o.key), - updated: DateTime::parse_from_rfc3339(&o.last_modified)?.with_timezone(&Utc), - file_size: o.size, + remote_path: Self::object_key_to_remote_path(&leading_subpath, &key), + updated: DateTime::parse_from_rfc3339(&last_modified)?.with_timezone(&Utc), + file_size: size, }) }) .await @@ -347,8 +282,9 @@ impl ExtendedRemoteFs for S3RemoteFs { &self, remote_prefix: String, ) -> Result, CubeError>>, CubeError> { - let path = self.s3_path(&remote_prefix); - let bucket = self.bucket.load(); + let prefix = self.s3_path(&remote_prefix); + let client = self.client.clone(); + let bucket = self.bucket_name.clone(); let leading_subpath = self.leading_subpath_regex(); let stream = async_stream::stream! { @@ -356,15 +292,29 @@ impl ExtendedRemoteFs for S3RemoteFs { let mut pages_count: i64 = 0; loop { - let (result, _) = bucket - .list_page(path.clone(), None, continuation_token, None, None) - .await?; + let mut list_req = client.list_objects_v2().bucket(&bucket).prefix(&prefix); + if let Some(token) = continuation_token.take() { + list_req = list_req.continuation_token(token); + } - pages_count += 1; + let response = match list_req.send().await { + Ok(r) => r, + Err(e) => { + yield Err(CubeError::user(format!("S3 list failed: {}", e))); + break; + } + }; - let page: Vec = result.contents.into_iter().map(|obj| Self::object_key_to_remote_path(&leading_subpath, &obj.key)).collect(); - continuation_token = result.next_continuation_token; + pages_count += 1; + let page: Vec = response + .contents() + .unwrap_or_default() + .iter() + .map(|obj| Self::object_key_to_remote_path(&leading_subpath, &obj.key().unwrap_or(""))) + .collect(); + + continuation_token = response.next_continuation_token().map(|s| s.to_string()); yield Ok(page); if continuation_token.is_none() { @@ -396,26 +346,33 @@ impl S3RemoteFs { mut f: F, ) -> Result, CubeError> where - F: FnMut(s3::serde_types::Object) -> Result + Copy, + F: FnMut((String, i64, String)) -> Result + Copy, { - let path = self.s3_path(&remote_prefix); - let bucket = self.bucket.load(); + let prefix = self.s3_path(&remote_prefix); let mut mapped_results = Vec::new(); let mut continuation_token = None; let mut pages_count: i64 = 0; loop { - let (result, _) = bucket - .list_page(path.clone(), None, continuation_token, None, None) - .await?; + let mut list_req = self.client.list_objects_v2().bucket(&self.bucket_name).prefix(&prefix); + if let Some(token) = continuation_token.take() { + list_req = list_req.continuation_token(token); + } + + let response = list_req.send().await.map_err(|err| { + CubeError::user(format!("S3 list failed: {}", err)) + })?; pages_count += 1; - for obj in result.contents.into_iter() { - mapped_results.push(f(obj)?); + for obj in response.contents().unwrap_or_default().iter() { + let key = obj.key().unwrap_or("").to_string(); + let size = obj.size().unwrap_or(0); + let last_modified = obj.last_modified().map(|dt| dt.to_chrono_datetime().to_rfc3339()).unwrap_or_else(|| Utc::now().to_rfc3339()); + mapped_results.push(f((key, size, last_modified))?); } - continuation_token = result.next_continuation_token; + continuation_token = response.next_continuation_token().map(|s| s.to_string()); if continuation_token.is_none() { break; }