Skip to content

Commit 2d34fbd

Browse files
committed
Introduce workload.Finish helper function
1 parent 108c05b commit 2d34fbd

File tree

5 files changed

+73
-39
lines changed

5 files changed

+73
-39
lines changed

pkg/controller/admissionchecks/multikueue/workload.go

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,8 @@ import (
4646
config "sigs.k8s.io/kueue/apis/config/v1beta2"
4747
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2"
4848
"sigs.k8s.io/kueue/pkg/controller/jobframework"
49-
"sigs.k8s.io/kueue/pkg/features"
5049
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
5150
"sigs.k8s.io/kueue/pkg/util/api"
52-
clientutil "sigs.k8s.io/kueue/pkg/util/client"
5351
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
5452
"sigs.k8s.io/kueue/pkg/workload"
5553
"sigs.k8s.io/kueue/pkg/workloadslicing"
@@ -359,24 +357,8 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
359357
log.V(3).Info("Group with no adapter, skip owner status copy", "workerCluster", remote)
360358
}
361359

362-
// copy the status to the local one
363-
finishCond := metav1.Condition{
364-
Type: kueue.WorkloadFinished,
365-
Status: metav1.ConditionTrue,
366-
Reason: remoteFinishedCond.Reason,
367-
Message: remoteFinishedCond.Message,
368-
LastTransitionTime: metav1.NewTime(w.clock.Now()),
369-
}
370-
if features.Enabled(features.WorkloadRequestUseMergePatch) {
371-
return reconcile.Result{}, clientutil.PatchStatus(ctx, w.client, group.local, func() (client.Object, bool, error) {
372-
apimeta.SetStatusCondition(&group.local.Status.Conditions, finishCond)
373-
return group.local, true, nil
374-
})
375-
}
376-
377-
wlPatch := workload.BaseSSAWorkload(group.local, false)
378-
apimeta.SetStatusCondition(&wlPatch.Status.Conditions, finishCond)
379-
return reconcile.Result{}, w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(kueue.MultiKueueControllerName+"-finish"), client.ForceOwnership)
360+
// finish workload and copy the status to the local one
361+
return reconcile.Result{}, workload.Finish(ctx, w.client, group.local, remoteFinishedCond.Reason, remoteFinishedCond.Message, kueue.MultiKueueControllerName, w.clock, workload.WithForceOwnership())
380362
}
381363

382364
// 2. delete all workloads that are out of sync (other than scaled-down elastic workloads)

pkg/controller/jobframework/reconciler.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -452,12 +452,12 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
452452
// 2. handle job is finished.
453453
if message, success, finished := job.Finished(ctx); finished {
454454
log.V(3).Info("The workload is already finished")
455-
if wl != nil && !apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
455+
if wl != nil && !workload.IsFinished(wl) {
456456
reason := kueue.WorkloadFinishedReasonSucceeded
457457
if !success {
458458
reason = kueue.WorkloadFinishedReasonFailed
459459
}
460-
err := workload.UpdateStatus(ctx, r.client, wl, kueue.WorkloadFinished, metav1.ConditionTrue, reason, message, constants.JobControllerName, r.clock)
460+
err := workload.Finish(ctx, r.client, wl, reason, message, constants.JobControllerName, r.clock)
461461
if err != nil && !apierrors.IsNotFound(err) {
462462
return ctrl.Result{}, err
463463
}
@@ -584,7 +584,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
584584
log.Error(err, "Unsuspending job")
585585
if podset.IsPermanent(err) {
586586
// Mark the workload as finished with failure since the is no point to retry.
587-
errUpdateStatus := workload.UpdateStatus(ctx, r.client, wl, kueue.WorkloadFinished, metav1.ConditionTrue, FailedToStartFinishedReason, err.Error(), constants.JobControllerName, r.clock)
587+
errUpdateStatus := workload.Finish(ctx, r.client, wl, FailedToStartFinishedReason, err.Error(), constants.JobControllerName, r.clock)
588588
if errUpdateStatus != nil {
589589
log.Error(errUpdateStatus, "Updating workload status, on start failure", "err", err)
590590
}
@@ -1022,13 +1022,8 @@ func (r *JobReconciler) ensurePrebuiltWorkloadInSync(ctx context.Context, wl *ku
10221022
return false, err
10231023
}
10241024
// mark the workload as finished
1025-
err := workload.UpdateStatus(ctx, r.client, wl,
1026-
kueue.WorkloadFinished,
1027-
metav1.ConditionTrue,
1028-
kueue.WorkloadFinishedReasonOutOfSync,
1029-
"The prebuilt workload is out of sync with its user job",
1030-
constants.JobControllerName, r.clock)
1031-
return false, err
1025+
msg := "The prebuilt workload is out of sync with its user job"
1026+
return false, workload.Finish(ctx, r.client, wl, kueue.WorkloadFinishedReasonOutOfSync, msg, constants.JobControllerName, r.clock)
10321027
}
10331028
return true, nil
10341029
}

pkg/scheduler/scheduler_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6872,6 +6872,7 @@ func TestSchedule(t *testing.T) {
68726872
Status: metav1.ConditionTrue,
68736873
Reason: kueue.WorkloadSliceReplaced,
68746874
Message: "Replaced to accommodate a workload (UID: , JobUID: ) due to workload slice aggregation",
6875+
ObservedGeneration: 1,
68756876
LastTransitionTime: metav1.NewTime(now),
68766877
}).
68776878
Obj(),

