From e677a49bfe4ead749f56ab412d5fdeafb1278f92 Mon Sep 17 00:00:00 2001 From: George Jahad Date: Mon, 10 Feb 2025 15:26:29 -0800 Subject: [PATCH 01/12] initial --- .../org/apache/spark/deploy/SparkSubmit.scala | 9 ++++---- .../armada/ArmadaClientApplication.scala | 21 +++++++++++++------ .../armada/ArmadaClusterManagerBackend.scala | 2 +- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 43db943a05fd..34c62ccb48d9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -339,8 +339,8 @@ private[spark] class SparkSubmit extends Logging { val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT val isKubernetesClusterModeDriver = isKubernetesClient && sparkConf.getBoolean("spark.kubernetes.submitInDriver", false) - // TODO: does client/cluster mode matter here? - val isArmada = clusterManager == ARMADA + val isArmadaCluster = clusterManager == ARMADA && deployMode == CLUSTER + // TODO: Support armada & client? val isCustomClasspathInClusterModeDisallowed = !sparkConf.get(ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE) && args.proxyUser != null && @@ -878,10 +878,9 @@ private[spark] class SparkSubmit extends Logging { } } - if (isArmada) { - // FIXME: Make sure we populate what we need here! + if (isArmadaCluster) { childMainClass = ARMADA_CLUSTER_SUBMIT_CLASS - childArgs ++= Array("--class", args.mainClass) + // TODO: Setup childArgs } // Load any properties specified through --conf and the default properties file diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala index bfc6387ada84..93219c8f2102 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala @@ -30,6 +30,7 @@ import io.fabric8.kubernetes.client.Watcher.Action */ import _root_.io.armadaproject.armada.ArmadaClient import k8s.io.api.core.v1.generated.{Container, EnvVar, PodSpec, ResourceRequirements} +import k8s.io.api.core.v1.generated.{EnvVarSource, ObjectFieldSelector} import k8s.io.apimachinery.pkg.api.resource.generated.Quantity import org.apache.spark.SparkConf @@ -241,14 +242,14 @@ private[spark] class ArmadaClientApplication extends SparkApplication { } override def start(args: Array[String], conf: SparkConf): Unit = { - log("ArmadaClientApplication.start() called!") + log("ArmadaClientApplication.start() called! arm4") run(conf) } private def run(sparkConf: SparkConf): Unit = { val (host, port) = ArmadaUtils.parseMasterUrl(sparkConf.get("spark.master")) log(s"host is $host, port is $port") - var armadaClient = new ArmadaClient(ArmadaClient.GetChannel(host, port)) + var armadaClient = ArmadaClient(host, port) if (armadaClient.SubmitHealth().isServing) { log("Submit health good!") } else { @@ -297,9 +298,12 @@ private[spark] class ArmadaClientApplication extends SparkApplication { } private def submitDriverJob(armadaClient: ArmadaClient, conf: SparkConf): String = { + val source = EnvVarSource().withFieldRef(ObjectFieldSelector() + .withApiVersion("v1").withFieldPath("status.podIP")) val envVars = Seq( - EnvVar().withName("SPARK_DRIVER_BIND_ADDRESS").withValue("0.0.0.0:1234") + new EnvVar().withName("SPARK_DRIVER_BIND_ADDRESS").withValueFrom(source) ) + val javaOptions = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=0.0.0.0:5005" val driverContainer = Container() .withName("spark-driver") .withImagePullPolicy("IfNotPresent") @@ -311,10 +315,15 @@ private[spark] class ArmadaClientApplication extends SparkApplication { "driver", "--verbose", "--class", - conf.get("spark.app.name"), + "org.apache.spark.examples.SparkPi", "--master", "armada://armada-server.armada.svc.cluster.local:50051", - "submit" + "--conf", + "spark.driver.port=7078", + "--conf", + s"spark.driver.extraJavaOptions=$javaOptions", + "local:///opt/spark/examples/jars/spark-examples.jar", + "100" ) ) .withResources( // FIXME: What are reasonable requests/limits for spark drivers? @@ -338,7 +347,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { val driverJob = api.submit .JobSubmitRequestItem() .withPriority(0) - .withNamespace("personal-anonymous") + .withNamespace("default") .withPodSpec(podSpec) // FIXME: Plumb config for queue, job-set-id diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala index 6d3f32ab364d..87cb6ddaac96 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala @@ -107,7 +107,7 @@ private[spark] class ArmadaClusterSchedulerBackend( .withNamespace("default") .withPodSpec(podSpec) - val client = new ArmadaClient(ArmadaClient.GetChannel(host, port)) + val client = ArmadaClient(host, port) val jobSubmitResponse = client.SubmitJobs("test", "executor", Seq(testJob)) logInfo(s"Driver Job Submit Response") From 09ec0da20f3b15d10dfd8dbcd6f9eca69851a834 Mon Sep 17 00:00:00 2001 From: George Jahad Date: Mon, 10 Feb 2025 17:13:59 -0800 Subject: [PATCH 02/12] working --- .../org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../armada/ArmadaClientApplication.scala | 22 ++++---- .../spark/deploy/armada/MainAppResource.scala | 54 +++++++++++++++++++ 3 files changed, 66 insertions(+), 12 deletions(-) create mode 100644 resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/MainAppResource.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 34c62ccb48d9..62fe4bd65ac5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -880,7 +880,7 @@ private[spark] class SparkSubmit extends Logging { if (isArmadaCluster) { childMainClass = ARMADA_CLUSTER_SUBMIT_CLASS - // TODO: Setup childArgs + childArgs ++= Array("--main-class", args.mainClass) } // Load any properties specified through --conf and the default properties file diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala index 93219c8f2102..f84766e84188 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala @@ -16,8 +16,9 @@ */ package org.apache.spark.deploy.armada.submit -/* import scala.collection.mutable + +/* import scala.jdk.CollectionConverters._ import scala.util.control.Breaks._ import scala.util.control.NonFatal @@ -53,7 +54,6 @@ import org.apache.spark.util.Utils * @param mainClass the main class of the application to run * @param driverArgs arguments to the driver */ -/* private[spark] case class ClientArguments( mainAppResource: MainAppResource, mainClass: String, @@ -95,7 +95,6 @@ private[spark] object ClientArguments { proxyUser) } } -*/ /** * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a @@ -242,11 +241,12 @@ private[spark] class ArmadaClientApplication extends SparkApplication { } override def start(args: Array[String], conf: SparkConf): Unit = { - log("ArmadaClientApplication.start() called! arm4") - run(conf) + log("ArmadaClientApplication.start() called! arm5") + val parsedArguments = ClientArguments.fromCommandLineArgs(args) + run(parsedArguments, conf) } - private def run(sparkConf: SparkConf): Unit = { + private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = { val (host, port) = ArmadaUtils.parseMasterUrl(sparkConf.get("spark.master")) log(s"host is $host, port is $port") var armadaClient = ArmadaClient(host, port) @@ -259,7 +259,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { // # FIXME: Need to check how this is launched whether to submit a job or // to turn into driver / cluster manager mode. - val jobId = submitDriverJob(armadaClient, sparkConf) + val jobId = submitDriverJob(armadaClient, clientArguments, sparkConf) log(s"Got job ID: $jobId") // For constructing the app ID, we can't use the Spark application name, as the app ID is going // to be added as a label to group resources belonging to the same application. Label values are @@ -297,7 +297,8 @@ private[spark] class ArmadaClientApplication extends SparkApplication { () } - private def submitDriverJob(armadaClient: ArmadaClient, conf: SparkConf): String = { + private def submitDriverJob(armadaClient: ArmadaClient, clientArguments: ClientArguments, + conf: SparkConf): String = { val source = EnvVarSource().withFieldRef(ObjectFieldSelector() .withApiVersion("v1").withFieldPath("status.podIP")) val envVars = Seq( @@ -315,15 +316,14 @@ private[spark] class ArmadaClientApplication extends SparkApplication { "driver", "--verbose", "--class", - "org.apache.spark.examples.SparkPi", + clientArguments.mainClass, "--master", "armada://armada-server.armada.svc.cluster.local:50051", "--conf", "spark.driver.port=7078", "--conf", s"spark.driver.extraJavaOptions=$javaOptions", - "local:///opt/spark/examples/jars/spark-examples.jar", - "100" + "local:///opt/spark/examples/jars/spark-examples.jar" ) ) .withResources( // FIXME: What are reasonable requests/limits for spark drivers? diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/MainAppResource.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/MainAppResource.scala new file mode 100644 index 000000000000..90cdb4fe6aee --- /dev/null +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/MainAppResource.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.armada.submit + + +import org.apache.spark.annotation.{DeveloperApi, Since, Stable} + +/** + * :: DeveloperApi :: + * + * All traits and classes in this file are used by K8s module and Spark K8s operator. + */ + +@Stable +@DeveloperApi +@Since("2.3.0") +sealed trait MainAppResource + +@Stable +@DeveloperApi +@Since("2.4.0") +sealed trait NonJVMResource + +@Stable +@DeveloperApi +@Since("3.0.0") +case class JavaMainAppResource(primaryResource: Option[String]) + extends MainAppResource + +@Stable +@DeveloperApi +@Since("2.4.0") +case class PythonMainAppResource(primaryResource: String) + extends MainAppResource with NonJVMResource + +@Stable +@DeveloperApi +@Since("2.4.0") +case class RMainAppResource(primaryResource: String) + extends MainAppResource with NonJVMResource From 34b71e70645dfd01d9e4dc52fb8768c939461c09 Mon Sep 17 00:00:00 2001 From: George Jahad Date: Mon, 10 Feb 2025 20:08:34 -0800 Subject: [PATCH 03/12] working --- .../spark/deploy/armada/ArmadaClientApplication.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala index f84766e84188..04b9c49f72c4 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala @@ -241,7 +241,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { } override def start(args: Array[String], conf: SparkConf): Unit = { - log("ArmadaClientApplication.start() called! arm5") + log("ArmadaClientApplication.start() called! arm6") val parsedArguments = ClientArguments.fromCommandLineArgs(args) run(parsedArguments, conf) } @@ -304,7 +304,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { val envVars = Seq( new EnvVar().withName("SPARK_DRIVER_BIND_ADDRESS").withValueFrom(source) ) - val javaOptions = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=0.0.0.0:5005" + val javaOptions = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=0.0.0.0:5005" val driverContainer = Container() .withName("spark-driver") .withImagePullPolicy("IfNotPresent") @@ -323,7 +323,8 @@ private[spark] class ArmadaClientApplication extends SparkApplication { "spark.driver.port=7078", "--conf", s"spark.driver.extraJavaOptions=$javaOptions", - "local:///opt/spark/examples/jars/spark-examples.jar" + "local:///opt/spark/examples/jars/spark-examples.jar", + "100" ) ) .withResources( // FIXME: What are reasonable requests/limits for spark drivers? From 09d954b07e80474d74efe98f6a5b249f47e39aea Mon Sep 17 00:00:00 2001 From: George Jahad Date: Tue, 11 Feb 2025 09:38:55 -0800 Subject: [PATCH 04/12] working --- .../org/apache/spark/deploy/SparkSubmit.scala | 21 ++++++++++++++++++- .../armada/ArmadaClientApplication.scala | 15 +++++++------ 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 62fe4bd65ac5..76c163ecba21 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -880,7 +880,26 @@ private[spark] class SparkSubmit extends Logging { if (isArmadaCluster) { childMainClass = ARMADA_CLUSTER_SUBMIT_CLASS - childArgs ++= Array("--main-class", args.mainClass) + if (args.primaryResource != SparkLauncher.NO_RESOURCE) { + if (args.isPython) { + childArgs ++= Array("--primary-py-file", args.primaryResource) + childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner") + } else if (args.isR) { + childArgs ++= Array("--primary-r-file", args.primaryResource) + childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner") + } + else { + childArgs ++= Array("--primary-java-resource", args.primaryResource) + childArgs ++= Array("--main-class", args.mainClass) + } + } else { + childArgs ++= Array("--main-class", args.mainClass) + } + if (args.childArgs != null) { + args.childArgs.foreach { arg => + childArgs += "--arg" += arg + } + } } // Load any properties specified through --conf and the default properties file diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala index 04b9c49f72c4..a1400d773fb3 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala @@ -91,7 +91,7 @@ private[spark] object ClientArguments { ClientArguments( mainAppResource, mainClass.get, - driverArgs.toArray, + driverArgs.toArray[String], proxyUser) } } @@ -241,7 +241,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { } override def start(args: Array[String], conf: SparkConf): Unit = { - log("ArmadaClientApplication.start() called! arm6") + log("ArmadaClientApplication.start() called! arm8") val parsedArguments = ClientArguments.fromCommandLineArgs(args) run(parsedArguments, conf) } @@ -304,7 +304,11 @@ private[spark] class ArmadaClientApplication extends SparkApplication { val envVars = Seq( new EnvVar().withName("SPARK_DRIVER_BIND_ADDRESS").withValueFrom(source) ) - val javaOptions = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=0.0.0.0:5005" + val primaryResource = clientArguments.mainAppResource match { + case JavaMainAppResource(Some(resource)) => Seq(resource) + case _ => Seq() + } + val javaOptions = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=0.0.0.0:5005" val driverContainer = Container() .withName("spark-driver") .withImagePullPolicy("IfNotPresent") @@ -323,9 +327,8 @@ private[spark] class ArmadaClientApplication extends SparkApplication { "spark.driver.port=7078", "--conf", s"spark.driver.extraJavaOptions=$javaOptions", - "local:///opt/spark/examples/jars/spark-examples.jar", - "100" - ) + + ) ++ primaryResource ++ Seq("100") ) .withResources( // FIXME: What are reasonable requests/limits for spark drivers? ResourceRequirements( From 17449ed06a9c9ff3b290907c3cb2ee084cad506f Mon Sep 17 00:00:00 2001 From: George Jahad Date: Tue, 11 Feb 2025 10:03:53 -0800 Subject: [PATCH 05/12] working --- .../deploy/armada/ArmadaClientApplication.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala index a1400d773fb3..5da2dffe7149 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala @@ -91,7 +91,7 @@ private[spark] object ClientArguments { ClientArguments( mainAppResource, mainClass.get, - driverArgs.toArray[String], + driverArgs.toArray, proxyUser) } } @@ -241,7 +241,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { } override def start(args: Array[String], conf: SparkConf): Unit = { - log("ArmadaClientApplication.start() called! arm8") + log("ArmadaClientApplication.start() called! arm11") val parsedArguments = ClientArguments.fromCommandLineArgs(args) run(parsedArguments, conf) } @@ -304,11 +304,15 @@ private[spark] class ArmadaClientApplication extends SparkApplication { val envVars = Seq( new EnvVar().withName("SPARK_DRIVER_BIND_ADDRESS").withValueFrom(source) ) + val primaryResource = clientArguments.mainAppResource match { case JavaMainAppResource(Some(resource)) => Seq(resource) + case PythonMainAppResource(resource) => Seq(resource) + case RMainAppResource(resource) => Seq(resource) case _ => Seq() } - val javaOptions = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=0.0.0.0:5005" + + val javaOptions = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=0.0.0.0:5005" val driverContainer = Container() .withName("spark-driver") .withImagePullPolicy("IfNotPresent") @@ -328,7 +332,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { "--conf", s"spark.driver.extraJavaOptions=$javaOptions", - ) ++ primaryResource ++ Seq("100") + ) ++ primaryResource ++ clientArguments.driverArgs ) .withResources( // FIXME: What are reasonable requests/limits for spark drivers? ResourceRequirements( From 0b77e499c4c7a609dfa3b77b99ad5e1afd91c29e Mon Sep 17 00:00:00 2001 From: George Jahad Date: Tue, 11 Feb 2025 11:00:29 -0800 Subject: [PATCH 06/12] multiple executors --- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 4 ++-- .../deploy/armada/ArmadaClientApplication.scala | 6 +++--- .../cluster/armada/ArmadaClusterManagerBackend.scala | 12 +++++++----- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 76c163ecba21..bbad8b6d7910 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -301,7 +301,7 @@ private[spark] class SparkSubmit extends Logging { printMessage(s"Armada selected as cluster manager.") if (!Utils.classIsLoadable(ARMADA_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) { error( - s"Could not load ARMADA class \"${ARMADA_CLUSTER_SUBMIT_CLASS}\". " + + s"Could not load ARMADA class \"$ARMADA_CLUSTER_SUBMIT_CLASS\". " + "This copy of Spark may not have been compiled with ARMADA support.") } } @@ -706,7 +706,7 @@ private[spark] class SparkSubmit extends Logging { mergeFn = Some(mergeFileLists(_, _))), // Other options - OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES, + OptionAssigner(args.numExecutors, YARN | KUBERNETES | ARMADA, ALL_DEPLOY_MODES, confKey = EXECUTOR_INSTANCES.key), OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, confKey = EXECUTOR_CORES.key), diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala index 5da2dffe7149..3fa405fa765b 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala @@ -249,7 +249,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = { val (host, port) = ArmadaUtils.parseMasterUrl(sparkConf.get("spark.master")) log(s"host is $host, port is $port") - var armadaClient = ArmadaClient(host, port) + val armadaClient = ArmadaClient(host, port) if (armadaClient.SubmitHealth().isServing) { log("Submit health good!") } else { @@ -359,12 +359,12 @@ private[spark] class ArmadaClientApplication extends SparkApplication { .withPodSpec(podSpec) // FIXME: Plumb config for queue, job-set-id - val jobSubmitResponse = armadaClient.SubmitJobs("test", "spark-test-1", Seq(driverJob)) + val jobSubmitResponse = armadaClient.SubmitJobs("test", "driver", Seq(driverJob)) log(s"Job Submit Response $jobSubmitResponse") for (respItem <- jobSubmitResponse.jobResponseItems) { log(s"JobID: ${respItem.jobId} Error: ${respItem.error} ") } - jobSubmitResponse.jobResponseItems(0).jobId + jobSubmitResponse.jobResponseItems.head.jobId } } diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala index 87cb6ddaac96..e5f755d7945e 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala @@ -29,6 +29,7 @@ import org.apache.spark.SparkContext import org.apache.spark.rpc.{RpcAddress, RpcCallContext} import org.apache.spark.scheduler.{ExecutorDecommission, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} +import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.getInitialTargetExecutorNumber // TODO: Implement for Armada @@ -49,7 +50,7 @@ private[spark] class ArmadaClusterSchedulerBackend( } - private def submitJob(): Unit = { + private def submitJob(executorId: Int): Unit = { val urlArray = masterURL.split(":") // Remove leading "/"'s @@ -63,7 +64,7 @@ private[spark] class ArmadaClusterSchedulerBackend( val source = EnvVarSource().withFieldRef(ObjectFieldSelector() .withApiVersion("v1").withFieldPath("status.podIP")) val envVars = Seq( - EnvVar().withName("SPARK_EXECUTOR_ID").withValue("1"), + EnvVar().withName("SPARK_EXECUTOR_ID").withValue(executorId.toString), EnvVar().withName("SPARK_RESOURCE_PROFILE_ID").withValue("0"), EnvVar().withName("SPARK_EXECUTOR_POD_NAME").withValue("test-pod-name"), EnvVar().withName("SPARK_APPLICATION_ID").withValue("test_spark_app_id"), @@ -110,14 +111,15 @@ private[spark] class ArmadaClusterSchedulerBackend( val client = ArmadaClient(host, port) val jobSubmitResponse = client.SubmitJobs("test", "executor", Seq(testJob)) - logInfo(s"Driver Job Submit Response") + logInfo(s"Driver Job Submit Response: arm10") for (respItem <- jobSubmitResponse.jobResponseItems) { logInfo(s"JobID: ${respItem.jobId} Error: ${respItem.error} ") } } override def start(): Unit = { - submitJob() + val numberOfExecutors = getInitialTargetExecutorNumber(conf) + 1 to numberOfExecutors foreach {j: Int => submitJob(j)} } override def stop(): Unit = {} @@ -143,7 +145,7 @@ private[spark] class ArmadaClusterSchedulerBackend( } private class ArmadaDriverEndpoint extends DriverEndpoint { - protected val execIDRequester = new HashMap[RpcAddress, String] + private val execIDRequester = new HashMap[RpcAddress, String] override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = super.receiveAndReply(context) From cc3047618eec0868c726e7de3d6664964bfb5dbc Mon Sep 17 00:00:00 2001 From: George Jahad Date: Tue, 11 Feb 2025 11:16:52 -0800 Subject: [PATCH 07/12] lint --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../spark/deploy/armada/ArmadaClientApplication.scala | 2 +- .../io/armadaproject/spark/deploy/armada/MainAppResource.scala | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index bbad8b6d7910..cf76c0e5b9f1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -301,7 +301,7 @@ private[spark] class SparkSubmit extends Logging { printMessage(s"Armada selected as cluster manager.") if (!Utils.classIsLoadable(ARMADA_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) { error( - s"Could not load ARMADA class \"$ARMADA_CLUSTER_SUBMIT_CLASS\". " + + s"Could not load ARMADA class \"${ARMADA_CLUSTER_SUBMIT_CLASS}\". " + "This copy of Spark may not have been compiled with ARMADA support.") } } diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala index 3fa405fa765b..898d33dcdc1d 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala @@ -330,7 +330,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { "--conf", "spark.driver.port=7078", "--conf", - s"spark.driver.extraJavaOptions=$javaOptions", + s"spark.driver.extraJavaOptions=$javaOptions" ) ++ primaryResource ++ clientArguments.driverArgs ) diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/MainAppResource.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/MainAppResource.scala index 90cdb4fe6aee..610b3c149bd1 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/MainAppResource.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/MainAppResource.scala @@ -16,7 +16,6 @@ */ package org.apache.spark.deploy.armada.submit - import org.apache.spark.annotation.{DeveloperApi, Since, Stable} /** From fca221800b8974c2ec8799f440d37144dbf08b45 Mon Sep 17 00:00:00 2001 From: George Jahad Date: Tue, 11 Feb 2025 11:29:26 -0800 Subject: [PATCH 08/12] added submit script --- examples/submitSparkPi.sh | 4 ++++ .../spark/deploy/armada/ArmadaClientApplication.scala | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) create mode 100755 examples/submitSparkPi.sh diff --git a/examples/submitSparkPi.sh b/examples/submitSparkPi.sh new file mode 100755 index 000000000000..f4769a75feb6 --- /dev/null +++ b/examples/submitSparkPi.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +# run spark pi app on armada +bin/spark-submit --master armada://localhost:30002 --deploy-mode cluster --name spark-pi --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=2 --conf spark.kubernetes.container.image=spark:testing local:///opt/spark/examples/jars/spark-examples.jar 100 diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala index 898d33dcdc1d..7a301f9d2bba 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala @@ -361,9 +361,9 @@ private[spark] class ArmadaClientApplication extends SparkApplication { // FIXME: Plumb config for queue, job-set-id val jobSubmitResponse = armadaClient.SubmitJobs("test", "driver", Seq(driverJob)) - log(s"Job Submit Response $jobSubmitResponse") for (respItem <- jobSubmitResponse.jobResponseItems) { - log(s"JobID: ${respItem.jobId} Error: ${respItem.error} ") + val error = if (respItem.error == "") "None" else respItem.error + log(s"JobID: ${respItem.jobId} Error: ${error}") } jobSubmitResponse.jobResponseItems.head.jobId } From 77ac7e11c82c2f79740d06bbdbfebb4106d29a84 Mon Sep 17 00:00:00 2001 From: George Jahad Date: Wed, 12 Feb 2025 12:52:53 -0800 Subject: [PATCH 09/12] fixed executor instances --- .../spark/deploy/armada/ArmadaClientApplication.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala index 7a301f9d2bba..a99779c645ba 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala @@ -328,6 +328,8 @@ private[spark] class ArmadaClientApplication extends SparkApplication { "--master", "armada://armada-server.armada.svc.cluster.local:50051", "--conf", + s"spark.executor.instances=${conf.get("spark.executor.instances")}", + "--conf", "spark.driver.port=7078", "--conf", s"spark.driver.extraJavaOptions=$javaOptions" From 2212223071389bf071401d7405d029de38801650 Mon Sep 17 00:00:00 2001 From: George Jahad Date: Thu, 13 Feb 2025 09:34:19 -0800 Subject: [PATCH 10/12] cleanup log messages --- .../spark/deploy/armada/ArmadaClientApplication.scala | 2 +- .../scheduler/cluster/armada/ArmadaClusterManagerBackend.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala index a99779c645ba..b27fa46441cc 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala @@ -241,7 +241,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { } override def start(args: Array[String], conf: SparkConf): Unit = { - log("ArmadaClientApplication.start() called! arm11") + log("ArmadaClientApplication.start() called!") val parsedArguments = ClientArguments.fromCommandLineArgs(args) run(parsedArguments, conf) } diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala index e5f755d7945e..121caec0c398 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala @@ -111,7 +111,7 @@ private[spark] class ArmadaClusterSchedulerBackend( val client = ArmadaClient(host, port) val jobSubmitResponse = client.SubmitJobs("test", "executor", Seq(testJob)) - logInfo(s"Driver Job Submit Response: arm10") + logInfo(s"Driver Job Submit Response") for (respItem <- jobSubmitResponse.jobResponseItems) { logInfo(s"JobID: ${respItem.jobId} Error: ${respItem.error} ") From 48a1e94b1c14ff81fa4721dc81f26b145260b2dc Mon Sep 17 00:00:00 2001 From: George Jahad Date: Thu, 13 Feb 2025 11:02:49 -0800 Subject: [PATCH 11/12] support image config --- .../spark/deploy/armada/ArmadaClientApplication.scala | 4 +++- .../cluster/armada/ArmadaClusterManagerBackend.scala | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala index b27fa46441cc..0db195cc39ef 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala @@ -316,7 +316,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { val driverContainer = Container() .withName("spark-driver") .withImagePullPolicy("IfNotPresent") - .withImage("spark:testing") + .withImage(conf.get("spark.kubernetes.container.image")) .withEnv(envVars) .withCommand(Seq("/opt/entrypoint.sh")) .withArgs( @@ -330,6 +330,8 @@ private[spark] class ArmadaClientApplication extends SparkApplication { "--conf", s"spark.executor.instances=${conf.get("spark.executor.instances")}", "--conf", + s"spark.kubernetes.container.image=${conf.get("spark.kubernetes.container.image")}", + "--conf", "spark.driver.port=7078", "--conf", s"spark.driver.extraJavaOptions=$javaOptions" diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala index 121caec0c398..cbccfa4d7392 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala @@ -76,7 +76,7 @@ private[spark] class ArmadaClusterSchedulerBackend( val executorContainer = Container() .withName("spark-executor") .withImagePullPolicy("IfNotPresent") - .withImage("spark:testing") + .withImage(conf.get("spark.kubernetes.container.image")) .withEnv(envVars) .withCommand(Seq("/opt/entrypoint.sh")) .withArgs( From 8e90e55ba10d490a193d4290072b8f29f254e818 Mon Sep 17 00:00:00 2001 From: George Jahad Date: Wed, 19 Feb 2025 15:19:58 -0800 Subject: [PATCH 12/12] fixed for latest client --- .../spark/deploy/armada/ArmadaClientApplication.scala | 4 ++-- .../cluster/armada/ArmadaClusterManagerBackend.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala index 0db195cc39ef..afac03b3f598 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala @@ -250,7 +250,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { val (host, port) = ArmadaUtils.parseMasterUrl(sparkConf.get("spark.master")) log(s"host is $host, port is $port") val armadaClient = ArmadaClient(host, port) - if (armadaClient.SubmitHealth().isServing) { + if (armadaClient.submitHealth().isServing) { log("Submit health good!") } else { log("Could not contact Armada!") @@ -363,7 +363,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { .withPodSpec(podSpec) // FIXME: Plumb config for queue, job-set-id - val jobSubmitResponse = armadaClient.SubmitJobs("test", "driver", Seq(driverJob)) + val jobSubmitResponse = armadaClient.submitJobs("test", "driver", Seq(driverJob)) for (respItem <- jobSubmitResponse.jobResponseItems) { val error = if (respItem.error == "") "None" else respItem.error diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala index cbccfa4d7392..2eda76104175 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala @@ -109,7 +109,7 @@ private[spark] class ArmadaClusterSchedulerBackend( .withPodSpec(podSpec) val client = ArmadaClient(host, port) - val jobSubmitResponse = client.SubmitJobs("test", "executor", Seq(testJob)) + val jobSubmitResponse = client.submitJobs("test", "executor", Seq(testJob)) logInfo(s"Driver Job Submit Response") for (respItem <- jobSubmitResponse.jobResponseItems) {