diff --git a/s3/src/bucket.rs b/s3/src/bucket.rs index 7cf34d861b..03d868a88a 100644 --- a/s3/src/bucket.rs +++ b/s3/src/bucket.rs @@ -1597,33 +1597,27 @@ impl Bucket { } /// Calculate the maximum number of concurrent chunks based on available memory. - /// Returns a value between 2 and 10, defaulting to 3 if memory detection fails. + /// Returns a value between 2 and 100, defaulting to 3 if memory detection fails. + /// + /// On Linux, this first attempts to read cgroup memory limits (v2 then v1) + /// to get the container's actual memory constraint. Falls back to sysinfo + /// (which reads /proc/meminfo and reports host memory) when cgroup detection + /// fails or returns unlimited. #[cfg(any(feature = "with-tokio", feature = "with-async-std"))] fn calculate_max_concurrent_chunks() -> usize { - // Create a new System instance and refresh memory info - let mut system = System::new(); - system.refresh_memory_specifics(MemoryRefreshKind::everything()); - - // Get available memory in bytes - let available_memory = system.available_memory(); + let available_memory = cgroup_available_memory().unwrap_or_else(|| { + let mut system = System::new(); + system.refresh_memory_specifics(MemoryRefreshKind::everything()); + system.available_memory() + }); - // If we can't get memory info, use a conservative default if available_memory == 0 { return 3; } - // CHUNK_SIZE is 8MB (8_388_608 bytes) - // Use a safety factor of 3 to leave room for other operations - // and account for memory that might be allocated during upload let safety_factor = 3; let memory_per_chunk = CHUNK_SIZE as u64 * safety_factor; - - // Calculate how many chunks we can safely handle concurrently let calculated_chunks = (available_memory / memory_per_chunk) as usize; - - // Clamp between 2 and 100 for safety - // Minimum 2 to maintain some parallelism - // Maximum 100 to prevent too many concurrent connections calculated_chunks.clamp(2, 100) } @@ -3011,6 +3005,142 @@ impl Bucket { } } +/// Attempt to read available memory from Linux cgroup limits. +/// +/// Tries cgroup v2 first, then v1. Returns `None` on non-Linux platforms, +/// when cgroup files are absent, or when the limit is effectively unlimited. +#[cfg(all( + target_os = "linux", + any(feature = "with-tokio", feature = "with-async-std") +))] +fn cgroup_available_memory() -> Option { + if let Some(mem) = cgroup_v2_available_memory() { + log::debug!("cgroup v2 memory detected: {} bytes available", mem); + return Some(mem); + } + if let Some(mem) = cgroup_v1_available_memory() { + log::debug!("cgroup v1 memory detected: {} bytes available", mem); + return Some(mem); + } + log::debug!("no cgroup memory limit detected, falling back to sysinfo"); + None +} + +#[cfg(all( + not(target_os = "linux"), + any(feature = "with-tokio", feature = "with-async-std") +))] +fn cgroup_available_memory() -> Option { + None +} + +/// Parse a cgroup path from the contents of `/proc/self/cgroup`. +/// +/// For cgroup v2 (`controller == "v2"`), looks for the `0::` unified hierarchy entry. +/// For cgroup v1, looks for the entry whose controllers field contains `controller`. +#[cfg(all( + target_os = "linux", + any(feature = "with-tokio", feature = "with-async-std") +))] +fn parse_cgroup_path(content: &str, controller: &str) -> Option { + for line in content.lines() { + let parts: Vec<&str> = line.splitn(3, ':').collect(); + if parts.len() != 3 { + continue; + } + if controller == "v2" { + // cgroup v2 unified hierarchy: "0::" + // Path may be "/" when running with a private cgroup namespace + // (Docker default, Kubernetes since 1.25), in which case the + // memory files live directly at /sys/fs/cgroup/. + if parts[0] == "0" && parts[1].is_empty() { + let path = parts[2].trim(); + if !path.is_empty() { + return Some(path.to_string()); + } + } + } else { + // cgroup v1: "::" where controllers contains "memory" + let controllers = parts[1]; + if controllers.split(',').any(|c| c == controller) { + let path = parts[2].trim(); + if !path.is_empty() && path != "/" { + return Some(path.to_string()); + } + } + } + } + None +} + +/// Read the process's cgroup path from /proc/self/cgroup. +/// +/// For cgroup v2, looks for the `0::` unified hierarchy entry. +/// For cgroup v1, looks for the `memory` controller entry. +#[cfg(all( + target_os = "linux", + any(feature = "with-tokio", feature = "with-async-std") +))] +fn read_cgroup_path(controller: &str) -> Option { + let content = std::fs::read_to_string("/proc/self/cgroup").ok()?; + parse_cgroup_path(&content, controller) +} + +/// Read available memory from cgroup v2 memory controller. +#[cfg(all( + target_os = "linux", + any(feature = "with-tokio", feature = "with-async-std") +))] +fn cgroup_v2_available_memory() -> Option { + let cgroup_path = read_cgroup_path("v2")?; + if !cgroup_path.starts_with('/') { + return None; + } + let base = format!("/sys/fs/cgroup{}", cgroup_path.trim_end_matches('/')); + + let max_str = std::fs::read_to_string(format!("{}/memory.max", base)).ok()?; + let max_str = max_str.trim(); + if max_str == "max" { + return None; // unlimited, fall back to sysinfo + } + let limit: u64 = max_str.parse().ok()?; + + let current_str = std::fs::read_to_string(format!("{}/memory.current", base)).ok()?; + let current: u64 = current_str.trim().parse().ok()?; + + Some(limit.saturating_sub(current)) +} + +/// Read available memory from cgroup v1 memory controller. +#[cfg(all( + target_os = "linux", + any(feature = "with-tokio", feature = "with-async-std") +))] +fn cgroup_v1_available_memory() -> Option { + let cgroup_path = read_cgroup_path("memory")?; + let base = format!( + "/sys/fs/cgroup/memory{}", + if cgroup_path.starts_with('/') { + &cgroup_path + } else { + return None; + } + ); + + let limit_str = std::fs::read_to_string(format!("{}/memory.limit_in_bytes", base)).ok()?; + let limit: u64 = limit_str.trim().parse().ok()?; + + // Values near u64::MAX indicate no limit (PAGE_COUNTER_MAX * page_size) + if limit >= 0x7FFF_FFFF_FFFF_F000 { + return None; // unlimited, fall back to sysinfo + } + + let usage_str = std::fs::read_to_string(format!("{}/memory.usage_in_bytes", base)).ok()?; + let usage: u64 = usage_str.trim().parse().ok()?; + + Some(limit.saturating_sub(usage)) +} + #[cfg(test)] mod test { @@ -4231,3 +4361,144 @@ mod test { assert!(exists, "Test bucket should exist"); } } + +#[cfg(test)] +#[cfg(target_os = "linux")] +mod cgroup_tests { + use super::parse_cgroup_path; + use std::fs; + + #[test] + fn test_parse_cgroup_path_v2() { + let content = "0::/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-podabc.slice/cri-containerd-xyz.scope\n"; + assert_eq!( + parse_cgroup_path(content, "v2"), + Some("/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-podabc.slice/cri-containerd-xyz.scope".to_string()) + ); + } + + #[test] + fn test_parse_cgroup_path_v1() { + let content = "12:devices:/kubepods/podabc/xyz\n5:memory:/kubepods/podabc/xyz\n1:name=systemd:/kubepods/podabc/xyz\n"; + assert_eq!( + parse_cgroup_path(content, "memory"), + Some("/kubepods/podabc/xyz".to_string()) + ); + } + + #[test] + fn test_parse_cgroup_path_v2_root_returns_root() { + // When cgroup path is "/", this is common with private cgroup namespaces + // (Docker default, Kubernetes since 1.25). The memory files live at + // /sys/fs/cgroup/ directly, so we return Some("/"). + let content = "0::/\n"; + assert_eq!(parse_cgroup_path(content, "v2"), Some("/".to_string())); + } + + #[test] + fn test_parse_cgroup_path_v1_root_returns_none() { + // cgroup v1 with root path "/" means no constraint, should return None. + let content = "5:memory:/\n"; + assert_eq!(parse_cgroup_path(content, "memory"), None); + } + + #[test] + fn test_parse_cgroup_path_no_match() { + let content = "0::/some/path\n"; + assert_eq!(parse_cgroup_path(content, "memory"), None); + } + + #[test] + fn test_cgroup_v2_available_memory_with_limit() { + let tmpdir = std::env::temp_dir().join(format!( + "rust_s3_cgroup_test_v2_{}", + std::process::id() + )); + let _ = fs::remove_dir_all(&tmpdir); + let cgroup_dir = tmpdir.join("kubepods.slice/test.scope"); + fs::create_dir_all(&cgroup_dir).unwrap(); + + fs::write(cgroup_dir.join("memory.max"), "134217728\n").unwrap(); + fs::write(cgroup_dir.join("memory.current"), "57053184\n").unwrap(); + + // Parse memory.max and memory.current directly (simulating what cgroup_v2 does) + let max: u64 = fs::read_to_string(cgroup_dir.join("memory.max")) + .unwrap() + .trim() + .parse() + .unwrap(); + let current: u64 = fs::read_to_string(cgroup_dir.join("memory.current")) + .unwrap() + .trim() + .parse() + .unwrap(); + + assert_eq!(max.saturating_sub(current), 77164544); + + let _ = fs::remove_dir_all(&tmpdir); + } + + #[test] + fn test_cgroup_v2_unlimited_returns_none() { + let max_str = "max\n"; + assert_eq!(max_str.trim(), "max"); + // When "max", our function returns None (unlimited) + } + + #[test] + fn test_cgroup_v1_available_memory_with_limit() { + let tmpdir = std::env::temp_dir().join(format!( + "rust_s3_cgroup_test_v1_{}", + std::process::id() + )); + let _ = fs::remove_dir_all(&tmpdir); + let cgroup_dir = tmpdir.join("kubepods/podabc/xyz"); + fs::create_dir_all(&cgroup_dir).unwrap(); + + fs::write(cgroup_dir.join("memory.limit_in_bytes"), "536870912\n").unwrap(); + fs::write(cgroup_dir.join("memory.usage_in_bytes"), "268435456\n").unwrap(); + + let limit: u64 = fs::read_to_string(cgroup_dir.join("memory.limit_in_bytes")) + .unwrap() + .trim() + .parse() + .unwrap(); + let usage: u64 = fs::read_to_string(cgroup_dir.join("memory.usage_in_bytes")) + .unwrap() + .trim() + .parse() + .unwrap(); + + assert_eq!(limit.saturating_sub(usage), 268435456); + + let _ = fs::remove_dir_all(&tmpdir); + } + + #[test] + fn test_cgroup_v1_unlimited_returns_none() { + // PAGE_COUNTER_MAX on most systems: values >= 0x7FFFFFFFFFFFF000 are "unlimited" + let limit: u64 = 0x7FFF_FFFF_FFFF_F000; + assert!(limit >= 0x7FFF_FFFF_FFFF_F000); + // Our function would return None for this + } + + #[test] + fn test_concurrent_chunks_calculation() { + // Verify the math: 77164544 bytes available / (8388608 * 3) = 3 + let available: u64 = 77164544; // 128MB limit - ~57MB used + let chunk_size: u64 = 8_388_608; + let safety_factor: u64 = 3; + let calculated = (available / (chunk_size * safety_factor)) as usize; + assert_eq!(calculated.clamp(2, 100), 3); + } + + #[test] + fn test_concurrent_chunks_clamp_minimum() { + // Very low memory should clamp to 2 + let available: u64 = 1_000_000; // ~1MB + let chunk_size: u64 = 8_388_608; + let safety_factor: u64 = 3; + let calculated = (available / (chunk_size * safety_factor)) as usize; + assert_eq!(calculated.clamp(2, 100), 2); + } +}