pkg/workload/workload.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,18 @@ func SetEvictedCondition(w *kueue.Workload, now time.Time, reason string, messag
819819
return apimeta.SetStatusCondition(&w.Status.Conditions, condition)
820820
}
821821

822+
func SetFinishedCondition(w *kueue.Workload, now time.Time, reason string, message string) bool {
823+
condition := metav1.Condition{
824+
Type: kueue.WorkloadFinished,
825+
Status: metav1.ConditionTrue,
826+
LastTransitionTime: metav1.NewTime(now),
827+
Reason: reason,
828+
Message: api.TruncateConditionMessage(message),
829+
ObservedGeneration: w.Generation,
830+
}
831+
return apimeta.SetStatusCondition(&w.Status.Conditions, condition)
832+
}
833+
822834
// PropagateResourceRequests synchronizes w.Status.ResourceRequests to
823835
// with info.TotalRequests if the feature gate is enabled and returns true if w was updated
824836
func PropagateResourceRequests(w *kueue.Workload, info *Info) bool {
@@ -1301,6 +1313,58 @@ func Evict(ctx context.Context, c client.Client, recorder record.EventRecorder,
13011313
return nil
13021314
}
13031315

1316+
type FinishOption func(*FinishOptions)
1317+
1318+
type FinishOptions struct {
1319+
UsePatch bool
1320+
ForceOwnership bool
1321+
}
1322+
1323+
func DefaultFinishOptions() *FinishOptions {
1324+
return &FinishOptions{
1325+
UsePatch: false,
1326+
ForceOwnership: false,
1327+
}
1328+
}
1329+
1330+
func WithUsePatch() FinishOption {
1331+
return func(o *FinishOptions) {
1332+
o.UsePatch = true
1333+
}
1334+
}
1335+
1336+
func WithForceOwnership() FinishOption {
1337+
return func(o *FinishOptions) {
1338+
o.ForceOwnership = true
1339+
}
1340+
}
1341+
1342+
func Finish(ctx context.Context, c client.Client, wl *kueue.Workload, reason, msg, managerPrefix string, clock clock.Clock, options ...FinishOption) error {
1343+
optsFinish := DefaultFinishOptions()
1344+
for _, opt := range options {
1345+
opt(optsFinish)
1346+
}
1347+
usePatch := optsFinish.UsePatch
1348+
forceOwnership := optsFinish.ForceOwnership
1349+
1350+
if features.Enabled(features.WorkloadRequestUseMergePatch) || usePatch {
1351+
return clientutil.PatchStatus(ctx, c, wl, func() (client.Object, bool, error) {
1352+
update := SetFinishedCondition(wl, clock.Now(), reason, msg)
1353+
return wl, update, nil
1354+
})
1355+
} else {
1356+
newWl := BaseSSAWorkload(wl, false)
1357+
SetFinishedCondition(newWl, clock.Now(), reason, msg)
1358+
1359+
fieldOwner := client.FieldOwner(managerPrefix + "-" + kueue.WorkloadFinished)
1360+
applyOpts := []client.SubResourcePatchOption{fieldOwner}
1361+
if forceOwnership {
1362+
applyOpts = append(applyOpts, client.ForceOwnership)
1363+
}
1364+
return c.Status().Patch(ctx, newWl, client.Apply, applyOpts...)
1365+
}
1366+
}
1367+
13041368
func PriorityClassName(wl *kueue.Workload) string {
13051369
if wl.Spec.PriorityClassRef != nil {
13061370
return wl.Spec.PriorityClassRef.Name

pkg/workloadslicing/workloadslicing.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,7 @@ func Finish(ctx context.Context, clnt client.Client, clk clock.Clock, workloadSl
104104
if apimeta.IsStatusConditionTrue(workloadSlice.Status.Conditions, kueue.WorkloadFinished) {
105105
return nil
106106
}
107-
if err := clientutil.PatchStatus(ctx, clnt, workloadSlice, func() (client.Object, bool, error) {
108-
return workloadSlice, apimeta.SetStatusCondition(&workloadSlice.Status.Conditions, metav1.Condition{
109-
Type: kueue.WorkloadFinished,
110-
Status: metav1.ConditionTrue,
111-
Reason: reason,
112-
Message: message,
113-
LastTransitionTime: metav1.NewTime(clk.Now()),
114-
}), nil
115-
}); err != nil {
107+
if err := workload.Finish(ctx, clnt, workloadSlice, reason, message, "", clk, workload.WithUsePatch()); err != nil {
116108
return fmt.Errorf("failed to patch workload slice status: %w", err)
117109
}
118110
return nil

0 commit comments

Comments
 (0)