Skip to content

Commit 8028f3f

Browse files
sutaakaropenshift-merge-bot[bot]
authored andcommitted
Add test coverage for sequential admission of PytorchJob
1 parent 02b6637 commit 8028f3f

File tree

1 file changed

+104
-22
lines changed

1 file changed

+104
-22
lines changed

tests/kfto/kfto_kueue_sft_test.go

Lines changed: 104 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ func PytorchJobConditionSucceeded(job *kftov1.PyTorchJob) corev1.ConditionStatus
4646
return PytorchJobCondition(job, kftov1.JobSucceeded)
4747
}
4848

49+
func PytorchJobConditionSuspended(job *kftov1.PyTorchJob) corev1.ConditionStatus {
50+
return PytorchJobCondition(job, kftov1.JobSuspended)
51+
}
52+
4953
func PytorchJobCondition(job *kftov1.PyTorchJob, conditionType kftov1.JobConditionType) corev1.ConditionStatus {
5054
for _, condition := range job.Status.Conditions {
5155
if condition.Type == conditionType {
@@ -55,9 +59,12 @@ func PytorchJobCondition(job *kftov1.PyTorchJob, conditionType kftov1.JobConditi
5559
return corev1.ConditionUnknown
5660
}
5761

62+
func OwnerReferenceName(meta metav1.Object) string {
63+
return meta.GetOwnerReferences()[0].Name
64+
}
65+
5866
func TestPytorchjobWithSFTtrainer(t *testing.T) {
5967
test := With(t)
60-
test.T().Parallel()
6168

6269
// Create a namespace
6370
namespace := test.NewTestNamespace()
@@ -99,17 +106,107 @@ func TestPytorchjobWithSFTtrainer(t *testing.T) {
99106
defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
100107
localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name)
101108

102-
// Run training PyTorch job
109+
// Create training PyTorch job
110+
tuningJob := createPyTorchJob(test, namespace.Name, localQueue.Name, *config)
111+
112+
// Make sure the Kueue Workload is admitted
113+
test.Eventually(KueueWorkloads(test, namespace.Name), TestTimeoutLong).
114+
Should(
115+
And(
116+
HaveLen(1),
117+
ContainElement(WithTransform(KueueWorkloadAdmitted, BeTrueBecause("Workload failed to be admitted"))),
118+
),
119+
)
120+
121+
// Make sure the PyTorch job is running
122+
test.Eventually(PytorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutShort).
123+
Should(WithTransform(PytorchJobConditionRunning, Equal(corev1.ConditionTrue)))
124+
125+
// Make sure the PyTorch job succeed
126+
test.Eventually(PytorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutShort).Should(WithTransform(PytorchJobConditionSucceeded, Equal(corev1.ConditionTrue)))
127+
test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name)
128+
}
129+
130+
func TestPytorchjobUsingKueueQuota(t *testing.T) {
131+
test := With(t)
132+
133+
// Create a namespace
134+
namespace := test.NewTestNamespace()
135+
136+
// Create a ConfigMap with training dataset and configuration
137+
configData := map[string][]byte{
138+
"config.json": ReadFile(test, "config.json"),
139+
"twitter_complaints_small.json": ReadFile(test, "twitter_complaints_small.json"),
140+
}
141+
config := CreateConfigMap(test, namespace.Name, configData)
142+
143+
// Create limited Kueue resources to run just one Pytorchjob at a time
144+
resourceFlavor := CreateKueueResourceFlavor(test, kueuev1beta1.ResourceFlavorSpec{})
145+
defer test.Client().Kueue().KueueV1beta1().ResourceFlavors().Delete(test.Ctx(), resourceFlavor.Name, metav1.DeleteOptions{})
146+
cqSpec := kueuev1beta1.ClusterQueueSpec{
147+
NamespaceSelector: &metav1.LabelSelector{},
148+
ResourceGroups: []kueuev1beta1.ResourceGroup{
149+
{
150+
CoveredResources: []corev1.ResourceName{corev1.ResourceName("cpu"), corev1.ResourceName("memory")},
151+
Flavors: []kueuev1beta1.FlavorQuotas{
152+
{
153+
Name: kueuev1beta1.ResourceFlavorReference(resourceFlavor.Name),
154+
Resources: []kueuev1beta1.ResourceQuota{
155+
{
156+
Name: corev1.ResourceCPU,
157+
NominalQuota: resource.MustParse("3"),
158+
},
159+
{
160+
Name: corev1.ResourceMemory,
161+
NominalQuota: resource.MustParse("6Gi"),
162+
},
163+
},
164+
},
165+
},
166+
},
167+
},
168+
}
169+
clusterQueue := CreateKueueClusterQueue(test, cqSpec)
170+
defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
171+
localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name)
172+
173+
// Create first training PyTorch job
174+
tuningJob := createPyTorchJob(test, namespace.Name, localQueue.Name, *config)
175+
176+
// Make sure the PyTorch job is running
177+
test.Eventually(PytorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutShort).
178+
Should(WithTransform(PytorchJobConditionRunning, Equal(corev1.ConditionTrue)))
179+
180+
// Create second training PyTorch job
181+
secondTuningJob := createPyTorchJob(test, namespace.Name, localQueue.Name, *config)
182+
183+
// Make sure the second PyTorch job is suspended, waiting for first job to finish
184+
test.Eventually(PytorchJob(test, namespace.Name, secondTuningJob.Name), TestTimeoutShort).
185+
Should(WithTransform(PytorchJobConditionSuspended, Equal(corev1.ConditionTrue)))
186+
187+
// Make sure the first PyTorch job succeed
188+
test.Eventually(PytorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutLong).Should(WithTransform(PytorchJobConditionSucceeded, Equal(corev1.ConditionTrue)))
189+
test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name)
190+
191+
// Second PyTorch job should be started now
192+
test.Eventually(PytorchJob(test, namespace.Name, secondTuningJob.Name), TestTimeoutShort).
193+
Should(WithTransform(PytorchJobConditionRunning, Equal(corev1.ConditionTrue)))
194+
195+
// Make sure the second PyTorch job succeed
196+
test.Eventually(PytorchJob(test, namespace.Name, secondTuningJob.Name), TestTimeoutLong).Should(WithTransform(PytorchJobConditionSucceeded, Equal(corev1.ConditionTrue)))
197+
test.T().Logf("PytorchJob %s/%s ran successfully", secondTuningJob.Namespace, secondTuningJob.Name)
198+
}
199+
200+
func createPyTorchJob(test Test, namespace, localQueueName string, config corev1.ConfigMap) *kftov1.PyTorchJob {
103201
tuningJob := &kftov1.PyTorchJob{
104202
TypeMeta: metav1.TypeMeta{
105203
APIVersion: corev1.SchemeGroupVersion.String(),
106204
Kind: "PyTorchJob",
107205
},
108206
ObjectMeta: metav1.ObjectMeta{
109-
Name: "kfto-sft",
110-
Namespace: namespace.Name,
207+
GenerateName: "kfto-sft-",
111208
Labels: map[string]string{
112-
"kueue.x-k8s.io/queue-name": localQueue.Name,
209+
"kueue.x-k8s.io/queue-name": localQueueName,
113210
},
114211
},
115212
Spec: kftov1.PyTorchJobSpec{
@@ -174,24 +271,9 @@ func TestPytorchjobWithSFTtrainer(t *testing.T) {
174271
},
175272
}
176273

177-
tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace.Name).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
274+
tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
178275
test.Expect(err).NotTo(HaveOccurred())
179276
test.T().Logf("Created PytorchJob %s/%s successfully", tuningJob.Namespace, tuningJob.Name)
180277

181-
// Make sure the Kueue Workload is admitted
182-
test.Eventually(KueueWorkloads(test, namespace.Name), TestTimeoutLong).
183-
Should(
184-
And(
185-
HaveLen(1),
186-
ContainElement(WithTransform(KueueWorkloadAdmitted, BeTrueBecause("Workload failed to be admitted"))),
187-
),
188-
)
189-
190-
// Make sure the PyTorch job is running
191-
test.Eventually(PytorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutShort).
192-
Should(WithTransform(PytorchJobConditionRunning, Equal(corev1.ConditionTrue)))
193-
194-
// Make sure the PyTorch job succeed
195-
test.Eventually(PytorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutLong).Should(WithTransform(PytorchJobConditionSucceeded, Equal(corev1.ConditionTrue)))
196-
test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name)
278+
return tuningJob
197279
}

0 commit comments

Comments
 (0)