Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 288 additions & 17 deletions s3/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1597,33 +1597,27 @@ impl Bucket {
}

/// Calculate the maximum number of concurrent chunks based on available memory.
/// Returns a value between 2 and 10, defaulting to 3 if memory detection fails.
/// Returns a value between 2 and 100, defaulting to 3 if memory detection fails.
///
/// On Linux, this first attempts to read cgroup memory limits (v2 then v1)
/// to get the container's actual memory constraint. Falls back to sysinfo
/// (which reads /proc/meminfo and reports host memory) when cgroup detection
/// fails or returns unlimited.
#[cfg(any(feature = "with-tokio", feature = "with-async-std"))]
fn calculate_max_concurrent_chunks() -> usize {
// Create a new System instance and refresh memory info
let mut system = System::new();
system.refresh_memory_specifics(MemoryRefreshKind::everything());

// Get available memory in bytes
let available_memory = system.available_memory();
let available_memory = cgroup_available_memory().unwrap_or_else(|| {
let mut system = System::new();
system.refresh_memory_specifics(MemoryRefreshKind::everything());
system.available_memory()
});

// If we can't get memory info, use a conservative default
if available_memory == 0 {
return 3;
}

// CHUNK_SIZE is 8MB (8_388_608 bytes)
// Use a safety factor of 3 to leave room for other operations
// and account for memory that might be allocated during upload
let safety_factor = 3;
let memory_per_chunk = CHUNK_SIZE as u64 * safety_factor;

// Calculate how many chunks we can safely handle concurrently
let calculated_chunks = (available_memory / memory_per_chunk) as usize;

// Clamp between 2 and 100 for safety
// Minimum 2 to maintain some parallelism
// Maximum 100 to prevent too many concurrent connections
calculated_chunks.clamp(2, 100)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Expand Down Expand Up @@ -3011,6 +3005,142 @@ impl Bucket {
}
}

/// Attempt to read available memory from Linux cgroup limits.
///
/// Tries cgroup v2 first, then v1. Returns `None` on non-Linux platforms,
/// when cgroup files are absent, or when the limit is effectively unlimited.
#[cfg(all(
target_os = "linux",
any(feature = "with-tokio", feature = "with-async-std")
))]
fn cgroup_available_memory() -> Option<u64> {
if let Some(mem) = cgroup_v2_available_memory() {
log::debug!("cgroup v2 memory detected: {} bytes available", mem);
return Some(mem);
}
if let Some(mem) = cgroup_v1_available_memory() {
log::debug!("cgroup v1 memory detected: {} bytes available", mem);
return Some(mem);
}
log::debug!("no cgroup memory limit detected, falling back to sysinfo");
None
}

#[cfg(all(
not(target_os = "linux"),
any(feature = "with-tokio", feature = "with-async-std")
))]
fn cgroup_available_memory() -> Option<u64> {
None
}

/// Parse a cgroup path from the contents of `/proc/self/cgroup`.
///
/// For cgroup v2 (`controller == "v2"`), looks for the `0::` unified hierarchy entry.
/// For cgroup v1, looks for the entry whose controllers field contains `controller`.
#[cfg(all(
target_os = "linux",
any(feature = "with-tokio", feature = "with-async-std")
))]
fn parse_cgroup_path(content: &str, controller: &str) -> Option<String> {
for line in content.lines() {
let parts: Vec<&str> = line.splitn(3, ':').collect();
if parts.len() != 3 {
continue;
}
if controller == "v2" {
// cgroup v2 unified hierarchy: "0::<path>"
// Path may be "/" when running with a private cgroup namespace
// (Docker default, Kubernetes since 1.25), in which case the
// memory files live directly at /sys/fs/cgroup/.
if parts[0] == "0" && parts[1].is_empty() {
let path = parts[2].trim();
if !path.is_empty() {
return Some(path.to_string());
}
}
} else {
// cgroup v1: "<id>:<controllers>:<path>" where controllers contains "memory"
let controllers = parts[1];
if controllers.split(',').any(|c| c == controller) {
let path = parts[2].trim();
if !path.is_empty() && path != "/" {
return Some(path.to_string());
}
}
}
}
None
}

/// Read the process's cgroup path from /proc/self/cgroup.
///
/// For cgroup v2, looks for the `0::` unified hierarchy entry.
/// For cgroup v1, looks for the `memory` controller entry.
#[cfg(all(
target_os = "linux",
any(feature = "with-tokio", feature = "with-async-std")
))]
fn read_cgroup_path(controller: &str) -> Option<String> {
let content = std::fs::read_to_string("/proc/self/cgroup").ok()?;
parse_cgroup_path(&content, controller)
}

