@@ -24,14 +24,12 @@ import (
2424 "k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
2525 "k8s.io/autoscaler/cluster-autoscaler/clusterstate"
2626 "k8s.io/autoscaler/cluster-autoscaler/context"
27- "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
2827 "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
2928 "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
29+ "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
3030 "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
31- core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
3231 "k8s.io/autoscaler/cluster-autoscaler/metrics"
3332 "k8s.io/autoscaler/cluster-autoscaler/processors"
34- "k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
3533 "k8s.io/autoscaler/cluster-autoscaler/processors/status"
3634 "k8s.io/autoscaler/cluster-autoscaler/simulator"
3735 "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
@@ -47,187 +45,6 @@ import (
4745 schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
4846)
4947
50- type scaleDownResourcesLimits map [string ]int64
51- type scaleDownResourcesDelta map [string ]int64
52-
53- // used as a value in scaleDownResourcesLimits if actual limit could not be obtained due to errors talking to cloud provider
54- const scaleDownLimitUnknown = math .MinInt64
55-
56- func (sd * ScaleDown ) computeScaleDownResourcesLeftLimits (nodes []* apiv1.Node , resourceLimiter * cloudprovider.ResourceLimiter , cp cloudprovider.CloudProvider , timestamp time.Time ) scaleDownResourcesLimits {
57- totalCores , totalMem := calculateScaleDownCoresMemoryTotal (nodes , timestamp )
58-
59- var totalResources map [string ]int64
60- var totalResourcesErr error
61- if cloudprovider .ContainsCustomResources (resourceLimiter .GetResources ()) {
62- totalResources , totalResourcesErr = sd .calculateScaleDownCustomResourcesTotal (nodes , cp , timestamp )
63- }
64-
65- resultScaleDownLimits := make (scaleDownResourcesLimits )
66- for _ , resource := range resourceLimiter .GetResources () {
67- min := resourceLimiter .GetMin (resource )
68-
69- // we put only actual limits into final map. No entry means no limit.
70- if min > 0 {
71- switch {
72- case resource == cloudprovider .ResourceNameCores :
73- resultScaleDownLimits [resource ] = computeAboveMin (totalCores , min )
74- case resource == cloudprovider .ResourceNameMemory :
75- resultScaleDownLimits [resource ] = computeAboveMin (totalMem , min )
76- case cloudprovider .IsCustomResource (resource ):
77- if totalResourcesErr != nil {
78- resultScaleDownLimits [resource ] = scaleDownLimitUnknown
79- } else {
80- resultScaleDownLimits [resource ] = computeAboveMin (totalResources [resource ], min )
81- }
82- default :
83- klog .Errorf ("Scale down limits defined for unsupported resource '%s'" , resource )
84- }
85- }
86- }
87- return resultScaleDownLimits
88- }
89-
90- func computeAboveMin (total int64 , min int64 ) int64 {
91- if total > min {
92- return total - min
93- }
94- return 0
95-
96- }
97-
98- func calculateScaleDownCoresMemoryTotal (nodes []* apiv1.Node , timestamp time.Time ) (int64 , int64 ) {
99- var coresTotal , memoryTotal int64
100- for _ , node := range nodes {
101- if actuation .IsNodeBeingDeleted (node , timestamp ) {
102- // Nodes being deleted do not count towards total cluster resources
103- continue
104- }
105- cores , memory := core_utils .GetNodeCoresAndMemory (node )
106-
107- coresTotal += cores
108- memoryTotal += memory
109- }
110-
111- return coresTotal , memoryTotal
112- }
113-
114- func (sd * ScaleDown ) calculateScaleDownCustomResourcesTotal (nodes []* apiv1.Node , cp cloudprovider.CloudProvider , timestamp time.Time ) (map [string ]int64 , error ) {
115- result := make (map [string ]int64 )
116- ngCache := make (map [string ][]customresources.CustomResourceTarget )
117- for _ , node := range nodes {
118- if actuation .IsNodeBeingDeleted (node , timestamp ) {
119- // Nodes being deleted do not count towards total cluster resources
120- continue
121- }
122- nodeGroup , err := cp .NodeGroupForNode (node )
123- if err != nil {
124- return nil , errors .ToAutoscalerError (errors .CloudProviderError , err ).AddPrefix ("can not get node group for node %v when calculating cluster gpu usage" , node .Name )
125- }
126- if nodeGroup == nil || reflect .ValueOf (nodeGroup ).IsNil () {
127- // We do not trust cloud providers to return properly constructed nil for interface type - hence the reflection check.
128- // See https://golang.org/doc/faq#nil_error
129- // TODO[lukaszos] consider creating cloud_provider sanitizer which will wrap cloud provider and ensure sane behaviour.
130- nodeGroup = nil
131- }
132-
133- var resourceTargets []customresources.CustomResourceTarget
134- var cacheHit bool
135-
136- if nodeGroup != nil {
137- resourceTargets , cacheHit = ngCache [nodeGroup .Id ()]
138- }
139- if ! cacheHit {
140- resourceTargets , err = sd .processors .CustomResourcesProcessor .GetNodeResourceTargets (sd .context , node , nodeGroup )
141- if err != nil {
142- return nil , errors .ToAutoscalerError (errors .CloudProviderError , err ).AddPrefix ("can not get gpu count for node %v when calculating cluster gpu usage" )
143- }
144- if nodeGroup != nil {
145- ngCache [nodeGroup .Id ()] = resourceTargets
146- }
147- }
148-
149- for _ , resourceTarget := range resourceTargets {
150- if resourceTarget .ResourceType == "" || resourceTarget .ResourceCount == 0 {
151- continue
152- }
153- result [resourceTarget .ResourceType ] += resourceTarget .ResourceCount
154- }
155- }
156-
157- return result , nil
158- }
159-
160- func noScaleDownLimitsOnResources () scaleDownResourcesLimits {
161- return nil
162- }
163-
164- func copyScaleDownResourcesLimits (source scaleDownResourcesLimits ) scaleDownResourcesLimits {
165- copy := scaleDownResourcesLimits {}
166- for k , v := range source {
167- copy [k ] = v
168- }
169- return copy
170- }
171-
172- func (sd * ScaleDown ) computeScaleDownResourcesDelta (cp cloudprovider.CloudProvider , node * apiv1.Node , nodeGroup cloudprovider.NodeGroup , resourcesWithLimits []string ) (scaleDownResourcesDelta , errors.AutoscalerError ) {
173- resultScaleDownDelta := make (scaleDownResourcesDelta )
174-
175- nodeCPU , nodeMemory := core_utils .GetNodeCoresAndMemory (node )
176- resultScaleDownDelta [cloudprovider .ResourceNameCores ] = nodeCPU
177- resultScaleDownDelta [cloudprovider .ResourceNameMemory ] = nodeMemory
178-
179- if cloudprovider .ContainsCustomResources (resourcesWithLimits ) {
180- resourceTargets , err := sd .processors .CustomResourcesProcessor .GetNodeResourceTargets (sd .context , node , nodeGroup )
181- if err != nil {
182- return scaleDownResourcesDelta {}, errors .ToAutoscalerError (errors .CloudProviderError , err ).AddPrefix ("Failed to get node %v custom resources: %v" , node .Name )
183- }
184- for _ , resourceTarget := range resourceTargets {
185- resultScaleDownDelta [resourceTarget .ResourceType ] = resourceTarget .ResourceCount
186- }
187- }
188- return resultScaleDownDelta , nil
189- }
190-
191- type scaleDownLimitsCheckResult struct {
192- exceeded bool
193- exceededResources []string
194- }
195-
196- func scaleDownLimitsNotExceeded () scaleDownLimitsCheckResult {
197- return scaleDownLimitsCheckResult {false , []string {}}
198- }
199-
200- func (limits * scaleDownResourcesLimits ) checkScaleDownDeltaWithinLimits (delta scaleDownResourcesDelta ) scaleDownLimitsCheckResult {
201- exceededResources := sets .NewString ()
202- for resource , resourceDelta := range delta {
203- resourceLeft , found := (* limits )[resource ]
204- if found {
205- if (resourceDelta > 0 ) && (resourceLeft == scaleDownLimitUnknown || resourceDelta > resourceLeft ) {
206- exceededResources .Insert (resource )
207- }
208- }
209- }
210- if len (exceededResources ) > 0 {
211- return scaleDownLimitsCheckResult {true , exceededResources .List ()}
212- }
213-
214- return scaleDownLimitsNotExceeded ()
215- }
216-
217- func (limits * scaleDownResourcesLimits ) tryDecrementLimitsByDelta (delta scaleDownResourcesDelta ) scaleDownLimitsCheckResult {
218- result := limits .checkScaleDownDeltaWithinLimits (delta )
219- if result .exceeded {
220- return result
221- }
222- for resource , resourceDelta := range delta {
223- resourceLeft , found := (* limits )[resource ]
224- if found {
225- (* limits )[resource ] = resourceLeft - resourceDelta
226- }
227- }
228- return scaleDownLimitsNotExceeded ()
229- }
230-
23148// ScaleDown is responsible for maintaining the state needed to perform unneeded node removals.
23249type ScaleDown struct {
23350 context * context.AutoscalingContext
@@ -242,12 +59,13 @@ type ScaleDown struct {
24259 nodeDeletionTracker * deletiontracker.NodeDeletionTracker
24360 removalSimulator * simulator.RemovalSimulator
24461 eligibilityChecker * eligibility.Checker
62+ resourceLimitsFinder * resource.LimitsFinder
24563}
24664
24765// NewScaleDown builds new ScaleDown object.
24866func NewScaleDown (context * context.AutoscalingContext , processors * processors.AutoscalingProcessors , clusterStateRegistry * clusterstate.ClusterStateRegistry , ndt * deletiontracker.NodeDeletionTracker ) * ScaleDown {
24967 usageTracker := simulator .NewUsageTracker ()
250- removalSimulator := simulator .NewRemovalSimulator (context .ListerRegistry , context .ClusterSnapshot , context .PredicateChecker , usageTracker )
68+ removalSimulator := simulator .NewRemovalSimulator (context .ListerRegistry , context .ClusterSnapshot , context .PredicateChecker , usageTracker , false )
25169 unremovableNodes := unremovable .NewNodes ()
25270 return & ScaleDown {
25371 context : context ,
@@ -262,6 +80,7 @@ func NewScaleDown(context *context.AutoscalingContext, processors *processors.Au
26280 nodeDeletionTracker : ndt ,
26381 removalSimulator : removalSimulator ,
26482 eligibilityChecker : eligibility .NewChecker (processors .NodeGroupConfigProcessor ),
83+ resourceLimitsFinder : resource .NewLimitsFinder (processors .CustomResourcesProcessor ),
26584 }
26685}
26786
@@ -513,7 +332,7 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDi
513332 return nil , nil , status .ScaleDownError , errors .ToAutoscalerError (errors .CloudProviderError , errCP )
514333 }
515334
516- scaleDownResourcesLeft := sd .computeScaleDownResourcesLeftLimits ( nodesWithoutMaster , resourceLimiter , sd . context . CloudProvider , currentTime )
335+ scaleDownResourcesLeft := sd .resourceLimitsFinder . LimitsLeft ( sd . context , nodesWithoutMaster , resourceLimiter , currentTime )
517336
518337 nodeGroupSize := utils .GetNodeGroupSizeMap (sd .context .CloudProvider )
519338 resourcesWithLimits := resourceLimiter .GetResources ()
@@ -588,18 +407,18 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDi
588407 continue
589408 }
590409
591- scaleDownResourcesDelta , err := sd .computeScaleDownResourcesDelta (sd .context . CloudProvider , node , nodeGroup , resourcesWithLimits )
410+ scaleDownResourcesDelta , err := sd .resourceLimitsFinder . DeltaForNode (sd .context , node , nodeGroup , resourcesWithLimits )
592411 if err != nil {
593412 klog .Errorf ("Error getting node resources: %v" , err )
594413 sd .unremovableNodes .AddReason (node , simulator .UnexpectedError )
595414 continue
596415 }
597416
598- checkResult := scaleDownResourcesLeft .checkScaleDownDeltaWithinLimits (scaleDownResourcesDelta )
599- if checkResult .exceeded {
600- klog .V (4 ).Infof ("Skipping %s - minimal limit exceeded for %v" , node .Name , checkResult .exceededResources )
417+ checkResult := scaleDownResourcesLeft .CheckDeltaWithinLimits (scaleDownResourcesDelta )
418+ if checkResult .Exceeded () {
419+ klog .V (4 ).Infof ("Skipping %s - minimal limit exceeded for %v" , node .Name , checkResult .ExceededResources )
601420 sd .unremovableNodes .AddReason (node , simulator .MinimalResourceLimitExceeded )
602- for _ , resource := range checkResult .exceededResources {
421+ for _ , resource := range checkResult .ExceededResources {
603422 switch resource {
604423 case cloudprovider .ResourceNameCores :
605424 metrics .RegisterSkippedScaleDownCPU ()
@@ -674,18 +493,18 @@ func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration
674493}
675494
676495func (sd * ScaleDown ) getEmptyNodesToRemoveNoResourceLimits (candidates []string , timestamp time.Time ) []simulator.NodeToBeRemoved {
677- return sd .getEmptyNodesToRemove (candidates , noScaleDownLimitsOnResources (), timestamp )
496+ return sd .getEmptyNodesToRemove (candidates , resource . NoLimits (), timestamp )
678497}
679498
680499// This functions finds empty nodes among passed candidates and returns a list of empty nodes
681500// that can be deleted at the same time.
682- func (sd * ScaleDown ) getEmptyNodesToRemove (candidates []string , resourcesLimits scaleDownResourcesLimits ,
501+ func (sd * ScaleDown ) getEmptyNodesToRemove (candidates []string , resourcesLimits resource. Limits ,
683502 timestamp time.Time ) []simulator.NodeToBeRemoved {
684503
685504 emptyNodes := sd .removalSimulator .FindEmptyNodesToRemove (candidates , timestamp )
686505 availabilityMap := make (map [string ]int )
687506 nodesToRemove := make ([]simulator.NodeToBeRemoved , 0 )
688- resourcesLimitsCopy := copyScaleDownResourcesLimits ( resourcesLimits ) // we do not want to modify input parameter
507+ resourcesLimitsCopy := resourcesLimits . DeepCopy ( ) // we do not want to modify input parameter
689508 resourcesNames := sets .StringKeySet (resourcesLimits ).List ()
690509 for _ , nodeName := range emptyNodes {
691510 nodeInfo , err := sd .context .ClusterSnapshot .NodeInfos ().Get (nodeName )
@@ -719,13 +538,13 @@ func (sd *ScaleDown) getEmptyNodesToRemove(candidates []string, resourcesLimits
719538 availabilityMap [nodeGroup .Id ()] = available
720539 }
721540 if available > 0 {
722- resourcesDelta , err := sd .computeScaleDownResourcesDelta (sd .context . CloudProvider , node , nodeGroup , resourcesNames )
541+ resourcesDelta , err := sd .resourceLimitsFinder . DeltaForNode (sd .context , node , nodeGroup , resourcesNames )
723542 if err != nil {
724543 klog .Errorf ("Error: %v" , err )
725544 continue
726545 }
727- checkResult := resourcesLimitsCopy .tryDecrementLimitsByDelta (resourcesDelta )
728- if checkResult .exceeded {
546+ checkResult := resourcesLimitsCopy .TryDecrementBy (resourcesDelta )
547+ if checkResult .Exceeded () {
729548 continue
730549 }
731550 available --
0 commit comments