From eb9b45735ed762b7be8206b6451ad5e41c76f281 Mon Sep 17 00:00:00 2001 From: Amy Chen Date: Mon, 3 Nov 2025 16:00:15 -0800 Subject: [PATCH 1/2] Transition QuotaReserved to false whenever setting Finished or Deactivated conditions --- .../admissionchecks/multikueue/workload.go | 3 +- pkg/controller/core/workload_controller.go | 24 ++ .../core/workload_controller_test.go | 314 +++++++++++++++++- pkg/workload/admissionchecks.go | 8 + pkg/workload/workload.go | 17 +- test/e2e/singlecluster/e2e_test.go | 6 +- test/e2e/singlecluster/e2e_v1beta1_test.go | 2 +- test/e2e/singlecluster/tas_test.go | 10 +- test/e2e/tas/job_test.go | 12 +- .../jobs/job/job_controller_test.go | 22 +- .../jobs/pod/pod_controller_test.go | 6 +- .../raycluster/raycluster_controller_test.go | 2 +- .../scheduler/preemption_test.go | 6 +- 13 files changed, 388 insertions(+), 44 deletions(-) diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index 7c3f692d96d..132fb643488 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -326,9 +326,10 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco // 0. Ignore Elastic workloads Finished when: // - Workload is "Finished" as a result workload slice replacement, OR // - Workload doesn't have quota reservation as a result of scale-up, i.e., scaling-up in progress. + // But don't ignore if the workload is actually finished (not just replaced). if group.IsElasticWorkload() && ((group.IsFinished() && workloadslicing.IsReplaced(group.local.Status)) || - (!workload.HasQuotaReservation(group.local) && workloadslicing.ScaledUp(group.local))) { + (!workload.HasQuotaReservation(group.local) && workloadslicing.ScaledUp(group.local) && !group.IsFinished())) { return reconcile.Result{}, nil } diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index b248d6ce51b..ac74a39bd00 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "slices" + "strings" "time" "github.com/go-logr/logr" @@ -170,6 +171,16 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c finishedCond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadFinished) if finishedCond != nil && finishedCond.Status == metav1.ConditionTrue { + // Unset quota reservation for finished workloads + if workload.HasQuotaReservation(&wl) { + log.V(2).Info("Unsetting quota reservation for finished workload") + if err := workload.PatchAdmissionStatus(ctx, r.client, &wl, r.clock, func() (*kueue.Workload, bool, error) { + return &wl, workload.UnsetQuotaReservationWithCondition(&wl, kueue.WorkloadFinished, "Workload finished", r.clock.Now()), nil + }); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to unset quota reservation for finished workload: %w", err) + } + } + if !features.Enabled(features.ObjectRetentionPolicies) || r.workloadRetention == nil || r.workloadRetention.afterFinished == nil { return ctrl.Result{}, nil } @@ -226,6 +237,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c requeuedCond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadRequeued) var conditionsCleared bool + //if requeuedCond != nil && requeuedCond.Status == metav1.ConditionFalse && requeuedCond.Reason == kueue.WorkloadInadmissible { if quotaReservedCond != nil && quotaReservedCond.Status == metav1.ConditionFalse { apimeta.RemoveStatusCondition(&wl.Status.Conditions, kueue.WorkloadQuotaReserved) conditionsCleared = true @@ -321,6 +333,18 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if updated { if evicted { if err := workload.Evict(ctx, r.client, r.recorder, wlOrig, reason, message, underlyingCause, r.clock, workload.WithCustomPrepare(func() (*kueue.Workload, error) { + // Unset quota reservation for deactivated workloads + if reason == kueue.WorkloadDeactivated || strings.HasPrefix(reason, kueue.WorkloadDeactivated) { + if workload.HasQuotaReservation(&wl) { + quotaReason := reason + if underlyingCause != "" { + quotaReason = workload.ReasonWithCause(reason, string(underlyingCause)) + } + workload.UnsetQuotaReservationWithCondition(&wl, quotaReason, message, r.clock.Now()) + // Set the WorkloadRequeued condition to false for deactivated workloads + workload.SetRequeuedCondition(&wl, quotaReason, message, false) + } + } return &wl, nil })); err != nil { if !apierrors.IsNotFound(err) { diff --git a/pkg/controller/core/workload_controller_test.go b/pkg/controller/core/workload_controller_test.go index f7c6c3be9ec..d044801858b 100644 --- a/pkg/controller/core/workload_controller_test.go +++ b/pkg/controller/core/workload_controller_test.go @@ -1484,12 +1484,65 @@ func TestReconcile(t *testing.T) { Reason: kueue.WorkloadDeactivated, Message: "The workload is deactivated", }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadDeactivated, + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "Deactivated", + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadDeactivated, + Message: "The workload is deactivated", + }). + SchedulingStatsEviction( + kueue.WorkloadSchedulingStatsEviction{ + Reason: kueue.WorkloadDeactivated, + Count: 1, + }, + ). + PastAdmittedTime(0). + Obj(), + wantWorkloadUseMergePatch: utiltestingapi.MakeWorkload("wl", "ns"). + Active(false). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadDeactivated, + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadDeactivated, + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadDeactivated, + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "Deactivated", + Message: "The workload is deactivated", + }). SchedulingStatsEviction( kueue.WorkloadSchedulingStatsEviction{ Reason: kueue.WorkloadDeactivated, Count: 1, }, ). + PastAdmittedTime(0). Obj(), wantEvents: []utiltesting.EventRecord{ { @@ -1529,6 +1582,24 @@ func TestReconcile(t *testing.T) { Reason: kueue.WorkloadDeactivated, Message: "The workload is deactivated", }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadDeactivated, + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "Deactivated", + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadDeactivated, + Message: "The workload is deactivated", + }). SchedulingStatsEviction( kueue.WorkloadSchedulingStatsEviction{ Reason: kueue.WorkloadEvictedByPodsReadyTimeout, @@ -1542,6 +1613,48 @@ func TestReconcile(t *testing.T) { Count: 1, }, ). + PastAdmittedTime(0). + Obj(), + wantWorkloadUseMergePatch: utiltestingapi.MakeWorkload("wl", "ns"). + Active(false). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadDeactivated, + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadDeactivated, + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "Deactivated", + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadDeactivated, + Message: "The workload is deactivated", + }). + SchedulingStatsEviction( + kueue.WorkloadSchedulingStatsEviction{ + Reason: kueue.WorkloadEvictedByPodsReadyTimeout, + UnderlyingCause: kueue.WorkloadWaitForStart, + Count: 1, + }, + ). + SchedulingStatsEviction( + kueue.WorkloadSchedulingStatsEviction{ + Reason: kueue.WorkloadDeactivated, + Count: 1, + }, + ). + PastAdmittedTime(0). Obj(), wantEvents: []utiltesting.EventRecord{ { @@ -1595,6 +1708,24 @@ func TestReconcile(t *testing.T) { Reason: "DeactivatedDueToRequeuingLimitExceeded", Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "Deactivated", + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). // DeactivationTarget condition should be deleted in the real cluster, but the fake client doesn't allow us to do it. Condition(metav1.Condition{ Type: kueue.WorkloadDeactivationTarget, @@ -1609,11 +1740,10 @@ func TestReconcile(t *testing.T) { Count: 1, }, ). + PastAdmittedTime(0). Obj(), wantWorkloadUseMergePatch: utiltestingapi.MakeWorkload("wl", "ns"). Active(false). - ReserveQuota(utiltestingapi.MakeAdmission("q1").Obj()). - Admitted(true). Condition(metav1.Condition{ Type: kueue.WorkloadPodsReady, Status: metav1.ConditionFalse, @@ -1626,6 +1756,24 @@ func TestReconcile(t *testing.T) { Reason: "DeactivatedDueToRequeuingLimitExceeded", Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "Deactivated", + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). SchedulingStatsEviction( kueue.WorkloadSchedulingStatsEviction{ Reason: "Deactivated", @@ -1633,6 +1781,7 @@ func TestReconcile(t *testing.T) { Count: 1, }, ). + PastAdmittedTime(0). Obj(), wantEvents: []utiltesting.EventRecord{ { @@ -1686,6 +1835,24 @@ func TestReconcile(t *testing.T) { Reason: "DeactivatedDueToRequeuingLimitExceeded", Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "Deactivated", + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). // DeactivationTarget condition should be deleted in the real cluster, but the fake client doesn't allow us to do it. Condition(metav1.Condition{ Type: kueue.WorkloadDeactivationTarget, @@ -1700,11 +1867,10 @@ func TestReconcile(t *testing.T) { Count: 1, }, ). + PastAdmittedTime(0). Obj(), wantWorkloadUseMergePatch: utiltestingapi.MakeWorkload("wl", "ns"). Active(false). - ReserveQuota(utiltestingapi.MakeAdmission("q1").Obj()). - Admitted(true). Condition(metav1.Condition{ Type: kueue.WorkloadPodsReady, Status: metav1.ConditionFalse, @@ -1717,6 +1883,24 @@ func TestReconcile(t *testing.T) { Reason: "DeactivatedDueToRequeuingLimitExceeded", Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "Deactivated", + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). SchedulingStatsEviction( kueue.WorkloadSchedulingStatsEviction{ Reason: "Deactivated", @@ -1724,6 +1908,7 @@ func TestReconcile(t *testing.T) { Count: 1, }, ). + PastAdmittedTime(0). Obj(), wantEvents: []utiltesting.EventRecord{ { @@ -1778,6 +1963,24 @@ func TestReconcile(t *testing.T) { Reason: "DeactivatedDueToRequeuingLimitExceeded", Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "Deactivated", + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). // DeactivationTarget condition should be deleted in the real cluster, but the fake client doesn't allow us to do it. Condition(metav1.Condition{ Type: kueue.WorkloadDeactivationTarget, @@ -1794,11 +1997,10 @@ func TestReconcile(t *testing.T) { Count: 1, }, ). + PastAdmittedTime(0). Obj(), wantWorkloadUseMergePatch: utiltestingapi.MakeWorkload("wl", "ns"). Active(false). - ReserveQuota(utiltestingapi.MakeAdmission("q1").Obj()). - Admitted(true). Condition(metav1.Condition{ Type: kueue.WorkloadPodsReady, Status: metav1.ConditionFalse, @@ -1811,6 +2013,24 @@ func TestReconcile(t *testing.T) { Reason: "DeactivatedDueToRequeuingLimitExceeded", Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "Deactivated", + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). SchedulingStatsEviction( kueue.WorkloadSchedulingStatsEviction{ Reason: "Deactivated", @@ -1818,6 +2038,7 @@ func TestReconcile(t *testing.T) { Count: 1, }, ). + PastAdmittedTime(0). Obj(), wantEvents: []utiltesting.EventRecord{ { @@ -1872,6 +2093,24 @@ func TestReconcile(t *testing.T) { Reason: "DeactivatedDueToRequeuingLimitExceeded", Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "Deactivated", + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). // DeactivationTarget condition should be deleted in the real cluster, but the fake client doesn't allow us to do it. Condition(metav1.Condition{ Type: kueue.WorkloadDeactivationTarget, @@ -1888,11 +2127,10 @@ func TestReconcile(t *testing.T) { Count: 1, }, ). + PastAdmittedTime(0). Obj(), wantWorkloadUseMergePatch: utiltestingapi.MakeWorkload("wl", "ns"). Active(false). - ReserveQuota(utiltestingapi.MakeAdmission("q1").Obj()). - Admitted(true). Condition(metav1.Condition{ Type: kueue.WorkloadPodsReady, Status: metav1.ConditionFalse, @@ -1905,6 +2143,24 @@ func TestReconcile(t *testing.T) { Reason: "DeactivatedDueToRequeuingLimitExceeded", Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "Deactivated", + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: "DeactivatedDueToRequeuingLimitExceeded", + Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", + }). SchedulingStatsEviction( kueue.WorkloadSchedulingStatsEviction{ Reason: "Deactivated", @@ -1912,6 +2168,7 @@ func TestReconcile(t *testing.T) { Count: 1, }, ). + PastAdmittedTime(0). Obj(), wantEvents: []utiltesting.EventRecord{ { @@ -2414,6 +2671,47 @@ func TestReconcile(t *testing.T) { }, wantError: nil, }, + "should unset quota reservation for finished workload": { + workload: utiltestingapi.MakeWorkload("wl", "ns"). + ReserveQuota(utiltestingapi.MakeAdmission("cq").Obj()). + Condition(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadFinishedReasonSucceeded, + Message: "Job finished successfully", + }). + Obj(), + wantWorkload: utiltestingapi.MakeWorkload("wl", "ns"). + ReserveQuota(utiltestingapi.MakeAdmission("cq").Obj()). + Condition(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadFinishedReasonSucceeded, + Message: "Job finished successfully", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadFinished, + Message: "Workload finished", + }). + Obj(), + wantWorkloadUseMergePatch: utiltestingapi.MakeWorkload("wl", "ns"). + Condition(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadFinishedReasonSucceeded, + Message: "Job finished successfully", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadFinished, + Message: "Workload finished", + }). + Obj(), + wantError: nil, + }, } for name, tc := range cases { for _, enabled := range []bool{false, true} { diff --git a/pkg/workload/admissionchecks.go b/pkg/workload/admissionchecks.go index 4769739a348..3f3a8ef4f36 100644 --- a/pkg/workload/admissionchecks.go +++ b/pkg/workload/admissionchecks.go @@ -17,6 +17,7 @@ limitations under the License. package workload import ( + "strings" "time" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -36,6 +37,9 @@ func SyncAdmittedCondition(w *kueue.Workload, now time.Time) bool { hasAllChecksReady := HasAllChecksReady(w) isAdmitted := IsAdmitted(w) hasAllTopologyAssignmentsReady := !HasTopologyAssignmentsPending(w) + quotaCondition := apimeta.FindStatusCondition(w.Status.Conditions, kueue.WorkloadQuotaReserved) + isDeactivated := quotaCondition != nil && + quotaCondition.Status == metav1.ConditionFalse && (quotaCondition.Reason == "Deactivated" || strings.Contains(quotaCondition.Reason, "Deactivated")) if isAdmitted == (hasReservation && hasAllChecksReady && hasAllTopologyAssignmentsReady) { return false @@ -53,6 +57,10 @@ func SyncAdmittedCondition(w *kueue.Workload, now time.Time) bool { newCondition.Status = metav1.ConditionFalse newCondition.Reason = "NoReservationUnsatisfiedChecks" newCondition.Message = "The workload has no reservation and not all checks ready" + case !hasReservation && isDeactivated: + newCondition.Status = metav1.ConditionFalse + newCondition.Reason = "Deactivated" + newCondition.Message = "The workload is deactivated" case !hasReservation: newCondition.Status = metav1.ConditionFalse newCondition.Reason = "NoReservation" diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 8c6edb7b928..ebea2c77e6c 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -602,6 +602,7 @@ func UnsetQuotaReservationWithCondition(wl *kueue.Workload, reason, message stri if SyncAdmittedCondition(wl, now) { changed = true } + return changed } @@ -1278,22 +1279,30 @@ func Evict(ctx context.Context, c client.Client, recorder record.EventRecorder, } prepareForEviction(wl, clock.Now(), evictionReason, msg) reportWorkloadEvictedOnce := workloadEvictionStateInc(wl, reason, underlyingCause) + + // Save cluster queue name before PatchAdmissionStatus potentially clears it + var clusterQueue kueue.ClusterQueueReference + if wlOrig.Status.Admission != nil { + clusterQueue = wlOrig.Status.Admission.ClusterQueue + } + if err := PatchAdmissionStatus(ctx, c, wlOrig, clock, func() (*kueue.Workload, bool, error) { return wl, true, nil }); err != nil { return err } - if wlOrig.Status.Admission == nil { - // This is an extra safeguard for access to `wl.Status.Admission`. + + if clusterQueue == "" { + // This is an extra safeguard for access to cluster queue info. // This function is expected to be called only for workload which have // Admission. log := log.FromContext(ctx) log.V(3).Info("WARNING: unexpected eviction of workload without status.Admission", "workload", klog.KObj(wl)) return nil } - reportEvictedWorkload(recorder, wl, wl.Status.Admission.ClusterQueue, reason, msg, underlyingCause) + reportEvictedWorkload(recorder, wl, clusterQueue, reason, msg, underlyingCause) if reportWorkloadEvictedOnce { - metrics.ReportEvictedWorkloadsOnce(wl.Status.Admission.ClusterQueue, reason, string(underlyingCause), wl.Spec.PriorityClassName) + metrics.ReportEvictedWorkloadsOnce(clusterQueue, reason, string(underlyingCause), wl.Spec.PriorityClassName) } return nil } diff --git a/test/e2e/singlecluster/e2e_test.go b/test/e2e/singlecluster/e2e_test.go index 563e34e8faa..c1973002658 100644 --- a/test/e2e/singlecluster/e2e_test.go +++ b/test/e2e/singlecluster/e2e_test.go @@ -214,7 +214,7 @@ var _ = ginkgo.Describe("Kueue", func() { wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(sampleJob.Name, sampleJob.UID), Namespace: ns.Name} gomega.Eventually(func(g gomega.Gomega) { g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) - g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeTrue()) + g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeFalse()) g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) @@ -385,7 +385,7 @@ var _ = ginkgo.Describe("Kueue", func() { wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: ns.Name} gomega.Eventually(func(g gomega.Gomega) { g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) - g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeTrue()) + g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeFalse()) g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) @@ -731,7 +731,7 @@ var _ = ginkgo.Describe("Kueue", func() { }) gomega.Eventually(func(g gomega.Gomega) { g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) - g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeTrue()) + g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeFalse()) g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) diff --git a/test/e2e/singlecluster/e2e_v1beta1_test.go b/test/e2e/singlecluster/e2e_v1beta1_test.go index 25437afae16..1831eb3a993 100644 --- a/test/e2e/singlecluster/e2e_v1beta1_test.go +++ b/test/e2e/singlecluster/e2e_v1beta1_test.go @@ -130,7 +130,7 @@ var _ = ginkgo.Describe("Kueue v1beta2", func() { wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(sampleJob.Name, sampleJob.UID), Namespace: ns.Name} gomega.Eventually(func(g gomega.Gomega) { g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) - g.Expect(hasQuotaReservation(createdWorkload)).Should(gomega.BeTrue()) + g.Expect(hasQuotaReservation(createdWorkload)).Should(gomega.BeFalse()) g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) diff --git a/test/e2e/singlecluster/tas_test.go b/test/e2e/singlecluster/tas_test.go index 43f568aca10..7b2ae58f1b5 100644 --- a/test/e2e/singlecluster/tas_test.go +++ b/test/e2e/singlecluster/tas_test.go @@ -140,7 +140,7 @@ var _ = ginkgo.Describe("TopologyAwareScheduling", func() { ginkgo.By(fmt.Sprintf("verify the workload %q gets finished", wlLookupKey), func() { gomega.Eventually(func(g gomega.Gomega) { g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) - g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeTrue()) + g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeFalse()) g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) @@ -275,7 +275,7 @@ var _ = ginkgo.Describe("TopologyAwareScheduling", func() { ginkgo.By(fmt.Sprintf("verify the workload %q gets finished", wlLookupKey), func() { gomega.Eventually(func(g gomega.Gomega) { g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) - g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeTrue()) + g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeFalse()) g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) @@ -376,7 +376,7 @@ var _ = ginkgo.Describe("TopologyAwareScheduling", func() { ginkgo.By(fmt.Sprintf("verify the workload %q gets finished", wlLookupKey), func() { gomega.Eventually(func(g gomega.Gomega) { g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) - g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeTrue()) + g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeFalse()) g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) @@ -440,7 +440,7 @@ var _ = ginkgo.Describe("TopologyAwareScheduling", func() { ginkgo.By(fmt.Sprintf("verify the workload %q gets finished", wlLookupKey), func() { gomega.Eventually(func(g gomega.Gomega) { g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) - g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeTrue()) + g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeFalse()) g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) @@ -554,7 +554,7 @@ var _ = ginkgo.Describe("TopologyAwareScheduling", func() { ginkgo.By(fmt.Sprintf("verify the workload %q gets finished", wlLookupKey), func() { gomega.Eventually(func(g gomega.Gomega) { g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) - g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeTrue()) + g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeFalse()) g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) diff --git a/test/e2e/tas/job_test.go b/test/e2e/tas/job_test.go index 2bae3bed4c9..970f6841b00 100644 --- a/test/e2e/tas/job_test.go +++ b/test/e2e/tas/job_test.go @@ -160,7 +160,7 @@ var _ = ginkgo.Describe("TopologyAwareScheduling for Job", func() { ginkgo.By(fmt.Sprintf("verify the workload %q gets finished", wlLookupKey), func() { gomega.Eventually(func(g gomega.Gomega) { g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) - g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeTrue()) + g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeFalse()) g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) @@ -213,7 +213,7 @@ var _ = ginkgo.Describe("TopologyAwareScheduling for Job", func() { ginkgo.By(fmt.Sprintf("verify the workload %q gets finished", wlLookupKey), func() { gomega.Eventually(func(g gomega.Gomega) { g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) - g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeTrue()) + g.Expect(workload.HasQuotaReservation(createdWorkload)).Should(gomega.BeFalse()) g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) @@ -235,11 +235,17 @@ var _ = ginkgo.Describe("TopologyAwareScheduling for Job", func() { wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(sampleJob.Name, sampleJob.UID), Namespace: ns.Name} createdWorkload := &kueue.Workload{} - ginkgo.By(fmt.Sprintf("verify the workload %q gets TopologyAssignment becomes finished", wlLookupKey), func() { + ginkgo.By(fmt.Sprintf("verify the workload %q gets admitted with TopologyAssignment", wlLookupKey), func() { gomega.Eventually(func(g gomega.Gomega) { g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) + g.Expect(createdWorkload.Status.Admission).ShouldNot(gomega.BeNil()) g.Expect(createdWorkload.Status.Admission.PodSetAssignments).Should(gomega.HaveLen(1)) g.Expect(createdWorkload.Status.Admission.PodSetAssignments[0].TopologyAssignment).ShouldNot(gomega.BeNil()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + ginkgo.By(fmt.Sprintf("verify the workload %q becomes finished", wlLookupKey), func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) g.Expect(createdWorkload.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadFinished)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) diff --git a/test/integration/singlecluster/controller/jobs/job/job_controller_test.go b/test/integration/singlecluster/controller/jobs/job/job_controller_test.go index d50563d4041..4d0b916d453 100644 --- a/test/integration/singlecluster/controller/jobs/job/job_controller_test.go +++ b/test/integration/singlecluster/controller/jobs/job/job_controller_test.go @@ -2141,14 +2141,14 @@ var _ = ginkgo.Describe("Interacting with scheduler", ginkgo.Ordered, ginkgo.Con gomega.BeComparableTo(metav1.Condition{ Type: kueue.WorkloadQuotaReserved, Status: metav1.ConditionFalse, - Reason: "Pending", + Reason: kueue.WorkloadDeactivated, Message: "The workload is deactivated", }, util.IgnoreConditionTimestampsAndObservedGeneration), gomega.BeComparableTo(metav1.Condition{ Type: kueue.WorkloadAdmitted, Status: metav1.ConditionFalse, - Reason: "NoReservation", - Message: "The workload has no reservation", + Reason: "Deactivated", + Message: "The workload is deactivated", }, util.IgnoreConditionTimestampsAndObservedGeneration), gomega.BeComparableTo(metav1.Condition{ Type: kueue.WorkloadEvicted, @@ -2194,13 +2194,11 @@ var _ = ginkgo.Describe("Interacting with scheduler", ginkgo.Ordered, ginkgo.Con g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: sampleJob.Name, Namespace: sampleJob.Namespace}, createdJob)). Should(gomega.Succeed()) g.Expect(createdJob.Spec.Suspend).To(gomega.Equal(ptr.To(true))) - // Workload should get unadmitted + // Workload should get unadmitted and have Evicted condition g.Expect(k8sClient.Get(ctx, wlKey, wll)).Should(gomega.Succeed()) - util.ExpectWorkloadsToBePending(ctx, k8sClient, wll) - // Workload should stay pending - g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wll), wll)).Should(gomega.Succeed()) - // Should have Evicted condition g.Expect(wll.Status.Conditions).Should(testing.HaveConditionStatusTrue(kueue.WorkloadEvicted)) + // Workload should not be admitted + g.Expect(wll.Status.Conditions).Should(testing.HaveConditionStatusFalse(kueue.WorkloadAdmitted)) }, util.ConsistentDuration, util.ShortInterval).Should(gomega.Succeed()) ginkgo.By("checking the first job becomes unsuspended after we update the Active field back to true") @@ -2869,7 +2867,7 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe gomega.BeComparableTo(metav1.Condition{ Type: kueue.WorkloadQuotaReserved, Status: metav1.ConditionFalse, - Reason: "Pending", + Reason: "DeactivatedDueToRequeuingLimitExceeded", Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", }, util.IgnoreConditionTimestampsAndObservedGeneration), gomega.BeComparableTo(metav1.Condition{ @@ -2881,8 +2879,8 @@ var _ = ginkgo.Describe("Job controller interacting with Workload controller whe gomega.BeComparableTo(metav1.Condition{ Type: kueue.WorkloadAdmitted, Status: metav1.ConditionFalse, - Reason: "NoReservation", - Message: "The workload has no reservation", + Reason: "Deactivated", + Message: "The workload is deactivated", }, util.IgnoreConditionTimestampsAndObservedGeneration), gomega.BeComparableTo(metav1.Condition{ Type: kueue.WorkloadRequeued, @@ -3621,7 +3619,7 @@ var _ = ginkgo.Describe("Job with elastic jobs via workload-slices support", gin g.Expect(workload.IsFinished(&workloads.Items[i])).Should(gomega.BeTrue()) continue } - util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, testJobWorkload) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, &workloads.Items[i]) } }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) diff --git a/test/integration/singlecluster/controller/jobs/pod/pod_controller_test.go b/test/integration/singlecluster/controller/jobs/pod/pod_controller_test.go index 29dfc6fb854..3c7434a7bc5 100644 --- a/test/integration/singlecluster/controller/jobs/pod/pod_controller_test.go +++ b/test/integration/singlecluster/controller/jobs/pod/pod_controller_test.go @@ -2139,7 +2139,7 @@ var _ = ginkgo.Describe("Pod controller interacting with Workload controller whe gomega.BeComparableTo(metav1.Condition{ Type: kueue.WorkloadQuotaReserved, Status: metav1.ConditionFalse, - Reason: "Pending", + Reason: "DeactivatedDueToRequeuingLimitExceeded", Message: "The workload is deactivated due to exceeding the maximum number of re-queuing retries", }, util.IgnoreConditionTimestampsAndObservedGeneration), gomega.BeComparableTo(metav1.Condition{ @@ -2151,8 +2151,8 @@ var _ = ginkgo.Describe("Pod controller interacting with Workload controller whe gomega.BeComparableTo(metav1.Condition{ Type: kueue.WorkloadAdmitted, Status: metav1.ConditionFalse, - Reason: "NoReservation", - Message: "The workload has no reservation", + Reason: "Deactivated", + Message: "The workload is deactivated", }, util.IgnoreConditionTimestampsAndObservedGeneration), gomega.BeComparableTo(metav1.Condition{ Type: podcontroller.WorkloadWaitingForReplacementPods, diff --git a/test/integration/singlecluster/controller/jobs/raycluster/raycluster_controller_test.go b/test/integration/singlecluster/controller/jobs/raycluster/raycluster_controller_test.go index 99fa3afd456..406fa095765 100644 --- a/test/integration/singlecluster/controller/jobs/raycluster/raycluster_controller_test.go +++ b/test/integration/singlecluster/controller/jobs/raycluster/raycluster_controller_test.go @@ -844,7 +844,7 @@ var _ = ginkgo.Describe("RayCluster with elastic jobs via workload-slices suppor g.Expect(workloads.Items[i].Spec.PodSets[1].Count).Should(gomega.Equal(int32(1))) } else { g.Expect(workloads.Items[i].Name).ShouldNot(gomega.Equal(testRayClusterWorkload.Name)) - util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, testRayClusterWorkload) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, &workloads.Items[i]) testRayClusterWorkload = &workloads.Items[i] } } diff --git a/test/integration/singlecluster/scheduler/preemption_test.go b/test/integration/singlecluster/scheduler/preemption_test.go index 93325336f7f..a4a2be3bed3 100644 --- a/test/integration/singlecluster/scheduler/preemption_test.go +++ b/test/integration/singlecluster/scheduler/preemption_test.go @@ -1063,7 +1063,7 @@ var _ = ginkgo.Describe("Preemption", func() { util.ExpectWorkloadsToBePreempted(ctx, k8sClient, lowWl) util.FinishEvictionForWorkloads(ctx, k8sClient, lowWl) util.ExpectWorkloadToFinish(ctx, k8sClient, client.ObjectKeyFromObject(highWl)) - util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, highWl) + util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, highWlScaledUp) ginkgo.By("Scale down a high priority workload") gomega.Eventually(func(g gomega.Gomega) { @@ -1072,8 +1072,8 @@ var _ = ginkgo.Describe("Preemption", func() { wl.Spec.PodSets[0].Count = 1 // Scaled down. g.Expect(k8sClient.Update(ctx, wl)).To(gomega.Succeed()) }, util.Timeout, util.Interval).Should(gomega.Succeed()) - util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, lowWl, highWl) - util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, lowWl, highWl) + util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, lowWl, highWlScaledUp) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, lowWl, highWlScaledUp) }) }) }) From b6f9adc527f37f1b479190192e5b99c8a6e62ec4 Mon Sep 17 00:00:00 2001 From: Amy Chen Date: Wed, 5 Nov 2025 09:56:22 -0800 Subject: [PATCH 2/2] partial admission finish workload test fixes --- pkg/controller/core/workload_controller.go | 1 - pkg/controller/jobframework/reconciler.go | 12 +++++++++--- pkg/util/equality/podset.go | 8 ++++---- pkg/util/equality/podset_test.go | 2 +- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index ac74a39bd00..8a0491ad998 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -237,7 +237,6 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c requeuedCond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadRequeued) var conditionsCleared bool - //if requeuedCond != nil && requeuedCond.Status == metav1.ConditionFalse && requeuedCond.Reason == kueue.WorkloadInadmissible { if quotaReservedCond != nil && quotaReservedCond.Status == metav1.ConditionFalse { apimeta.RemoveStatusCondition(&wl.Status.Conditions, kueue.WorkloadQuotaReserved) conditionsCleared = true diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index dd417e7b7ee..ce4a4c897db 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -1074,18 +1074,24 @@ func EquivalentToWorkload(ctx context.Context, c client.Client, job GenericJob, } jobPodSets := clearMinCountsIfFeatureDisabled(getPodSets) + // For finished workloads, ignore count differences in pod set comparisons. + // In partial admission, the job's final parallelism may differ from the + // workload's original specification, causing count mismatches that could + // incorrectly trigger workload deletion. + isFinished := apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) + if runningPodSets := expectedRunningPodSets(ctx, c, wl); runningPodSets != nil { - if equality.ComparePodSetSlices(jobPodSets, runningPodSets, workload.IsAdmitted(wl)) { + if equality.ComparePodSetSlices(jobPodSets, runningPodSets, workload.IsAdmitted(wl), isFinished) { return true, nil } // If the workload is admitted but the job is suspended, do the check // against the non-running info. // This might allow some violating jobs to pass equivalency checks, but their // workloads would be invalidated in the next sync after unsuspending. - return job.IsSuspended() && equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets, workload.IsAdmitted(wl)), nil + return job.IsSuspended() && equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets, workload.IsAdmitted(wl), isFinished), nil } - return equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets, workload.IsAdmitted(wl)), nil + return equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets, workload.IsAdmitted(wl), isFinished), nil } func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload) (*kueue.Workload, error) { diff --git a/pkg/util/equality/podset.go b/pkg/util/equality/podset.go index d07c326b732..29f89a19dff 100644 --- a/pkg/util/equality/podset.go +++ b/pkg/util/equality/podset.go @@ -36,8 +36,8 @@ func comparePodTemplate(a, b *corev1.PodSpec, ignoreTolerations bool) bool { return equality.Semantic.DeepEqual(a.Containers, b.Containers) } -func ComparePodSets(a, b *kueue.PodSet, ignoreTolerations bool) bool { - if a.Count != b.Count { +func ComparePodSets(a, b *kueue.PodSet, ignoreTolerations, ignoreCount bool) bool { + if !ignoreCount && a.Count != b.Count { return false } if ptr.Deref(a.MinCount, -1) != ptr.Deref(b.MinCount, -1) { @@ -47,12 +47,12 @@ func ComparePodSets(a, b *kueue.PodSet, ignoreTolerations bool) bool { return comparePodTemplate(&a.Template.Spec, &b.Template.Spec, ignoreTolerations) } -func ComparePodSetSlices(a, b []kueue.PodSet, ignoreTolerations bool) bool { +func ComparePodSetSlices(a, b []kueue.PodSet, ignoreTolerations, ignoreCount bool) bool { if len(a) != len(b) { return false } for i := range a { - if !ComparePodSets(&a[i], &b[i], ignoreTolerations) { + if !ComparePodSets(&a[i], &b[i], ignoreTolerations, ignoreCount) { return false } } diff --git a/pkg/util/equality/podset_test.go b/pkg/util/equality/podset_test.go index 7b571f0537b..e8a2b00bccb 100644 --- a/pkg/util/equality/podset_test.go +++ b/pkg/util/equality/podset_test.go @@ -139,7 +139,7 @@ func TestComparePodSetSlices(t *testing.T) { } for name, tc := range cases { t.Run(name, func(t *testing.T) { - got := ComparePodSetSlices(tc.a, tc.b, tc.ignoreTolerations) + got := ComparePodSetSlices(tc.a, tc.b, tc.ignoreTolerations, false) if got != tc.wantEquivalent { t.Errorf("Unexpected result, want %v", tc.wantEquivalent) }