/// Read available memory from cgroup v2 memory controller.
#[cfg(all(
target_os = "linux",
any(feature = "with-tokio", feature = "with-async-std")
))]
fn cgroup_v2_available_memory() -> Option<u64> {
let cgroup_path = read_cgroup_path("v2")?;
if !cgroup_path.starts_with('/') {
return None;
}
let base = format!("/sys/fs/cgroup{}", cgroup_path.trim_end_matches('/'));

let max_str = std::fs::read_to_string(format!("{}/memory.max", base)).ok()?;
let max_str = max_str.trim();
if max_str == "max" {
return None; // unlimited, fall back to sysinfo
}
let limit: u64 = max_str.parse().ok()?;

let current_str = std::fs::read_to_string(format!("{}/memory.current", base)).ok()?;
let current: u64 = current_str.trim().parse().ok()?;

Some(limit.saturating_sub(current))
}

/// Read available memory from cgroup v1 memory controller.
#[cfg(all(
target_os = "linux",
any(feature = "with-tokio", feature = "with-async-std")
))]
fn cgroup_v1_available_memory() -> Option<u64> {
let cgroup_path = read_cgroup_path("memory")?;
let base = format!(
"/sys/fs/cgroup/memory{}",
if cgroup_path.starts_with('/') {
&cgroup_path
} else {
return None;
}
);

let limit_str = std::fs::read_to_string(format!("{}/memory.limit_in_bytes", base)).ok()?;
let limit: u64 = limit_str.trim().parse().ok()?;

// Values near u64::MAX indicate no limit (PAGE_COUNTER_MAX * page_size)
if limit >= 0x7FFF_FFFF_FFFF_F000 {
return None; // unlimited, fall back to sysinfo
}

let usage_str = std::fs::read_to_string(format!("{}/memory.usage_in_bytes", base)).ok()?;
let usage: u64 = usage_str.trim().parse().ok()?;

Some(limit.saturating_sub(usage))
}
Comment on lines +3119 to +3142
Copy link
Copy Markdown

@coderabbitai coderabbitai bot Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Consider if memory.current / memory.usage_in_bytes is the right signal.

memory.current (v2) and memory.usage_in_bytes (v1) include reclaimable page cache and other evictable memory. Computing limit - current will therefore under-estimate memory available to the upload whenever the container has warm page cache — which, e.g., a long-running service writing to disk easily does. The effect is that concurrency gets clamped toward the lower bound (2) even when plenty of memory could actually be reclaimed for new allocations.

That's a safe conservative direction (favours "too little" over OOMKill), so this isn't a blocker. If you want a tighter number, cgroup v2 exposes memory.stat with file / inactive_file lines you could subtract from current; v1 has memory.stat with cache / inactive_file similarly. Worth considering, otherwise the current conservative behaviour is fine — just flagging so it's an informed choice.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@s3/src/bucket.rs` around lines 3119 - 3142, The current
cgroup_v1_available_memory() uses memory.usage_in_bytes (and v2 memory.current)
which counts reclaimable page cache and therefore underestimates usable memory;
update cgroup_v1_available_memory (and the v2 counterpart if present) to read
and parse cgroup memory.stat and subtract reclaimable fields (for v1: "cache"
and "inactive_file" or for v2: "file" and "inactive_file") from the
usage/current value before computing limit - usage so the result excludes
reclaimable pages, and ensure you still handle missing keys and fall back to the
existing conservative behavior (or a config flag) if memory.stat parsing fails.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed this is worth knowing, but the conservative direction (underestimating available memory) is the right trade-off for an OOM-fix PR. Parsing memory.stat would add complexity for a marginal improvement. If it becomes a problem in practice (concurrency too low on cache-heavy workloads), it can be revisited separately.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ProbstenHias, that's a sound call — for an OOM-fix PR, erring on the side of conservative concurrency is clearly the right priority. The memory.stat path can always be layered on later as an opt-in refinement if real-world workloads show the concurrency floor being hit too aggressively. Happy to close this out.


✏️ Learnings added
Learnt from: ProbstenHias
Repo: durch/rust-s3 PR: 447
File: s3/src/bucket.rs:3119-3142
Timestamp: 2026-04-18T19:49:49.392Z
Learning: In `s3/src/bucket.rs` (rust-s3 crate), `cgroup_v2_available_memory()` and `cgroup_v1_available_memory()` intentionally use `memory.current` / `memory.usage_in_bytes` (which include reclaimable page cache) rather than subtracting reclaimable fields from `memory.stat`. This deliberately underestimates available memory to be conservative and avoid OOMKills during multipart uploads. Parsing `memory.stat` for a tighter estimate was considered but deferred as an explicit design decision by the author (PR `#447`).

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


#[cfg(test)]
mod test {

Expand Down Expand Up @@ -4231,3 +4361,144 @@ mod test {
assert!(exists, "Test bucket should exist");
}
}

