Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1946,13 +1946,14 @@ The following table lists available monitors and the respective services where t
|`org.apache.druid.java.util.metrics.JvmCpuMonitor`|Reports statistics of CPU consumption by the JVM.|Any|
|`org.apache.druid.java.util.metrics.CpuAcctDeltaMonitor`|Reports consumed CPU as per the cpuacct cgroup.|Any|
|`org.apache.druid.java.util.metrics.JvmThreadsMonitor`|Reports Thread statistics in the JVM, like numbers of total, daemon, started, died threads.|Any|
|`org.apache.druid.java.util.metrics.CgroupCpuMonitor`|Reports CPU shares and quotas as per the `cpu` cgroup.|Any|
|`org.apache.druid.java.util.metrics.CgroupCpuSetMonitor`|Reports CPU core/HT and memory node allocations as per the `cpuset` cgroup.|Any|
|`org.apache.druid.java.util.metrics.CgroupDiskMonitor`|Reports disk statistic as per the blkio cgroup.|Any|
|`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory statistic as per the memory cgroup.|Any|
|`org.apache.druid.java.util.metrics.CgroupV2CpuMonitor`| **EXPERIMENTAL** Reports CPU usage from `cpu.stat` file. Only applicable to `cgroupv2`.|Any|
|`org.apache.druid.java.util.metrics.CgroupV2DiskMonitor`| **EXPERIMENTAL** Reports disk usage from `io.stat` file. Only applicable to `cgroupv2`.|Any|
|`org.apache.druid.java.util.metrics.CgroupV2MemoryMonitor`| **EXPERIMENTAL** Reports memory usage from `memory.current` and `memory.max` files. Only applicable to `cgroupv2`.|Any|
|`org.apache.druid.java.util.metrics.CgroupCpuMonitor`|Reports CPU shares and quotas as per the `cpu` cgroup. Automatically switches to `CgroupV2CpuMonitor` in case `cgroupv2` type is detected.|Any|
|`org.apache.druid.java.util.metrics.CgroupCpuSetMonitor`|Reports CPU core/HT and memory node allocations as per the `cpuset` cgroup. Automatically switches to `CgroupV2CpuSetMonitor` in case `cgroupv2` type is detected.|Any|
|`org.apache.druid.java.util.metrics.CgroupDiskMonitor`|Reports disk statistic as per the blkio cgroup. Automatically switches to `CgroupV2DiskMonitor` in case `cgroupv2` type is detected.|Any|
|`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory statistic as per the memory cgroup. Automatically switches to `CgroupV2MemoryMonitor` in case `cgroupv2` type is detected.|Any|
|`org.apache.druid.java.util.metrics.CgroupV2CpuMonitor`| Reports CPU usage from `cpu.stat` file. Only applicable to `cgroupv2`.|Any|
|`org.apache.druid.java.util.metrics.CgroupV2CpuSetMonitor`|Reports CPU core/HT and memory node allocations as per the `cpuset` cgroup. Only applicable to `cgroupv2`.|Any|
|`org.apache.druid.java.util.metrics.CgroupV2DiskMonitor`| Reports disk usage from `io.stat` file. Only applicable to `cgroupv2`.|Any|
|`org.apache.druid.java.util.metrics.CgroupV2MemoryMonitor`| Reports memory usage from `memory.current` and `memory.max` files. Only applicable to `cgroupv2`.|Any|
|`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical services.|Historical|
|`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical services. Not to be used when lazy loading is configured.|Historical|
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|Broker, Historical, Router, Indexer, Peon|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.CgroupVersion;
import org.apache.druid.java.util.metrics.cgroups.Cpu;
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;

Expand All @@ -41,37 +42,32 @@
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
private Long userHz;
private KeyedDiff jiffies = new KeyedDiff();
private final KeyedDiff jiffies = new KeyedDiff();
private long prevJiffiesSnapshotAt = 0;
private final boolean isRunningOnCgroupsV2;
private final CgroupV2CpuMonitor cgroupV2CpuMonitor;

public CgroupCpuMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;

Check warning

Code scanning / CodeQL

Executing a command with a relative path Medium

Command with a relative path 'getconf' is executed.
this.dimensions = dimensions;
try {
Process p = new ProcessBuilder("getconf", "CLK_TCK").start();
try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream(), StandardCharsets.UTF_8))) {
String line = in.readLine();
if (line != null) {
userHz = Long.valueOf(line.trim());
}
}
}
catch (IOException | NumberFormatException e) {
LOG.warn(e, "Error getting the USER_HZ value");
}
finally {
if (userHz == null) {
LOG.warn("Using default value for USER_HZ");
userHz = DEFAULT_USER_HZ;
}

// Check if we're running on cgroups v2
this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2);
if (isRunningOnCgroupsV2) {
this.cgroupV2CpuMonitor = new CgroupV2CpuMonitor(cgroupDiscoverer, dimensions, feed);
LOG.info("Detected cgroups v2, using CgroupV2CpuMonitor behavior for accurate metrics");
} else {
this.cgroupV2CpuMonitor = null;
initUzerHz();
}

}

public CgroupCpuMonitor(final Map<String, String[]> dimensions, String feed)
{
this(new ProcSelfCgroupDiscoverer(), dimensions, feed);
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
}

public CgroupCpuMonitor(final Map<String, String[]> dimensions)
Expand All @@ -87,16 +83,26 @@
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final Cpu cpu = new Cpu(cgroupDiscoverer);
final Cpu.CpuMetrics cpuSnapshot = cpu.snapshot();
if (isRunningOnCgroupsV2) {
return cgroupV2CpuMonitor.doMonitor(emitter);
} else {
return doMonitorV1(emitter);
}
}

private boolean doMonitorV1(ServiceEmitter emitter)
{
final Cpu.CpuMetrics cpuSnapshot = cgroupDiscoverer.getCpuMetrics();
long now = Instant.now().getEpochSecond();

final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name());

emitter.emit(builder.setMetric("cgroup/cpu/shares", cpuSnapshot.getShares()));
emitter.emit(builder.setMetric(
"cgroup/cpu/cores_quota",
computeProcessorQuota(cpuSnapshot.getQuotaUs(), cpuSnapshot.getPeriodUs())
CgroupUtil.computeProcessorQuota(cpuSnapshot.getQuotaUs(), cpuSnapshot.getPeriodUs())
));

long elapsedJiffiesSnapshotSecs = now - prevJiffiesSnapshotAt;
Expand All @@ -122,18 +128,25 @@
return true;
}

