Skip to content

Commit 68d906e

Browse files
committed
Add executor feature step to create executor kubernetes service
1 parent 0ba9a9b commit 68d906e

File tree

7 files changed

+157
-4
lines changed

7 files changed

+157
-4
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,19 @@ private[spark] object Config extends Logging {
442442
.toSequence
443443
.createWithDefault(Nil)
444444

445+
val KUBERNETES_EXECUTOR_ENABLE_SERVICE =
446+
ConfigBuilder("spark.kubernetes.executor.enableService")
447+
.doc("If a Kubernetes service is created for the executor. " +
448+
"The executor pod creates a Kubernetes service that allows to connect to executor ports " +
449+
"via the Kubernetes service instead of the pod host IP. Connecting to such ports " +
450+
"instantly fails with 'connection refused' error once the executor got decommissioned. " +
451+
"Connection to the port via the pod host IP instead fails with a 'connection timeout' " +
452+
"which is set via NETWORK_TIMEOUT and defaults to 2 minutes. " +
453+
"The kubernetes service provides access to the executors shuffle service.")
454+
.version("4.1.0")
455+
.booleanConf
456+
.createWithDefault(false)
457+
445458
val KUBERNETES_EXECUTOR_DECOMMISSION_LABEL =
446459
ConfigBuilder("spark.kubernetes.executor.decommissionLabel")
447460
.doc("Label to apply to a pod which is being decommissioned." +
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.features
18+
19+
import scala.jdk.CollectionConverters._
20+
21+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, ServiceBuilder}
22+
23+
import org.apache.spark.SparkException
24+
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
25+
import org.apache.spark.internal.config.SHUFFLE_SERVICE_PORT
26+
27+
class ExecutorServiceFeatureStep(conf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep {
28+
private val spark_app_selector_label = "spark-app-selector"
29+
private val spark_exec_id_label = "spark-exec-id"
30+
private val service_selector_labels = Set(spark_app_selector_label, spark_exec_id_label)
31+
32+
private lazy val sparkAppSelector = getLabel(spark_app_selector_label)
33+
private lazy val sparkExecId = getLabel(spark_exec_id_label)
34+
// name length is 8 + 38 + 6 + 10 = 62
35+
// which fits in KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH = 63
36+
private lazy val serviceName = s"svc-$sparkAppSelector-exec-$sparkExecId"
37+
38+
private def getLabel(label: String): String = {
39+
val value = conf.labels.get(label)
40+
value.getOrElse(
41+
throw new SparkException(s"This feature step requires label $label")
42+
)
43+
}
44+
45+
override def configurePod(pod: SparkPod): SparkPod = {
46+
SparkPod(
47+
pod.pod,
48+
// tell the executor entry point its Kubernetes service name
49+
new ContainerBuilder(pod.container)
50+
.addNewEnv()
51+
.withName("EXECUTOR_SERVICE_NAME")
52+
.withValue(serviceName)
53+
.endEnv()
54+
.build())
55+
}
56+
57+
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
58+
val selector = conf.labels
59+
.filter { case (key, _) => service_selector_labels.contains(key) }
60+
61+
// kubernetes executor service provides access to the executor's shuffle service
62+
val portName = "spark-shuffle-service"
63+
val port = conf.sparkConf.get(SHUFFLE_SERVICE_PORT)
64+
65+
val service = new ServiceBuilder()
66+
.withNewMetadata()
67+
.withName(serviceName)
68+
.endMetadata()
69+
.withNewSpec()
70+
.withSelector(selector.asJava)
71+
.addNewPort()
72+
.withName(portName)
73+
.withPort(port)
74+
.withNewTargetPort(port)
75+
.endPort()
76+
.endSpec()
77+
.build()
78+
79+
Seq(service)
80+
}
81+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,7 @@ class ExecutorPodsAllocator(
463463
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
464464
try {
465465
addOwnerReference(createdExecutorPod, resources)
466+
kubernetesClient.resourceList(resources: _*).forceConflicts().serverSideApply()
466467
resources
467468
.filter(_.getKind == "PersistentVolumeClaim")
468469
.foreach { resource =>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,20 @@ private[spark] class KubernetesExecutorBuilder {
6565
}
6666
}
6767

68+
val optionalFeatures = Seq(
69+
Some(conf.get(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE))
70+
.filter(enabled => enabled)
71+
.map(_ => new ExecutorServiceFeatureStep(conf))
72+
).flatten
73+
6874
val allFeatures = Seq(
6975
new BasicExecutorFeatureStep(conf, secMgr, resourceProfile),
7076
new ExecutorKubernetesCredentialsFeatureStep(conf),
7177
new MountSecretsFeatureStep(conf),
7278
new EnvSecretsFeatureStep(conf),
7379
new MountVolumesFeatureStep(conf),
7480
new HadoopConfExecutorFeatureStep(conf),
75-
new LocalDirsFeatureStep(conf)) ++ userFeatures
81+
new LocalDirsFeatureStep(conf)) ++ optionalFeatures ++ userFeatures
7682

7783
val features = allFeatures.filterNot(f =>
7884
conf.get(Config.KUBERNETES_EXECUTOR_POD_EXCLUDED_FEATURE_STEPS).contains(f.getClass.getName))

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ import scala.jdk.CollectionConverters._
2525

2626
import io.fabric8.kubernetes.api.model._
2727
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException}
28-
import io.fabric8.kubernetes.client.dsl.PodResource
28+
import io.fabric8.kubernetes.client.dsl.{NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource, ServerSideApplicable}
2929
import org.mockito.{Mock, MockitoAnnotations}
3030
import org.mockito.ArgumentMatchers.{any, anyString, eq => meq}
3131
import org.mockito.Mockito.{never, times, verify, when}
3232
import org.mockito.invocation.InvocationOnMock
3333
import org.mockito.stubbing.Answer
3434
import org.scalatest.BeforeAndAfter
3535
import org.scalatest.PrivateMethodTester._
36+
import org.scalatestplus.mockito.MockitoSugar.mock
3637

3738
import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite}
3839
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSpec}
@@ -144,6 +145,11 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
144145
conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
145146
when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty)
146147
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
148+
val apl = mock[NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[HasMetadata]]
149+
val ssa = mock[ServerSideApplicable[java.util.List[HasMetadata]]]
150+
when(apl.forceConflicts()).thenReturn(ssa)
151+
when(kubernetesClient.resourceList()).thenReturn(apl)
152+
when(kubernetesClient.resourceList(any[HasMetadata]())).thenReturn(apl)
147153
when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims)
148154
when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace)
149155
when(pvcWithNamespace.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims)

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616
*/
1717
package org.apache.spark.scheduler.cluster.k8s
1818

