@@ -26,6 +26,7 @@ import (
2626 kueueacv1beta1 "sigs.k8s.io/kueue/client-go/applyconfiguration/kueue/v1beta1"
2727
2828 corev1 "k8s.io/api/core/v1"
29+ "k8s.io/apimachinery/pkg/api/errors"
2930 "k8s.io/apimachinery/pkg/api/resource"
3031 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3132
4243func TestSetupPytorchjob (t * testing.T ) {
4344 test := With (t )
4445
45- // Create a namespace
46- namespace := & corev1.Namespace {
47- ObjectMeta : metav1.ObjectMeta {
48- Name : namespaceName ,
49- },
50- }
51- _ , err := test .Client ().Core ().CoreV1 ().Namespaces ().Create (test .Ctx (), namespace , metav1.CreateOptions {})
52- test .Expect (err ).NotTo (HaveOccurred ())
46+ createOrGetUpgradeTestNamespace (test , namespaceName )
5347
5448 // Create a ConfigMap with training dataset and configuration
5549 configData := map [string ][]byte {
@@ -59,47 +53,31 @@ func TestSetupPytorchjob(t *testing.T) {
5953 config := CreateConfigMap (test , namespaceName , configData )
6054
6155 // Create Kueue resources
62- resourceFlavor := & kueuev1beta1.ResourceFlavor {
63- ObjectMeta : metav1.ObjectMeta {
64- Name : resourceFlavorName ,
65- },
66- }
67- resourceFlavor , err = test .Client ().Kueue ().KueueV1beta1 ().ResourceFlavors ().Create (test .Ctx (), resourceFlavor , metav1.CreateOptions {})
56+ resourceFlavor := kueueacv1beta1 .ResourceFlavor (resourceFlavorName )
57+ _ , err := test .Client ().Kueue ().KueueV1beta1 ().ResourceFlavors ().Apply (test .Ctx (), resourceFlavor , metav1.ApplyOptions {FieldManager : "setup-PyTorchJob" , Force : true })
6858 test .Expect (err ).NotTo (HaveOccurred ())
6959
70- clusterQueue := & kueuev1beta1.ClusterQueue {
71- ObjectMeta : metav1.ObjectMeta {
72- Name : clusterQueueName ,
73- },
74- Spec : kueuev1beta1.ClusterQueueSpec {
75- NamespaceSelector : & metav1.LabelSelector {},
76- ResourceGroups : []kueuev1beta1.ResourceGroup {
77- {
78- CoveredResources : []corev1.ResourceName {corev1 .ResourceName ("cpu" ), corev1 .ResourceName ("memory" )},
79- Flavors : []kueuev1beta1.FlavorQuotas {
80- {
81- Name : kueuev1beta1 .ResourceFlavorReference (resourceFlavor .Name ),
82- Resources : []kueuev1beta1.ResourceQuota {
83- {
84- Name : corev1 .ResourceCPU ,
85- NominalQuota : resource .MustParse ("8" ),
86- },
87- {
88- Name : corev1 .ResourceMemory ,
89- NominalQuota : resource .MustParse ("12Gi" ),
90- },
91- },
92- },
93- },
94- },
95- },
96- StopPolicy : Ptr (kueuev1beta1 .Hold ),
97- },
98- }
99- clusterQueue , err = test .Client ().Kueue ().KueueV1beta1 ().ClusterQueues ().Create (test .Ctx (), clusterQueue , metav1.CreateOptions {})
60+ clusterQueue := kueueacv1beta1 .ClusterQueue (clusterQueueName ).WithSpec (
61+ kueueacv1beta1 .ClusterQueueSpec ().
62+ WithNamespaceSelector (metav1.LabelSelector {}).
63+ WithResourceGroups (
64+ kueueacv1beta1 .ResourceGroup ().WithCoveredResources (
65+ corev1 .ResourceName ("cpu" ), corev1 .ResourceName ("memory" ),
66+ ).WithFlavors (
67+ kueueacv1beta1 .FlavorQuotas ().
68+ WithName (kueuev1beta1 .ResourceFlavorReference (resourceFlavorName )).
69+ WithResources (
70+ kueueacv1beta1 .ResourceQuota ().WithName (corev1 .ResourceCPU ).WithNominalQuota (resource .MustParse ("8" )),
71+ kueueacv1beta1 .ResourceQuota ().WithName (corev1 .ResourceMemory ).WithNominalQuota (resource .MustParse ("12Gi" )),
72+ ),
73+ ),
74+ ).
75+ WithStopPolicy (kueuev1beta1 .Hold ),
76+ )
77+ _ , err = test .Client ().Kueue ().KueueV1beta1 ().ClusterQueues ().Apply (test .Ctx (), clusterQueue , metav1.ApplyOptions {FieldManager : "setup-PyTorchJob" , Force : true })
10078 test .Expect (err ).NotTo (HaveOccurred ())
10179
102- localQueue := CreateKueueLocalQueue (test , namespaceName , clusterQueue . Name , AsDefaultQueue )
80+ localQueue := CreateKueueLocalQueue (test , namespaceName , clusterQueueName , AsDefaultQueue )
10381
10482 // Create training PyTorch job
10583 tuningJob := createPyTorchJob (test , namespaceName , localQueue .Name , * config )
@@ -133,6 +111,17 @@ func TestRunPytorchjob(t *testing.T) {
133111}
134112
135113func createPyTorchJob (test Test , namespace , localQueueName string , config corev1.ConfigMap ) * kftov1.PyTorchJob {
114+ // Does PyTorchJob already exist?
115+ _ , err := test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace ).Get (test .Ctx (), pyTorchJobName , metav1.GetOptions {})
116+ if err == nil {
117+ // If yes then delete it and wait until there are no PyTorchJobs in the namespace
118+ err := test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace ).Delete (test .Ctx (), pyTorchJobName , metav1.DeleteOptions {})
119+ test .Expect (err ).NotTo (HaveOccurred ())
120+ test .Eventually (kftocore .PytorchJobs (test , namespace ), TestTimeoutShort ).Should (BeEmpty ())
121+ } else if ! errors .IsNotFound (err ) {
122+ test .T ().Fatalf ("Error retrieving PyTorchJob with name `%s`: %v" , pyTorchJobName , err )
123+ }
124+
136125 tuningJob := & kftov1.PyTorchJob {
137126 ObjectMeta : metav1.ObjectMeta {
138127 Name : pyTorchJobName ,
@@ -186,6 +175,10 @@ func createPyTorchJob(test Test, namespace, localQueueName string, config corev1
186175 Name : "tmp-volume" ,
187176 MountPath : "/tmp" ,
188177 },
178+ {
179+ Name : "output-volume" ,
180+ MountPath : "/mnt/output" ,
181+ },
189182 },
190183 Resources : corev1.ResourceRequirements {
191184 Requests : corev1.ResourceList {
@@ -226,6 +219,12 @@ func createPyTorchJob(test Test, namespace, localQueueName string, config corev1
226219 EmptyDir : & corev1.EmptyDirVolumeSource {},
227220 },
228221 },
222+ {
223+ Name : "output-volume" ,
224+ VolumeSource : corev1.VolumeSource {
225+ EmptyDir : & corev1.EmptyDirVolumeSource {},
226+ },
227+ },
229228 },
230229 },
231230 },
@@ -234,9 +233,23 @@ func createPyTorchJob(test Test, namespace, localQueueName string, config corev1
234233 },
235234 }
236235
237- tuningJob , err : = test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace ).Create (test .Ctx (), tuningJob , metav1.CreateOptions {})
236+ tuningJob , err = test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace ).Create (test .Ctx (), tuningJob , metav1.CreateOptions {})
238237 test .Expect (err ).NotTo (HaveOccurred ())
239238 test .T ().Logf ("Created PytorchJob %s/%s successfully" , tuningJob .Namespace , tuningJob .Name )
240239
241240 return tuningJob
242241}
242+
243+ func createOrGetUpgradeTestNamespace (test Test , name string , options ... Option [* corev1.Namespace ]) (namespace * corev1.Namespace ) {
244+ // Verify that the namespace really exists and return it, create it if doesn't exist yet
245+ namespace , err := test .Client ().Core ().CoreV1 ().Namespaces ().Get (test .Ctx (), name , metav1.GetOptions {})
246+ if err == nil {
247+ return
248+ } else if errors .IsNotFound (err ) {
249+ test .T ().Logf ("%s namespace doesn't exists. Creating ..." , name )
250+ return CreateTestNamespaceWithName (test , name , options ... )
251+ } else {
252+ test .T ().Fatalf ("Error retrieving namespace with name `%s`: %v" , name , err )
253+ }
254+ return
255+ }
0 commit comments