Skip to content

Commit fb2899a

Browse files
authored
Merge pull request #8485 from ttetyanka/feature/deletionlatencytracker
Node removal latency metrics added
2 parents ffcbfee + 97e45c5 commit fb2899a

File tree

17 files changed

+794
-110
lines changed

17 files changed

+794
-110
lines changed

cluster-autoscaler/clusterstate/clusterstate.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
2828
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/api"
2929
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
30+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
3031
"k8s.io/autoscaler/cluster-autoscaler/metrics"
3132
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
3233
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups"
@@ -775,18 +776,18 @@ func (csr *ClusterStateRegistry) updateCloudProviderDeletedNodes(deletedNodes []
775776
}
776777

777778
// UpdateScaleDownCandidates updates scale down candidates
778-
func (csr *ClusterStateRegistry) UpdateScaleDownCandidates(nodes []*apiv1.Node, now time.Time) {
779+
func (csr *ClusterStateRegistry) UpdateScaleDownCandidates(nodes []*scaledown.UnneededNode, now time.Time) {
779780
result := make(map[string][]string)
780781
for _, node := range nodes {
781-
group, err := csr.cloudProvider.NodeGroupForNode(node)
782+
group, err := csr.cloudProvider.NodeGroupForNode(node.Node)
782783
if err != nil {
783-
klog.Warningf("Failed to get node group for %s: %v", node.Name, err)
784+
klog.Warningf("Failed to get node group for %s: %v", node.Node.Name, err)
784785
continue
785786
}
786787
if group == nil || reflect.ValueOf(group).IsNil() {
787788
continue
788789
}
789-
result[group.Id()] = append(result[group.Id()], node.Name)
790+
result[group.Id()] = append(result[group.Id()], node.Node.Name)
790791
}
791792
csr.candidatesForScaleDown = result
792793
csr.lastScaleDownUpdateTime = now

cluster-autoscaler/clusterstate/clusterstate_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/api"
2929
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
3030
"k8s.io/autoscaler/cluster-autoscaler/config"
31+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
3132
"k8s.io/autoscaler/cluster-autoscaler/metrics"
3233
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
3334
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups"
@@ -306,7 +307,7 @@ func TestNodeWithoutNodeGroupDontCrash(t *testing.T) {
306307
err := clusterstate.UpdateNodes([]*apiv1.Node{noNgNode}, nil, now)
307308
assert.NoError(t, err)
308309
assert.Empty(t, clusterstate.GetScaleUpFailures())
309-
clusterstate.UpdateScaleDownCandidates([]*apiv1.Node{noNgNode}, now)
310+
clusterstate.UpdateScaleDownCandidates([]*scaledown.UnneededNode{{Node: noNgNode}}, now)
310311
}
311312

312313
func TestOKOneUnreadyNodeWithScaleDownCandidate(t *testing.T) {
@@ -331,7 +332,7 @@ func TestOKOneUnreadyNodeWithScaleDownCandidate(t *testing.T) {
331332
OkTotalUnreadyCount: 1,
332333
}, fakeLogRecorder, newBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
333334
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
334-
clusterstate.UpdateScaleDownCandidates([]*apiv1.Node{ng1_1}, now)
335+
clusterstate.UpdateScaleDownCandidates([]*scaledown.UnneededNode{{Node: ng1_1}}, now)
335336

336337
assert.NoError(t, err)
337338
assert.True(t, clusterstate.IsClusterHealthy())

cluster-autoscaler/config/autoscaling_options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,8 @@ type AutoscalingOptions struct {
358358
CapacitybufferPodInjectionEnabled bool
359359
// MaxNodeSkipEvalTimeTrackerEnabled is used to enabled/disable the tracking of maximum evaluation time of a node being skipped during ScaleDown.
360360
MaxNodeSkipEvalTimeTrackerEnabled bool
361+
// NodeRemovalLatencyTrackingEnabled is used to enable/disable node removal latency tracking.
362+
NodeRemovalLatencyTrackingEnabled bool
361363
}
362364

363365
// KubeClientOptions specify options for kube client

cluster-autoscaler/config/flags/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ var (
233233
nodeDeletionCandidateTTL = flag.Duration("node-deletion-candidate-ttl", time.Duration(0), "Maximum time a node can be marked as removable before the marking becomes stale. This sets the TTL of Cluster-Autoscaler's state if the Cluste-Autoscaler deployment becomes inactive")
234234
capacitybufferControllerEnabled = flag.Bool("capacity-buffer-controller-enabled", false, "Whether to enable the default controller for capacity buffers or not")
235235
capacitybufferPodInjectionEnabled = flag.Bool("capacity-buffer-pod-injection-enabled", false, "Whether to enable pod list processor that processes ready capacity buffers and injects fake pods accordingly")
236+
nodeRemovalLatencyTrackingEnabled = flag.Bool("node-removal-latency-tracking-enabled", false, "Whether to track latency from when an unneeded node is eligible for scale down until it is removed or needed again.")
236237
maxNodeSkipEvalTimeTrackerEnabled = flag.Bool("max-node-skip-eval-time-tracker-enabled", false, "Whether to enable the tracking of the maximum time of node being skipped during ScaleDown")
237238

238239
// Deprecated flags
@@ -425,6 +426,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
425426
NodeDeletionCandidateTTL: *nodeDeletionCandidateTTL,
426427
CapacitybufferControllerEnabled: *capacitybufferControllerEnabled,
427428
CapacitybufferPodInjectionEnabled: *capacitybufferPodInjectionEnabled,
429+
NodeRemovalLatencyTrackingEnabled: *nodeRemovalLatencyTrackingEnabled,
428430
MaxNodeSkipEvalTimeTrackerEnabled: *maxNodeSkipEvalTimeTrackerEnabled,
429431
}
430432
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package latencytracker
18+
19+
import (
20+
"maps"
21+
"slices"
22+
"time"
23+
24+
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
25+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
26+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
27+
"k8s.io/autoscaler/cluster-autoscaler/metrics"
28+
"k8s.io/klog/v2"
29+
30+
processor "k8s.io/autoscaler/cluster-autoscaler/processors/status"
31+
)
32+
33+
const (
34+
// scaleDownLatencyLogThreshold is the duration after which a scale-down
35+
// deletion is considered "slow". Deletions that take
36+
// longer than this threshold will be logged at a more visible level
37+
scaleDownLatencyLogThreshold = 3 * time.Minute
38+
)
39+
40+
type unneededNodeState struct {
41+
unneededSince time.Time
42+
removalThreshold time.Duration
43+
}
44+
45+
// NodeLatencyTracker keeps track of nodes that are marked as unneeded, when they became unneeded,
46+
// and removalThresholds to emit node removal latency metrics.
47+
type NodeLatencyTracker struct {
48+
unneededNodes map[string]unneededNodeState
49+
wrapped processor.ScaleDownStatusProcessor
50+
}
51+
52+
// NewNodeLatencyTracker creates a new tracker.
53+
func NewNodeLatencyTracker(wrapped processor.ScaleDownStatusProcessor) *NodeLatencyTracker {
54+
return &NodeLatencyTracker{
55+
unneededNodes: make(map[string]unneededNodeState),
56+
wrapped: wrapped,
57+
}
58+
}
59+
60+
// UpdateScaleDownCandidates updates tracked unneeded nodes and reports those that became needed again.
61+
func (t *NodeLatencyTracker) UpdateScaleDownCandidates(list []*scaledown.UnneededNode, timestamp time.Time) {
62+
currentSet := make(map[string]struct{}, len(list))
63+
for _, candidate := range list {
64+
nodeName := candidate.Node.Name
65+
currentSet[nodeName] = struct{}{}
66+
if info, exists := t.unneededNodes[nodeName]; !exists {
67+
t.unneededNodes[nodeName] = unneededNodeState{
68+
unneededSince: timestamp,
69+
removalThreshold: candidate.RemovalThreshold,
70+
}
71+
klog.V(6).Infof("Started tracking unneeded node %s at %v with removal threshold %v.", nodeName, timestamp, candidate.RemovalThreshold)
72+
} else {
73+
if info.removalThreshold != candidate.RemovalThreshold {
74+
info.removalThreshold = candidate.RemovalThreshold
75+
t.unneededNodes[nodeName] = info
76+
klog.V(6).Infof("Updated removal threshold for tracked node %s to %v.", nodeName, candidate.RemovalThreshold)
77+
}
78+
}
79+
}
80+
for nodeName := range t.unneededNodes {
81+
if _, exists := currentSet[nodeName]; !exists {
82+
delete(t.unneededNodes, nodeName)
83+
klog.V(6).Infof("Node %s is no longer unneeded (or was removed). Stopped tracking at %v.", nodeName, timestamp)
84+
}
85+
}
86+
}
87+
88+
// Process updates unremovableNodes and reports node removal latency based on scale-down status.
89+
func (t *NodeLatencyTracker) Process(autoscalingCtx *ca_context.AutoscalingContext, status *status.ScaleDownStatus) {
90+
if t.wrapped != nil {
91+
t.wrapped.Process(autoscalingCtx, status)
92+
}
93+
for _, unremovableNode := range status.UnremovableNodes {
94+
nodeName := unremovableNode.Node.Name
95+
if info, exists := t.unneededNodes[nodeName]; exists {
96+
duration := time.Since(info.unneededSince)
97+
metrics.UpdateScaleDownNodeRemovalLatency(false, duration)
98+
klog.V(4).Infof("Node %q is unremovable, became needed again (unneeded for %s).", nodeName, duration)
99+
delete(t.unneededNodes, nodeName)
100+
}
101+
}
102+
for _, scaledDownNode := range status.ScaledDownNodes {
103+
nodeName := scaledDownNode.Node.Name
104+
if info, exists := t.unneededNodes[nodeName]; exists {
105+
duration := time.Since(info.unneededSince)
106+
latency := duration - info.removalThreshold
107+
metrics.UpdateScaleDownNodeRemovalLatency(true, latency)
108+
if latency > scaleDownLatencyLogThreshold {
109+
klog.V(2).Infof(
110+
"Observing deletion for node %s, unneeded for %s (removal threshold was %s).",
111+
nodeName, duration, info.removalThreshold,
112+
)
113+
} else {
114+
klog.V(6).Infof(
115+
"Observing deletion for node %s, unneeded for %s (removal threshold was %s).",
116+
nodeName, duration, info.removalThreshold,
117+
)
118+
}
119+
delete(t.unneededNodes, nodeName)
120+
}
121+
}
122+
if klog.V(6).Enabled() {
123+
for nodeName := range t.unneededNodes {
124+
klog.Infof("Node %q remains in unneeded list (not scaled down). Continuing to track latency.", nodeName)
125+
}
126+
}
127+
}
128+
129+
// getTrackedNodes returns the names of all nodes currently tracked as unneeded.
130+
func (t *NodeLatencyTracker) getTrackedNodes() []string {
131+
return slices.Collect(maps.Keys(t.unneededNodes))
132+
}
133+
134+
// CleanUp cleans up internal structures.
135+
func (t *NodeLatencyTracker) CleanUp() {
136+
if t.wrapped != nil {
137+
t.wrapped.CleanUp()
138+
}
139+
}

0 commit comments

Comments
 (0)