diff --git a/docs/configuration/index.md b/docs/configuration/index.md index c8e1d200cfb9..283e2baf388a 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1946,13 +1946,14 @@ The following table lists available monitors and the respective services where t |`org.apache.druid.java.util.metrics.JvmCpuMonitor`|Reports statistics of CPU consumption by the JVM.|Any| |`org.apache.druid.java.util.metrics.CpuAcctDeltaMonitor`|Reports consumed CPU as per the cpuacct cgroup.|Any| |`org.apache.druid.java.util.metrics.JvmThreadsMonitor`|Reports Thread statistics in the JVM, like numbers of total, daemon, started, died threads.|Any| -|`org.apache.druid.java.util.metrics.CgroupCpuMonitor`|Reports CPU shares and quotas as per the `cpu` cgroup.|Any| -|`org.apache.druid.java.util.metrics.CgroupCpuSetMonitor`|Reports CPU core/HT and memory node allocations as per the `cpuset` cgroup.|Any| -|`org.apache.druid.java.util.metrics.CgroupDiskMonitor`|Reports disk statistic as per the blkio cgroup.|Any| -|`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory statistic as per the memory cgroup.|Any| -|`org.apache.druid.java.util.metrics.CgroupV2CpuMonitor`| **EXPERIMENTAL** Reports CPU usage from `cpu.stat` file. Only applicable to `cgroupv2`.|Any| -|`org.apache.druid.java.util.metrics.CgroupV2DiskMonitor`| **EXPERIMENTAL** Reports disk usage from `io.stat` file. Only applicable to `cgroupv2`.|Any| -|`org.apache.druid.java.util.metrics.CgroupV2MemoryMonitor`| **EXPERIMENTAL** Reports memory usage from `memory.current` and `memory.max` files. Only applicable to `cgroupv2`.|Any| +|`org.apache.druid.java.util.metrics.CgroupCpuMonitor`|Reports CPU shares and quotas as per the `cpu` cgroup. Automatically switches to `CgroupV2CpuMonitor` in case `cgroupv2` type is detected.|Any| +|`org.apache.druid.java.util.metrics.CgroupCpuSetMonitor`|Reports CPU core/HT and memory node allocations as per the `cpuset` cgroup. Automatically switches to `CgroupV2CpuSetMonitor` in case `cgroupv2` type is detected.|Any| +|`org.apache.druid.java.util.metrics.CgroupDiskMonitor`|Reports disk statistic as per the blkio cgroup. Automatically switches to `CgroupV2DiskMonitor` in case `cgroupv2` type is detected.|Any| +|`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory statistic as per the memory cgroup. Automatically switches to `CgroupV2MemoryMonitor` in case `cgroupv2` type is detected.|Any| +|`org.apache.druid.java.util.metrics.CgroupV2CpuMonitor`| Reports CPU usage from `cpu.stat` file. Only applicable to `cgroupv2`.|Any| +|`org.apache.druid.java.util.metrics.CgroupV2CpuSetMonitor`|Reports CPU core/HT and memory node allocations as per the `cpuset` cgroup. Only applicable to `cgroupv2`.|Any| +|`org.apache.druid.java.util.metrics.CgroupV2DiskMonitor`| Reports disk usage from `io.stat` file. Only applicable to `cgroupv2`.|Any| +|`org.apache.druid.java.util.metrics.CgroupV2MemoryMonitor`| Reports memory usage from `memory.current` and `memory.max` files. Only applicable to `cgroupv2`.|Any| |`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical services.|Historical| |`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical services. Not to be used when lazy loading is configured.|Historical| |`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|Broker, Historical, Router, Indexer, Peon| diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java index c37291b4b5f4..d4589df73c08 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuMonitor.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; +import org.apache.druid.java.util.metrics.cgroups.CgroupVersion; import org.apache.druid.java.util.metrics.cgroups.Cpu; import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; @@ -41,37 +42,32 @@ public class CgroupCpuMonitor extends FeedDefiningMonitor final CgroupDiscoverer cgroupDiscoverer; final Map dimensions; private Long userHz; - private KeyedDiff jiffies = new KeyedDiff(); + private final KeyedDiff jiffies = new KeyedDiff(); private long prevJiffiesSnapshotAt = 0; + private final boolean isRunningOnCgroupsV2; + private final CgroupV2CpuMonitor cgroupV2CpuMonitor; public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) { super(feed); this.cgroupDiscoverer = cgroupDiscoverer; this.dimensions = dimensions; - try { - Process p = new ProcessBuilder("getconf", "CLK_TCK").start(); - try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream(), StandardCharsets.UTF_8))) { - String line = in.readLine(); - if (line != null) { - userHz = Long.valueOf(line.trim()); - } - } - } - catch (IOException | NumberFormatException e) { - LOG.warn(e, "Error getting the USER_HZ value"); - } - finally { - if (userHz == null) { - LOG.warn("Using default value for USER_HZ"); - userHz = DEFAULT_USER_HZ; - } + + // Check if we're running on cgroups v2 + this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2); + if (isRunningOnCgroupsV2) { + this.cgroupV2CpuMonitor = new CgroupV2CpuMonitor(cgroupDiscoverer, dimensions, feed); + LOG.info("Detected cgroups v2, using CgroupV2CpuMonitor behavior for accurate metrics"); + } else { + this.cgroupV2CpuMonitor = null; + initUzerHz(); } + } public CgroupCpuMonitor(final Map dimensions, String feed) { - this(new ProcSelfCgroupDiscoverer(), dimensions, feed); + this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed); } public CgroupCpuMonitor(final Map dimensions) @@ -87,16 +83,26 @@ public CgroupCpuMonitor() @Override public boolean doMonitor(ServiceEmitter emitter) { - final Cpu cpu = new Cpu(cgroupDiscoverer); - final Cpu.CpuMetrics cpuSnapshot = cpu.snapshot(); + if (isRunningOnCgroupsV2) { + return cgroupV2CpuMonitor.doMonitor(emitter); + } else { + return doMonitorV1(emitter); + } + } + + private boolean doMonitorV1(ServiceEmitter emitter) + { + final Cpu.CpuMetrics cpuSnapshot = cgroupDiscoverer.getCpuMetrics(); long now = Instant.now().getEpochSecond(); final ServiceMetricEvent.Builder builder = builder(); MonitorUtils.addDimensionsToBuilder(builder, dimensions); + builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name()); + emitter.emit(builder.setMetric("cgroup/cpu/shares", cpuSnapshot.getShares())); emitter.emit(builder.setMetric( "cgroup/cpu/cores_quota", - computeProcessorQuota(cpuSnapshot.getQuotaUs(), cpuSnapshot.getPeriodUs()) + CgroupUtil.computeProcessorQuota(cpuSnapshot.getQuotaUs(), cpuSnapshot.getPeriodUs()) )); long elapsedJiffiesSnapshotSecs = now - prevJiffiesSnapshotAt; @@ -122,18 +128,25 @@ public boolean doMonitor(ServiceEmitter emitter) return true; } - /** - * Calculates the total cores allocated through quotas. A negative value indicates that no quota has been specified. - * We use -1 because that's the default value used in the cgroup. - * - * @param quotaUs the cgroup quota value. - * @param periodUs the cgroup period value. - * @return the calculated processor quota, -1 if no quota or period set. - */ - public static double computeProcessorQuota(long quotaUs, long periodUs) + private void initUzerHz() { - return quotaUs < 0 || periodUs == 0 - ? -1 - : (double) quotaUs / periodUs; + try { + Process p = new ProcessBuilder("getconf", "CLK_TCK").start(); + try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream(), StandardCharsets.UTF_8))) { + String line = in.readLine(); + if (line != null) { + userHz = Long.valueOf(line.trim()); + } + } + } + catch (IOException | NumberFormatException e) { + LOG.warn(e, "Error getting the USER_HZ value"); + } + finally { + if (userHz == null) { + LOG.warn("Using default value for USER_HZ"); + userHz = DEFAULT_USER_HZ; + } + } } } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java index 84de0fd216dd..4d4ae2008bb5 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitor.java @@ -28,6 +28,10 @@ import java.util.Map; +/** + * Monitor that reports CPU set metrics from cgroups both v1 and v2. + */ + public class CgroupCpuSetMonitor extends FeedDefiningMonitor { final CgroupDiscoverer cgroupDiscoverer; @@ -42,7 +46,7 @@ public CgroupCpuSetMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) { - this(new ProcSelfCgroupDiscoverer(), dimensions, feed); + this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed); } public CgroupCpuSetMonitor(final Map dimensions) @@ -58,11 +62,10 @@ public CgroupCpuSetMonitor() @Override public boolean doMonitor(ServiceEmitter emitter) { - final CpuSet cpuset = new CpuSet(cgroupDiscoverer); - final CpuSet.CpuSetMetric cpusetSnapshot = cpuset.snapshot(); - + final CpuSet.CpuSetMetric cpusetSnapshot = cgroupDiscoverer.getCpuSetMetrics(); final ServiceMetricEvent.Builder builder = builder(); MonitorUtils.addDimensionsToBuilder(builder, dimensions); + builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name()); emitter.emit(builder.setMetric( "cgroup/cpuset/cpu_count", cpusetSnapshot.getCpuSetCpus().length diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java index f13c055c7791..d120cd8a28bf 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupDiskMonitor.java @@ -20,9 +20,11 @@ package org.apache.druid.java.util.metrics; import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; +import org.apache.druid.java.util.metrics.cgroups.CgroupVersion; import org.apache.druid.java.util.metrics.cgroups.Disk; import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; @@ -30,20 +32,32 @@ public class CgroupDiskMonitor extends FeedDefiningMonitor { + private static final Logger LOG = new Logger(CgroupDiskMonitor.class); final CgroupDiscoverer cgroupDiscoverer; final Map dimensions; private final KeyedDiff diff = new KeyedDiff(); + private final boolean isRunningOnCgroupsV2; + private final CgroupV2DiskMonitor cgroupV2DiskMonitor; public CgroupDiskMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) { super(feed); this.cgroupDiscoverer = cgroupDiscoverer; this.dimensions = dimensions; + + // Check if we're running on cgroups v2 + this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2); + if (isRunningOnCgroupsV2) { + this.cgroupV2DiskMonitor = new CgroupV2DiskMonitor(cgroupDiscoverer, dimensions, feed); + LOG.info("Detected cgroups v2, using CgroupV2DiskMonitor behavior for accurate metrics"); + } else { + this.cgroupV2DiskMonitor = null; + } } public CgroupDiskMonitor(final Map dimensions, String feed) { - this(new ProcSelfCgroupDiscoverer(), dimensions, feed); + this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed); } public CgroupDiskMonitor(final Map dimensions) @@ -58,6 +72,15 @@ public CgroupDiskMonitor() @Override public boolean doMonitor(ServiceEmitter emitter) + { + if (isRunningOnCgroupsV2) { + return cgroupV2DiskMonitor.doMonitor(emitter); + } else { + return doMonitorV1(emitter); + } + } + + private boolean doMonitorV1(ServiceEmitter emitter) { Map snapshot = new Disk(cgroupDiscoverer).snapshot(); for (Map.Entry entry : snapshot.entrySet()) { @@ -75,6 +98,7 @@ public boolean doMonitor(ServiceEmitter emitter) final ServiceMetricEvent.Builder builder = builder() .setDimension("diskName", entry.getValue().getDiskName()); MonitorUtils.addDimensionsToBuilder(builder, dimensions); + builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion()); for (Map.Entry stat : stats.entrySet()) { emitter.emit(builder.setMetric(stat.getKey(), stat.getValue())); } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java index 26195f972209..24c2aa35bc59 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitor.java @@ -21,9 +21,11 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; +import org.apache.druid.java.util.metrics.cgroups.CgroupVersion; import org.apache.druid.java.util.metrics.cgroups.Memory; import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; @@ -31,19 +33,35 @@ public class CgroupMemoryMonitor extends FeedDefiningMonitor { + private static final Logger LOG = new Logger(CgroupMemoryMonitor.class); + private static final String MEMORY_USAGE_FILE = "memory.usage_in_bytes"; + private static final String MEMORY_LIMIT_FILE = "memory.limit_in_bytes"; + final CgroupDiscoverer cgroupDiscoverer; final Map dimensions; + private final boolean isRunningOnCgroupsV2; + private final CgroupV2MemoryMonitor cgroupV2MemoryMonitor; + public CgroupMemoryMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) { super(feed); this.cgroupDiscoverer = cgroupDiscoverer; this.dimensions = dimensions; + + // Check if we're running on cgroups v2 + this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2); + if (isRunningOnCgroupsV2) { + this.cgroupV2MemoryMonitor = new CgroupV2MemoryMonitor(cgroupDiscoverer, dimensions, feed); + LOG.info("Detected cgroups v2, using CgroupV2MemoryMonitor behavior for accurate metrics"); + } else { + this.cgroupV2MemoryMonitor = null; + } } public CgroupMemoryMonitor(final Map dimensions, String feed) { - this(new ProcSelfCgroupDiscoverer(), dimensions, feed); + this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed); } public CgroupMemoryMonitor(final Map dimensions) @@ -58,11 +76,31 @@ public CgroupMemoryMonitor() @Override public boolean doMonitor(ServiceEmitter emitter) + { + if (isRunningOnCgroupsV2) { + return cgroupV2MemoryMonitor.doMonitor(emitter); + } else { + return parseAndEmit(emitter, cgroupDiscoverer, dimensions, MEMORY_USAGE_FILE, MEMORY_LIMIT_FILE, this); + } + } + + /** + * Common metric parser and emitter for both v1 and v2 cgroupMemory monitors. + */ + public static boolean parseAndEmit( + ServiceEmitter emitter, + CgroupDiscoverer cgroupDiscoverer, + Map dimensions, + String memoryUsageFile, + String memoryLimitFile, + FeedDefiningMonitor feedDefiningMonitor + ) { final Memory memory = new Memory(cgroupDiscoverer); - final Memory.MemoryStat stat = memory.snapshot(memoryUsageFile(), memoryLimitFile()); - final ServiceMetricEvent.Builder builder = builder(); + final Memory.MemoryStat stat = memory.snapshot(memoryUsageFile, memoryLimitFile); + final ServiceMetricEvent.Builder builder = feedDefiningMonitor.builder(); MonitorUtils.addDimensionsToBuilder(builder, dimensions); + builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name()); emitter.emit(builder.setMetric("cgroup/memory/usage/bytes", stat.getUsage())); emitter.emit(builder.setMetric("cgroup/memory/limit/bytes", stat.getLimit())); @@ -72,19 +110,10 @@ public boolean doMonitor(ServiceEmitter emitter) emitter.emit(builder.setMetric(StringUtils.format("cgroup/memory/%s", key), value)); }); stat.getNumaMemoryStats().forEach((key, value) -> { - builder().setDimension("numaZone", Long.toString(key)); + feedDefiningMonitor.builder().setDimension("numaZone", Long.toString(key)); value.forEach((k, v) -> emitter.emit(builder.setMetric(StringUtils.format("cgroup/memory_numa/%s/pages", k), v))); }); return true; } - public String memoryUsageFile() - { - return "memory.usage_in_bytes"; - } - - public String memoryLimitFile() - { - return "memory.limit_in_bytes"; - } } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupUtil.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupUtil.java index 9dd782eea2b1..fb7ab5478e33 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupUtil.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupUtil.java @@ -58,4 +58,20 @@ public static long readLongValue(CgroupDiscoverer discoverer, String cgroup, Str return defaultValue; } } + + /** + * Calculates the total cores allocated through quotas. A negative value indicates that no quota has been specified. + * We use -1 because that's the default value used in the cgroup. + * + * @param quotaUs the cgroup quota value. + * @param periodUs the cgroup period value. + * @return the calculated processor quota, -1 if no quota or period set. + */ + public static double computeProcessorQuota(long quotaUs, long periodUs) + { + return quotaUs < 0 || periodUs == 0 + ? -1 + : (double) quotaUs / periodUs; + } + } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitor.java index 596215db2c9a..3e9dbb4a3fe5 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitor.java @@ -21,8 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; -import com.google.common.primitives.Longs; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; @@ -30,23 +28,15 @@ import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer; import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; -import java.io.BufferedReader; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.HashMap; import java.util.Map; -import java.util.regex.Pattern; /** * Monitor that reports cpu usage stats by reading `cpu.stat` reported by cgroupv2 */ public class CgroupV2CpuMonitor extends FeedDefiningMonitor { - private static final Logger LOG = new Logger(CgroupV2CpuMonitor.class); - private static final String CPU_STAT_FILE = "cpu.stat"; private static final String SNAPSHOT = "snapshot"; final CgroupDiscoverer cgroupDiscoverer; final Map dimensions; @@ -75,13 +65,21 @@ public boolean doMonitor(ServiceEmitter emitter) { final ServiceMetricEvent.Builder builder = builder(); MonitorUtils.addDimensionsToBuilder(builder, dimensions); - Snapshot snapshot = snapshot(); + builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name()); + final Cpu.CpuMetrics cpuSnapshot = cgroupDiscoverer.getCpuMetrics(); + emitter.emit(builder.setMetric("cgroup/cpu/shares", cpuSnapshot.getShares())); + emitter.emit(builder.setMetric( + "cgroup/cpu/cores_quota", + CgroupUtil.computeProcessorQuota(cpuSnapshot.getQuotaUs(), cpuSnapshot.getPeriodUs()) + )); + + final Map elapsed = diff.to( "usage", ImmutableMap.builder() - .put(CgroupUtil.USER, snapshot.getUserUsec()) - .put(CgroupUtil.SYSTEM, snapshot.getSystemUsec()) - .put(CgroupUtil.TOTAL, snapshot.getUsageUsec()) + .put(CgroupUtil.USER, cpuSnapshot.getUserUs()) + .put(CgroupUtil.SYSTEM, cpuSnapshot.getSystemUs()) + .put(CgroupUtil.TOTAL, cpuSnapshot.getTotalUs()) .put(SNAPSHOT, ChronoUnit.MICROS.between(Instant.EPOCH, Instant.now())) .build() ); @@ -97,67 +95,4 @@ public boolean doMonitor(ServiceEmitter emitter) } return true; } - - /* - file: cpu.stat - - sample content: - usage_usec 2379951538 - user_usec 1802023024 - system_usec 577928513 - nr_periods 1581231 - nr_throttled 59 - throttled_usec 3095133 - */ - public Snapshot snapshot() - { - Map entries = new HashMap<>(); - try (final BufferedReader reader = Files.newBufferedReader( - Paths.get(cgroupDiscoverer.discover(Cpu.CGROUP).toString(), CPU_STAT_FILE) - )) { - for (String line = reader.readLine(); line != null; line = reader.readLine()) { - final String[] parts = line.split(Pattern.quote(" ")); - if (parts.length != 2) { - // ignore - continue; - } - entries.put(parts[0], Longs.tryParse(parts[1])); - } - } - catch (IOException | RuntimeException ex) { - LOG.error(ex, "Unable to fetch cpu snapshot"); - } - - return new Snapshot(entries.get("usage_usec"), entries.get("user_usec"), entries.get("system_usec")); - } - - - public static class Snapshot - { - private final long usageUsec; - private final long userUsec; - private final long systemUsec; - - public Snapshot(long usageUsec, long userUsec, long systemUsec) - { - this.usageUsec = usageUsec; - this.userUsec = userUsec; - this.systemUsec = systemUsec; - } - - public long getUsageUsec() - { - return usageUsec; - } - - public long getUserUsec() - { - return userUsec; - } - - public long getSystemUsec() - { - return systemUsec; - } - } } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuSetMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuSetMonitor.java new file mode 100644 index 000000000000..595b136a6c22 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2CpuSetMonitor.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; +import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer; +import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; + +import java.util.Map; + +/** + * Monitor that reports CPU set metrics from cgroups v2 with native v2 file support. + */ +public class CgroupV2CpuSetMonitor extends CgroupCpuSetMonitor +{ + + public CgroupV2CpuSetMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) + { + super(cgroupDiscoverer, dimensions, feed); + + } + + @VisibleForTesting + CgroupV2CpuSetMonitor(CgroupDiscoverer cgroupDiscoverer) + { + this(cgroupDiscoverer, ImmutableMap.of(), DEFAULT_METRICS_FEED); + } + + CgroupV2CpuSetMonitor() + { + this(new ProcSelfCgroupDiscoverer(ProcCgroupV2Discoverer.class)); + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitor.java index 5f50dea3fbf3..b35f780851b0 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitor.java @@ -87,6 +87,7 @@ public boolean doMonitor(ServiceEmitter emitter) final ServiceMetricEvent.Builder builder = builder() .setDimension("diskName", entry.getDiskName()); MonitorUtils.addDimensionsToBuilder(builder, dimensions); + builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion()); for (Map.Entry stat : stats.entrySet()) { emitter.emit(builder.setMetric(stat.getKey(), stat.getValue())); } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitor.java index 1de1051e23de..43b15cbf728b 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitor.java @@ -21,38 +21,59 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer; import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; +import java.util.Map; + /** * Monitor that reports memory usage stats by reading `memory.*` files reported by cgroupv2 */ -public class CgroupV2MemoryMonitor extends CgroupMemoryMonitor +public class CgroupV2MemoryMonitor extends FeedDefiningMonitor { + private static final String MEMORY_USAGE_FILE = "memory.current"; + private static final String MEMORY_LIMIT_FILE = "memory.max"; + private final CgroupDiscoverer cgroupDiscoverer; + private final Map dimensions; + @VisibleForTesting - CgroupV2MemoryMonitor(CgroupDiscoverer cgroupDiscoverer) + CgroupV2MemoryMonitor(CgroupDiscoverer cgroupDiscoverer, final Map dimensions, String feed) { - super(cgroupDiscoverer, ImmutableMap.of(), DEFAULT_METRICS_FEED); + super(feed); + this.cgroupDiscoverer = cgroupDiscoverer; + this.dimensions = dimensions; } - // This would be invoked when monitor is specified in config (supressing to satisy intellij-inspections) - @SuppressWarnings("unused") - CgroupV2MemoryMonitor() + + public CgroupV2MemoryMonitor(final Map dimensions, String feed) { - this(new ProcSelfCgroupDiscoverer(ProcCgroupV2Discoverer.class)); + this(new ProcSelfCgroupDiscoverer(ProcCgroupV2Discoverer.class), dimensions, feed); } - @Override - public String memoryUsageFile() + public CgroupV2MemoryMonitor(final Map dimensions) { - return "memory.current"; + this(dimensions, DEFAULT_METRICS_FEED); } + public CgroupV2MemoryMonitor() + { + this(ImmutableMap.of()); + } + + @Override - public String memoryLimitFile() + public boolean doMonitor(ServiceEmitter emitter) { - return "memory.max"; + return CgroupMemoryMonitor.parseAndEmit( + emitter, + cgroupDiscoverer, + dimensions, + MEMORY_USAGE_FILE, + MEMORY_LIMIT_FILE, + this + ); } } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitor.java index cbaf39729f27..10d46258556c 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitor.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitor.java @@ -22,10 +22,12 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; +import org.apache.druid.java.util.metrics.cgroups.CgroupVersion; import org.apache.druid.java.util.metrics.cgroups.CpuAcct; import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; import org.joda.time.DateTime; @@ -36,10 +38,16 @@ public class CpuAcctDeltaMonitor extends FeedDefiningMonitor { private static final Logger log = new Logger(CpuAcctDeltaMonitor.class); + private static final String USE_CGROUPS_V2_MESSAGE = StringUtils.format( + "%s doest not function correctly on cgroups v2. Please use %s instead.", + CpuAcctDeltaMonitor.class.getSimpleName(), + CgroupV2CpuMonitor.class.getSimpleName() + ); private final AtomicReference priorSnapshot = new AtomicReference<>(null); private final Map dimensions; private final CgroupDiscoverer cgroupDiscoverer; + private final boolean isRunningOnCgroupsV2; public CpuAcctDeltaMonitor() { @@ -53,7 +61,7 @@ public CpuAcctDeltaMonitor(final Map dimensions) public CpuAcctDeltaMonitor(final Map dimensions, final String feed) { - this(feed, dimensions, new ProcSelfCgroupDiscoverer()); + this(feed, dimensions, ProcSelfCgroupDiscoverer.autoCgroupDiscoverer()); } public CpuAcctDeltaMonitor( @@ -66,11 +74,20 @@ public CpuAcctDeltaMonitor( Preconditions.checkNotNull(dimensions); this.dimensions = ImmutableMap.copyOf(dimensions); this.cgroupDiscoverer = Preconditions.checkNotNull(cgroupDiscoverer, "cgroupDiscoverer required"); + + isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2); + if (isRunningOnCgroupsV2) { + log.warn(USE_CGROUPS_V2_MESSAGE); + } } @Override public boolean doMonitor(ServiceEmitter emitter) { + if (isRunningOnCgroupsV2) { + log.warn(USE_CGROUPS_V2_MESSAGE); + return false; + } final CpuAcct cpuAcct = new CpuAcct(cgroupDiscoverer); final CpuAcct.CpuAcctMetric snapshot = cpuAcct.snapshot(); final long nanoTime = System.nanoTime(); // Approx time... may be influenced by an unlucky GC @@ -96,10 +113,12 @@ public boolean doMonitor(ServiceEmitter emitter) for (int i = 0; i < snapshot.cpuCount(); ++i) { final ServiceMetricEvent.Builder builderUsr = builder() .setDimension("cpuName", Integer.toString(i)) - .setDimension("cpuTime", "usr"); + .setDimension("cpuTime", "usr") + .setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name()); final ServiceMetricEvent.Builder builderSys = builder() .setDimension("cpuName", Integer.toString(i)) - .setDimension("cpuTime", "sys"); + .setDimension("cpuTime", "sys") + .setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name()); MonitorUtils.addDimensionsToBuilder(builderUsr, dimensions); MonitorUtils.addDimensionsToBuilder(builderSys, dimensions); emitter.emit(builderUsr.setCreatedTime(dateTime).setMetric( diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CgroupDiscoverer.java b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CgroupDiscoverer.java index bb3bf9166b72..6f2bc18323c7 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CgroupDiscoverer.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CgroupDiscoverer.java @@ -29,4 +29,21 @@ public interface CgroupDiscoverer * @return The path that contains that cgroup's interesting bits. */ Path discover(String cgroup); + + /** + * Gets CPU metrics (shares, quota, period, usage) appropriate for this cgroups version. + * Encapsulates the difference between v1 (cpu.shares, cpu.cfs_*) and v2 (cpu.weight, cpu.max). + * @return CPU metrics compatible with both cgroups v1 and v2 + */ + Cpu.CpuMetrics getCpuMetrics(); + + /** + * Gets CPU set metrics (effective CPU list) appropriate for this cgroups version. + * Encapsulates the difference between v1 (cpuset.effective_cpus) and v2 (cpuset.cpus.effective). + * @return CPU set metrics compatible with both cgroups v1 and v2 + */ + CpuSet.CpuSetMetric getCpuSetMetrics(); + + CgroupVersion getCgroupVersion(); + } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CgroupVersion.java b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CgroupVersion.java new file mode 100644 index 000000000000..49e65ae8881d --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CgroupVersion.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics.cgroups; + +/** + * Enumeration of supported cgroups versions + */ +public enum CgroupVersion +{ + V1, + V2, + UNKNOWN +} diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Cpu.java b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Cpu.java index 5938740dd152..0b1a76920981 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Cpu.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/Cpu.java @@ -85,7 +85,7 @@ public CpuMetrics snapshot() CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_SHARES_FILE, -1), CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_QUOTA_FILE, 0), CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_PERIOD_FILE, 0), - systemJiffies, userJiffies + systemJiffies, userJiffies, -1, -1, -1 ); } @@ -108,15 +108,33 @@ public static class CpuMetrics // Maps to system value at cpuacct.stat private final long systemJiffies; - CpuMetrics(long shares, long quotaUs, long periodUs, long systemJiffis, long userJiffies) + private final long userUs; + private final long systemUs; + private final long totalUs; + + + CpuMetrics( + long shares, + long quotaUs, + long periodUs, + long systemJiffis, + long userJiffies, + long userUs, + long systemUs, + long totalUs + ) { this.shares = shares; this.quotaUs = quotaUs; this.periodUs = periodUs; this.userJiffies = userJiffies; this.systemJiffies = systemJiffis; + this.userUs = userUs; + this.systemUs = systemUs; + this.totalUs = totalUs; } + public final long getShares() { return shares; @@ -144,7 +162,25 @@ public long getSystemJiffies() public long getTotalJiffies() { + if (userJiffies == -1 && systemJiffies == -1) { + return -1; + } return userJiffies + systemJiffies; } + + public long getUserUs() + { + return userUs; + } + + public long getSystemUs() + { + return systemUs; + } + + public long getTotalUs() + { + return totalUs; + } } } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CpuSet.java b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CpuSet.java index 390bda2ba22b..97a8cc81dac6 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CpuSet.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CpuSet.java @@ -69,7 +69,7 @@ private int[] readCpuSetFile(String file) try { List lines = Files.readAllLines( Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), file)); - output = lines.stream().map(this::parseStringRangeToIntArray).findFirst().orElse(output); + output = lines.stream().map(CpuSet::parseStringRangeToIntArray).findFirst().orElse(output); } catch (RuntimeException | IOException ex) { LOG.noStackTrace().warn(ex, "Unable to read %s, these metrics will be skipped", file); @@ -85,24 +85,26 @@ private int[] readCpuSetFile(String file) * # outputs [0, 1, 2, 7, 12, 13, 14] * * This method also works fine for memory nodes. + * The format is identical in both cgroups v1 and v2, so this can be shared. * * @param line The list format cpu value * @return the list of CPU IDs */ - private int[] parseStringRangeToIntArray(String line) + public static int[] parseStringRangeToIntArray(String line) { String[] cpuParts = line.split(","); return Arrays.stream(cpuParts) .flatMapToInt(cpuPart -> { + cpuPart = cpuPart.trim(); // Trim whitespace around each part String[] bits = cpuPart.split("-"); if (bits.length == 2) { - Integer low = Ints.tryParse(bits[0]); - Integer high = Ints.tryParse(bits[1]); + Integer low = Ints.tryParse(bits[0].trim()); + Integer high = Ints.tryParse(bits[1].trim()); if (low != null && high != null) { return IntStream.rangeClosed(low, high); } } else if (bits.length == 1) { - Integer bit = Ints.tryParse(bits[0]); + Integer bit = Ints.tryParse(bits[0].trim()); if (bit != null) { return IntStream.of(bit); } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CpuSetV2.java b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CpuSetV2.java new file mode 100644 index 000000000000..505efc2e6579 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CpuSetV2.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics.cgroups; + +import org.apache.druid.java.util.common.logger.Logger; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; + +/** + * Collect CPU and memory data from cgroups v2 cpuset files. + * This class provides a bridge to return CpuSet.CpuSetMetric compatible data from cgroups v2. + */ +public class CpuSetV2 +{ + private static final Logger LOG = new Logger(CpuSetV2.class); + public static final String CGROUP = ""; // cgroups v2 uses unified hierarchy + // cgroups v2 file names (different from v1) + private static final String CPUS_FILE = "cpuset.cpus"; + private static final String EFFECTIVE_CPUS_FILE = "cpuset.cpus.effective"; + private static final String MEMS_FILE = "cpuset.mems"; + private static final String EFFECTIVE_MEMS_FILE = "cpuset.mems.effective"; + + private final CgroupDiscoverer cgroupDiscoverer; + + public CpuSetV2(CgroupDiscoverer cgroupDiscoverer) + { + this.cgroupDiscoverer = cgroupDiscoverer; + } + + /** + * Take a snapshot of cgroups v2 cpuset data and convert it to CpuSet.CpuSetMetric format. + * + * @return A CpuSet.CpuSetMetric snapshot with v2 data in v1-compatible format. + */ + public CpuSet.CpuSetMetric snapshot() + { + return new CpuSet.CpuSetMetric( + readCpuSetFile(CPUS_FILE), + readCpuSetFile(EFFECTIVE_CPUS_FILE), + readCpuSetFile(MEMS_FILE), + readCpuSetFile(EFFECTIVE_MEMS_FILE) + ); + } + + private int[] readCpuSetFile(String file) + { + int[] output = {}; + try { + List lines = Files.readAllLines( + Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), file)); + output = lines.stream().map(CpuSet::parseStringRangeToIntArray).findFirst().orElse(output); + } + catch (RuntimeException | IOException ex) { + LOG.noStackTrace().warn(ex, "Unable to read %s, these metrics will be skipped", file); + } + return output; + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CpuV2.java b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CpuV2.java new file mode 100644 index 000000000000..226e9adfb522 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/CpuV2.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics.cgroups; + +import com.google.common.primitives.Longs; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.metrics.CgroupUtil; + +import java.io.BufferedReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.regex.Pattern; + +/** + * Collect CPU weight, quota and usage information from cgroups v2 files. + * This class provides a bridge to return Cpu.CpuMetrics compatible data from cgroups v2. + */ +public class CpuV2 +{ + public static final String CGROUP = ""; // cgroups v2 uses unified hierarchy + private static final Logger LOG = new Logger(CpuV2.class); + private static final String CPU_STAT_FILE = "cpu.stat"; + private static final String CPU_WEIGHT_FILE = "cpu.weight"; + private static final String CPU_MAX_FILE = "cpu.max"; + + private final CgroupDiscoverer cgroupDiscoverer; + + public CpuV2(CgroupDiscoverer cgroupDiscoverer) + { + this.cgroupDiscoverer = cgroupDiscoverer; + } + + /** + * Take a snapshot of cgroups v2 CPU data with native microsecond precision. + * This provides maximum accuracy for monitoring calculations. + * + * @return A Cpu.CpuMetrics snapshot with native v2 microsecond data. + */ + public Cpu.CpuMetrics snapshot() + { + long usageUsec = -1L; + long userUsec = -1L; + long systemUsec = -1L; + + // Read cpu.stat (equivalent to cpuacct.stat but with different format) + try (final BufferedReader reader = Files.newBufferedReader( + Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), CPU_STAT_FILE) + )) { + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + final String[] parts = line.split(Pattern.quote(" ")); + if (parts.length != 2) { + continue; + } + switch (parts[0]) { + case "usage_usec": + usageUsec = Longs.tryParse(parts[1]); // Capture this! + break; + case "user_usec": + userUsec = Longs.tryParse(parts[1]); + break; + case "system_usec": + systemUsec = Longs.tryParse(parts[1]); + break; + } + } + } + catch (IOException | RuntimeException ex) { + LOG.noStackTrace().warn(ex, "Unable to fetch CPU v2 snapshot. Cgroup metrics will not be emitted."); + } + + // Read cpu.weight (equivalent to cpu.shares but different scale) + long weight = CgroupUtil.readLongValue(cgroupDiscoverer, CGROUP, CPU_WEIGHT_FILE, -1); + + // Read cpu.max (format: "quota period" or "max") + long quotaUs = -1; + long periodUs = -1; + try (final BufferedReader reader = Files.newBufferedReader( + Paths.get(cgroupDiscoverer.discover(CGROUP).toString(), CPU_MAX_FILE) + )) { + String line = reader.readLine(); + if (line != null && !line.trim().startsWith("max")) { + String[] parts = line.trim().split("\\s+"); + if (parts.length == 2) { + quotaUs = Longs.tryParse(parts[0]); + periodUs = Longs.tryParse(parts[1]); + } + } + } + catch (IOException | RuntimeException ex) { + LOG.noStackTrace().warn(ex, "Unable to read cpu.max file."); + } + + // Convert weight to shares for compatibility with existing logic + long shares = convertWeightToShares(weight); + + return new Cpu.CpuMetrics(shares, quotaUs, periodUs, -1, -1, userUsec, systemUsec, usageUsec); + } + + /** + * Convert cgroups v2 cpu.weight to cgroups v1 cpu.shares equivalent. + * v2 weight range: 1-10000 (default: 100) + * v1 shares range: 2-262144 (default: 1024) + * + * @param weight The cgroups v2 weight value + * @return Equivalent shares value for compatibility + */ + private static long convertWeightToShares(long weight) + { + if (weight <= 0) { + return -1; // No limit + } + + // Convert weight to shares: weight=100 -> shares=1024 (defaults match) + // Linear scaling: shares = (weight * 1024) / 100 + return Math.max(2, (weight * 1024) / 100); + } +} diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/ProcCgroupDiscoverer.java b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/ProcCgroupDiscoverer.java index afa1b190cd70..c8e828d49a11 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/ProcCgroupDiscoverer.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/ProcCgroupDiscoverer.java @@ -193,4 +193,22 @@ private PidCgroupEntry(Set controllers, Path path) this.path = path; } } + + @Override + public Cpu.CpuMetrics getCpuMetrics() + { + return new Cpu(this).snapshot(); + } + + @Override + public CpuSet.CpuSetMetric getCpuSetMetrics() + { + return new CpuSet(this).snapshot(); + } + + @Override + public CgroupVersion getCgroupVersion() + { + return CgroupVersion.V1; + } } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/ProcCgroupV2Discoverer.java b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/ProcCgroupV2Discoverer.java index f75b4e532b9b..523e1bd0e8e3 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/ProcCgroupV2Discoverer.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/ProcCgroupV2Discoverer.java @@ -58,4 +58,22 @@ public Path discover(String cgroup) throw new RE("Cgroup location not found"); } + + @Override + public Cpu.CpuMetrics getCpuMetrics() + { + return new CpuV2(this).snapshot(); + } + + @Override + public CpuSet.CpuSetMetric getCpuSetMetrics() + { + return new CpuSetV2(this).snapshot(); + } + + @Override + public CgroupVersion getCgroupVersion() + { + return CgroupVersion.V2; + } } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/ProcSelfCgroupDiscoverer.java b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/ProcSelfCgroupDiscoverer.java index 02a8ffab2e76..049c0a04e471 100644 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/ProcSelfCgroupDiscoverer.java +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/cgroups/ProcSelfCgroupDiscoverer.java @@ -19,11 +19,18 @@ package org.apache.druid.java.util.metrics.cgroups; +import com.google.common.io.Files; +import org.apache.druid.java.util.common.logger.Logger; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; public class ProcSelfCgroupDiscoverer implements CgroupDiscoverer { + private static final Logger LOG = new Logger(ProcSelfCgroupDiscoverer.class); private final CgroupDiscoverer delegate; public ProcSelfCgroupDiscoverer() @@ -31,6 +38,12 @@ public ProcSelfCgroupDiscoverer() this(ProcCgroupDiscoverer.class); } + + public static CgroupDiscoverer autoCgroupDiscoverer() + { + return autoCgroupDiscoverer(Paths.get("/proc/self")); + } + public ProcSelfCgroupDiscoverer(Class discoverer) { try { @@ -46,4 +59,89 @@ public Path discover(String cgroup) { return delegate.discover(cgroup); } + + @Override + public Cpu.CpuMetrics getCpuMetrics() + { + return delegate.getCpuMetrics(); + } + + @Override + public CpuSet.CpuSetMetric getCpuSetMetrics() + { + return delegate.getCpuSetMetrics(); + } + + @Override + public CgroupVersion getCgroupVersion() + { + return delegate.getCgroupVersion(); + } + + /** + * Creates the appropriate CgroupDiscoverer based on the cgroups version detected in the system. + */ + public static CgroupDiscoverer autoCgroupDiscoverer(Path procPidDir) + { + CgroupVersion version = detectCgroupVersion(procPidDir); + + switch (version) { + case V2: + LOG.info("Detected cgroups v2, using ProcCgroupV2Discoverer"); + return new ProcCgroupV2Discoverer(procPidDir); + + case V1: + LOG.info("Detected cgroups v1, using ProcCgroupDiscoverer"); + return new ProcCgroupDiscoverer(procPidDir); + + case UNKNOWN: + default: + LOG.warn("Could not detect cgroups version, falling back to cgroups v1 discoverer"); + return new ProcCgroupDiscoverer(procPidDir); + } + } + + /** + * Detects the cgroups version by examining the /proc/mounts file. + */ + private static CgroupVersion detectCgroupVersion(Path procPidDir) + { + File mountsFile = new File(procPidDir.toFile(), "mounts"); + + if (!mountsFile.exists() || !mountsFile.canRead()) { + LOG.warn("Cannot read mounts file at [%s], unable to detect cgroups version", mountsFile); + return CgroupVersion.UNKNOWN; + } + + try { + boolean hasV1 = false; + boolean hasV2 = false; + + for (String line : Files.readLines(mountsFile, StandardCharsets.UTF_8)) { + String[] parts = line.split("\\s+"); + if (parts.length >= 3) { + String fsType = parts[2]; + if ("cgroup".equals(fsType)) { + hasV1 = true; + } else if ("cgroup2".equals(fsType)) { + hasV2 = true; + } + } + } + + // Prefer v2 if both are present (hybrid mode) + if (hasV2) { + return CgroupVersion.V2; + } else if (hasV1) { + return CgroupVersion.V1; + } else { + return CgroupVersion.UNKNOWN; + } + } + catch (IOException e) { + LOG.warn(e, "Error reading mounts file [%s], unable to detect cgroups version", mountsFile); + return CgroupVersion.UNKNOWN; + } + } + } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java index c42d9a61c69d..990addd6d82e 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuMonitorTest.java @@ -24,7 +24,9 @@ import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; +import org.apache.druid.java.util.metrics.cgroups.CgroupVersion; import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer; +import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.TestUtils; import org.junit.Assert; import org.junit.Before; @@ -35,6 +37,7 @@ import java.io.File; import java.io.IOException; +import java.net.URISyntaxException; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -109,10 +112,55 @@ public void testMonitor() throws IOException, InterruptedException @Test public void testQuotaCompute() { - Assert.assertEquals(-1, CgroupCpuMonitor.computeProcessorQuota(-1, 100000), 0); - Assert.assertEquals(0, CgroupCpuMonitor.computeProcessorQuota(0, 100000), 0); - Assert.assertEquals(-1, CgroupCpuMonitor.computeProcessorQuota(100000, 0), 0); - Assert.assertEquals(2.0D, CgroupCpuMonitor.computeProcessorQuota(200000, 100000), 0); - Assert.assertEquals(0.5D, CgroupCpuMonitor.computeProcessorQuota(50000, 100000), 0); + Assert.assertEquals(-1, CgroupUtil.computeProcessorQuota(-1, 100000), 0); + Assert.assertEquals(0, CgroupUtil.computeProcessorQuota(0, 100000), 0); + Assert.assertEquals(-1, CgroupUtil.computeProcessorQuota(100000, 0), 0); + Assert.assertEquals(2.0D, CgroupUtil.computeProcessorQuota(200000, 100000), 0); + Assert.assertEquals(0.5D, CgroupUtil.computeProcessorQuota(50000, 100000), 0); + } + + @Test + public void testCgroupsV2Detection() throws IOException, URISyntaxException + { + // Set up cgroups v2 structure + File cgroupV2Dir = temporaryFolder.newFolder(); + File procV2Dir = temporaryFolder.newFolder(); + TestUtils.setUpCgroupsV2(procV2Dir, cgroupV2Dir); + + + CgroupDiscoverer v2Discoverer = ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(procV2Dir.toPath()); + + // Constructor should detect v2 and log warning + CgroupCpuMonitor monitor = new CgroupCpuMonitor(v2Discoverer, ImmutableMap.of(), "test-feed"); + + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + + // doMonitor should return true + Assert.assertTrue(monitor.doMonitor(emitter)); + + Assert.assertEquals(2, emitter.getEvents().size()); + Assert.assertEquals(CgroupVersion.V2.name(), emitter.getEvents().get(0).toMap().get("cgroupversion")); + } + + @Test + public void testCgroupsV1MonitoringContinuesNormally() throws IOException, InterruptedException + { + // This test verifies that the existing v1 monitoring continues to work + // after the v2 detection changes + final CgroupCpuMonitor monitor = new CgroupCpuMonitor(discoverer, ImmutableMap.of(), "some_feed"); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + + Assert.assertTrue(monitor.doMonitor(emitter)); + final List actualEvents = emitter.getEvents(); + + // Should emit metrics normally for v1 + Assert.assertEquals(2, actualEvents.size()); + final Map sharesEvent = actualEvents.get(0).toMap(); + final Map coresEvent = actualEvents.get(1).toMap(); + Assert.assertEquals("cgroup/cpu/shares", sharesEvent.get("metric")); + Assert.assertEquals(1024L, sharesEvent.get("value")); + Assert.assertEquals("cgroup/cpu/cores_quota", coresEvent.get("metric")); + Assert.assertEquals(3.0D, coresEvent.get("value")); + Assert.assertEquals(CgroupVersion.V1.name(), coresEvent.get("cgroupversion")); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java index b18aa138fa55..71ea59ef5fa6 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java @@ -22,7 +22,9 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; +import org.apache.druid.java.util.metrics.cgroups.CgroupVersion; import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer; +import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.TestUtils; import org.junit.Assert; import org.junit.Before; @@ -33,6 +35,9 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; public class CgroupCpuSetMonitorTest { @@ -75,5 +80,35 @@ public void testMonitor() emitter.verifyValue("cgroup/cpuset/effective_cpu_count", 7); emitter.verifyValue("cgroup/cpuset/mems_count", 4); emitter.verifyValue("cgroup/cpuset/effective_mems_count", 1); + Assert.assertEquals(CgroupVersion.V1.name(), emitter.getEvents().get(0).toMap().get("cgroupversion")); } + + @Test + public void testCgroupsV2DetectionInConstructor() throws IOException + { + // Set up cgroups v2 structure + File cgroupV2Dir = temporaryFolder.newFolder(); + File procV2Dir = temporaryFolder.newFolder(); + TestUtils.setUpCgroupsV2(procV2Dir, cgroupV2Dir); + + // Create v2 cpuset files in unified hierarchy + File cgroupRoot = new File(procV2Dir, "unified"); + FileUtils.mkdirp(cgroupRoot); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus.effective"), "0-3\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems.effective"), "0\n".getBytes(StandardCharsets.UTF_8)); + + CgroupDiscoverer v2Discoverer = ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(procV2Dir.toPath()); + Assert.assertEquals(CgroupVersion.V2, v2Discoverer.getCgroupVersion()); + + // Constructor should detect v2 and log warning + CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(v2Discoverer, ImmutableMap.of(), "test-feed"); + + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + + // doMonitor should return true but skip actual monitoring + Assert.assertTrue(monitor.doMonitor(emitter)); + Assert.assertEquals(4, emitter.getNumEmittedEvents()); + Assert.assertEquals(CgroupVersion.V2.name(), emitter.getEvents().get(0).toMap().get("cgroupversion")); + } + } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java index 07682643148a..9a32dbbf86aa 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java @@ -63,7 +63,7 @@ public void testMonitor() throws IOException, InterruptedException final CgroupV2CpuMonitor monitor = new CgroupV2CpuMonitor(discoverer); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); - Assert.assertEquals(0, emitter.getNumEmittedEvents()); + Assert.assertEquals(2, emitter.getNumEmittedEvents()); emitter.flush(); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitorTest.java index bb9a6f62f260..eb8f57a9c466 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2MemoryMonitorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.java.util.metrics; +import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer; @@ -62,7 +63,11 @@ public void setUp() throws IOException @Test public void testMonitor() { - final CgroupMemoryMonitor monitor = new CgroupV2MemoryMonitor(discoverer); + final CgroupV2MemoryMonitor monitor = new CgroupV2MemoryMonitor( + discoverer, + ImmutableMap.of(), + FeedDefiningMonitor.DEFAULT_METRICS_FEED + ); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); final List actualEvents = emitter.getEvents(); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java index 94cb1d9b3aa5..fd7a0d0ef54b 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java @@ -22,6 +22,10 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; +import org.apache.druid.java.util.metrics.cgroups.CgroupVersion; +import org.apache.druid.java.util.metrics.cgroups.Cpu; +import org.apache.druid.java.util.metrics.cgroups.CpuSet; import org.apache.druid.java.util.metrics.cgroups.TestUtils; import org.junit.Assert; import org.junit.Before; @@ -33,6 +37,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.file.Path; public class CpuAcctDeltaMonitorTest { @@ -65,9 +70,7 @@ public void testMonitorWontCrash() final CpuAcctDeltaMonitor monitor = new CpuAcctDeltaMonitor( "some_feed", ImmutableMap.of(), - cgroup -> { - throw new RuntimeException("Should continue"); - } + TestUtils.exceptionThrowingDiscoverer() ); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); @@ -89,7 +92,32 @@ public void testSimpleMonitor() throws Exception final CpuAcctDeltaMonitor monitor = new CpuAcctDeltaMonitor( "some_feed", ImmutableMap.of(), - (cgroup) -> cpuacctDir.toPath() + new CgroupDiscoverer() + { + @Override + public Path discover(String cgroup) + { + return cpuacctDir.toPath(); + } + + @Override + public Cpu.CpuMetrics getCpuMetrics() + { + return null; + } + + @Override + public CpuSet.CpuSetMetric getCpuSetMetrics() + { + return null; + } + + @Override + public CgroupVersion getCgroupVersion() + { + return CgroupVersion.V1; + } + } ); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertFalse(monitor.doMonitor(emitter)); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuAcctTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuAcctTest.java index 4f804600d27f..65a03842104a 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuAcctTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuAcctTest.java @@ -63,9 +63,7 @@ public void setUp() throws IOException @Test public void testWontCrash() { - final CpuAcct cpuAcct = new CpuAcct(cgroup -> { - throw new RuntimeException("Should still continue"); - }); + final CpuAcct cpuAcct = new CpuAcct(TestUtils.exceptionThrowingDiscoverer()); final CpuAcct.CpuAcctMetric metric = cpuAcct.snapshot(); Assert.assertEquals(0L, metric.cpuCount()); Assert.assertEquals(0L, metric.usrTime()); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuSetTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuSetTest.java index 4e30c323b6da..4e2bb6730b2f 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuSetTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuSetTest.java @@ -58,9 +58,7 @@ public void setUp() throws IOException @Test public void testWontCrash() { - final CpuSet cpuSet = new CpuSet(cgroup -> { - throw new RuntimeException("Should still continue"); - }); + final CpuSet cpuSet = new CpuSet(TestUtils.exceptionThrowingDiscoverer()); final CpuSet.CpuSetMetric metric = cpuSet.snapshot(); Assert.assertEquals(0, metric.getCpuSetCpus().length); Assert.assertEquals(0, metric.getEffectiveCpuSetCpus().length); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuSetV2Test.java b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuSetV2Test.java new file mode 100644 index 000000000000..6d5d33d42065 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuSetV2Test.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics.cgroups; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; + +public class CpuSetV2Test +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private File cgroupDir; + private File procDir; + private CgroupDiscoverer discoverer; + + @Before + public void setUp() throws IOException + { + cgroupDir = temporaryFolder.newFolder(); + procDir = temporaryFolder.newFolder(); + TestUtils.setUpCgroupsV2(procDir, cgroupDir); + discoverer = new ProcCgroupV2Discoverer(procDir.toPath()); + } + + @Test + public void testCpuSetV2Snapshot() throws IOException + { + // Set up v2 cpuset files directly in cgroupDir (unified hierarchy root) + File cgroupRoot = cgroupDir; + + // Create v2 cpuset files with different names than v1 + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus"), + "0-7\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus.effective"), + "0-3\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems"), + "0-1\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems.effective"), + "0\n".getBytes(StandardCharsets.UTF_8)); + + CpuSetV2 cpuSetV2 = new CpuSetV2(discoverer); + CpuSet.CpuSetMetric metrics = cpuSetV2.snapshot(); + + // Verify CPU set parsing + int[] expectedCpus = {0, 1, 2, 3, 4, 5, 6, 7}; + int[] expectedEffectiveCpus = {0, 1, 2, 3}; + int[] expectedMems = {0, 1}; + int[] expectedEffectiveMems = {0}; + + Assert.assertArrayEquals("CPU set should be parsed correctly", expectedCpus, metrics.getCpuSetCpus()); + Assert.assertArrayEquals("Effective CPU set should be parsed correctly", expectedEffectiveCpus, metrics.getEffectiveCpuSetCpus()); + Assert.assertArrayEquals("Memory set should be parsed correctly", expectedMems, metrics.getCpuSetMems()); + Assert.assertArrayEquals("Effective memory set should be parsed correctly", expectedEffectiveMems, metrics.getEffectiveCpuSetMems()); + } + + @Test + public void testComplexCpuRangesParsing() throws IOException + { + File cgroupRoot = cgroupDir; + + // Test complex CPU ranges with mixed single CPUs and ranges + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus"), + "0-2,7,12-14\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus.effective"), + "0,2,7\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems"), + "0,2-3\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems.effective"), + "0\n".getBytes(StandardCharsets.UTF_8)); + + CpuSetV2 cpuSetV2 = new CpuSetV2(discoverer); + CpuSet.CpuSetMetric metrics = cpuSetV2.snapshot(); + + // 0-2,7,12-14 should expand to [0,1,2,7,12,13,14] + int[] expectedCpus = {0, 1, 2, 7, 12, 13, 14}; + // 0,2,7 should expand to [0,2,7] + int[] expectedEffectiveCpus = {0, 2, 7}; + // 0,2-3 should expand to [0,2,3] + int[] expectedMems = {0, 2, 3}; + // 0 should be [0] + int[] expectedEffectiveMems = {0}; + + Assert.assertArrayEquals("Complex CPU ranges should be parsed correctly", expectedCpus, metrics.getCpuSetCpus()); + Assert.assertArrayEquals("Complex effective CPU ranges should be parsed correctly", expectedEffectiveCpus, metrics.getEffectiveCpuSetCpus()); + Assert.assertArrayEquals("Complex memory ranges should be parsed correctly", expectedMems, metrics.getCpuSetMems()); + Assert.assertArrayEquals("Single memory node should be parsed correctly", expectedEffectiveMems, metrics.getEffectiveCpuSetMems()); + } + + @Test + public void testEmptyCpuSetFiles() throws IOException + { + File cgroupRoot = cgroupDir; + + // Create empty cpuset files + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus"), + "\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus.effective"), + "\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems"), + "\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems.effective"), + "\n".getBytes(StandardCharsets.UTF_8)); + + CpuSetV2 cpuSetV2 = new CpuSetV2(discoverer); + CpuSet.CpuSetMetric metrics = cpuSetV2.snapshot(); + + // Empty files should result in empty arrays + Assert.assertArrayEquals("Empty CPU file should result in empty array", new int[0], metrics.getCpuSetCpus()); + Assert.assertArrayEquals("Empty effective CPU file should result in empty array", new int[0], metrics.getEffectiveCpuSetCpus()); + Assert.assertArrayEquals("Empty memory file should result in empty array", new int[0], metrics.getCpuSetMems()); + Assert.assertArrayEquals("Empty effective memory file should result in empty array", new int[0], metrics.getEffectiveCpuSetMems()); + } + + @Test + public void testMissingCpuSetFiles() throws IOException + { + // Set up directory but don't create the files + File cgroupRoot = cgroupDir; + + CpuSetV2 cpuSetV2 = new CpuSetV2(discoverer); + CpuSet.CpuSetMetric metrics = cpuSetV2.snapshot(); + + // Missing files should result in empty arrays + Assert.assertArrayEquals("Missing CPU file should result in empty array", new int[0], metrics.getCpuSetCpus()); + Assert.assertArrayEquals("Missing effective CPU file should result in empty array", new int[0], metrics.getEffectiveCpuSetCpus()); + Assert.assertArrayEquals("Missing memory file should result in empty array", new int[0], metrics.getCpuSetMems()); + Assert.assertArrayEquals("Missing effective memory file should result in empty array", new int[0], metrics.getEffectiveCpuSetMems()); + } + + @Test + public void testInvalidCpuSetData() throws IOException + { + File cgroupRoot = cgroupDir; + + // Create files with invalid data + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus"), + "invalid-range\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus.effective"), + "not-a-number\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems"), + "1-abc\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems.effective"), + "5-3\n".getBytes(StandardCharsets.UTF_8)); // Invalid range (high < low) + + CpuSetV2 cpuSetV2 = new CpuSetV2(discoverer); + CpuSet.CpuSetMetric metrics = cpuSetV2.snapshot(); + + // Invalid data should be handled gracefully and result in empty arrays + Assert.assertArrayEquals("Invalid CPU data should result in empty array", new int[0], metrics.getCpuSetCpus()); + Assert.assertArrayEquals("Invalid effective CPU data should result in empty array", new int[0], metrics.getEffectiveCpuSetCpus()); + Assert.assertArrayEquals("Invalid memory data should result in empty array", new int[0], metrics.getCpuSetMems()); + Assert.assertArrayEquals("Invalid effective memory data should result in empty array", new int[0], metrics.getEffectiveCpuSetMems()); + } + + @Test + public void testMixedValidAndInvalidData() throws IOException + { + File cgroupRoot = cgroupDir; + + // Mix valid and invalid data in the same line + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus"), + "0-2,invalid,7,bad-range,10\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus.effective"), + "0,not-valid,2\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems"), + "0\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems.effective"), + "0\n".getBytes(StandardCharsets.UTF_8)); + + CpuSetV2 cpuSetV2 = new CpuSetV2(discoverer); + CpuSet.CpuSetMetric metrics = cpuSetV2.snapshot(); + + // Should parse only valid parts: 0-2,7,10 -> [0,1,2,7,10] + int[] expectedCpus = {0, 1, 2, 7, 10}; + // Should parse only valid parts: 0,2 -> [0,2] + int[] expectedEffectiveCpus = {0, 2}; + int[] expectedMems = {0}; + int[] expectedEffectiveMems = {0}; + + Assert.assertArrayEquals("Should parse only valid CPU parts", expectedCpus, metrics.getCpuSetCpus()); + Assert.assertArrayEquals("Should parse only valid effective CPU parts", expectedEffectiveCpus, metrics.getEffectiveCpuSetCpus()); + Assert.assertArrayEquals("Valid memory data should parse correctly", expectedMems, metrics.getCpuSetMems()); + Assert.assertArrayEquals("Valid effective memory data should parse correctly", expectedEffectiveMems, metrics.getEffectiveCpuSetMems()); + } + + @Test + public void testSingleCpuValues() throws IOException + { + File cgroupRoot = cgroupDir; + + // Test single CPU values without ranges + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus"), + "5\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus.effective"), + "3\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems"), + "1\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems.effective"), + "0\n".getBytes(StandardCharsets.UTF_8)); + + CpuSetV2 cpuSetV2 = new CpuSetV2(discoverer); + CpuSet.CpuSetMetric metrics = cpuSetV2.snapshot(); + + Assert.assertArrayEquals("Single CPU should be parsed", new int[]{5}, metrics.getCpuSetCpus()); + Assert.assertArrayEquals("Single effective CPU should be parsed", new int[]{3}, metrics.getEffectiveCpuSetCpus()); + Assert.assertArrayEquals("Single memory node should be parsed", new int[]{1}, metrics.getCpuSetMems()); + Assert.assertArrayEquals("Single effective memory node should be parsed", new int[]{0}, metrics.getEffectiveCpuSetMems()); + } + + @Test + public void testLargeRanges() throws IOException + { + File cgroupRoot = cgroupDir; + + // Test larger ranges + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus"), + "0-15\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus.effective"), + "8-11\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems"), + "0-3\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems.effective"), + "1-2\n".getBytes(StandardCharsets.UTF_8)); + + CpuSetV2 cpuSetV2 = new CpuSetV2(discoverer); + CpuSet.CpuSetMetric metrics = cpuSetV2.snapshot(); + + int[] expectedCpus = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}; + int[] expectedEffectiveCpus = {8, 9, 10, 11}; + int[] expectedMems = {0, 1, 2, 3}; + int[] expectedEffectiveMems = {1, 2}; + + Assert.assertArrayEquals("Large CPU range should be parsed correctly", expectedCpus, metrics.getCpuSetCpus()); + Assert.assertArrayEquals("Large effective CPU range should be parsed correctly", expectedEffectiveCpus, metrics.getEffectiveCpuSetCpus()); + Assert.assertArrayEquals("Large memory range should be parsed correctly", expectedMems, metrics.getCpuSetMems()); + Assert.assertArrayEquals("Large effective memory range should be parsed correctly", expectedEffectiveMems, metrics.getEffectiveCpuSetMems()); + } + + @Test + public void testWhitespaceInFiles() throws IOException + { + File cgroupRoot = cgroupDir; + + // Test files with extra whitespace + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus"), + " 0-2,7 \n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus.effective"), + "\t0,2\t\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems"), + " 0 \n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.mems.effective"), + "0\n".getBytes(StandardCharsets.UTF_8)); + + CpuSetV2 cpuSetV2 = new CpuSetV2(discoverer); + CpuSet.CpuSetMetric metrics = cpuSetV2.snapshot(); + + int[] expectedCpus = {0, 1, 2, 7}; + int[] expectedEffectiveCpus = {0, 2}; + int[] expectedMems = {0}; + int[] expectedEffectiveMems = {0}; + + Assert.assertArrayEquals("CPU data with whitespace should be parsed correctly", expectedCpus, metrics.getCpuSetCpus()); + Assert.assertArrayEquals("Effective CPU data with whitespace should be parsed correctly", expectedEffectiveCpus, metrics.getEffectiveCpuSetCpus()); + Assert.assertArrayEquals("Memory data with whitespace should be parsed correctly", expectedMems, metrics.getCpuSetMems()); + Assert.assertArrayEquals("Effective memory data should be parsed correctly", expectedEffectiveMems, metrics.getEffectiveCpuSetMems()); + } + +} diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuTest.java index 1af0ba427bf9..2473fb6f2e7b 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuTest.java @@ -59,9 +59,7 @@ public void setUp() throws IOException @Test public void testWontCrash() { - final Cpu cpu = new Cpu(cgroup -> { - throw new RuntimeException("Should still continue"); - }); + final Cpu cpu = new Cpu(TestUtils.exceptionThrowingDiscoverer()); final Cpu.CpuMetrics metric = cpu.snapshot(); Assert.assertEquals(-1L, metric.getShares()); Assert.assertEquals(0, metric.getQuotaUs()); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuV2Test.java b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuV2Test.java new file mode 100644 index 000000000000..f789c8829ab8 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/CpuV2Test.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics.cgroups; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; + +public class CpuV2Test +{ + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private File cgroupDir; + private File procDir; + private CgroupDiscoverer discoverer; + + @Before + public void setUp() throws IOException + { + cgroupDir = temporaryFolder.newFolder(); + procDir = temporaryFolder.newFolder(); + TestUtils.setUpCgroupsV2(procDir, cgroupDir); + discoverer = new ProcCgroupV2Discoverer(procDir.toPath()); + } + + @Test + public void testCpuV2Snapshot() throws IOException + { + // Set up v2 files directly in cgroupDir (unified hierarchy root) + // Create cpu.stat with microsecond values + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.stat"), + "user_usec 123456789\nsystem_usec 987654321\ncore_sched.force_idle_usec 0\n".getBytes(StandardCharsets.UTF_8) + ); + + // Create cpu.weight (v2 weight) + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.weight"), + "200\n".getBytes(StandardCharsets.UTF_8) + ); + + // Create cpu.max (quota period) + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.max"), + "150000 100000\n".getBytes(StandardCharsets.UTF_8) + ); + + CpuV2 cpuV2 = new CpuV2(discoverer); + Cpu.CpuMetrics metrics = cpuV2.snapshot(); + + // Verify the conversion from v2 to v1 format + Assert.assertEquals("Weight should be converted to shares", 2048L, metrics.getShares()); + Assert.assertEquals("Quota should be preserved", 150000L, metrics.getQuotaUs()); + Assert.assertEquals("Period should be preserved", 100000L, metrics.getPeriodUs()); + + // V2 should not provide jiffies, only microseconds + Assert.assertEquals("V2 should not provide user jiffies", -1L, metrics.getUserJiffies()); + Assert.assertEquals("V2 should not provide system jiffies", -1L, metrics.getSystemJiffies()); + Assert.assertEquals("V2 should not provide total jiffies", -1L, metrics.getTotalJiffies()); + } + + @Test + public void testCpuV2SnapshotWithMaxQuota() throws IOException + { + // Set up v2 files with unlimited quota + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.stat"), + "user_usec 50000000\nsystem_usec 25000000\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.weight"), + "100\n".getBytes(StandardCharsets.UTF_8) + ); + + // cpu.max with "max" means no limit + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.max"), + "max\n".getBytes(StandardCharsets.UTF_8) + ); + + CpuV2 cpuV2 = new CpuV2(discoverer); + Cpu.CpuMetrics metrics = cpuV2.snapshot(); + Assert.assertEquals("Default weight should convert to default shares", 1024L, metrics.getShares()); + Assert.assertEquals("Max quota should be -1", -1L, metrics.getQuotaUs()); + Assert.assertEquals("Max period should be -1", -1L, metrics.getPeriodUs()); + Assert.assertEquals("V2 should not provide user jiffies", -1L, metrics.getUserJiffies()); + Assert.assertEquals("V2 should not provide system jiffies", -1L, metrics.getSystemJiffies()); + } + + @Test + public void testCpuV2SnapshotWithMissingFiles() throws IOException + { + // Set up directory but don't create the files + CpuV2 cpuV2 = new CpuV2(discoverer); + Cpu.CpuMetrics metrics = cpuV2.snapshot(); + + // Should return default/error values when files are missing + Assert.assertEquals("Missing weight should return default shares", -1L, metrics.getShares()); + Assert.assertEquals("Missing quota should be -1", -1L, metrics.getQuotaUs()); + Assert.assertEquals("Missing period should be -1", -1L, metrics.getPeriodUs()); + Assert.assertEquals("Missing user time should be -1", -1L, metrics.getUserJiffies()); + Assert.assertEquals("Missing system time should be -1", -1L, metrics.getSystemJiffies()); + Assert.assertEquals("Missing total should be -1", -1L, metrics.getTotalJiffies()); + } + + @Test + public void testCpuV2SnapshotWithInvalidData() throws IOException + { + // Set up v2 files with invalid/malformed data + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.stat"), + "user_usec invalid_number\nsystem_usec not_a_number\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.weight"), + "invalid\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.max"), + "invalid format\n".getBytes(StandardCharsets.UTF_8) + ); + + CpuV2 cpuV2 = new CpuV2(discoverer); + Cpu.CpuMetrics metrics = cpuV2.snapshot(); + + // Should handle invalid data gracefully + Assert.assertEquals("Invalid weight should return default", -1L, metrics.getShares()); + Assert.assertEquals("Invalid quota should be -1", -1L, metrics.getQuotaUs()); + Assert.assertEquals("Invalid period should be -1", -1L, metrics.getPeriodUs()); + Assert.assertEquals("Invalid user time should be -1", -1L, metrics.getUserJiffies()); + Assert.assertEquals("Invalid system time should be -1", -1L, metrics.getSystemJiffies()); + } + + @Test + public void testWeightToSharesConversion() + { + // Test the weight to shares conversion logic via snapshot + try { + + // Test various weight values + testWeightConversion(cgroupDir, 1, 10); // Minimum weight -> minimum shares (clamped to 2) + testWeightConversion(cgroupDir, 100, 1024); // Default weight -> default shares + testWeightConversion(cgroupDir, 200, 2048); // Double weight -> double shares + testWeightConversion(cgroupDir, 1000, 10240); // 10x weight -> 10x shares + testWeightConversion(cgroupDir, 10000, 102400); // Maximum weight -> maximum shares + } + catch (IOException e) { + Assert.fail("IOException during weight conversion test: " + e.getMessage()); + } + } + + private void testWeightConversion(File cgroupDir, int weight, long expectedShares) throws IOException + { + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.weight"), + (weight + "\n").getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.stat"), + "user_usec 0\nsystem_usec 0\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.max"), + "max\n".getBytes(StandardCharsets.UTF_8) + ); + + CpuV2 cpuV2 = new CpuV2(discoverer); + Cpu.CpuMetrics metrics = cpuV2.snapshot(); + + Assert.assertEquals( + "Weight " + weight + " should convert to shares " + expectedShares, + expectedShares, metrics.getShares() + ); + } + + @Test + public void testV2DoesNotConvertMicroSecondToJiffies() throws IOException + { + + // Test microsecond to jiffies conversion (divide by 10000) + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.stat"), + "user_usec 100000000\nsystem_usec 50000000\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.weight"), + "100\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.max"), + "max\n".getBytes(StandardCharsets.UTF_8) + ); + + CpuV2 cpuV2 = new CpuV2(discoverer); + Cpu.CpuMetrics metrics = cpuV2.snapshot(); + + // V2 should not provide jiffies, only microseconds + Assert.assertEquals("V2 should not provide user jiffies", -1L, metrics.getUserJiffies()); + Assert.assertEquals("V2 should not provide system jiffies", -1L, metrics.getSystemJiffies()); + Assert.assertEquals("V2 should not provide total jiffies", -1L, metrics.getTotalJiffies()); + } + + @Test + public void testCpuStatFileWithExtraFields() throws IOException + { + // Test parsing cpu.stat with additional fields that should be ignored + + + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.stat"), + ("usage_usec 75000000\n" + + "user_usec 30000000\n" + + "system_usec 45000000\n" + + "core_sched.force_idle_usec 12345\n" + + "nr_periods 5000\n" + + "nr_throttled 100\n" + + "throttled_usec 1000000\n").getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.weight"), + "150\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.max"), + "max\n".getBytes(StandardCharsets.UTF_8) + ); + + CpuV2 cpuV2 = new CpuV2(discoverer); + Cpu.CpuMetrics metrics = cpuV2.snapshot(); + + // Should parse weight correctly and ignore extra cpu.stat fields + Assert.assertEquals("Weight should be converted", 1536L, metrics.getShares()); // 150 * 1024 / 100 + } + + @Test + public void testCpuMaxFileWithOnlyQuota() throws IOException + { + // Test cpu.max with only quota value (no period) + + + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.stat"), + "user_usec 10000000\nsystem_usec 5000000\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.weight"), + "100\n".getBytes(StandardCharsets.UTF_8) + ); + + // Invalid format - single value instead of "quota period" + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.max"), + "75000\n".getBytes(StandardCharsets.UTF_8) + ); + + CpuV2 cpuV2 = new CpuV2(discoverer); + Cpu.CpuMetrics metrics = cpuV2.snapshot(); + + // Should handle malformed cpu.max gracefully + Assert.assertEquals("Invalid cpu.max should result in -1 quota", -1L, metrics.getQuotaUs()); + Assert.assertEquals("Invalid cpu.max should result in -1 period", -1L, metrics.getPeriodUs()); + } + + @Test + public void testZeroMicrosecondValues() throws IOException + { + // Test handling of zero values in cpu.stat + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.stat"), + "user_usec 0\nsystem_usec 0\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.weight"), + "100\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.max"), + "max\n".getBytes(StandardCharsets.UTF_8) + ); + + CpuV2 cpuV2 = new CpuV2(discoverer); + Cpu.CpuMetrics metrics = cpuV2.snapshot(); + + // V2 should not provide jiffies, only microseconds + Assert.assertEquals("V2 should not provide user jiffies", -1L, metrics.getUserJiffies()); + Assert.assertEquals("V2 should not provide system jiffies", -1L, metrics.getSystemJiffies()); + Assert.assertEquals("V2 should not provide total jiffies", -1L, metrics.getTotalJiffies()); + Assert.assertEquals(0, metrics.getUserUs()); + Assert.assertEquals(0, metrics.getSystemUs()); + Assert.assertEquals(-1, metrics.getTotalUs()); + } + + @Test + public void testCpuMaxWithExtraWhitespace() throws IOException + { + // Test cpu.max parsing with various whitespace scenarios + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.stat"), + "user_usec 20000000\nsystem_usec 10000000\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.weight"), + "100\n".getBytes(StandardCharsets.UTF_8) + ); + + // cpu.max with extra whitespace + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.max"), + " 200000 100000 \n".getBytes(StandardCharsets.UTF_8) + ); + + CpuV2 cpuV2 = new CpuV2(discoverer); + Cpu.CpuMetrics metrics = cpuV2.snapshot(); + + // Should handle whitespace correctly + Assert.assertEquals("Should parse quota with whitespace", 200000L, metrics.getQuotaUs()); + Assert.assertEquals("Should parse period with whitespace", 100000L, metrics.getPeriodUs()); + } + + @Test + public void testNegativeWeightValue() throws IOException + { + // Test handling of negative weight (should not happen in practice but test robustness) + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.stat"), + "user_usec 1000000\nsystem_usec 2000000\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.weight"), + "-5\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.max"), + "max\n".getBytes(StandardCharsets.UTF_8) + ); + + CpuV2 cpuV2 = new CpuV2(discoverer); + Cpu.CpuMetrics metrics = cpuV2.snapshot(); + + // Negative weight should result in -1 shares (no limit) + Assert.assertEquals("Negative weight should result in -1 shares", -1L, metrics.getShares()); + } + + @Test + public void testZeroWeightValue() throws IOException + { + // Test handling of zero weight (should not happen in practice but test robustness) + + + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.stat"), + "user_usec 1000000\nsystem_usec 2000000\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.weight"), + "0\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.max"), + "max\n".getBytes(StandardCharsets.UTF_8) + ); + + CpuV2 cpuV2 = new CpuV2(discoverer); + Cpu.CpuMetrics metrics = cpuV2.snapshot(); + + // Zero weight should result in -1 shares (no limit) + Assert.assertEquals("Zero weight should result in -1 shares", -1L, metrics.getShares()); + } + + @Test + public void testMalformedCpuStatLines() throws IOException + { + // Test cpu.stat with various malformed lines + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.stat"), + ("incomplete_line\n" + + "user_usec 15000000\n" + + "too many parts here extra\n" + + "system_usec 25000000\n" + + "empty_value \n").getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.weight"), + "100\n".getBytes(StandardCharsets.UTF_8) + ); + Files.write( + Paths.get(cgroupDir.getAbsolutePath(), "cpu.max"), + "max\n".getBytes(StandardCharsets.UTF_8) + ); + + CpuV2 cpuV2 = new CpuV2(discoverer); + Cpu.CpuMetrics metrics = cpuV2.snapshot(); + + // Should parse valid weight and handle malformed cpu.stat lines gracefully + Assert.assertEquals("Should parse valid weight", 1024L, metrics.getShares()); + } +} diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/DiskTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/DiskTest.java index 514b4cea9ea1..0a143f256a63 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/DiskTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/DiskTest.java @@ -59,9 +59,7 @@ public void setUp() throws Exception @Test public void testWontCrash() { - final Disk disk = new Disk((cgroup) -> { - throw new RuntimeException("shouldContinue"); - }); + final Disk disk = new Disk(TestUtils.exceptionThrowingDiscoverer()); final Map stats = disk.snapshot(); Assert.assertEquals(ImmutableMap.of(), stats); } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/MemoryTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/MemoryTest.java index 131627849005..ba7af8180984 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/MemoryTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/MemoryTest.java @@ -64,9 +64,7 @@ public void setUp() throws Exception @Test public void testWontCrash() { - final Memory memory = new Memory((cgroup) -> { - throw new RuntimeException("shouldContinue"); - }); + final Memory memory = new Memory(TestUtils.exceptionThrowingDiscoverer()); final Memory.MemoryStat stat = memory.snapshot("memory.usage_in_bytes", "memory.limit_in_bytes"); Assert.assertEquals(ImmutableMap.of(), stat.getNumaMemoryStats()); Assert.assertEquals(ImmutableMap.of(), stat.getMemoryStats()); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/TestUtils.java b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/TestUtils.java index 0133dca756df..263f0071abbb 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/TestUtils.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/cgroups/TestUtils.java @@ -25,7 +25,10 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.nio.file.StandardCopyOption; public class TestUtils @@ -85,4 +88,42 @@ public static void copyOrReplaceResource(String resource, File out) throws IOExc Assert.assertTrue(out.exists()); Assert.assertNotEquals(0, out.length()); } + + public static void writeCpuCgroupV2Files(File cgroupRoot) throws IOException + { + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpu.max"), "150000 100000\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpu.weight"), "100\n".getBytes(StandardCharsets.UTF_8)); + Files.write(Paths.get(cgroupRoot.getAbsolutePath(), "cpuset.cpus.effective"), "0-3\n".getBytes(StandardCharsets.UTF_8)); + } + + + public static CgroupDiscoverer exceptionThrowingDiscoverer() + { + return new CgroupDiscoverer() + { + @Override + public Path discover(String cgroup) + { + throw new RuntimeException("shouldContinue"); + } + + @Override + public Cpu.CpuMetrics getCpuMetrics() + { + throw new RuntimeException("shouldContinue"); + } + + @Override + public CpuSet.CpuSetMetric getCpuSetMetrics() + { + throw new RuntimeException("shouldContinue"); + } + + @Override + public CgroupVersion getCgroupVersion() + { + return CgroupVersion.UNKNOWN; + } + }; + } } diff --git a/processing/src/test/resources/cgroupv2/proc.mounts b/processing/src/test/resources/cgroupv2/proc.mounts index 98cdb26c5bdf..e6e237428c10 100644 --- a/processing/src/test/resources/cgroupv2/proc.mounts +++ b/processing/src/test/resources/cgroupv2/proc.mounts @@ -24,27 +24,9 @@ cgroup /sys/fs/cgroup cgroup2 ro,nosuid,nodev,noexec,relatime 0 0 shm /dev/shm tmpfs rw,nosuid,nodev,noexec,relatime,size=65536k,inode64 0 0 /dev/root /dev/termination-log ext4 rw,relatime,discard,errors=remount-ro 0 0 /dev/root /root/download-bundle ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/grove-init.yaml ext4 ro,relatime,discard,errors=remount-ro 0 0 /dev/root /etc/syslog-ng/conf.d/udp_source.conf ext4 ro,relatime,discard,errors=remount-ro 0 0 tmpfs /run/secrets/azure/tokens tmpfs ro,relatime,size=7812500k,inode64 0 0 -/dev/root /opt/imply/conf/logging/logging-config ext4 ro,relatime,discard,errors=remount-ro 0 0 tmpfs /run/secrets/kubernetes.io/serviceaccount tmpfs ro,relatime,size=7812500k,inode64 0 0 -/dev/root /opt/imply/conf/supervise/master.conf ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/supervise/druid.conf ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/overlord/jvm.config ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/overlord/1-peon-config.yaml ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/overlord/2-peon-config.yaml ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/overlord/3-peon-config.yaml ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/overlord/4-peon-config.yaml ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/overlord/base-peon-config.yaml ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/overlord/runtime.properties ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/overlord/main.config ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/coordinator/runtime.properties ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/coordinator/main.config ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/coordinator/jvm.config ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/_common/roles.json ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/_common/log4j2.xml ext4 ro,relatime,discard,errors=remount-ro 0 0 -/dev/root /opt/imply/conf/druid/_common/common.runtime.properties ext4 ro,relatime,discard,errors=remount-ro 0 0 proc /proc/bus proc ro,nosuid,nodev,noexec,relatime 0 0 proc /proc/fs proc ro,nosuid,nodev,noexec,relatime 0 0 proc /proc/irq proc ro,nosuid,nodev,noexec,relatime 0 0