19+
import scala.jdk.CollectionConverters.IterableHasAsScala
20+
21+
import io.fabric8.kubernetes.api.model.Service
1922
import io.fabric8.kubernetes.client.KubernetesClient
23+
import org.mockito.Mockito.mock
2024

2125
import org.apache.spark.{SecurityManager, SparkConf}
2226
import org.apache.spark.deploy.k8s._
2327
import org.apache.spark.deploy.k8s.features.KubernetesExecutorCustomFeatureConfigStep
24-
import org.apache.spark.internal.config.ConfigEntry
28+
import org.apache.spark.internal.config.{ConfigEntry, SHUFFLE_SERVICE_PORT}
2529
import org.apache.spark.resource.ResourceProfile
2630

2731
class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
@@ -65,6 +69,47 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
6569
val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
6670
new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, defaultProfile).pod
6771
}
72+
73+
test("SPARK-XXXXX: check executor kubernetes spec with service disabled by default") {
74+
val sparkConf = baseConf
75+
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
76+
val secMgr = new SecurityManager(sparkConf)
77+
val client = mock(classOf[KubernetesClient])
78+
val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
79+
val spec = new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile)
80+
81+
val containerEnvs = spec.pod.container.getEnv.asScala
82+
assert(!containerEnvs.exists(_.getName === "EXECUTOR_SERVICE_NAME"))
83+
84+
assert(spec.executorKubernetesResources.size === 0)
85+
}
86+
87+
test("SPARK-XXXXX: check executor kubernetes spec with service enabled") {
88+
Seq(None, Some(1234)).foreach { somePort =>
89+
val sparkConf = baseConf.clone.set(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE, true)
90+
somePort.foreach(sparkConf.set(SHUFFLE_SERVICE_PORT, _))
91+
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
92+
val secMgr = new SecurityManager(sparkConf)
93+
val client = mock(classOf[KubernetesClient])
94+
val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
95+
val spec = new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile)
96+
97+
val containerEnvs = spec.pod.container.getEnv.asScala
98+
assert(containerEnvs.exists(_.getName === "EXECUTOR_SERVICE_NAME"))
99+
val containerEnv = containerEnvs.filter(_.getName === "EXECUTOR_SERVICE_NAME").head
100+
assert(containerEnv.getValue === "svc-appId-exec-1")
101+
102+
assert(spec.executorKubernetesResources.size === 1)
103+
val resource = spec.executorKubernetesResources.head
104+
assert(resource.getKind === "Service")
105+
val service = resource.asInstanceOf[Service]
106+
assert(service.getMetadata.getName === "svc-appId-exec-1")
107+
assert(service.getSpec.getPorts.size() === 1)
108+
val port = service.getSpec.getPorts.get(0)
109+
assert(port.getName === "spark-shuffle-service")
110+
assert(port.getPort === somePort.getOrElse(SHUFFLE_SERVICE_PORT.defaultValue.get))
111+
}
112+
}
68113
}
69114

70115
/**

resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ case "$1" in
102102
--executor-id $SPARK_EXECUTOR_ID
103103
--cores $SPARK_EXECUTOR_CORES
104104
--app-id $SPARK_APPLICATION_ID
105-
--hostname $SPARK_EXECUTOR_POD_IP
105+
${EXECUTOR_SERVICE_NAME:+--bind-address $SPARK_EXECUTOR_POD_IP}
106+
--hostname ${EXECUTOR_SERVICE_NAME:-$SPARK_EXECUTOR_POD_IP}
106107
--resourceProfileId $SPARK_RESOURCE_PROFILE_ID
107108
--podName $SPARK_EXECUTOR_POD_NAME
108109
)

0 commit comments

Comments
 (0)