Skip to content

Commit 4000068

Browse files
committed
Paved e2e test for SparkApplication integration
1 parent 0e44b59 commit 4000068

File tree

9 files changed

+221
-1
lines changed

9 files changed

+221
-1
lines changed

Makefile-deps.mk

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ KUBEFLOW_MPI_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/ku
3838
KUBERAY_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/ray-project/kuberay/ray-operator)
3939
APPWRAPPER_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/project-codeflare/appwrapper)
4040
LEADERWORKERSET_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" sigs.k8s.io/lws)
41+
SPARKOPERATOR_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/kubeflow/spark-operator/v2)
4142
CERTMANAGER_VERSION=$(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/cert-manager/cert-manager)
4243

4344
GOLANGCI_LINT = $(BIN_DIR)/golangci-lint
@@ -64,6 +65,7 @@ JOBSET_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" sigs.k8s.io/
6465
CLUSTER_AUTOSCALER_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" k8s.io/autoscaler/cluster-autoscaler/apis)
6566
APPWRAPPER_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" github.com/project-codeflare/appwrapper)
6667
LEADERWORKERSET_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" sigs.k8s.io/lws)
68+
SPARKOPERATOR_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" github.com/kubeflow/spark-operator/v2)
6769

6870
##@ Tools
6971

@@ -221,8 +223,13 @@ leaderworkerset-operator-crd: ## Copy the CRDs from the leaderworkerset-operator
221223
mkdir -p $(EXTERNAL_CRDS_DIR)/leaderworkerset-operator/
222224
cp -f $(LEADERWORKERSET_ROOT)/config/crd/bases/* $(EXTERNAL_CRDS_DIR)/leaderworkerset-operator/
223225

226+
.PHONY: spark-operator-crd
227+
spark-operator-crd: ## Copy the CRDs from the spark-operator to the dep-crds directory.
228+
mkdir -p $(EXTERNAL_CRDS_DIR)/spark-operator/
229+
cp -rf $(SPARKOPERATOR_ROOT)/config/crd/bases/* $(EXTERNAL_CRDS_DIR)/spark-operator/
230+
224231
.PHONY: dep-crds
225-
dep-crds: mpi-operator-crd kf-training-operator-crd kf-trainer-crd kf-trainer-runtimes ray-operator-crd jobset-operator-crd leaderworkerset-operator-crd cluster-autoscaler-crd appwrapper-crd appwrapper-manifests kf-training-operator-manifests ray-operator-manifests kf-trainer-manifests ## Copy the CRDs from the external operators to the dep-crds directory.
232+
dep-crds: mpi-operator-crd kf-training-operator-crd kf-trainer-crd kf-trainer-runtimes ray-operator-crd jobset-operator-crd leaderworkerset-operator-crd cluster-autoscaler-crd appwrapper-crd appwrapper-manifests kf-training-operator-manifests ray-operator-manifests kf-trainer-manifests spark-operator-crd ## Copy the CRDs from the external operators to the dep-crds directory.
226233
@echo "Copying CRDs from external operators to dep-crds directory"
227234

228235
.PHONY: kueuectl-docs

Makefile-test.mk

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ run-test-e2e-singlecluster-%:
140140
KUBEFLOW_TRAINER_VERSION=$(KUBEFLOW_TRAINER_VERSION) \
141141
LEADERWORKERSET_VERSION=$(LEADERWORKERSET_VERSION) \
142142
KUBERAY_VERSION=$(KUBERAY_VERSION) RAY_VERSION=$(RAY_VERSION) RAYMINI_VERSION=$(RAYMINI_VERSION) USE_RAY_FOR_TESTS=$(USE_RAY_FOR_TESTS) \
143+
SPARKOPERATOR_VERSION=$(SPARKOPERATOR_VERSION) SPARKOPERATOR_ROOT=$(SPARKOPERATOR_ROOT) \
143144
KIND_CLUSTER_FILE="kind-cluster.yaml" E2E_TARGET_FOLDER="singlecluster" \
144145
TEST_LOG_LEVEL=$(TEST_LOG_LEVEL) \
145146
E2E_RUN_ONLY_ENV=$(E2E_RUN_ONLY_ENV) \

hack/e2e-common.sh

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ if [[ -n ${LEADERWORKERSET_VERSION:-} ]]; then
6868
export LEADERWORKERSET_IMAGE=registry.k8s.io/lws/lws:${LEADERWORKERSET_VERSION}
6969
fi
7070

71+
if [[ -n ${SPARKOPERATOR_VERSION:-} ]]; then
72+
export SPARKOPERATOR_IMAGE="ghcr.io/kubeflow/spark-operator/controller:${SPARKOPERATOR_VERSION}"
73+
fi
74+
7175
if [[ -n "${CERTMANAGER_VERSION:-}" ]]; then
7276
export CERTMANAGER_MANIFEST="https://github.com/cert-manager/cert-manager/releases/download/${CERTMANAGER_VERSION}/cert-manager.yaml"
7377
fi
@@ -141,6 +145,10 @@ function prepare_docker_images {
141145
if [[ -n ${LEADERWORKERSET_VERSION:-} ]]; then
142146
docker pull "${LEADERWORKERSET_IMAGE}"
143147
fi
148+
# FIXME: pull released version
149+
if [[ -n ${SPARKOPERATOR_VERSION:-} ]]; then
150+
(cd "${SPARKOPERATOR_ROOT}" && chmod +x entrypoint.sh && make docker-build IMAGE="${SPARKOPERATOR_IMAGE}")
151+
fi
144152
}
145153

146154
# $1 cluster
@@ -184,6 +192,9 @@ function kind_load {
184192
if [[ -n ${KUBERAY_VERSION:-} ]]; then
185193
install_kuberay "$1" "$2"
186194
fi
195+
if [[ -n ${SPARKOPERATOR_VERSION:-} ]]; then
196+
install_sparkoperator "$1" "$2"
197+
fi
187198
if [[ -n ${CERTMANAGER_VERSION:-} ]]; then
188199
install_cert_manager "$2"
189200
fi
@@ -343,6 +354,18 @@ function install_lws {
343354
kubectl apply --kubeconfig="$2" --server-side -f "${LEADERWORKERSET_MANIFEST}"
344355
}
345356

357+
# $1 cluster name
358+
# $2 kubeconfig option
359+
function install_sparkoperator {
360+
cluster_kind_load_image "${1}" "${SPARKOPERATOR_IMAGE}"
361+
# FIXME: replace with released helm chart
362+
${HELM} install spark-operator "${SPARKOPERATOR_ROOT}/charts/spark-operator-chart" \
363+
--namespace spark-operator \
364+
--create-namespace \
365+
--set image.tag="${SPARKOPERATOR_VERSION}" \
366+
--set 'spark.jobNamespaces[0]='
367+
}
368+
346369
# $1 kubeconfig option
347370
function install_cert_manager {
348371
kubectl apply --kubeconfig="$1" --server-side -f "${CERTMANAGER_MANIFEST}"

test/e2e/config/default/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ managerConfig:
6565
- "deployment"
6666
- "statefulset"
6767
- "leaderworkerset.x-k8s.io/leaderworkerset"
68+
- "sparkoperator.k8s.io/sparkapplication"
6869
featureGates:
6970
MultiKueueBatchJobWithManagedBy: true
7071
LocalQueueMetrics: true
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
Copyright The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package e2e
18+
19+
import (
20+
"github.com/onsi/ginkgo/v2"
21+
"github.com/onsi/gomega"
22+
corev1 "k8s.io/api/core/v1"
23+
rbacv1 "k8s.io/api/rbac/v1"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/types"
26+
"sigs.k8s.io/controller-runtime/pkg/client"
27+
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
28+
sparkapplication "sigs.k8s.io/kueue/pkg/controller/jobs/sparkapplication"
29+
testing "sigs.k8s.io/kueue/pkg/util/testing"
30+
sparkapplicationtesting "sigs.k8s.io/kueue/pkg/util/testingjobs/sparkapplication"
31+
"sigs.k8s.io/kueue/test/util"
32+
33+
sparkv1beta2 "github.com/kubeflow/spark-operator/v2/api/v1beta2"
34+
)
35+
36+
var _ = ginkgo.Describe("SparkApplication integration", func() {
37+
const (
38+
resourceFlavorName = "sparkapplication-rf"
39+
clusterQueueName = "sparkapplication-cq"
40+
localQueueName = "sparkapplication-lq"
41+
serviceAccountName = "sparkapplication-sa"
42+
roleBindingName = "sparkapplication-sa-edit"
43+
)
44+
45+
var (
46+
ns *corev1.Namespace
47+
rf *kueue.ResourceFlavor
48+
cq *kueue.ClusterQueue
49+
lq *kueue.LocalQueue
50+
sa *corev1.ServiceAccount
51+
)
52+
53+
ginkgo.BeforeEach(func() {
54+
ns = util.CreateNamespaceFromPrefixWithLog(ctx, k8sClient, "sparkapplication-e2e-")
55+
56+
rf = testing.MakeResourceFlavor(resourceFlavorName).Obj()
57+
util.MustCreate(ctx, k8sClient, rf)
58+
59+
cq = testing.MakeClusterQueue(clusterQueueName).
60+
ResourceGroup(
61+
*testing.MakeFlavorQuotas(resourceFlavorName).
62+
Resource(corev1.ResourceCPU, "5").
63+
Resource(corev1.ResourceMemory, "10Gi").
64+
Obj(),
65+
).
66+
Preemption(kueue.ClusterQueuePreemption{
67+
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
68+
}).
69+
Obj()
70+
util.MustCreate(ctx, k8sClient, cq)
71+
72+
lq = testing.MakeLocalQueue(localQueueName, ns.Name).ClusterQueue(cq.Name).Obj()
73+
util.MustCreate(ctx, k8sClient, lq)
74+
75+
sa = &corev1.ServiceAccount{
76+
ObjectMeta: metav1.ObjectMeta{
77+
Name: serviceAccountName,
78+
Namespace: ns.Name,
79+
},
80+
}
81+
util.MustCreate(ctx, k8sClient, sa)
82+
83+
rb := &rbacv1.RoleBinding{
84+
ObjectMeta: metav1.ObjectMeta{
85+
Name: roleBindingName,
86+
Namespace: ns.Name,
87+
},
88+
Subjects: []rbacv1.Subject{
89+
{
90+
Kind: rbacv1.ServiceAccountKind,
91+
Name: serviceAccountName,
92+
Namespace: ns.Name,
93+
},
94+
},
95+
RoleRef: rbacv1.RoleRef{
96+
Kind: "ClusterRole",
97+
Name: "edit",
98+
APIGroup: rbacv1.GroupName,
99+
},
100+
}
101+
util.MustCreate(ctx, k8sClient, rb)
102+
})
103+
104+
ginkgo.AfterEach(func() {
105+
gomega.Expect(util.DeleteAllSparkApplicationsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
106+
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
107+
util.ExpectObjectToBeDeleted(ctx, k8sClient, cq, true)
108+
util.ExpectObjectToBeDeleted(ctx, k8sClient, rf, true)
109+
util.ExpectAllPodsInNamespaceDeleted(ctx, k8sClient, ns)
110+
})
111+
112+
ginkgo.When("SparkApplication created", func() {
113+
ginkgo.It("should run if admitted", func() {
114+
sparkApp := sparkapplicationtesting.MakeSparkApplication("sparkapplication-simple", ns.Name).
115+
DriverServiceAccount(sa.Name).
116+
DriverCoreRequest("1").
117+
DriverMemoryRequest("512m"). // 512MB
118+
ExecutorServiceAccount(sa.Name).
119+
ExecutorCoreRequest("1").
120+
ExecutorMemoryRequest("512m"). // 512MB
121+
ExecutorInstances(1).
122+
Queue(lq.Name).
123+
Obj()
124+
125+
ginkgo.By("Create a SparkApplicatioon", func() {
126+
util.MustCreate(ctx, k8sClient, sparkApp)
127+
})
128+
129+
ginkgo.By("Waiting for SparkApplication to be Running", func() {
130+
createdSparkApp := &sparkv1beta2.SparkApplication{}
131+
132+
gomega.Eventually(func(g gomega.Gomega) {
133+
// Fetch the SparkApplication object
134+
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(sparkApp), createdSparkApp)
135+
g.Expect(err).ToNot(gomega.HaveOccurred(), "Failed to fetch SparkApplication")
136+
137+
// Ensure SparkApplication's AppState.State is Running
138+
g.Expect(createdSparkApp.Status.AppState.State).To(gomega.Equal(sparkv1beta2.ApplicationStateRunning))
139+
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
140+
})
141+
142+
wlLookupKey := types.NamespacedName{
143+
Name: sparkapplication.GetWorkloadNameForSparkApplication(sparkApp.Name, sparkApp.UID),
144+
Namespace: ns.Name,
145+
}
146+
createdWorkload := &kueue.Workload{}
147+
ginkgo.By("Check workload is created", func() {
148+
gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
149+
})
150+
151+
ginkgo.By("Check workload is admitted", func() {
152+
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, createdWorkload)
153+
})
154+
155+
ginkgo.By("Check workload is finished", func() {
156+
util.ExpectWorkloadToFinish(ctx, k8sClient, wlLookupKey)
157+
})
158+
159+
ginkgo.By("Delete the SparkApplication", func() {
160+
util.ExpectObjectToBeDeleted(ctx, k8sClient, sparkApp, true)
161+
})
162+
163+
ginkgo.By("Check workload is deleted", func() {
164+
util.ExpectObjectToBeDeletedWithTimeout(ctx, k8sClient, createdWorkload, false, util.LongTimeout)
165+
})
166+
})
167+
})
168+
})

test/e2e/singlecluster/suite_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ var _ = ginkgo.BeforeSuite(func() {
7676
util.WaitForLeaderWorkerSetAvailability(ctx, k8sClient)
7777
util.WaitForAppWrapperAvailability(ctx, k8sClient)
7878
util.WaitForKubeFlowTrainingOperatorAvailability(ctx, k8sClient)
79+
util.WaitForSparkOperatorAvailability(ctx, k8sClient)
7980
ginkgo.GinkgoLogr.Info(
8081
"Kueue and all required operators are available in the cluster",
8182
"waitingTime", time.Since(waitForAvailableStart),

test/util/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ var (
6565
MpiOperatorCrds = filepath.Join(GetProjectBaseDir(), "dep-crds", "mpi-operator")
6666
AppWrapperCrds = filepath.Join(GetProjectBaseDir(), "dep-crds", "appwrapper-crds")
6767
RayOperatorCrds = filepath.Join(GetProjectBaseDir(), "dep-crds", "ray-operator-crds")
68+
SparkOperatorCrds = filepath.Join(GetProjectBaseDir(), "dep-crds", "spark-operator-crds")
6869
WebhookPath = filepath.Join(GetProjectBaseDir(), "config", "components", "webhook")
6970
)
7071

test/util/e2e.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
cmv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
3131
"github.com/google/go-cmp/cmp/cmpopts"
3232
kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
33+
sparkv1beta2 "github.com/kubeflow/spark-operator/v2/api/v1beta2"
3334
kftrainer "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1"
3435
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
3536
"github.com/onsi/ginkgo/v2"
@@ -173,6 +174,9 @@ func CreateClientUsingCluster(kContext string) (client.WithWatch, *rest.Config,
173174
err = kftrainer.AddToScheme(scheme.Scheme)
174175
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
175176

177+
err = sparkv1beta2.AddToScheme(scheme.Scheme)
178+
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
179+
176180
client, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme.Scheme})
177181
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
178182
return client, cfg, nil
@@ -321,6 +325,15 @@ func WaitForKubeFlowTrainingOperatorAvailability(ctx context.Context, k8sClient
321325
verifyNoControllerRestarts(ctx, k8sClient, kftoKey)
322326
}
323327

328+
func WaitForSparkOperatorAvailability(ctx context.Context, k8sClient client.Client) {
329+
sparkctrKey := types.NamespacedName{Namespace: "spark-operator", Name: "spark-operator-controller"}
330+
waitForDeploymentAvailability(ctx, k8sClient, sparkctrKey)
331+
verifyNoControllerRestarts(ctx, k8sClient, sparkctrKey)
332+
sparkwhKey := types.NamespacedName{Namespace: "spark-operator", Name: "spark-operator-webhook"}
333+
waitForDeploymentAvailability(ctx, k8sClient, sparkwhKey)
334+
verifyNoControllerRestarts(ctx, k8sClient, sparkwhKey)
335+
}
336+
324337
func WaitForKubeFlowMPIOperatorAvailability(ctx context.Context, k8sClient client.Client) {
325338
kftoKey := types.NamespacedName{Namespace: "mpi-operator", Name: "mpi-operator"}
326339
waitForDeploymentAvailability(ctx, k8sClient, kftoKey)

test/util/util.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/go-logr/logr"
3232
"github.com/google/go-cmp/cmp/cmpopts"
3333
kfmpi "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
34+
sparkv1beta2 "github.com/kubeflow/spark-operator/v2/api/v1beta2"
3435
kftrainerapi "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1"
3536
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
3637
"github.com/onsi/ginkgo/v2"
@@ -196,6 +197,10 @@ func DeleteAllRayJobsInNamespace(ctx context.Context, c client.Client, ns *corev
196197
return deleteAllObjectsInNamespace(ctx, c, ns, &rayv1.RayJob{})
197198
}
198199

200+
func DeleteAllSparkApplicationsInNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) error {
201+
return deleteAllObjectsInNamespace(ctx, c, ns, &sparkv1beta2.SparkApplication{})
202+
}
203+
199204
func DeleteAllPodsInNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) error {
200205
return deleteAllPodsInNamespace(ctx, c, ns, 2)
201206
}

0 commit comments

Comments
 (0)