Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/querying/sql-metadata-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ Servers table lists all discovered servers in the cluster.
|is_leader|BIGINT|1 if the server is currently the 'leader' (for services which have the concept of leadership), otherwise 0 if the server is not the leader, or null if the server type does not have the concept of leadership|
|start_time|STRING|Timestamp in ISO8601 format when the server was announced in the cluster|
|version|VARCHAR|Druid version running on the server|
|available_processors|BIGINT|Total number of processors available to the server|
|total_memory|BIGINT|Total memory in bytes available to the server|

To retrieve information about all servers, use the query:

```sql
Expand Down
13 changes: 13 additions & 0 deletions processing/src/main/java/org/apache/druid/utils/JvmUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import com.sun.management.OperatingSystemMXBean;

import java.io.File;
import java.lang.management.ManagementFactory;
Expand All @@ -44,6 +45,7 @@ public class JvmUtils
private static RuntimeInfo RUNTIME_INFO = new RuntimeInfo();

private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
private static final OperatingSystemMXBean OPERATING_SYSTEM_MX_BEAN = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();

private static int computeMajorVersion()
{
Expand Down Expand Up @@ -139,4 +141,15 @@ public static List<URL> systemClassPath()
).collect(Collectors.toList());
return jobURLs;
}

/**
* Get the total memory of the machine it is running on. This function is container aware.
* If the machine is running in a container, the function will return the total memory of the container.
* If the machine is not running in a container, the function will return the total memory of the machine.
* @return the total memory of the machine it is running on in bytes.
*/
public static long getTotalMemory()
{
return OPERATING_SYSTEM_MX_BEAN.getTotalPhysicalMemorySize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.druid.client.DruidServer;
import org.apache.druid.jackson.StringObjectPairList;
Expand All @@ -32,6 +33,7 @@
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
import org.apache.druid.utils.JvmUtils;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
Expand All @@ -54,6 +56,8 @@
private final DruidNode druidNode;
private final NodeRole nodeRole;
private final DateTime startTime;
private final Integer availableProcessors;
private final Long totalMemory;

/**
* Map of service name -> DruidServices.
Expand All @@ -65,20 +69,36 @@
*/
private final Map<String, DruidService> services = new HashMap<>();

/**
* Constructor for tests. In production, the @Inject constructor is used instead.
*/
@VisibleForTesting
public DiscoveryDruidNode(
DruidNode druidNode,
NodeRole nodeRole,
Map<String, DruidService> services,
DateTime startTime
)
{
this(druidNode, nodeRole, services, startTime, JvmUtils.getRuntimeInfo().getAvailableProcessors(), JvmUtils.getTotalMemory());
}

public DiscoveryDruidNode(
DruidNode druidNode,
NodeRole nodeRole,
Map<String, DruidService> services
)
{
this(druidNode, nodeRole, services, DateTimes.nowUtc());
this(druidNode, nodeRole, services, DateTimes.nowUtc(), JvmUtils.getRuntimeInfo().getAvailableProcessors(), JvmUtils.getTotalMemory());
}

public DiscoveryDruidNode(
DruidNode druidNode,
NodeRole nodeRole,
Map<String, DruidService> services,
DateTime startTime
DateTime startTime,
Integer availableProcessors,
Long totalMemory
)
{
this.druidNode = druidNode;
Expand All @@ -88,6 +108,10 @@
this.services.putAll(services);
}
this.startTime = startTime;

// Happens if service is running older version of Druid
this.availableProcessors = availableProcessors != null ? availableProcessors : -1;
this.totalMemory = totalMemory != null ? totalMemory : -1;
}

@JsonCreator
Expand All @@ -96,6 +120,8 @@
@JsonProperty("nodeType") NodeRole nodeRole,
@JsonProperty("services") Map<String, StringObjectPairList> rawServices,
@JsonProperty("startTime") DateTime startTime,
@JsonProperty("availableProcessors") Integer availableProcessors,
@JsonProperty("totalMemory") Long totalMemory,
@JacksonInject ObjectMapper jsonMapper
)
{
Expand All @@ -111,7 +137,7 @@
}
}
}
return new DiscoveryDruidNode(druidNode, nodeRole, services, startTime);
return new DiscoveryDruidNode(druidNode, nodeRole, services, startTime, availableProcessors, totalMemory);
}

/**
Expand Down Expand Up @@ -188,6 +214,18 @@
return startTime;
}

@JsonProperty
public Integer getAvailableProcessors()
{
return availableProcessors;
}

@JsonProperty
public Long getTotalMemory()
{
return totalMemory;
}

@Nullable
@JsonIgnore
public <T extends DruidService> T getService(String key, Class<T> clazz)
Expand Down Expand Up @@ -235,13 +273,15 @@
DiscoveryDruidNode that = (DiscoveryDruidNode) o;
return Objects.equals(druidNode, that.druidNode) &&
Objects.equals(nodeRole, that.nodeRole) &&
Objects.equals(services, that.services);
Objects.equals(services, that.services) &&
Objects.equals(availableProcessors, that.availableProcessors) &&
Objects.equals(totalMemory, that.totalMemory);
}

@Override
public int hashCode()
{
return Objects.hash(druidNode, nodeRole, services);
return Objects.hash(druidNode, nodeRole, services, availableProcessors, totalMemory);
}

@Override
Expand All @@ -252,6 +292,8 @@
", nodeRole='" + nodeRole + '\'' +
", services=" + services + '\'' +
", startTime=" + startTime +
", availableProcessors=" + availableProcessors +
", totalMemory=" + totalMemory +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public DiscoveryDruidNodeTest()
public void testEquals()
{
EqualsVerifier.forClass(DiscoveryDruidNode.class)
.withNonnullFields("druidNode", "nodeRole", "services")
.withNonnullFields("druidNode", "nodeRole", "services", "availableProcessors", "totalMemory")
.withIgnoredFields("startTime")
.usingGetClass()
.verify();
Expand Down Expand Up @@ -155,7 +155,9 @@ public void testDeserializeWithDataNodeServiceWithAWrongPropertyOrder() throws J
+ " \"serverType\" : \"broker\",\n"
+ " \"priority\" : 0\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"availableProcessors\" : 3,\n"
+ " \"totalMemory\" : 1234\n"
+ "}";
Assert.assertEquals(
new DiscoveryDruidNode(
Expand All @@ -173,7 +175,10 @@ public void testDeserializeWithDataNodeServiceWithAWrongPropertyOrder() throws J
ImmutableMap.of(
"dataNodeService",
new DataNodeService("_default_tier", 1000000000, ServerType.BROKER, 0)
)
),
null,
3,
1234L
),
mapper.readValue(json, DiscoveryDruidNode.class)
);
Expand Down Expand Up @@ -204,7 +209,9 @@ public void testDeserialize_duplicateProperties_shouldSucceedToDeserialize() thr
+ " \"serverType\" : \"broker\",\n"
+ " \"priority\" : 0\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"availableProcessors\" : 6,\n"
+ " \"totalMemory\" : 5432\n"
+ "}";
Assert.assertEquals(
new DiscoveryDruidNode(
Expand All @@ -222,7 +229,10 @@ public void testDeserialize_duplicateProperties_shouldSucceedToDeserialize() thr
ImmutableMap.of(
"dataNodeService",
new DataNodeService("_default_tier", 1000000000, ServerType.BROKER, 0)
)
),
null,
6,
5432L
),
mapper.readValue(json, DiscoveryDruidNode.class)
);
Expand Down Expand Up @@ -254,7 +264,9 @@ public void testDeserialize_duplicateKeysWithDifferentValus_shouldIgnoreDataNode
+ " \"serverType\" : \"broker\",\n"
+ " \"priority\" : 0\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"availableProcessors\" : 4,\n"
+ " \"totalMemory\" : 246810\n"
+ "}";
Assert.assertEquals(
new DiscoveryDruidNode(
Expand All @@ -269,7 +281,10 @@ public void testDeserialize_duplicateKeysWithDifferentValus_shouldIgnoreDataNode
true
),
NodeRole.BROKER,
ImmutableMap.of()
ImmutableMap.of(),
null,
4,
246810L
),
mapper.readValue(json, DiscoveryDruidNode.class)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ public class SystemSchema extends AbstractSchema
.add("is_leader", ColumnType.LONG)
.add("start_time", ColumnType.STRING)
.add("version", ColumnType.STRING)
.add("available_processors", ColumnType.LONG)
.add("total_memory", ColumnType.LONG)
.build();

static final RowSignature SERVER_SEGMENTS_SIGNATURE = RowSignature
Expand Down Expand Up @@ -641,7 +643,9 @@ private static Object[] buildRowForNonDataServer(DiscoveryDruidNode discoveryDru
UNKNOWN_SIZE,
null,
toStringOrNull(discoveryDruidNode.getStartTime()),
node.getVersion()
node.getVersion(),
(long) discoveryDruidNode.getAvailableProcessors(),
discoveryDruidNode.getTotalMemory()
};
}

Expand All @@ -665,7 +669,9 @@ private static Object[] buildRowForNonDataServerWithLeadership(
UNKNOWN_SIZE,
isLeader ? 1L : 0L,
toStringOrNull(discoveryDruidNode.getStartTime()),
node.getVersion()
node.getVersion(),
(long) discoveryDruidNode.getAvailableProcessors(),
discoveryDruidNode.getTotalMemory()
};
}

Expand Down Expand Up @@ -701,7 +707,9 @@ private static Object[] buildRowForDiscoverableDataServer(
druidServerToUse.getMaxSize(),
null,
toStringOrNull(discoveryDruidNode.getStartTime()),
node.getVersion()
node.getVersion(),
(long) discoveryDruidNode.getAvailableProcessors(),
discoveryDruidNode.getTotalMemory()
};
}

Expand Down
Loading
Loading