Skip to content

Commit a40533a

Browse files
committed
Paved simple e2e test for SparkApplication integration
1 parent 23aea3a commit a40533a

File tree

10 files changed

+334
-1
lines changed

10 files changed

+334
-1
lines changed

Makefile-deps.mk

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ KUBEFLOW_MPI_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/ku
3838
KUBERAY_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/ray-project/kuberay/ray-operator)
3939
APPWRAPPER_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/project-codeflare/appwrapper)
4040
LEADERWORKERSET_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" sigs.k8s.io/lws)
41+
SPARKOPERATOR_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/kubeflow/spark-operator/v2)
4142
CERTMANAGER_VERSION=$(shell $(GO_CMD) list -m -f "{{.Version}}" github.com/cert-manager/cert-manager)
4243

4344
GOLANGCI_LINT = $(BIN_DIR)/golangci-lint
@@ -64,6 +65,7 @@ JOBSET_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" sigs.k8s.io/
6465
CLUSTER_AUTOSCALER_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" k8s.io/autoscaler/cluster-autoscaler/apis)
6566
APPWRAPPER_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" github.com/project-codeflare/appwrapper)
6667
LEADERWORKERSET_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" sigs.k8s.io/lws)
68+
SPARKOPERATOR_ROOT = $(shell $(GO_CMD) list -m -mod=readonly -f "{{.Dir}}" github.com/kubeflow/spark-operator/v2)
6769

6870
##@ Tools
6971

