-
Notifications
You must be signed in to change notification settings - Fork 241
fix: use cgroup memory limits for multipart upload concurrency in con… #447
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1597,33 +1597,27 @@ impl Bucket { | |
| } | ||
|
|
||
| /// Calculate the maximum number of concurrent chunks based on available memory. | ||
| /// Returns a value between 2 and 10, defaulting to 3 if memory detection fails. | ||
| /// Returns a value between 2 and 100, defaulting to 3 if memory detection fails. | ||
| /// | ||
| /// On Linux, this first attempts to read cgroup memory limits (v2 then v1) | ||
| /// to get the container's actual memory constraint. Falls back to sysinfo | ||
| /// (which reads /proc/meminfo and reports host memory) when cgroup detection | ||
| /// fails or returns unlimited. | ||
| #[cfg(any(feature = "with-tokio", feature = "with-async-std"))] | ||
| fn calculate_max_concurrent_chunks() -> usize { | ||
| // Create a new System instance and refresh memory info | ||
| let mut system = System::new(); | ||
| system.refresh_memory_specifics(MemoryRefreshKind::everything()); | ||
|
|
||
| // Get available memory in bytes | ||
| let available_memory = system.available_memory(); | ||
| let available_memory = cgroup_available_memory().unwrap_or_else(|| { | ||
| let mut system = System::new(); | ||
| system.refresh_memory_specifics(MemoryRefreshKind::everything()); | ||
| system.available_memory() | ||
| }); | ||
|
|
||
| // If we can't get memory info, use a conservative default | ||
| if available_memory == 0 { | ||
| return 3; | ||
| } | ||
|
|
||
| // CHUNK_SIZE is 8MB (8_388_608 bytes) | ||
| // Use a safety factor of 3 to leave room for other operations | ||
| // and account for memory that might be allocated during upload | ||
| let safety_factor = 3; | ||
| let memory_per_chunk = CHUNK_SIZE as u64 * safety_factor; | ||
|
|
||
| // Calculate how many chunks we can safely handle concurrently | ||
| let calculated_chunks = (available_memory / memory_per_chunk) as usize; | ||
|
|
||
| // Clamp between 2 and 100 for safety | ||
| // Minimum 2 to maintain some parallelism | ||
| // Maximum 100 to prevent too many concurrent connections | ||
| calculated_chunks.clamp(2, 100) | ||
| } | ||
|
|
||
|
|
@@ -3011,6 +3005,142 @@ impl Bucket { | |
| } | ||
| } | ||
|
|
||
| /// Attempt to read available memory from Linux cgroup limits. | ||
| /// | ||
| /// Tries cgroup v2 first, then v1. Returns `None` on non-Linux platforms, | ||
| /// when cgroup files are absent, or when the limit is effectively unlimited. | ||
| #[cfg(all( | ||
| target_os = "linux", | ||
| any(feature = "with-tokio", feature = "with-async-std") | ||
| ))] | ||
| fn cgroup_available_memory() -> Option<u64> { | ||
| if let Some(mem) = cgroup_v2_available_memory() { | ||
| log::debug!("cgroup v2 memory detected: {} bytes available", mem); | ||
| return Some(mem); | ||
| } | ||
| if let Some(mem) = cgroup_v1_available_memory() { | ||
| log::debug!("cgroup v1 memory detected: {} bytes available", mem); | ||
| return Some(mem); | ||
| } | ||
| log::debug!("no cgroup memory limit detected, falling back to sysinfo"); | ||
| None | ||
| } | ||
|
|
||
| #[cfg(all( | ||
| not(target_os = "linux"), | ||
| any(feature = "with-tokio", feature = "with-async-std") | ||
| ))] | ||
| fn cgroup_available_memory() -> Option<u64> { | ||
| None | ||
| } | ||
|
|
||
| /// Parse a cgroup path from the contents of `/proc/self/cgroup`. | ||
| /// | ||
| /// For cgroup v2 (`controller == "v2"`), looks for the `0::` unified hierarchy entry. | ||
| /// For cgroup v1, looks for the entry whose controllers field contains `controller`. | ||
| #[cfg(all( | ||
| target_os = "linux", | ||
| any(feature = "with-tokio", feature = "with-async-std") | ||
| ))] | ||
| fn parse_cgroup_path(content: &str, controller: &str) -> Option<String> { | ||
| for line in content.lines() { | ||
| let parts: Vec<&str> = line.splitn(3, ':').collect(); | ||
| if parts.len() != 3 { | ||
| continue; | ||
| } | ||
| if controller == "v2" { | ||
| // cgroup v2 unified hierarchy: "0::<path>" | ||
| // Path may be "/" when running with a private cgroup namespace | ||
| // (Docker default, Kubernetes since 1.25), in which case the | ||
| // memory files live directly at /sys/fs/cgroup/. | ||
| if parts[0] == "0" && parts[1].is_empty() { | ||
| let path = parts[2].trim(); | ||
| if !path.is_empty() { | ||
| return Some(path.to_string()); | ||
| } | ||
| } | ||
| } else { | ||
| // cgroup v1: "<id>:<controllers>:<path>" where controllers contains "memory" | ||
| let controllers = parts[1]; | ||
| if controllers.split(',').any(|c| c == controller) { | ||
| let path = parts[2].trim(); | ||
| if !path.is_empty() && path != "/" { | ||
| return Some(path.to_string()); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| None | ||
| } | ||
|
|
||
| /// Read the process's cgroup path from /proc/self/cgroup. | ||
| /// | ||
| /// For cgroup v2, looks for the `0::` unified hierarchy entry. | ||
| /// For cgroup v1, looks for the `memory` controller entry. | ||
| #[cfg(all( | ||
| target_os = "linux", | ||
| any(feature = "with-tokio", feature = "with-async-std") | ||
| ))] | ||
| fn read_cgroup_path(controller: &str) -> Option<String> { | ||
| let content = std::fs::read_to_string("/proc/self/cgroup").ok()?; | ||
| parse_cgroup_path(&content, controller) | ||
| } | ||
|
|
||
| /// Read available memory from cgroup v2 memory controller. | ||
| #[cfg(all( | ||
| target_os = "linux", | ||
| any(feature = "with-tokio", feature = "with-async-std") | ||
| ))] | ||
| fn cgroup_v2_available_memory() -> Option<u64> { | ||
| let cgroup_path = read_cgroup_path("v2")?; | ||
| if !cgroup_path.starts_with('/') { | ||
| return None; | ||
| } | ||
| let base = format!("/sys/fs/cgroup{}", cgroup_path.trim_end_matches('/')); | ||
|
|
||
| let max_str = std::fs::read_to_string(format!("{}/memory.max", base)).ok()?; | ||
| let max_str = max_str.trim(); | ||
| if max_str == "max" { | ||
| return None; // unlimited, fall back to sysinfo | ||
| } | ||
| let limit: u64 = max_str.parse().ok()?; | ||
|
|
||
| let current_str = std::fs::read_to_string(format!("{}/memory.current", base)).ok()?; | ||
| let current: u64 = current_str.trim().parse().ok()?; | ||
|
|
||
| Some(limit.saturating_sub(current)) | ||
| } | ||
|
|
||
| /// Read available memory from cgroup v1 memory controller. | ||
| #[cfg(all( | ||
| target_os = "linux", | ||
| any(feature = "with-tokio", feature = "with-async-std") | ||
| ))] | ||
| fn cgroup_v1_available_memory() -> Option<u64> { | ||
| let cgroup_path = read_cgroup_path("memory")?; | ||
| let base = format!( | ||
| "/sys/fs/cgroup/memory{}", | ||
| if cgroup_path.starts_with('/') { | ||
| &cgroup_path | ||
| } else { | ||
| return None; | ||
| } | ||
| ); | ||
|
|
||
| let limit_str = std::fs::read_to_string(format!("{}/memory.limit_in_bytes", base)).ok()?; | ||
| let limit: u64 = limit_str.trim().parse().ok()?; | ||
|
|
||
| // Values near u64::MAX indicate no limit (PAGE_COUNTER_MAX * page_size) | ||
| if limit >= 0x7FFF_FFFF_FFFF_F000 { | ||
| return None; // unlimited, fall back to sysinfo | ||
| } | ||
|
|
||
| let usage_str = std::fs::read_to_string(format!("{}/memory.usage_in_bytes", base)).ok()?; | ||
| let usage: u64 = usage_str.trim().parse().ok()?; | ||
|
|
||
| Some(limit.saturating_sub(usage)) | ||
| } | ||
|
Comment on lines
+3119
to
+3142
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider if
That's a safe conservative direction (favours "too little" over OOMKill), so this isn't a blocker. If you want a tighter number, cgroup v2 exposes 🤖 Prompt for AI Agents
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed this is worth knowing, but the conservative direction (underestimating available memory) is the right trade-off for an OOM-fix PR. Parsing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
✏️ Learnings added
|
||
|
|
||
| #[cfg(test)] | ||
| mod test { | ||
|
|
||
|
|
@@ -4231,3 +4361,144 @@ mod test { | |
| assert!(exists, "Test bucket should exist"); | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| #[cfg(target_os = "linux")] | ||
| mod cgroup_tests { | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| use super::parse_cgroup_path; | ||
| use std::fs; | ||
|
|
||
| #[test] | ||
| fn test_parse_cgroup_path_v2() { | ||
| let content = "0::/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-podabc.slice/cri-containerd-xyz.scope\n"; | ||
| assert_eq!( | ||
| parse_cgroup_path(content, "v2"), | ||
| Some("/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-podabc.slice/cri-containerd-xyz.scope".to_string()) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_parse_cgroup_path_v1() { | ||
| let content = "12:devices:/kubepods/podabc/xyz\n5:memory:/kubepods/podabc/xyz\n1:name=systemd:/kubepods/podabc/xyz\n"; | ||
| assert_eq!( | ||
| parse_cgroup_path(content, "memory"), | ||
| Some("/kubepods/podabc/xyz".to_string()) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_parse_cgroup_path_v2_root_returns_root() { | ||
| // When cgroup path is "/", this is common with private cgroup namespaces | ||
| // (Docker default, Kubernetes since 1.25). The memory files live at | ||
| // /sys/fs/cgroup/ directly, so we return Some("/"). | ||
| let content = "0::/\n"; | ||
| assert_eq!(parse_cgroup_path(content, "v2"), Some("/".to_string())); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_parse_cgroup_path_v1_root_returns_none() { | ||
| // cgroup v1 with root path "/" means no constraint, should return None. | ||
| let content = "5:memory:/\n"; | ||
| assert_eq!(parse_cgroup_path(content, "memory"), None); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_parse_cgroup_path_no_match() { | ||
| let content = "0::/some/path\n"; | ||
| assert_eq!(parse_cgroup_path(content, "memory"), None); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_cgroup_v2_available_memory_with_limit() { | ||
| let tmpdir = std::env::temp_dir().join(format!( | ||
| "rust_s3_cgroup_test_v2_{}", | ||
| std::process::id() | ||
| )); | ||
| let _ = fs::remove_dir_all(&tmpdir); | ||
| let cgroup_dir = tmpdir.join("kubepods.slice/test.scope"); | ||
| fs::create_dir_all(&cgroup_dir).unwrap(); | ||
|
|
||
| fs::write(cgroup_dir.join("memory.max"), "134217728\n").unwrap(); | ||
| fs::write(cgroup_dir.join("memory.current"), "57053184\n").unwrap(); | ||
|
|
||
| // Parse memory.max and memory.current directly (simulating what cgroup_v2 does) | ||
| let max: u64 = fs::read_to_string(cgroup_dir.join("memory.max")) | ||
| .unwrap() | ||
| .trim() | ||
| .parse() | ||
| .unwrap(); | ||
| let current: u64 = fs::read_to_string(cgroup_dir.join("memory.current")) | ||
| .unwrap() | ||
| .trim() | ||
| .parse() | ||
| .unwrap(); | ||
|
|
||
| assert_eq!(max.saturating_sub(current), 77164544); | ||
|
|
||
| let _ = fs::remove_dir_all(&tmpdir); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_cgroup_v2_unlimited_returns_none() { | ||
| let max_str = "max\n"; | ||
| assert_eq!(max_str.trim(), "max"); | ||
| // When "max", our function returns None (unlimited) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_cgroup_v1_available_memory_with_limit() { | ||
| let tmpdir = std::env::temp_dir().join(format!( | ||
| "rust_s3_cgroup_test_v1_{}", | ||
| std::process::id() | ||
| )); | ||
| let _ = fs::remove_dir_all(&tmpdir); | ||
| let cgroup_dir = tmpdir.join("kubepods/podabc/xyz"); | ||
| fs::create_dir_all(&cgroup_dir).unwrap(); | ||
|
|
||
| fs::write(cgroup_dir.join("memory.limit_in_bytes"), "536870912\n").unwrap(); | ||
| fs::write(cgroup_dir.join("memory.usage_in_bytes"), "268435456\n").unwrap(); | ||
|
|
||
| let limit: u64 = fs::read_to_string(cgroup_dir.join("memory.limit_in_bytes")) | ||
| .unwrap() | ||
| .trim() | ||
| .parse() | ||
| .unwrap(); | ||
| let usage: u64 = fs::read_to_string(cgroup_dir.join("memory.usage_in_bytes")) | ||
| .unwrap() | ||
| .trim() | ||
| .parse() | ||
| .unwrap(); | ||
|
|
||
| assert_eq!(limit.saturating_sub(usage), 268435456); | ||
|
|
||
| let _ = fs::remove_dir_all(&tmpdir); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_cgroup_v1_unlimited_returns_none() { | ||
| // PAGE_COUNTER_MAX on most systems: values >= 0x7FFFFFFFFFFFF000 are "unlimited" | ||
| let limit: u64 = 0x7FFF_FFFF_FFFF_F000; | ||
| assert!(limit >= 0x7FFF_FFFF_FFFF_F000); | ||
| // Our function would return None for this | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_concurrent_chunks_calculation() { | ||
| // Verify the math: 77164544 bytes available / (8388608 * 3) = 3 | ||
| let available: u64 = 77164544; // 128MB limit - ~57MB used | ||
| let chunk_size: u64 = 8_388_608; | ||
| let safety_factor: u64 = 3; | ||
| let calculated = (available / (chunk_size * safety_factor)) as usize; | ||
| assert_eq!(calculated.clamp(2, 100), 3); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_concurrent_chunks_clamp_minimum() { | ||
| // Very low memory should clamp to 2 | ||
| let available: u64 = 1_000_000; // ~1MB | ||
| let chunk_size: u64 = 8_388_608; | ||
| let safety_factor: u64 = 3; | ||
| let calculated = (available / (chunk_size * safety_factor)) as usize; | ||
| assert_eq!(calculated.clamp(2, 100), 2); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.