#[cfg(test)]
#[cfg(target_os = "linux")]
mod cgroup_tests {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
use super::parse_cgroup_path;
use std::fs;

#[test]
fn test_parse_cgroup_path_v2() {
let content = "0::/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-podabc.slice/cri-containerd-xyz.scope\n";
assert_eq!(
parse_cgroup_path(content, "v2"),
Some("/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-podabc.slice/cri-containerd-xyz.scope".to_string())
);
}

#[test]
fn test_parse_cgroup_path_v1() {
let content = "12:devices:/kubepods/podabc/xyz\n5:memory:/kubepods/podabc/xyz\n1:name=systemd:/kubepods/podabc/xyz\n";
assert_eq!(
parse_cgroup_path(content, "memory"),
Some("/kubepods/podabc/xyz".to_string())
);
}

#[test]
fn test_parse_cgroup_path_v2_root_returns_root() {
// When cgroup path is "/", this is common with private cgroup namespaces
// (Docker default, Kubernetes since 1.25). The memory files live at
// /sys/fs/cgroup/ directly, so we return Some("/").
let content = "0::/\n";
assert_eq!(parse_cgroup_path(content, "v2"), Some("/".to_string()));
}

#[test]
fn test_parse_cgroup_path_v1_root_returns_none() {
// cgroup v1 with root path "/" means no constraint, should return None.
let content = "5:memory:/\n";
assert_eq!(parse_cgroup_path(content, "memory"), None);
}

#[test]
fn test_parse_cgroup_path_no_match() {
let content = "0::/some/path\n";
assert_eq!(parse_cgroup_path(content, "memory"), None);
}

#[test]
fn test_cgroup_v2_available_memory_with_limit() {
let tmpdir = std::env::temp_dir().join(format!(
"rust_s3_cgroup_test_v2_{}",
std::process::id()
));
let _ = fs::remove_dir_all(&tmpdir);
let cgroup_dir = tmpdir.join("kubepods.slice/test.scope");
fs::create_dir_all(&cgroup_dir).unwrap();

fs::write(cgroup_dir.join("memory.max"), "134217728\n").unwrap();
fs::write(cgroup_dir.join("memory.current"), "57053184\n").unwrap();

// Parse memory.max and memory.current directly (simulating what cgroup_v2 does)
let max: u64 = fs::read_to_string(cgroup_dir.join("memory.max"))
.unwrap()
.trim()
.parse()
.unwrap();
let current: u64 = fs::read_to_string(cgroup_dir.join("memory.current"))
.unwrap()
.trim()
.parse()
.unwrap();

assert_eq!(max.saturating_sub(current), 77164544);

let _ = fs::remove_dir_all(&tmpdir);
}

#[test]
fn test_cgroup_v2_unlimited_returns_none() {
let max_str = "max\n";
assert_eq!(max_str.trim(), "max");
// When "max", our function returns None (unlimited)
}

#[test]
fn test_cgroup_v1_available_memory_with_limit() {
let tmpdir = std::env::temp_dir().join(format!(
"rust_s3_cgroup_test_v1_{}",
std::process::id()
));
let _ = fs::remove_dir_all(&tmpdir);
let cgroup_dir = tmpdir.join("kubepods/podabc/xyz");
fs::create_dir_all(&cgroup_dir).unwrap();

fs::write(cgroup_dir.join("memory.limit_in_bytes"), "536870912\n").unwrap();
fs::write(cgroup_dir.join("memory.usage_in_bytes"), "268435456\n").unwrap();

let limit: u64 = fs::read_to_string(cgroup_dir.join("memory.limit_in_bytes"))
.unwrap()
.trim()
.parse()
.unwrap();
let usage: u64 = fs::read_to_string(cgroup_dir.join("memory.usage_in_bytes"))
.unwrap()
.trim()
.parse()
.unwrap();

assert_eq!(limit.saturating_sub(usage), 268435456);

let _ = fs::remove_dir_all(&tmpdir);
}

#[test]
fn test_cgroup_v1_unlimited_returns_none() {
// PAGE_COUNTER_MAX on most systems: values >= 0x7FFFFFFFFFFFF000 are "unlimited"
let limit: u64 = 0x7FFF_FFFF_FFFF_F000;
assert!(limit >= 0x7FFF_FFFF_FFFF_F000);
// Our function would return None for this
}

#[test]
fn test_concurrent_chunks_calculation() {
// Verify the math: 77164544 bytes available / (8388608 * 3) = 3
let available: u64 = 77164544; // 128MB limit - ~57MB used
let chunk_size: u64 = 8_388_608;
let safety_factor: u64 = 3;
let calculated = (available / (chunk_size * safety_factor)) as usize;
assert_eq!(calculated.clamp(2, 100), 3);
}

#[test]
fn test_concurrent_chunks_clamp_minimum() {
// Very low memory should clamp to 2
let available: u64 = 1_000_000; // ~1MB
let chunk_size: u64 = 8_388_608;
let safety_factor: u64 = 3;
let calculated = (available / (chunk_size * safety_factor)) as usize;
assert_eq!(calculated.clamp(2, 100), 2);
}
}