@@ -26,6 +26,7 @@ import (
2626 kueueacv1beta1 "sigs.k8s.io/kueue/client-go/applyconfiguration/kueue/v1beta1"
2727
2828 corev1 "k8s.io/api/core/v1"
29+ "k8s.io/apimachinery/pkg/api/errors"
2930 "k8s.io/apimachinery/pkg/api/resource"
3031 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3132
@@ -36,20 +37,14 @@ var (
3637 namespaceName = "test-kfto-upgrade"
3738 resourceFlavorName = "rf-upgrade"
3839 clusterQueueName = "cq-upgrade"
40+ localQueueName = "lq-upgrade"
3941 pyTorchJobName = "pytorch-upgrade"
4042)
4143
4244func TestSetupPytorchjob (t * testing.T ) {
4345 test := With (t )
4446
45- // Create a namespace
46- namespace := & corev1.Namespace {
47- ObjectMeta : metav1.ObjectMeta {
48- Name : namespaceName ,
49- },
50- }
51- _ , err := test .Client ().Core ().CoreV1 ().Namespaces ().Create (test .Ctx (), namespace , metav1.CreateOptions {})
52- test .Expect (err ).NotTo (HaveOccurred ())
47+ createOrGetUpgradeTestNamespace (test , namespaceName )
5348
5449 // Create a ConfigMap with training dataset and configuration
5550 configData := map [string ][]byte {
@@ -59,50 +54,43 @@ func TestSetupPytorchjob(t *testing.T) {
5954 config := CreateConfigMap (test , namespaceName , configData )
6055
6156 // Create Kueue resources
62- resourceFlavor := & kueuev1beta1.ResourceFlavor {
63- ObjectMeta : metav1.ObjectMeta {
64- Name : resourceFlavorName ,
65- },
66- }
67- resourceFlavor , err = test .Client ().Kueue ().KueueV1beta1 ().ResourceFlavors ().Create (test .Ctx (), resourceFlavor , metav1.CreateOptions {})
57+ resourceFlavor := kueueacv1beta1 .ResourceFlavor (resourceFlavorName )
58+ appliedResourceFlavor , err := test .Client ().Kueue ().KueueV1beta1 ().ResourceFlavors ().Apply (test .Ctx (), resourceFlavor , metav1.ApplyOptions {FieldManager : "setup-PyTorchJob" , Force : true })
6859 test .Expect (err ).NotTo (HaveOccurred ())
60+ test .T ().Logf ("Applied Kueue ResourceFlavor %s successfully" , appliedResourceFlavor .Name )
6961
70- clusterQueue := & kueuev1beta1.ClusterQueue {
71- ObjectMeta : metav1.ObjectMeta {
72- Name : clusterQueueName ,
73- },
74- Spec : kueuev1beta1.ClusterQueueSpec {
75- NamespaceSelector : & metav1.LabelSelector {},
76- ResourceGroups : []kueuev1beta1.ResourceGroup {
77- {
78- CoveredResources : []corev1.ResourceName {corev1 .ResourceName ("cpu" ), corev1 .ResourceName ("memory" )},
79- Flavors : []kueuev1beta1.FlavorQuotas {
80- {
81- Name : kueuev1beta1 .ResourceFlavorReference (resourceFlavor .Name ),
82- Resources : []kueuev1beta1.ResourceQuota {
83- {
84- Name : corev1 .ResourceCPU ,
85- NominalQuota : resource .MustParse ("8" ),
86- },
87- {
88- Name : corev1 .ResourceMemory ,
89- NominalQuota : resource .MustParse ("12Gi" ),
90- },
91- },
92- },
93- },
94- },
95- },
96- StopPolicy : Ptr (kueuev1beta1 .Hold ),
97- },
98- }
99- clusterQueue , err = test .Client ().Kueue ().KueueV1beta1 ().ClusterQueues ().Create (test .Ctx (), clusterQueue , metav1.CreateOptions {})
62+ clusterQueue := kueueacv1beta1 .ClusterQueue (clusterQueueName ).WithSpec (
63+ kueueacv1beta1 .ClusterQueueSpec ().
64+ WithNamespaceSelector (metav1.LabelSelector {}).
65+ WithResourceGroups (
66+ kueueacv1beta1 .ResourceGroup ().WithCoveredResources (
67+ corev1 .ResourceName ("cpu" ), corev1 .ResourceName ("memory" ),
68+ ).WithFlavors (
69+ kueueacv1beta1 .FlavorQuotas ().
70+ WithName (kueuev1beta1 .ResourceFlavorReference (resourceFlavorName )).
71+ WithResources (
72+ kueueacv1beta1 .ResourceQuota ().WithName (corev1 .ResourceCPU ).WithNominalQuota (resource .MustParse ("8" )),
73+ kueueacv1beta1 .ResourceQuota ().WithName (corev1 .ResourceMemory ).WithNominalQuota (resource .MustParse ("12Gi" )),
74+ ),
75+ ),
76+ ).
77+ WithStopPolicy (kueuev1beta1 .Hold ),
78+ )
79+ appliedClusterQueue , err := test .Client ().Kueue ().KueueV1beta1 ().ClusterQueues ().Apply (test .Ctx (), clusterQueue , metav1.ApplyOptions {FieldManager : "setup-PyTorchJob" , Force : true })
10080 test .Expect (err ).NotTo (HaveOccurred ())
81+ test .T ().Logf ("Applied Kueue ClusterQueue %s successfully" , appliedClusterQueue .Name )
10182
102- localQueue := CreateKueueLocalQueue (test , namespaceName , clusterQueue .Name , AsDefaultQueue )
83+ localQueue := kueueacv1beta1 .LocalQueue (localQueueName , namespaceName ).
84+ WithAnnotations (map [string ]string {"kueue.x-k8s.io/default-queue" : "true" }).
85+ WithSpec (
86+ kueueacv1beta1 .LocalQueueSpec ().WithClusterQueue (kueuev1beta1 .ClusterQueueReference (clusterQueueName )),
87+ )
88+ appliedLocalQueue , err := test .Client ().Kueue ().KueueV1beta1 ().LocalQueues (namespaceName ).Apply (test .Ctx (), localQueue , metav1.ApplyOptions {FieldManager : "setup-PyTorchJob" , Force : true })
89+ test .Expect (err ).NotTo (HaveOccurred ())
90+ test .T ().Logf ("Applied Kueue LocalQueue %s/%s successfully" , appliedLocalQueue .Namespace , appliedLocalQueue .Name )
10391
10492 // Create training PyTorch job
105- tuningJob := createPyTorchJob (test , namespaceName , localQueue .Name , * config )
93+ tuningJob := createPyTorchJob (test , namespaceName , appliedLocalQueue .Name , * config )
10694
10795 // Make sure the PyTorch job is suspended, waiting for ClusterQueue to be enabled
10896 test .Eventually (kftocore .PyTorchJob (test , tuningJob .Namespace , pyTorchJobName ), TestTimeoutShort ).
@@ -133,6 +121,17 @@ func TestRunPytorchjob(t *testing.T) {
133121}
134122
135123func createPyTorchJob (test Test , namespace , localQueueName string , config corev1.ConfigMap ) * kftov1.PyTorchJob {
124+ // Does PyTorchJob already exist?
125+ _ , err := test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace ).Get (test .Ctx (), pyTorchJobName , metav1.GetOptions {})
126+ if err == nil {
127+ // If yes then delete it and wait until there are no PyTorchJobs in the namespace
128+ err := test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace ).Delete (test .Ctx (), pyTorchJobName , metav1.DeleteOptions {})
129+ test .Expect (err ).NotTo (HaveOccurred ())
130+ test .Eventually (kftocore .PyTorchJobs (test , namespace ), TestTimeoutShort ).Should (BeEmpty ())
131+ } else if ! errors .IsNotFound (err ) {
132+ test .T ().Fatalf ("Error retrieving PyTorchJob with name `%s`: %v" , pyTorchJobName , err )
133+ }
134+
136135 tuningJob := & kftov1.PyTorchJob {
137136 ObjectMeta : metav1.ObjectMeta {
138137 Name : pyTorchJobName ,
@@ -244,9 +243,23 @@ func createPyTorchJob(test Test, namespace, localQueueName string, config corev1
244243 },
245244 }
246245
247- tuningJob , err : = test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace ).Create (test .Ctx (), tuningJob , metav1.CreateOptions {})
246+ tuningJob , err = test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace ).Create (test .Ctx (), tuningJob , metav1.CreateOptions {})
248247 test .Expect (err ).NotTo (HaveOccurred ())
249248 test .T ().Logf ("Created PytorchJob %s/%s successfully" , tuningJob .Namespace , tuningJob .Name )
250249
251250 return tuningJob
252251}
252+
253+ func createOrGetUpgradeTestNamespace (test Test , name string , options ... Option [* corev1.Namespace ]) (namespace * corev1.Namespace ) {
254+ // Verify that the namespace really exists and return it, create it if doesn't exist yet
255+ namespace , err := test .Client ().Core ().CoreV1 ().Namespaces ().Get (test .Ctx (), name , metav1.GetOptions {})
256+ if err == nil {
257+ return
258+ } else if errors .IsNotFound (err ) {
259+ test .T ().Logf ("%s namespace doesn't exists. Creating ..." , name )
260+ return CreateTestNamespaceWithName (test , name , options ... )
261+ } else {
262+ test .T ().Fatalf ("Error retrieving namespace with name `%s`: %v" , name , err )
263+ }
264+ return
265+ }
0 commit comments