/**
* Calculates the total cores allocated through quotas. A negative value indicates that no quota has been specified.
* We use -1 because that's the default value used in the cgroup.
*
* @param quotaUs the cgroup quota value.
* @param periodUs the cgroup period value.
* @return the calculated processor quota, -1 if no quota or period set.
*/
public static double computeProcessorQuota(long quotaUs, long periodUs)
private void initUzerHz()
{
return quotaUs < 0 || periodUs == 0
? -1
: (double) quotaUs / periodUs;
try {
Process p = new ProcessBuilder("getconf", "CLK_TCK").start();
try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream(), StandardCharsets.UTF_8))) {
String line = in.readLine();
if (line != null) {
userHz = Long.valueOf(line.trim());
}
}
}
catch (IOException | NumberFormatException e) {
LOG.warn(e, "Error getting the USER_HZ value");
}
finally {
if (userHz == null) {
LOG.warn("Using default value for USER_HZ");
userHz = DEFAULT_USER_HZ;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@

import java.util.Map;

/**
* Monitor that reports CPU set metrics from cgroups both v1 and v2.
*/

public class CgroupCpuSetMonitor extends FeedDefiningMonitor
{
final CgroupDiscoverer cgroupDiscoverer;
Expand All @@ -42,7 +46,7 @@ public CgroupCpuSetMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String,

public CgroupCpuSetMonitor(final Map<String, String[]> dimensions, String feed)
{
this(new ProcSelfCgroupDiscoverer(), dimensions, feed);
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
}

public CgroupCpuSetMonitor(final Map<String, String[]> dimensions)
Expand All @@ -58,11 +62,10 @@ public CgroupCpuSetMonitor()
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final CpuSet cpuset = new CpuSet(cgroupDiscoverer);
final CpuSet.CpuSetMetric cpusetSnapshot = cpuset.snapshot();

final CpuSet.CpuSetMetric cpusetSnapshot = cgroupDiscoverer.getCpuSetMetrics();
final ServiceMetricEvent.Builder builder = builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name());
emitter.emit(builder.setMetric(
"cgroup/cpuset/cpu_count",
cpusetSnapshot.getCpuSetCpus().length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,44 @@
package org.apache.druid.java.util.metrics;

import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.CgroupVersion;
import org.apache.druid.java.util.metrics.cgroups.Disk;
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;

import java.util.Map;

public class CgroupDiskMonitor extends FeedDefiningMonitor
{
private static final Logger LOG = new Logger(CgroupDiskMonitor.class);
final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
private final KeyedDiff diff = new KeyedDiff();
private final boolean isRunningOnCgroupsV2;
private final CgroupV2DiskMonitor cgroupV2DiskMonitor;

public CgroupDiskMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
this.dimensions = dimensions;

// Check if we're running on cgroups v2
this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2);
if (isRunningOnCgroupsV2) {
this.cgroupV2DiskMonitor = new CgroupV2DiskMonitor(cgroupDiscoverer, dimensions, feed);
LOG.info("Detected cgroups v2, using CgroupV2DiskMonitor behavior for accurate metrics");
} else {
this.cgroupV2DiskMonitor = null;
}
}

public CgroupDiskMonitor(final Map<String, String[]> dimensions, String feed)
{
this(new ProcSelfCgroupDiscoverer(), dimensions, feed);
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
}

public CgroupDiskMonitor(final Map<String, String[]> dimensions)
Expand All @@ -58,6 +72,15 @@ public CgroupDiskMonitor()

@Override
public boolean doMonitor(ServiceEmitter emitter)
{
if (isRunningOnCgroupsV2) {
return cgroupV2DiskMonitor.doMonitor(emitter);
} else {
return doMonitorV1(emitter);
}
}

private boolean doMonitorV1(ServiceEmitter emitter)
{
Map<String, Disk.Metrics> snapshot = new Disk(cgroupDiscoverer).snapshot();
for (Map.Entry<String, Disk.Metrics> entry : snapshot.entrySet()) {
Expand All @@ -75,6 +98,7 @@ public boolean doMonitor(ServiceEmitter emitter)
final ServiceMetricEvent.Builder builder = builder()
.setDimension("diskName", entry.getValue().getDiskName());
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion());
for (Map.Entry<String, Long> stat : stats.entrySet()) {
emitter.emit(builder.setMetric(stat.getKey(), stat.getValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,47 @@

import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.CgroupVersion;
import org.apache.druid.java.util.metrics.cgroups.Memory;
import org.apache.druid.java.util.metrics.cgroups.ProcSelfCgroupDiscoverer;

import java.util.Map;

public class CgroupMemoryMonitor extends FeedDefiningMonitor
{
private static final Logger LOG = new Logger(CgroupMemoryMonitor.class);
private static final String MEMORY_USAGE_FILE = "memory.usage_in_bytes";
private static final String MEMORY_LIMIT_FILE = "memory.limit_in_bytes";

final CgroupDiscoverer cgroupDiscoverer;
final Map<String, String[]> dimensions;
private final boolean isRunningOnCgroupsV2;
private final CgroupV2MemoryMonitor cgroupV2MemoryMonitor;


public CgroupMemoryMonitor(CgroupDiscoverer cgroupDiscoverer, final Map<String, String[]> dimensions, String feed)
{
super(feed);
this.cgroupDiscoverer = cgroupDiscoverer;
this.dimensions = dimensions;

// Check if we're running on cgroups v2
this.isRunningOnCgroupsV2 = cgroupDiscoverer.getCgroupVersion().equals(CgroupVersion.V2);
if (isRunningOnCgroupsV2) {
this.cgroupV2MemoryMonitor = new CgroupV2MemoryMonitor(cgroupDiscoverer, dimensions, feed);
LOG.info("Detected cgroups v2, using CgroupV2MemoryMonitor behavior for accurate metrics");
} else {
this.cgroupV2MemoryMonitor = null;
}
}

public CgroupMemoryMonitor(final Map<String, String[]> dimensions, String feed)
{
this(new ProcSelfCgroupDiscoverer(), dimensions, feed);
this(ProcSelfCgroupDiscoverer.autoCgroupDiscoverer(), dimensions, feed);
}

public CgroupMemoryMonitor(final Map<String, String[]> dimensions)
Expand All @@ -58,11 +76,31 @@ public CgroupMemoryMonitor()

@Override
public boolean doMonitor(ServiceEmitter emitter)
{
if (isRunningOnCgroupsV2) {
return cgroupV2MemoryMonitor.doMonitor(emitter);
} else {
return parseAndEmit(emitter, cgroupDiscoverer, dimensions, MEMORY_USAGE_FILE, MEMORY_LIMIT_FILE, this);
}
}

/**
* Common metric parser and emitter for both v1 and v2 cgroupMemory monitors.
*/
public static boolean parseAndEmit(
ServiceEmitter emitter,
CgroupDiscoverer cgroupDiscoverer,
Map<String, String[]> dimensions,
String memoryUsageFile,
String memoryLimitFile,
FeedDefiningMonitor feedDefiningMonitor
)
{
final Memory memory = new Memory(cgroupDiscoverer);
final Memory.MemoryStat stat = memory.snapshot(memoryUsageFile(), memoryLimitFile());
final ServiceMetricEvent.Builder builder = builder();
final Memory.MemoryStat stat = memory.snapshot(memoryUsageFile, memoryLimitFile);
final ServiceMetricEvent.Builder builder = feedDefiningMonitor.builder();
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
builder.setDimension("cgroupversion", cgroupDiscoverer.getCgroupVersion().name());
emitter.emit(builder.setMetric("cgroup/memory/usage/bytes", stat.getUsage()));
emitter.emit(builder.setMetric("cgroup/memory/limit/bytes", stat.getLimit()));

Expand All @@ -72,19 +110,10 @@ public boolean doMonitor(ServiceEmitter emitter)
emitter.emit(builder.setMetric(StringUtils.format("cgroup/memory/%s", key), value));
});
stat.getNumaMemoryStats().forEach((key, value) -> {
builder().setDimension("numaZone", Long.toString(key));
feedDefiningMonitor.builder().setDimension("numaZone", Long.toString(key));
value.forEach((k, v) -> emitter.emit(builder.setMetric(StringUtils.format("cgroup/memory_numa/%s/pages", k), v)));
});
return true;
}

public String memoryUsageFile()
{
return "memory.usage_in_bytes";
}

public String memoryLimitFile()
{
return "memory.limit_in_bytes";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,20 @@ public static long readLongValue(CgroupDiscoverer discoverer, String cgroup, Str
return defaultValue;
}
}

/**
* Calculates the total cores allocated through quotas. A negative value indicates that no quota has been specified.
* We use -1 because that's the default value used in the cgroup.
*
* @param quotaUs the cgroup quota value.
* @param periodUs the cgroup period value.
* @return the calculated processor quota, -1 if no quota or period set.
*/
public static double computeProcessorQuota(long quotaUs, long periodUs)
{
return quotaUs < 0 || periodUs == 0
? -1
: (double) quotaUs / periodUs;
}

}
Loading