Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ private[spark] class SparkSubmit extends Logging {
val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
val isKubernetesClusterModeDriver = isKubernetesClient &&
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
// TODO: does client/cluster mode matter here?
val isArmada = clusterManager == ARMADA
val isArmadaCluster = clusterManager == ARMADA && deployMode == CLUSTER
// TODO: Support armada & client?
val isCustomClasspathInClusterModeDisallowed =
!sparkConf.get(ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE) &&
args.proxyUser != null &&
Expand Down Expand Up @@ -706,7 +706,7 @@ private[spark] class SparkSubmit extends Logging {
mergeFn = Some(mergeFileLists(_, _))),

// Other options
OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES,
OptionAssigner(args.numExecutors, YARN | KUBERNETES | ARMADA, ALL_DEPLOY_MODES,
confKey = EXECUTOR_INSTANCES.key),
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = EXECUTOR_CORES.key),
Expand Down Expand Up @@ -878,10 +878,28 @@ private[spark] class SparkSubmit extends Logging {
}
}

if (isArmada) {
// FIXME: Make sure we populate what we need here!
if (isArmadaCluster) {
childMainClass = ARMADA_CLUSTER_SUBMIT_CLASS
childArgs ++= Array("--class", args.mainClass)
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
if (args.isPython) {
childArgs ++= Array("--primary-py-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
childArgs ++= Array("--primary-r-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
}
else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
}
} else {
childArgs ++= Array("--main-class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += "--arg" += arg
}
}
}

// Load any properties specified through --conf and the default properties file
Expand Down
4 changes: 4 additions & 0 deletions examples/submitSparkPi.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

# run spark pi app on armada
bin/spark-submit --master armada://localhost:30002 --deploy-mode cluster --name spark-pi --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=2 --conf spark.kubernetes.container.image=spark:testing local:///opt/spark/examples/jars/spark-examples.jar 100
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
*/
package org.apache.spark.deploy.armada.submit

/*
import scala.collection.mutable

/*
import scala.jdk.CollectionConverters._
import scala.util.control.Breaks._
import scala.util.control.NonFatal
Expand All @@ -30,6 +31,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
*/
import _root_.io.armadaproject.armada.ArmadaClient
import k8s.io.api.core.v1.generated.{Container, EnvVar, PodSpec, ResourceRequirements}
import k8s.io.api.core.v1.generated.{EnvVarSource, ObjectFieldSelector}
import k8s.io.apimachinery.pkg.api.resource.generated.Quantity

import org.apache.spark.SparkConf
Expand All @@ -52,7 +54,6 @@ import org.apache.spark.util.Utils
* @param mainClass the main class of the application to run
* @param driverArgs arguments to the driver
*/
/*
private[spark] case class ClientArguments(
mainAppResource: MainAppResource,
mainClass: String,
Expand Down Expand Up @@ -94,7 +95,6 @@ private[spark] object ClientArguments {
proxyUser)
}
}
*/

/**
* Submits a Spark application to run on Kubernetes by creating the driver pod and starting a
Expand Down Expand Up @@ -242,14 +242,15 @@ private[spark] class ArmadaClientApplication extends SparkApplication {

override def start(args: Array[String], conf: SparkConf): Unit = {
log("ArmadaClientApplication.start() called!")
run(conf)
val parsedArguments = ClientArguments.fromCommandLineArgs(args)
run(parsedArguments, conf)
}

private def run(sparkConf: SparkConf): Unit = {
private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
val (host, port) = ArmadaUtils.parseMasterUrl(sparkConf.get("spark.master"))
log(s"host is $host, port is $port")
var armadaClient = new ArmadaClient(ArmadaClient.GetChannel(host, port))
if (armadaClient.SubmitHealth().isServing) {
val armadaClient = ArmadaClient(host, port)
if (armadaClient.submitHealth().isServing) {
log("Submit health good!")
} else {
log("Could not contact Armada!")
Expand All @@ -258,7 +259,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication {

// # FIXME: Need to check how this is launched whether to submit a job or
// to turn into driver / cluster manager mode.
val jobId = submitDriverJob(armadaClient, sparkConf)
val jobId = submitDriverJob(armadaClient, clientArguments, sparkConf)
log(s"Got job ID: $jobId")
// For constructing the app ID, we can't use the Spark application name, as the app ID is going
// to be added as a label to group resources belonging to the same application. Label values are
Expand Down Expand Up @@ -296,26 +297,46 @@ private[spark] class ArmadaClientApplication extends SparkApplication {
()
}

private def submitDriverJob(armadaClient: ArmadaClient, conf: SparkConf): String = {
private def submitDriverJob(armadaClient: ArmadaClient, clientArguments: ClientArguments,
conf: SparkConf): String = {
val source = EnvVarSource().withFieldRef(ObjectFieldSelector()
.withApiVersion("v1").withFieldPath("status.podIP"))
val envVars = Seq(
EnvVar().withName("SPARK_DRIVER_BIND_ADDRESS").withValue("0.0.0.0:1234")
new EnvVar().withName("SPARK_DRIVER_BIND_ADDRESS").withValueFrom(source)
)

val primaryResource = clientArguments.mainAppResource match {
case JavaMainAppResource(Some(resource)) => Seq(resource)
case PythonMainAppResource(resource) => Seq(resource)
case RMainAppResource(resource) => Seq(resource)
case _ => Seq()
}

val javaOptions = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=0.0.0.0:5005"
val driverContainer = Container()
.withName("spark-driver")
.withImagePullPolicy("IfNotPresent")
.withImage("spark:testing")
.withImage(conf.get("spark.kubernetes.container.image"))
.withEnv(envVars)
.withCommand(Seq("/opt/entrypoint.sh"))
.withArgs(
Seq(
"driver",
"--verbose",
"--class",
conf.get("spark.app.name"),
clientArguments.mainClass,
"--master",
"armada://armada-server.armada.svc.cluster.local:50051",
"submit"
)
"--conf",
s"spark.executor.instances=${conf.get("spark.executor.instances")}",
"--conf",
s"spark.kubernetes.container.image=${conf.get("spark.kubernetes.container.image")}",
"--conf",
"spark.driver.port=7078",
"--conf",
s"spark.driver.extraJavaOptions=$javaOptions"

) ++ primaryResource ++ clientArguments.driverArgs
)
.withResources( // FIXME: What are reasonable requests/limits for spark drivers?
ResourceRequirements(
Expand All @@ -338,16 +359,16 @@ private[spark] class ArmadaClientApplication extends SparkApplication {
val driverJob = api.submit
.JobSubmitRequestItem()
.withPriority(0)
.withNamespace("personal-anonymous")
.withNamespace("default")
.withPodSpec(podSpec)

// FIXME: Plumb config for queue, job-set-id
val jobSubmitResponse = armadaClient.SubmitJobs("test", "spark-test-1", Seq(driverJob))
val jobSubmitResponse = armadaClient.submitJobs("test", "driver", Seq(driverJob))

log(s"Job Submit Response $jobSubmitResponse")
for (respItem <- jobSubmitResponse.jobResponseItems) {
log(s"JobID: ${respItem.jobId} Error: ${respItem.error} ")
val error = if (respItem.error == "") "None" else respItem.error
log(s"JobID: ${respItem.jobId} Error: ${error}")
}
jobSubmitResponse.jobResponseItems(0).jobId
jobSubmitResponse.jobResponseItems.head.jobId
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.armada.submit

import org.apache.spark.annotation.{DeveloperApi, Since, Stable}

/**
* :: DeveloperApi ::
*
* All traits and classes in this file are used by K8s module and Spark K8s operator.
*/

@Stable
@DeveloperApi
@Since("2.3.0")
sealed trait MainAppResource

@Stable
@DeveloperApi
@Since("2.4.0")
sealed trait NonJVMResource

@Stable
@DeveloperApi
@Since("3.0.0")
case class JavaMainAppResource(primaryResource: Option[String])
extends MainAppResource

@Stable
@DeveloperApi
@Since("2.4.0")
case class PythonMainAppResource(primaryResource: String)
extends MainAppResource with NonJVMResource

@Stable
@DeveloperApi
@Since("2.4.0")
case class RMainAppResource(primaryResource: String)
extends MainAppResource with NonJVMResource
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rpc.{RpcAddress, RpcCallContext}
import org.apache.spark.scheduler.{ExecutorDecommission, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.getInitialTargetExecutorNumber


// TODO: Implement for Armada
Expand All @@ -49,7 +50,7 @@ private[spark] class ArmadaClusterSchedulerBackend(
}


private def submitJob(): Unit = {
private def submitJob(executorId: Int): Unit = {

val urlArray = masterURL.split(":")
// Remove leading "/"'s
Expand All @@ -63,7 +64,7 @@ private[spark] class ArmadaClusterSchedulerBackend(
val source = EnvVarSource().withFieldRef(ObjectFieldSelector()
.withApiVersion("v1").withFieldPath("status.podIP"))
val envVars = Seq(
EnvVar().withName("SPARK_EXECUTOR_ID").withValue("1"),
EnvVar().withName("SPARK_EXECUTOR_ID").withValue(executorId.toString),
EnvVar().withName("SPARK_RESOURCE_PROFILE_ID").withValue("0"),
EnvVar().withName("SPARK_EXECUTOR_POD_NAME").withValue("test-pod-name"),
EnvVar().withName("SPARK_APPLICATION_ID").withValue("test_spark_app_id"),
Expand All @@ -75,7 +76,7 @@ private[spark] class ArmadaClusterSchedulerBackend(
val executorContainer = Container()
.withName("spark-executor")
.withImagePullPolicy("IfNotPresent")
.withImage("spark:testing")
.withImage(conf.get("spark.kubernetes.container.image"))
.withEnv(envVars)
.withCommand(Seq("/opt/entrypoint.sh"))
.withArgs(
Expand Down Expand Up @@ -107,8 +108,8 @@ private[spark] class ArmadaClusterSchedulerBackend(
.withNamespace("default")
.withPodSpec(podSpec)

val client = new ArmadaClient(ArmadaClient.GetChannel(host, port))
val jobSubmitResponse = client.SubmitJobs("test", "executor", Seq(testJob))
val client = ArmadaClient(host, port)
val jobSubmitResponse = client.submitJobs("test", "executor", Seq(testJob))

logInfo(s"Driver Job Submit Response")
for (respItem <- jobSubmitResponse.jobResponseItems) {
Expand All @@ -117,7 +118,8 @@ private[spark] class ArmadaClusterSchedulerBackend(
}
}
override def start(): Unit = {
submitJob()
val numberOfExecutors = getInitialTargetExecutorNumber(conf)
1 to numberOfExecutors foreach {j: Int => submitJob(j)}
}

override def stop(): Unit = {}
Expand All @@ -143,7 +145,7 @@ private[spark] class ArmadaClusterSchedulerBackend(
}

private class ArmadaDriverEndpoint extends DriverEndpoint {
protected val execIDRequester = new HashMap[RpcAddress, String]
private val execIDRequester = new HashMap[RpcAddress, String]

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] =
super.receiveAndReply(context)
Expand Down
Loading