@@ -221,8 +223,13 @@ leaderworkerset-operator-crd: ## Copy the CRDs from the leaderworkerset-operator
221223
mkdir -p $(EXTERNAL_CRDS_DIR)/leaderworkerset-operator/
222224
cp -f $(LEADERWORKERSET_ROOT)/config/crd/bases/* $(EXTERNAL_CRDS_DIR)/leaderworkerset-operator/
223225

226+
.PHONY: spark-operator-crd
227+
spark-operator-crd: ## Copy the CRDs from the spark-operator to the dep-crds directory.
228+
mkdir -p $(EXTERNAL_CRDS_DIR)/spark-operator/
229+
cp -rf $(SPARKOPERATOR_ROOT)/config/crd/bases/* $(EXTERNAL_CRDS_DIR)/spark-operator/
230+
224231
.PHONY: dep-crds
225-
dep-crds: mpi-operator-crd kf-training-operator-crd kf-trainer-crd kf-trainer-runtimes ray-operator-crd jobset-operator-crd leaderworkerset-operator-crd cluster-autoscaler-crd appwrapper-crd appwrapper-manifests kf-training-operator-manifests ray-operator-manifests kf-trainer-manifests ## Copy the CRDs from the external operators to the dep-crds directory.
232+
dep-crds: mpi-operator-crd kf-training-operator-crd kf-trainer-crd kf-trainer-runtimes ray-operator-crd jobset-operator-crd leaderworkerset-operator-crd cluster-autoscaler-crd appwrapper-crd appwrapper-manifests kf-training-operator-manifests ray-operator-manifests kf-trainer-manifests spark-operator-crd ## Copy the CRDs from the external operators to the dep-crds directory.
226233
@echo "Copying CRDs from external operators to dep-crds directory"
227234

228235
.PHONY: kueuectl-docs

Makefile-test.mk

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ run-test-e2e-singlecluster-%:
140140
KUBEFLOW_TRAINER_VERSION=$(KUBEFLOW_TRAINER_VERSION) \
141141
LEADERWORKERSET_VERSION=$(LEADERWORKERSET_VERSION) \
142142
KUBERAY_VERSION=$(KUBERAY_VERSION) RAY_VERSION=$(RAY_VERSION) RAYMINI_VERSION=$(RAYMINI_VERSION) USE_RAY_FOR_TESTS=$(USE_RAY_FOR_TESTS) \
143+
SPARKOPERATOR_VERSION=$(SPARKOPERATOR_VERSION) SPARKOPERATOR_ROOT=$(SPARKOPERATOR_ROOT) \
143144
KIND_CLUSTER_FILE="kind-cluster.yaml" E2E_TARGET_FOLDER="singlecluster" \
144145
TEST_LOG_LEVEL=$(TEST_LOG_LEVEL) \
145146
E2E_RUN_ONLY_ENV=$(E2E_RUN_ONLY_ENV) \

hack/e2e-common.sh

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ if [[ -n ${LEADERWORKERSET_VERSION:-} ]]; then
6868
export LEADERWORKERSET_IMAGE=registry.k8s.io/lws/lws:${LEADERWORKERSET_VERSION}
6969
fi
7070

71+
if [[ -n ${SPARKOPERATOR_VERSION:-} ]]; then
72+
export SPARKOPERATOR_IMAGE="ghcr.io/kubeflow/spark-operator/controller:${SPARKOPERATOR_VERSION}"
73+
fi
74+
7175
if [[ -n "${CERTMANAGER_VERSION:-}" ]]; then
7276
export CERTMANAGER_MANIFEST="https://github.com/cert-manager/cert-manager/releases/download/${CERTMANAGER_VERSION}/cert-manager.yaml"
7377
fi
@@ -141,6 +145,10 @@ function prepare_docker_images {
141145
if [[ -n ${LEADERWORKERSET_VERSION:-} ]]; then
142146
docker pull "${LEADERWORKERSET_IMAGE}"
143147
fi
148+
# FIXME: pull released version
149+
if [[ -n ${SPARKOPERATOR_VERSION:-} ]]; then
150+
(cd "${SPARKOPERATOR_ROOT}" && chmod +x entrypoint.sh && make docker-build IMAGE="${SPARKOPERATOR_IMAGE}")
151+
fi
144152
}
145153

146154
# $1 cluster
@@ -184,6 +192,9 @@ function kind_load {
184192
if [[ -n ${KUBERAY_VERSION:-} ]]; then
185193
install_kuberay "$1" "$2"
186194
fi
195+
if [[ -n ${SPARKOPERATOR_VERSION:-} ]]; then
196+
install_sparkoperator "$1" "$2"
197+
fi
187198
if [[ -n ${CERTMANAGER_VERSION:-} ]]; then
188199
install_cert_manager "$2"
189200
fi
@@ -343,6 +354,18 @@ function install_lws {
343354
kubectl apply --kubeconfig="$2" --server-side -f "${LEADERWORKERSET_MANIFEST}"
344355
}
345356

357+
# $1 cluster name
358+
# $2 kubeconfig option
359+
function install_sparkoperator {
360+
cluster_kind_load_image "${1}" "${SPARKOPERATOR_IMAGE}"
361+
# FIXME: replace with released helm chart
362+
${HELM} install spark-operator "${SPARKOPERATOR_ROOT}/charts/spark-operator-chart" \
363+
--namespace spark-operator \
364+
--create-namespace \
365+
--set image.tag="${SPARKOPERATOR_VERSION}" \
366+
--set 'spark.jobNamespaces[0]='
367+
}
368+
346369
# $1 kubeconfig option
347370
function install_cert_manager {
348371
kubectl apply --kubeconfig="$1" --server-side -f "${CERTMANAGER_MANIFEST}"
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package sparkapplication
2+
3+
import (
4+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
5+
"k8s.io/utils/ptr"
6+
7+
controllerconstants "sigs.k8s.io/kueue/pkg/controller/constants"
8+
9+
sparkappv1beta2 "github.com/kubeflow/spark-operator/v2/api/v1beta2"
10+
)
11+
12+
// SparkApplicationWrapper wraps a SparkApplication.
13+
type SparkApplicationWrapper struct {
14+
sparkappv1beta2.SparkApplication
15+
}
16+
17+
// MakeSparkApplication creates a wrapper for SparkApplication with some default values
18+
func MakeSparkApplication(name, ns string) *SparkApplicationWrapper {
19+
return &SparkApplicationWrapper{sparkappv1beta2.SparkApplication{
20+
ObjectMeta: metav1.ObjectMeta{
21+
Name: name,
22+
Namespace: ns,
23+
},
24+
Spec: sparkappv1beta2.SparkApplicationSpec{
25+
Type: sparkappv1beta2.SparkApplicationTypeScala,
26+
Mode: sparkappv1beta2.DeployModeCluster,
27+
SparkVersion: "3.5.3",
28+
Image: ptr.To("spark:3.5.3"),
29+
MainApplicationFile: ptr.To("local:///opt/spark/examples/jars/spark-examples.jar"),
30+
MainClass: ptr.To("org.apache.spark.examples.SparkPi"),
31+
Arguments: []string{"1000"},
32+
Driver: sparkappv1beta2.DriverSpec{
33+
SparkPodSpec: sparkappv1beta2.SparkPodSpec{
34+
Memory: ptr.To("512Mi"),
35+
ServiceAccount: ptr.To("spark-operator-spark"),
36+
},
37+
CoreRequest: ptr.To("100m"),
38+
},
39+
Executor: sparkappv1beta2.ExecutorSpec{
40+
SparkPodSpec: sparkappv1beta2.SparkPodSpec{
41+
Memory: ptr.To("512Mi"),
42+
ServiceAccount: ptr.To("spark-operator-spark"),
43+
},
44+
CoreRequest: ptr.To("100m"),
45+
Instances: ptr.To[int32](1),
46+
DeleteOnTermination: ptr.To(false),
47+
},
48+
},
49+
}}
50+
}
51+
52+
// Label sets the label of the SparkApplication.
53+
func (w *SparkApplicationWrapper) Label(key, value string) *SparkApplicationWrapper {
54+
if w.Labels == nil {
55+
w.Labels = make(map[string]string)
56+
}
57+
w.Labels[key] = value
58+
return w
59+
}
60+
61+
// DriverCoreRequest sets the driver core request.
62+
func (w *SparkApplicationWrapper) DriverCoreRequest(q string) *SparkApplicationWrapper {
63+
w.Spec.Driver.CoreRequest = ptr.To(q)
64+
return w
65+
}
66+
67+
// DriverMemoryRequest sets the driver memory request.
68+
// Note: the string in Java format, e.g. "512m", "2g".
69+
func (w *SparkApplicationWrapper) DriverMemoryRequest(q string) *SparkApplicationWrapper {
70+
w.Spec.Driver.Memory = ptr.To(q)
71+
return w
72+
}
73+
74+
// ExecutorCoreRequest sets the executor core request.
75+
func (w *SparkApplicationWrapper) ExecutorCoreRequest(q string) *SparkApplicationWrapper {
76+
w.Spec.Executor.CoreRequest = ptr.To(q)
77+
return w
78+
}
79+
80+
// ExecutorMemoryRequest sets the executor memory request.
81+
// Note: the string in Java format, e.g. "512m", "2g".
82+
func (w *SparkApplicationWrapper) ExecutorMemoryRequest(q string) *SparkApplicationWrapper {
83+
w.Spec.Executor.Memory = ptr.To(q)
84+
return w
85+
}
86+
87+
// ExecutorInstances sets the number of executor instances.
88+
func (w *SparkApplicationWrapper) ExecutorInstances(n int32) *SparkApplicationWrapper {
89+
w.Spec.Executor.Instances = ptr.To(n)
90+
return w
91+
}
92+
93+
// DriverServiceAccount sets the driver service account.
94+
func (w *SparkApplicationWrapper) DriverServiceAccount(sa string) *SparkApplicationWrapper {
95+
w.Spec.Driver.ServiceAccount = ptr.To(sa)
96+
return w
97+
}
98+
99+
// ExecutorServiceAccount sets the executor service account.
100+
func (w *SparkApplicationWrapper) ExecutorServiceAccount(sa string) *SparkApplicationWrapper {
101+
w.Spec.Executor.ServiceAccount = ptr.To(sa)
102+
return w
103+
}
104+
105+
// Queue sets the local queue name in the annotations.
106+
func (w *SparkApplicationWrapper) Queue(lq string) *SparkApplicationWrapper {
107+
return w.Label(controllerconstants.QueueLabel, lq)
108+
}
109+
110+
// Obj returns the inner SparkApplication.
111+
func (w *SparkApplicationWrapper) Obj() *sparkappv1beta2.SparkApplication {
112+
return &w.SparkApplication
113+
}

test/e2e/config/default/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ managerConfig:
6565
- "deployment"
6666
- "statefulset"
6767
- "leaderworkerset.x-k8s.io/leaderworkerset"
68+
- "sparkoperator.k8s.io/sparkapplication"
6869
featureGates:
6970
MultiKueueBatchJobWithManagedBy: true
7071
LocalQueueMetrics: true
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
Copyright The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package e2e
18+
19+
import (
20+
"github.com/onsi/ginkgo/v2"
21+
"github.com/onsi/gomega"
22+
corev1 "k8s.io/api/core/v1"
23+
rbacv1 "k8s.io/api/rbac/v1"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/types"
26+
"sigs.k8s.io/controller-runtime/pkg/client"
27+
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
28+
sparkapplication "sigs.k8s.io/kueue/pkg/controller/jobs/sparkapplication"
29+
testing "sigs.k8s.io/kueue/pkg/util/testing"
30+
sparkapplicationtesting "sigs.k8s.io/kueue/pkg/util/testingjobs/sparkapplication"
31+
"sigs.k8s.io/kueue/test/util"
32+
33+
sparkv1beta2 "github.com/kubeflow/spark-operator/v2/api/v1beta2"
34+
)
35+
36+
var _ = ginkgo.Describe("SparkApplication integration", func() {
37+
const (
38+
resourceFlavorName = "sparkapplication-rf"
39+
clusterQueueName = "sparkapplication-cq"
40+
localQueueName = "sparkapplication-lq"
41+
serviceAccountName = "sparkapplication-sa"
42+
roleBindingName = "sparkapplication-sa-edit"
43+
)
44+
45+
var (
46+
ns *corev1.Namespace
47+
rf *kueue.ResourceFlavor
48+
cq *kueue.ClusterQueue
49+
lq *kueue.LocalQueue
50+
sa *corev1.ServiceAccount
51+
)
52+
53+
ginkgo.BeforeEach(func() {
54+
ns = util.CreateNamespaceFromPrefixWithLog(ctx, k8sClient, "sparkapplication-e2e-")
55+
56+
rf = testing.MakeResourceFlavor(resourceFlavorName).Obj()
57+
util.MustCreate(ctx, k8sClient, rf)
58+
59+
cq = testing.MakeClusterQueue(clusterQueueName).
60+
ResourceGroup(
61+
*testing.MakeFlavorQuotas(resourceFlavorName).
62+
Resource(corev1.ResourceCPU, "5").
63+
Resource(corev1.ResourceMemory, "10Gi").
64+
Obj(),
65+
).
66+
Preemption(kueue.ClusterQueuePreemption{
67+
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
68+
}).
69+
Obj()
70+
util.MustCreate(ctx, k8sClient, cq)
71+
72+
lq = testing.MakeLocalQueue(localQueueName, ns.Name).ClusterQueue(cq.Name).Obj()
73+
util.MustCreate(ctx, k8sClient, lq)
74+
75+
sa = &corev1.ServiceAccount{
76+
ObjectMeta: metav1.ObjectMeta{
77+
Name: serviceAccountName,
78+
Namespace: ns.Name,
79+
},
80+
}
81+
util.MustCreate(ctx, k8sClient, sa)
82+
83+
rb := &rbacv1.RoleBinding{
84+
ObjectMeta: metav1.ObjectMeta{
85+
Name: roleBindingName,
86+
Namespace: ns.Name,
87+
},
88+
Subjects: []rbacv1.Subject{
89+
{
90+
Kind: rbacv1.ServiceAccountKind,
91+
Name: serviceAccountName,
92+
Namespace: ns.Name,
93+
},
94+
},
95+
RoleRef: rbacv1.RoleRef{
96+
Kind: "ClusterRole",
97+
Name: "edit",
98+
APIGroup: rbacv1.GroupName,
99+
},
100+
}
101+
util.MustCreate(ctx, k8sClient, rb)
102+
})
103+
104+
ginkgo.AfterEach(func() {
105+
gomega.Expect(util.DeleteAllSparkApplicationsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
106+
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
107+
util.ExpectObjectToBeDeleted(ctx, k8sClient, cq, true)
108+
util.ExpectObjectToBeDeleted(ctx, k8sClient, rf, true)
109+
util.ExpectAllPodsInNamespaceDeleted(ctx, k8sClient, ns)
110+
})
111+
112+
ginkgo.When("SparkApplication created", func() {
113+
ginkgo.It("should run if admitted", func() {
114+
sparkApp := sparkapplicationtesting.MakeSparkApplication("sparkapplication-simple", ns.Name).
115+
DriverServiceAccount(sa.Name).
116+
DriverCoreRequest("1").
117+
DriverMemoryRequest("512m"). // 512MB
118+
ExecutorServiceAccount(sa.Name).
119+
ExecutorCoreRequest("1").
120+
ExecutorMemoryRequest("512m"). // 512MB
121+
ExecutorInstances(1).
122+
Queue(lq.Name).
123+
Obj()
124+
125+
ginkgo.By("Create a SparkApplicatioon", func() {
126+
util.MustCreate(ctx, k8sClient, sparkApp)
127+
})
128+
129+
ginkgo.By("Waiting for SparkApplication to be Running", func() {
130+
createdSparkApp := &sparkv1beta2.SparkApplication{}
131+
132+
gomega.Eventually(func(g gomega.Gomega) {
133+
// Fetch the SparkApplication object
134+
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(sparkApp), createdSparkApp)
135+
g.Expect(err).ToNot(gomega.HaveOccurred(), "Failed to fetch SparkApplication")
136+
137+
// Ensure SparkApplication's AppState.State is Running
138+
g.Expect(createdSparkApp.Status.AppState.State).To(gomega.Equal(sparkv1beta2.ApplicationStateRunning))
139+
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
140+
})
141+
142+
wlLookupKey := types.NamespacedName{
143+
Name: sparkapplication.GetWorkloadNameForSparkApplication(sparkApp.Name, sparkApp.UID),
144+
Namespace: ns.Name,
145+
}
146+
createdWorkload := &kueue.Workload{}
147+
ginkgo.By("Check workload is created", func() {
148+
gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
149+
})
150+
151+
ginkgo.By("Check workload is admitted", func() {
152+
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, createdWorkload)
153+
})
154+
155+
ginkgo.By("Check workload is finished", func() {
156+
util.ExpectWorkloadToFinish(ctx, k8sClient, wlLookupKey)
157+
})
158+
159+
ginkgo.By("Delete the SparkApplication", func() {
160+
util.ExpectObjectToBeDeleted(ctx, k8sClient, sparkApp, true)
161+
})
162+
163+
ginkgo.By("Check workload is deleted", func() {
164+
util.ExpectObjectToBeDeletedWithTimeout(ctx, k8sClient, createdWorkload, false, util.LongTimeout)
165+
})
166+
})
167+
})
168+
})

test/e2e/singlecluster/suite_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ var _ = ginkgo.BeforeSuite(func() {
7676
util.WaitForLeaderWorkerSetAvailability(ctx, k8sClient)
7777
util.WaitForAppWrapperAvailability(ctx, k8sClient)
7878
util.WaitForKubeFlowTrainingOperatorAvailability(ctx, k8sClient)
79+
util.WaitForSparkOperatorAvailability(ctx, k8sClient)
7980
ginkgo.GinkgoLogr.Info(
8081
"Kueue and all required operators are available in the cluster",
8182
"waitingTime", time.Since(waitForAvailableStart),

test/util/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ var (
6565
MpiOperatorCrds = filepath.Join(GetProjectBaseDir(), "dep-crds", "mpi-operator")
6666
AppWrapperCrds = filepath.Join(GetProjectBaseDir(), "dep-crds", "appwrapper-crds")
6767
RayOperatorCrds = filepath.Join(GetProjectBaseDir(), "dep-crds", "ray-operator-crds")
68+
SparkOperatorCrds = filepath.Join(GetProjectBaseDir(), "dep-crds", "spark-operator-crds")
6869
WebhookPath = filepath.Join(GetProjectBaseDir(), "config", "components", "webhook")
6970
)
7071

0 commit comments

Comments
 (0)