diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index 754f26f58..968937e0c 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -29,18 +29,15 @@ jobs: strategy: matrix: maven_profile: - - "-Pscala-2.11 -Pspark2 -DskipRTests" - "-Pscala-2.12 -Pspark3" jdk_path: - "/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java" - include: - - maven_profile: "-Pscala-2.12 -Pspark3" - jdk_path: "/usr/lib/jvm/java-17-openjdk-amd64/bin/java" + - "/usr/lib/jvm/java-17-openjdk-amd64/bin/java" steps: - - + - name: Checkout uses: actions/checkout@v3 - - + - name: Cache local Maven repository uses: actions/cache@v3 with: @@ -51,14 +48,12 @@ jobs: restore-keys: | ${{ runner.os }}-maven- - - name: Set Python 3 as default for Spark 3 builds - if: ${{ contains(matrix.maven_profile, 'spark3') }} - # This can be removed once support for Python 2 and Spark 2 is removed and the default python executable is python3 + name: Set Python 3 as default run: pyenv global 3 && echo "PYSPARK_PYTHON=$(which python3)" >> "$GITHUB_ENV" - name: Set JDK version run: update-alternatives --set java ${{ matrix.jdk_path }} - - + - name: Build with Maven run: mvn -Pthriftserver ${{ matrix.maven_profile }} -DskipTests -Dmaven.javadoc.skip=true -B -V -e verify - diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index f946fd97c..ccd22a0ec 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -27,19 +27,15 @@ jobs: strategy: matrix: maven_profile: - - "-Pscala-2.11 -Pspark2" - - "-Pscala-2.12 -Pspark2" - "-Pscala-2.12 -Pspark3" jdk_path: - "/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java" - include: - - maven_profile: "-Pscala-2.12 -Pspark3" - jdk_path: "/usr/lib/jvm/java-17-openjdk-amd64/bin/java" + - "/usr/lib/jvm/java-17-openjdk-amd64/bin/java" steps: - - + - name: Checkout uses: actions/checkout@v3 - - + - name: Cache local Maven repository uses: actions/cache@v3 with: diff --git a/.travis.yml b/.travis.yml index d661e30b5..2ea3a96b6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,25 +21,12 @@ language: scala matrix: include: - - name: "Spark 2.4 Unit Tests (Scala 2.11)" - scala: 2.11.12 - env: MVN_FLAG='-Pscala-2.11 -Pspark2 -Pthriftserver -DskipITs' - - name: "Spark 2.4 ITs (Scala 2.11)" - scala: 2.11.12 - env: MVN_FLAG='-Pscala-2.11 -Pspark2 -Pthriftserver -DskipTests' - - name: "Spark 2.4 Unit Tests (Scala 2.12)" - scala: 2.12.10 - env: MVN_FLAG='-Pscala-2.12 -Pspark2 -Pthriftserver -DskipITs' - - name: "Spark 2.4 ITs (Scala 2.12)" - scala: 2.12.10 - env: MVN_FLAG='-Pscala-2.12 -Pspark2 -Pthriftserver -DskipTests' -# No scala 2.11.x build for spark3 - - name: "Spark 3.0 Unit Tests (Scala 2.12)" + - name: "Spark 3.x Unit Tests (Scala 2.12)" scala: 2.12.10 env: MVN_FLAG='-Pscala-2.12 -Pspark3 -Pthriftserver -DskipITs' - - name: "Spark 3.0 ITs (Scala 2.12)" + - name: "Spark 3.x ITs (Scala 2.12)" scala: 2.12.10 - env: MVN_FLAG='-Pscala-2.12 -Pspark3 -Pthriftserver -DskipITs' + env: MVN_FLAG='-Pscala-2.12 -Pspark3 -Pthriftserver -DskipTests' jdk: - openjdk8 diff --git a/README.md b/README.md index b653569f4..5fbe4f468 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ Required python packages for building Livy: To run Livy, you will also need a Spark installation. You can get Spark releases at https://spark.apache.org/downloads.html. -Livy requires Spark 2.4+. You can switch to a different version of Spark by setting the +Livy requires Spark 3.0+. You can switch to a different version of Spark by setting the ``SPARK_HOME`` environment variable in the Livy server process, without needing to rebuild Livy. @@ -82,7 +82,7 @@ docker run --rm -it -v $(pwd):/workspace -v $HOME/.m2:/root/.m2 livy-ci mvn pack > **Note**: The `docker run` command maps the maven repository to your host machine's maven cache so subsequent runs will not need to download dependencies. -By default Livy is built against Apache Spark 2.4.5, but the version of Spark used when running +By default Livy is built against Apache Spark 3.3.4, but the version of Spark used when running Livy does not need to match the version used to build Livy. Livy internally handles the differences between different Spark versions. @@ -93,8 +93,6 @@ version of Spark without needing to rebuild. | Flag | Purpose | |--------------|--------------------------------------------------------------------| -| -Phadoop2 | Choose Hadoop2 based build dependencies (default configuration) | -| -Pspark2 | Choose Spark 2.x based build dependencies (default configuration) | -| -Pspark3 | Choose Spark 3.x based build dependencies | -| -Pscala-2.11 | Choose Scala 2.11 based build dependencies (default configuration) | -| -Pscala-2.12 | Choose scala 2.12 based build dependencies | +| -Phadoop2 | Choose Hadoop2 based build dependencies | +| -Pspark3 | Choose Spark 3.x based build dependencies (default configuration) | +| -Pscala-2.12 | Choose Scala 2.12 based build dependencies (default configuration) | diff --git a/core/scala-2.11/pom.xml b/core/scala-2.11/pom.xml deleted file mode 100644 index 950d5b261..000000000 --- a/core/scala-2.11/pom.xml +++ /dev/null @@ -1,48 +0,0 @@ - - - - 4.0.0 - org.apache.livy - livy-core_2.11 - 0.10.0-incubating-SNAPSHOT - jar - - - org.apache.livy - livy-core-parent - 0.10.0-incubating-SNAPSHOT - ../pom.xml - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - - test-jar - - - - - - - - diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 055a01b03..a803b3573 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -246,35 +246,6 @@ - - org.codehaus.mojo - build-helper-maven-plugin - - - parse-spark-version - process-test-sources - - parse-version - - - spark - ${spark.version} - - - - add-spark-version-specific-test - process-test-sources - - add-test-source - - - - ${project.basedir}/src/test/spark${spark.majorVersion}/scala - - - - - org.apache.maven.plugins diff --git a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala index e5d77cab4..ece0fad5f 100644 --- a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala +++ b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala @@ -87,10 +87,7 @@ class InteractiveIT extends BaseIntegrationTestSuite { s.run("from pyspark.sql.types import Row").verifyResult("") s.run("x = [Row(age=1, name=u'a'), Row(age=2, name=u'b'), Row(age=3, name=u'c')]") .verifyResult("") - // Check if we're running with Spark2. - if (s.run("spark").result().isLeft) { - s.run("sqlContext.sparkSession").verifyResult(".*pyspark\\.sql\\.session\\.SparkSession.*") - } + s.run("spark").verifyResult(".*pyspark\\.sql\\.session\\.SparkSession.*") s.run("%table x").verifyResult(".*headers.*type.*name.*data.*") s.run("abcde").verifyError(ename = "NameError", evalue = "name 'abcde' is not defined") s.run("raise KeyError('foo')").verifyError(ename = "KeyError", evalue = "'foo'") diff --git a/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala b/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala deleted file mode 100644 index 38230f822..000000000 --- a/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.livy.test - -import java.io.File -import java.net.URI -import java.util.concurrent.{TimeUnit, Future => JFuture} -import javax.servlet.http.HttpServletResponse - -import scala.util.Properties - -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.apache.http.client.methods.HttpGet -import org.scalatest.BeforeAndAfterAll - -import org.apache.livy._ -import org.apache.livy.client.common.HttpMessages._ -import org.apache.livy.sessions.SessionKindModule -import org.apache.livy.test.framework.BaseIntegrationTestSuite -import org.apache.livy.test.jobs.spark2._ -import org.apache.livy.utils.LivySparkUtils - -class Spark2JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logging { - - private var client: LivyClient = _ - private var sessionId: Int = _ - private val mapper = new ObjectMapper() - .registerModule(DefaultScalaModule) - .registerModule(new SessionKindModule()) - - override def afterAll(): Unit = { - super.afterAll() - - if (client != null) { - client.stop(true) - } - - livyClient.connectSession(sessionId).stop() - } - - scalaTest("create a new session and upload test jar") { - val prevSessionCount = sessionList().total - val tempClient = createClient(livyEndpoint) - - try { - // Figure out the session ID by poking at the REST endpoint. We should probably expose this - // in the Java API. - val list = sessionList() - assert(list.total === prevSessionCount + 1) - val tempSessionId = list.sessions(0).id - - livyClient.connectSession(tempSessionId).verifySessionIdle() - waitFor(tempClient.uploadJar(new File(testLib))) - - client = tempClient - sessionId = tempSessionId - } finally { - if (client == null) { - try { - if (tempClient != null) { - tempClient.stop(true) - } - } catch { - case e: Exception => warn("Error stopping client.", e) - } - } - } - } - - scalaTest("run spark2 job") { - assume(client != null, "Client not active.") - val result = waitFor(client.submit(new SparkSessionTest())) - assert(result === 3) - } - - scalaTest("run spark2 dataset job") { - assume(client != null, "Client not active.") - val result = waitFor(client.submit(new DatasetTest())) - assert(result === 2) - } - - private def waitFor[T](future: JFuture[T]): T = { - future.get(60, TimeUnit.SECONDS) - } - - private def sessionList(): SessionList = { - val httpGet = new HttpGet(s"$livyEndpoint/sessions/") - val r = livyClient.httpClient.execute(httpGet) - val statusCode = r.getStatusLine().getStatusCode() - val responseBody = r.getEntity().getContent - val sessionList = mapper.readValue(responseBody, classOf[SessionList]) - r.close() - - assert(statusCode == HttpServletResponse.SC_OK) - sessionList - } - - private def createClient(uri: String): LivyClient = { - new LivyClientBuilder().setURI(new URI(uri)).build() - } - - protected def scalaTest(desc: String)(testFn: => Unit): Unit = { - test(desc) { - val livyConf = new LivyConf() - val (sparkVersion, scalaVersion) = LivySparkUtils.sparkSubmitVersion(livyConf) - val formattedSparkVersion = LivySparkUtils.formatSparkVersion(sparkVersion) - val versionString = - LivySparkUtils.sparkScalaVersion(formattedSparkVersion, scalaVersion, livyConf) - - assume(versionString == LivySparkUtils.formatScalaVersion(Properties.versionNumberString), - s"Scala test can only be run with ${Properties.versionString}") - testFn - } - } -} diff --git a/pom.xml b/pom.xml index 68c71b540..715de69e5 100644 --- a/pom.xml +++ b/pom.xml @@ -82,9 +82,8 @@ compile 1.7.36 1.2.26 - 2.4.5 - 2.4.5 - ${spark.scala-2.11.version} + 3.3.4 + ${spark.scala-2.12.version} 5.6.0 3.0.0 1.15 @@ -126,13 +125,10 @@ ${execution.root}/dev/spark - 2.7 + 2 2.7.3 - - 2.11 - 2.11.12 - - 2.4.5 + 2.12 + 2.12.18 1.8 0.10.9 3.5.3 @@ -147,8 +143,6 @@ false - - false false @@ -981,7 +975,6 @@ false ${project.version} ${skipRTests} - ${skipPySpark2Tests} ${skipPySpark3Tests} ${test.redirectToFile} @@ -1011,7 +1004,6 @@ false ${project.version} ${skipRTests} - ${skipPySpark2Tests} ${skipPySpark3Tests} ${extraJavaTestArgs} @@ -1326,13 +1318,6 @@ 2.7.3 - - scala-2.11 - - 2.11 - 2.11.12 - - scala-2.12 @@ -1349,19 +1334,6 @@ thriftserver/client - - spark2 - - 2.4.5 - 1.8 - 0.10.9 - 3.5.3 - spark-${spark.version}-bin-hadoop${hadoop.major-minor.version} - - https://archive.apache.org/dist/spark/spark-${spark.version}/${spark.bin.name}.tgz - - - spark3 @@ -1379,7 +1351,6 @@ https://archive.apache.org/dist/spark/spark-${spark.version}/${spark.bin.name}.tgz - true diff --git a/repl/scala-2.11/pom.xml b/repl/scala-2.11/pom.xml deleted file mode 100644 index 0b740f744..000000000 --- a/repl/scala-2.11/pom.xml +++ /dev/null @@ -1,34 +0,0 @@ - - - - 4.0.0 - org.apache.livy - livy-repl_2.11 - 0.10.0-incubating-SNAPSHOT - jar - - - org.apache.livy - livy-repl-parent - 0.10.0-incubating-SNAPSHOT - ../pom.xml - - - diff --git a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala b/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala deleted file mode 100644 index 48dca17b3..000000000 --- a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.livy.repl - -import java.io.File -import java.net.{URL, URLClassLoader} -import java.nio.file.{Files, Paths} - -import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter.Completion.ScalaCompleter -import scala.tools.nsc.interpreter.IMain -import scala.tools.nsc.interpreter.JLineCompletion -import scala.tools.nsc.interpreter.JPrintWriter -import scala.tools.nsc.interpreter.Results.Result - -import org.apache.spark.SparkConf -import org.apache.spark.repl.SparkILoop - -/** - * This represents a Scala 2.11 Spark interpreter. It is not thread safe. - */ -class SparkInterpreter(protected override val conf: SparkConf) extends AbstractSparkInterpreter { - - private var sparkILoop: SparkILoop = _ - - override def start(): Unit = { - require(sparkILoop == null) - - val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir")) - val outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile - outputDir.deleteOnExit() - conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath) - - val settings = new Settings() - settings.processArguments(List("-Yrepl-class-based", - "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true) - settings.usejavacp.value = true - settings.embeddedDefaults(Thread.currentThread().getContextClassLoader()) - - sparkILoop = new SparkILoop(None, new JPrintWriter(outputStream, true)) - sparkILoop.settings = settings - sparkILoop.createInterpreter() - sparkILoop.initializeSynchronous() - - restoreContextClassLoader { - sparkILoop.setContextClassLoader() - - var classLoader = Thread.currentThread().getContextClassLoader - while (classLoader != null) { - if (classLoader.getClass.getCanonicalName == - "org.apache.spark.util.MutableURLClassLoader") { - val extraJarPath = classLoader.asInstanceOf[URLClassLoader].getURLs() - // Check if the file exists. Otherwise an exception will be thrown. - .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile } - // Livy rsc and repl are also in the extra jars list. Filter them out. - .filterNot { u => Paths.get(u.toURI).getFileName.toString.startsWith("livy-") } - // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it. - .filterNot { u => - Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect") - } - - extraJarPath.foreach { p => debug(s"Adding $p to Scala interpreter's class path...") } - sparkILoop.addUrlsToClassPath(extraJarPath: _*) - classLoader = null - } else { - classLoader = classLoader.getParent - } - } - - postStart() - } - } - - override def close(): Unit = synchronized { - super.close() - - if (sparkILoop != null) { - sparkILoop.closeInterpreter() - sparkILoop = null - } - } - - override def addJar(jar: String): Unit = { - sparkILoop.addUrlsToClassPath(new URL(jar)) - } - - override protected def isStarted(): Boolean = { - sparkILoop != null - } - - override protected def interpret(code: String): Result = { - sparkILoop.interpret(code) - } - - override protected def completeCandidates(code: String, cursor: Int) : Array[String] = { - val completer : ScalaCompleter = { - try { - val cls = Class.forName("scala.tools.nsc.interpreter.PresentationCompilerCompleter") - cls.getDeclaredConstructor(classOf[IMain]).newInstance(sparkILoop.intp) - .asInstanceOf[ScalaCompleter] - } catch { - case e : ClassNotFoundException => new JLineCompletion(sparkILoop.intp).completer - } - } - completer.complete(code, cursor).candidates.toArray - } - - override protected def valueOfTerm(name: String): Option[Any] = { - // IMain#valueOfTerm will always return None, so use other way instead. - Option(sparkILoop.lastRequest.lineRep.call("$result")) - } - - override protected def bind(name: String, - tpe: String, - value: Object, - modifier: List[String]): Unit = { - sparkILoop.beQuietDuring { - sparkILoop.bind(name, tpe, value, modifier) - } - } -} diff --git a/repl/scala-2.11/src/test/scala/org/apache/livy/repl/SparkInterpreterSpec.scala b/repl/scala-2.11/src/test/scala/org/apache/livy/repl/SparkInterpreterSpec.scala deleted file mode 100644 index d92203473..000000000 --- a/repl/scala-2.11/src/test/scala/org/apache/livy/repl/SparkInterpreterSpec.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.livy.repl - -import org.scalatest._ - -import org.apache.livy.LivyBaseUnitTestSuite - -class SparkInterpreterSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite { - describe("SparkInterpreter") { - val interpreter = new SparkInterpreter(null) - - it("should parse Scala compile error.") { - // Regression test for LIVY-. - val error = - """:27: error: type mismatch; - | found : Int - | required: String - | sc.setJobGroup(groupName, groupName, true) - | ^ - |:27: error: type mismatch; - | found : Int - | required: String - | sc.setJobGroup(groupName, groupName, true) - | ^ - |""".stripMargin - - val parsedError = AbstractSparkInterpreter.KEEP_NEWLINE_REGEX.split(error) - - val expectedTraceback = parsedError.tail - - val (ename, traceback) = interpreter.parseError(error) - ename shouldBe ":27: error: type mismatch;" - traceback shouldBe expectedTraceback - } - - it("should parse Scala runtime error.") { - val error = - """java.lang.RuntimeException: message - | ... 48 elided - | - |Tailing message""".stripMargin - - val parsedError = AbstractSparkInterpreter.KEEP_NEWLINE_REGEX.split(error) - - val expectedTraceback = parsedError.tail - - val (ename, traceback) = interpreter.parseError(error) - ename shouldBe "java.lang.RuntimeException: message" - traceback shouldBe expectedTraceback - } - } -} diff --git a/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala b/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala index c74c8c80d..407762623 100644 --- a/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala +++ b/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala @@ -201,10 +201,8 @@ class SparkRInterpreter( sendRequest("""assign(".sc", SparkR:::callJStatic("org.apache.livy.repl.SparkRInterpreter", "getSparkContext"), envir = SparkR:::.sparkREnv)""") sendRequest("""assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)""") - if (sparkMajorVersion >= 2) { - sendRequest("""assign(".sparkRsession", SparkR:::callJStatic("org.apache.livy.repl.SparkRInterpreter", "getSparkSession"), envir = SparkR:::.sparkREnv)""") - sendRequest("""assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)""") - } + sendRequest("""assign(".sparkRsession", SparkR:::callJStatic("org.apache.livy.repl.SparkRInterpreter", "getSparkSession"), envir = SparkR:::.sparkREnv)""") + sendRequest("""assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)""") sendRequest("""assign(".sqlc", SparkR:::callJStatic("org.apache.livy.repl.SparkRInterpreter", "getSQLContext"), envir = SparkR:::.sparkREnv)""") sendRequest("""assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)""") diff --git a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala index bce8fd460..e2d63e8c4 100644 --- a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala +++ b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala @@ -282,11 +282,6 @@ class Python2InterpreterSpec extends PythonBaseInterpreterSpec { implicit val formats = DefaultFormats - override protected def withFixture(test: NoArgTest): Outcome = { - assume(!sys.props.getOrElse("skipPySpark2Tests", "false").toBoolean, "Skipping PySpark2 tests.") - test() - } - override def createInterpreter(): Interpreter = { val sparkConf = new SparkConf() PythonInterpreter(sparkConf, new SparkEntries(sparkConf)) diff --git a/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala index 486dffef1..b54be11a6 100644 --- a/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala +++ b/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala @@ -170,13 +170,7 @@ abstract class PythonSessionSpec extends BaseSessionSpec(PySpark) { } } -class Python2SessionSpec extends PythonSessionSpec { - - override protected def withFixture(test: NoArgTest): Outcome = { - assume(!sys.props.getOrElse("skipPySpark2Tests", "false").toBoolean, "Skipping PySpark2 tests.") - test() - } -} +class Python2SessionSpec extends PythonSessionSpec class Python3SessionSpec extends PythonSessionSpec with BeforeAndAfterAll { diff --git a/scala-api/scala-2.11/pom.xml b/scala-api/scala-2.11/pom.xml deleted file mode 100644 index 4c021fc37..000000000 --- a/scala-api/scala-2.11/pom.xml +++ /dev/null @@ -1,32 +0,0 @@ - - - - 4.0.0 - org.apache.livy - livy-scala-api_2.11 - 0.10.0-incubating-SNAPSHOT - jar - - - org.apache.livy - livy-scala-api-parent - 0.10.0-incubating-SNAPSHOT - ../pom.xml - - - diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 4250794dc..0667b718c 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -246,16 +246,15 @@ object InteractiveSession extends Logging { } else { val sparkHome = livyConf.sparkHome().get val libdir = sparkMajorVersion match { - case 2 | 3 => + case 3 => if (new File(sparkHome, "RELEASE").isFile) { new File(sparkHome, "jars") - } else if (new File(sparkHome, "assembly/target/scala-2.11/jars").isDirectory) { - new File(sparkHome, "assembly/target/scala-2.11/jars") } else { new File(sparkHome, "assembly/target/scala-2.12/jars") } case v => - throw new RuntimeException(s"Unsupported Spark major version: $sparkMajorVersion") + throw new RuntimeException( + s"Unsupported Spark major version: $sparkMajorVersion (minimum 3.0 required)") } val jars = if (!libdir.isDirectory) { Seq.empty[String] diff --git a/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala b/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala index c1b4320e9..6bd8c3807 100644 --- a/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala +++ b/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala @@ -41,17 +41,11 @@ object LivySparkUtils extends Logging { // Spark 3.1 + Scala 2.12 (3, 1) -> "2.12", // Spark 3.0 + Scala 2.12 - (3, 0) -> "2.12", - // Spark 2.4 + Scala 2.11 - (2, 4) -> "2.11", - // Spark 2.3 + Scala 2.11 - (2, 3) -> "2.11", - // Spark 2.2 + Scala 2.11 - (2, 2) -> "2.11" + (3, 0) -> "2.12" ) - // Supported Spark version - private val MIN_VERSION = (2, 2) + // Supported Spark version (Spark 2.x support has been removed) + private val MIN_VERSION = (3, 0) private val MAX_VERSION = (3, 6) private val sparkVersionRegex = """version (.*)""".r.unanchored diff --git a/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala b/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala index ea62d0cc2..24b612ccd 100644 --- a/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala +++ b/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala @@ -44,10 +44,10 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu } test("should recognize supported Spark versions") { - testSparkVersion("2.2.0") - testSparkVersion("2.3.0") - testSparkVersion("2.4.0") testSparkVersion("3.0.0") + testSparkVersion("3.1.0") + testSparkVersion("3.2.0") + testSparkVersion("3.5.0") } test("should complain about unsupported Spark versions") { @@ -59,6 +59,9 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu intercept[IllegalArgumentException] { testSparkVersion("1.6.0") } intercept[IllegalArgumentException] { testSparkVersion("2.0.1") } intercept[IllegalArgumentException] { testSparkVersion("2.1.3") } + intercept[IllegalArgumentException] { testSparkVersion("2.2.0") } + intercept[IllegalArgumentException] { testSparkVersion("2.3.0") } + intercept[IllegalArgumentException] { testSparkVersion("2.4.5") } } test("should fail on bad version") { @@ -85,29 +88,28 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu } test("defaultSparkScalaVersion() should return default Scala version") { - defaultSparkScalaVersion(formatSparkVersion("2.2.1")) shouldBe "2.11" - defaultSparkScalaVersion(formatSparkVersion("2.3.0")) shouldBe "2.11" - defaultSparkScalaVersion(formatSparkVersion("2.4.0")) shouldBe "2.11" defaultSparkScalaVersion(formatSparkVersion("3.0.0")) shouldBe "2.12" + defaultSparkScalaVersion(formatSparkVersion("3.1.0")) shouldBe "2.12" + defaultSparkScalaVersion(formatSparkVersion("3.5.0")) shouldBe "2.12" } test("sparkScalaVersion() should use spark-submit detected Scala version.") { - sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.10"), livyConf) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("1.6.0"), Some("2.11"), livyConf) shouldBe "2.11" + sparkScalaVersion(formatSparkVersion("3.0.0"), Some("2.12"), livyConf) shouldBe "2.12" + sparkScalaVersion(formatSparkVersion("3.1.0"), Some("2.12"), livyConf) shouldBe "2.12" } test("sparkScalaVersion() should throw if configured and detected Scala version mismatch.") { intercept[IllegalArgumentException] { - sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.11"), livyConf210) + sparkScalaVersion(formatSparkVersion("3.0.0"), Some("2.11"), livyConf210) } intercept[IllegalArgumentException] { - sparkScalaVersion(formatSparkVersion("1.6.1"), Some("2.10"), livyConf211) + sparkScalaVersion(formatSparkVersion("3.1.0"), Some("2.10"), livyConf211) } } test("sparkScalaVersion() should use default Spark Scala version.") { - sparkScalaVersion(formatSparkVersion("2.2.0"), None, livyConf) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("2.3.1"), None, livyConf) shouldBe "2.11" sparkScalaVersion(formatSparkVersion("3.0.0"), None, livyConf) shouldBe "2.12" + sparkScalaVersion(formatSparkVersion("3.1.0"), None, livyConf) shouldBe "2.12" + sparkScalaVersion(formatSparkVersion("3.5.0"), None, livyConf) shouldBe "2.12" } } diff --git a/test-lib/pom.xml b/test-lib/pom.xml index 1f35e0828..5d4241898 100644 --- a/test-lib/pom.xml +++ b/test-lib/pom.xml @@ -85,36 +85,6 @@ - - org.codehaus.mojo - build-helper-maven-plugin - - - parse-spark-version - process-sources - - parse-version - - - spark - ${spark.version} - - - - add-spark2-source-code - process-sources - - add-source - - - - ${project.basedir}/src/main/spark${spark.majorVersion}/scala - ${project.basedir}/src/main/spark${spark.majorVersion}/java - - - - - diff --git a/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/DatasetTest.java b/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/DatasetTest.java deleted file mode 100644 index 92940c51f..000000000 --- a/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/DatasetTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.livy.test.jobs.spark2; - -import java.util.Arrays; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; - -import org.apache.livy.Job; -import org.apache.livy.JobContext; - -public class DatasetTest implements Job { - - @Override - public Long call(JobContext jc) throws Exception { - SparkSession spark = jc.sparkSession(); - - JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3)).map( - new Function() { - public Row call(Integer integer) throws Exception { - return RowFactory.create(integer); - } - }); - StructType schema = DataTypes.createStructType(new StructField[] { - DataTypes.createStructField("value", DataTypes.IntegerType, false) - }); - - Dataset ds = spark.createDataFrame(rdd, schema); - - return ds.filter(new FilterFunction() { - @Override - public boolean call(Row row) throws Exception { - return row.getInt(0) >= 2; - } - }).count(); - } -} diff --git a/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/SparkSessionTest.java b/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/SparkSessionTest.java deleted file mode 100644 index d04d5e577..000000000 --- a/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/SparkSessionTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.livy.test.jobs.spark2; - -import java.util.Arrays; - -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; - -import org.apache.livy.Job; -import org.apache.livy.JobContext; - -public class SparkSessionTest implements Job { - - @Override - public Long call(JobContext jc) throws Exception { - // Make sure SparkSession and SparkContext is callable - SparkSession session = jc.sparkSession(); - - JavaSparkContext sc = new JavaSparkContext(session.sparkContext()); - return sc.parallelize(Arrays.asList(1, 2, 3)).count(); - } -} diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala index 03e10c35a..a3b0af7ba 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala @@ -74,7 +74,7 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC val supportUseDatabase: Boolean = { val sparkVersion = server.livyConf.get(LivyConf.LIVY_SPARK_VERSION) val (sparkMajorVersion, _) = LivySparkUtils.formatSparkVersion(sparkVersion) - sparkMajorVersion > 1 || server.livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT) + sparkMajorVersion >= 3 || server.livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT) } // Configs from Hive diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala index cc28b02e6..dd688a64a 100644 --- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala +++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala @@ -29,7 +29,7 @@ import org.apache.livy.LivyConf trait CommonThriftTests { def hiveSupportEnabled(sparkMajorVersion: Int, livyConf: LivyConf): Boolean = { - sparkMajorVersion > 1 || livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT) + sparkMajorVersion >= 3 || livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT) } def dataTypesTest(statement: Statement, mapSupported: Boolean): Unit = {