diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml
index 754f26f58..968937e0c 100644
--- a/.github/workflows/integration-tests.yaml
+++ b/.github/workflows/integration-tests.yaml
@@ -29,18 +29,15 @@ jobs:
strategy:
matrix:
maven_profile:
- - "-Pscala-2.11 -Pspark2 -DskipRTests"
- "-Pscala-2.12 -Pspark3"
jdk_path:
- "/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java"
- include:
- - maven_profile: "-Pscala-2.12 -Pspark3"
- jdk_path: "/usr/lib/jvm/java-17-openjdk-amd64/bin/java"
+ - "/usr/lib/jvm/java-17-openjdk-amd64/bin/java"
steps:
- -
+ -
name: Checkout
uses: actions/checkout@v3
- -
+ -
name: Cache local Maven repository
uses: actions/cache@v3
with:
@@ -51,14 +48,12 @@ jobs:
restore-keys: |
${{ runner.os }}-maven-
-
- name: Set Python 3 as default for Spark 3 builds
- if: ${{ contains(matrix.maven_profile, 'spark3') }}
- # This can be removed once support for Python 2 and Spark 2 is removed and the default python executable is python3
+ name: Set Python 3 as default
run: pyenv global 3 && echo "PYSPARK_PYTHON=$(which python3)" >> "$GITHUB_ENV"
-
name: Set JDK version
run: update-alternatives --set java ${{ matrix.jdk_path }}
- -
+ -
name: Build with Maven
run: mvn -Pthriftserver ${{ matrix.maven_profile }} -DskipTests -Dmaven.javadoc.skip=true -B -V -e verify
-
diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml
index f946fd97c..ccd22a0ec 100644
--- a/.github/workflows/unit-tests.yaml
+++ b/.github/workflows/unit-tests.yaml
@@ -27,19 +27,15 @@ jobs:
strategy:
matrix:
maven_profile:
- - "-Pscala-2.11 -Pspark2"
- - "-Pscala-2.12 -Pspark2"
- "-Pscala-2.12 -Pspark3"
jdk_path:
- "/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java"
- include:
- - maven_profile: "-Pscala-2.12 -Pspark3"
- jdk_path: "/usr/lib/jvm/java-17-openjdk-amd64/bin/java"
+ - "/usr/lib/jvm/java-17-openjdk-amd64/bin/java"
steps:
- -
+ -
name: Checkout
uses: actions/checkout@v3
- -
+ -
name: Cache local Maven repository
uses: actions/cache@v3
with:
diff --git a/.travis.yml b/.travis.yml
index d661e30b5..2ea3a96b6 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -21,25 +21,12 @@ language: scala
matrix:
include:
- - name: "Spark 2.4 Unit Tests (Scala 2.11)"
- scala: 2.11.12
- env: MVN_FLAG='-Pscala-2.11 -Pspark2 -Pthriftserver -DskipITs'
- - name: "Spark 2.4 ITs (Scala 2.11)"
- scala: 2.11.12
- env: MVN_FLAG='-Pscala-2.11 -Pspark2 -Pthriftserver -DskipTests'
- - name: "Spark 2.4 Unit Tests (Scala 2.12)"
- scala: 2.12.10
- env: MVN_FLAG='-Pscala-2.12 -Pspark2 -Pthriftserver -DskipITs'
- - name: "Spark 2.4 ITs (Scala 2.12)"
- scala: 2.12.10
- env: MVN_FLAG='-Pscala-2.12 -Pspark2 -Pthriftserver -DskipTests'
-# No scala 2.11.x build for spark3
- - name: "Spark 3.0 Unit Tests (Scala 2.12)"
+ - name: "Spark 3.x Unit Tests (Scala 2.12)"
scala: 2.12.10
env: MVN_FLAG='-Pscala-2.12 -Pspark3 -Pthriftserver -DskipITs'
- - name: "Spark 3.0 ITs (Scala 2.12)"
+ - name: "Spark 3.x ITs (Scala 2.12)"
scala: 2.12.10
- env: MVN_FLAG='-Pscala-2.12 -Pspark3 -Pthriftserver -DskipITs'
+ env: MVN_FLAG='-Pscala-2.12 -Pspark3 -Pthriftserver -DskipTests'
jdk:
- openjdk8
diff --git a/README.md b/README.md
index b653569f4..5fbe4f468 100644
--- a/README.md
+++ b/README.md
@@ -57,7 +57,7 @@ Required python packages for building Livy:
To run Livy, you will also need a Spark installation. You can get Spark releases at
https://spark.apache.org/downloads.html.
-Livy requires Spark 2.4+. You can switch to a different version of Spark by setting the
+Livy requires Spark 3.0+. You can switch to a different version of Spark by setting the
``SPARK_HOME`` environment variable in the Livy server process, without needing to rebuild Livy.
@@ -82,7 +82,7 @@ docker run --rm -it -v $(pwd):/workspace -v $HOME/.m2:/root/.m2 livy-ci mvn pack
> **Note**: The `docker run` command maps the maven repository to your host machine's maven cache so subsequent runs will not need to download dependencies.
-By default Livy is built against Apache Spark 2.4.5, but the version of Spark used when running
+By default Livy is built against Apache Spark 3.3.4, but the version of Spark used when running
Livy does not need to match the version used to build Livy. Livy internally handles the differences
between different Spark versions.
@@ -93,8 +93,6 @@ version of Spark without needing to rebuild.
| Flag | Purpose |
|--------------|--------------------------------------------------------------------|
-| -Phadoop2 | Choose Hadoop2 based build dependencies (default configuration) |
-| -Pspark2 | Choose Spark 2.x based build dependencies (default configuration) |
-| -Pspark3 | Choose Spark 3.x based build dependencies |
-| -Pscala-2.11 | Choose Scala 2.11 based build dependencies (default configuration) |
-| -Pscala-2.12 | Choose scala 2.12 based build dependencies |
+| -Phadoop2 | Choose Hadoop2 based build dependencies |
+| -Pspark3 | Choose Spark 3.x based build dependencies (default configuration) |
+| -Pscala-2.12 | Choose Scala 2.12 based build dependencies (default configuration) |
diff --git a/core/scala-2.11/pom.xml b/core/scala-2.11/pom.xml
deleted file mode 100644
index 950d5b261..000000000
--- a/core/scala-2.11/pom.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-
-
-
- 4.0.0
- org.apache.livy
- livy-core_2.11
- 0.10.0-incubating-SNAPSHOT
- jar
-
-
- org.apache.livy
- livy-core-parent
- 0.10.0-incubating-SNAPSHOT
- ../pom.xml
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
-
-
- test-jar
-
-
-
-
-
-
-
-
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 055a01b03..a803b3573 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -246,35 +246,6 @@
-
- org.codehaus.mojo
- build-helper-maven-plugin
-
-
- parse-spark-version
- process-test-sources
-
- parse-version
-
-
- spark
- ${spark.version}
-
-
-
- add-spark-version-specific-test
- process-test-sources
-
- add-test-source
-
-
-
- ${project.basedir}/src/test/spark${spark.majorVersion}/scala
-
-
-
-
-
org.apache.maven.plugins
diff --git a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
index e5d77cab4..ece0fad5f 100644
--- a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
+++ b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
@@ -87,10 +87,7 @@ class InteractiveIT extends BaseIntegrationTestSuite {
s.run("from pyspark.sql.types import Row").verifyResult("")
s.run("x = [Row(age=1, name=u'a'), Row(age=2, name=u'b'), Row(age=3, name=u'c')]")
.verifyResult("")
- // Check if we're running with Spark2.
- if (s.run("spark").result().isLeft) {
- s.run("sqlContext.sparkSession").verifyResult(".*pyspark\\.sql\\.session\\.SparkSession.*")
- }
+ s.run("spark").verifyResult(".*pyspark\\.sql\\.session\\.SparkSession.*")
s.run("%table x").verifyResult(".*headers.*type.*name.*data.*")
s.run("abcde").verifyError(ename = "NameError", evalue = "name 'abcde' is not defined")
s.run("raise KeyError('foo')").verifyError(ename = "KeyError", evalue = "'foo'")
diff --git a/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala b/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala
deleted file mode 100644
index 38230f822..000000000
--- a/integration-test/src/test/spark2/scala/Spark2JobApiIT.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.livy.test
-
-import java.io.File
-import java.net.URI
-import java.util.concurrent.{TimeUnit, Future => JFuture}
-import javax.servlet.http.HttpServletResponse
-
-import scala.util.Properties
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.http.client.methods.HttpGet
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.livy._
-import org.apache.livy.client.common.HttpMessages._
-import org.apache.livy.sessions.SessionKindModule
-import org.apache.livy.test.framework.BaseIntegrationTestSuite
-import org.apache.livy.test.jobs.spark2._
-import org.apache.livy.utils.LivySparkUtils
-
-class Spark2JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logging {
-
- private var client: LivyClient = _
- private var sessionId: Int = _
- private val mapper = new ObjectMapper()
- .registerModule(DefaultScalaModule)
- .registerModule(new SessionKindModule())
-
- override def afterAll(): Unit = {
- super.afterAll()
-
- if (client != null) {
- client.stop(true)
- }
-
- livyClient.connectSession(sessionId).stop()
- }
-
- scalaTest("create a new session and upload test jar") {
- val prevSessionCount = sessionList().total
- val tempClient = createClient(livyEndpoint)
-
- try {
- // Figure out the session ID by poking at the REST endpoint. We should probably expose this
- // in the Java API.
- val list = sessionList()
- assert(list.total === prevSessionCount + 1)
- val tempSessionId = list.sessions(0).id
-
- livyClient.connectSession(tempSessionId).verifySessionIdle()
- waitFor(tempClient.uploadJar(new File(testLib)))
-
- client = tempClient
- sessionId = tempSessionId
- } finally {
- if (client == null) {
- try {
- if (tempClient != null) {
- tempClient.stop(true)
- }
- } catch {
- case e: Exception => warn("Error stopping client.", e)
- }
- }
- }
- }
-
- scalaTest("run spark2 job") {
- assume(client != null, "Client not active.")
- val result = waitFor(client.submit(new SparkSessionTest()))
- assert(result === 3)
- }
-
- scalaTest("run spark2 dataset job") {
- assume(client != null, "Client not active.")
- val result = waitFor(client.submit(new DatasetTest()))
- assert(result === 2)
- }
-
- private def waitFor[T](future: JFuture[T]): T = {
- future.get(60, TimeUnit.SECONDS)
- }
-
- private def sessionList(): SessionList = {
- val httpGet = new HttpGet(s"$livyEndpoint/sessions/")
- val r = livyClient.httpClient.execute(httpGet)
- val statusCode = r.getStatusLine().getStatusCode()
- val responseBody = r.getEntity().getContent
- val sessionList = mapper.readValue(responseBody, classOf[SessionList])
- r.close()
-
- assert(statusCode == HttpServletResponse.SC_OK)
- sessionList
- }
-
- private def createClient(uri: String): LivyClient = {
- new LivyClientBuilder().setURI(new URI(uri)).build()
- }
-
- protected def scalaTest(desc: String)(testFn: => Unit): Unit = {
- test(desc) {
- val livyConf = new LivyConf()
- val (sparkVersion, scalaVersion) = LivySparkUtils.sparkSubmitVersion(livyConf)
- val formattedSparkVersion = LivySparkUtils.formatSparkVersion(sparkVersion)
- val versionString =
- LivySparkUtils.sparkScalaVersion(formattedSparkVersion, scalaVersion, livyConf)
-
- assume(versionString == LivySparkUtils.formatScalaVersion(Properties.versionNumberString),
- s"Scala test can only be run with ${Properties.versionString}")
- testFn
- }
- }
-}
diff --git a/pom.xml b/pom.xml
index 68c71b540..715de69e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,9 +82,8 @@
compile
1.7.36
1.2.26
- 2.4.5
- 2.4.5
- ${spark.scala-2.11.version}
+ 3.3.4
+ ${spark.scala-2.12.version}
5.6.0
3.0.0
1.15
@@ -126,13 +125,10 @@
${execution.root}/dev/spark
- 2.7
+ 2
2.7.3
-
- 2.11
- 2.11.12
-
- 2.4.5
+ 2.12
+ 2.12.18
1.8
0.10.9
3.5.3
@@ -147,8 +143,6 @@
false
-
- false
false
@@ -981,7 +975,6 @@
false
${project.version}
${skipRTests}
- ${skipPySpark2Tests}
${skipPySpark3Tests}
${test.redirectToFile}
@@ -1011,7 +1004,6 @@
false
${project.version}
${skipRTests}
- ${skipPySpark2Tests}
${skipPySpark3Tests}
${extraJavaTestArgs}
@@ -1326,13 +1318,6 @@
2.7.3
-
- scala-2.11
-
- 2.11
- 2.11.12
-
-
scala-2.12
@@ -1349,19 +1334,6 @@
thriftserver/client
-
- spark2
-
- 2.4.5
- 1.8
- 0.10.9
- 3.5.3
- spark-${spark.version}-bin-hadoop${hadoop.major-minor.version}
-
- https://archive.apache.org/dist/spark/spark-${spark.version}/${spark.bin.name}.tgz
-
-
-
spark3
@@ -1379,7 +1351,6 @@
https://archive.apache.org/dist/spark/spark-${spark.version}/${spark.bin.name}.tgz
- true
diff --git a/repl/scala-2.11/pom.xml b/repl/scala-2.11/pom.xml
deleted file mode 100644
index 0b740f744..000000000
--- a/repl/scala-2.11/pom.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-
-
-
- 4.0.0
- org.apache.livy
- livy-repl_2.11
- 0.10.0-incubating-SNAPSHOT
- jar
-
-
- org.apache.livy
- livy-repl-parent
- 0.10.0-incubating-SNAPSHOT
- ../pom.xml
-
-
-
diff --git a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala b/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
deleted file mode 100644
index 48dca17b3..000000000
--- a/repl/scala-2.11/src/main/scala/org/apache/livy/repl/SparkInterpreter.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.livy.repl
-
-import java.io.File
-import java.net.{URL, URLClassLoader}
-import java.nio.file.{Files, Paths}
-
-import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter.Completion.ScalaCompleter
-import scala.tools.nsc.interpreter.IMain
-import scala.tools.nsc.interpreter.JLineCompletion
-import scala.tools.nsc.interpreter.JPrintWriter
-import scala.tools.nsc.interpreter.Results.Result
-
-import org.apache.spark.SparkConf
-import org.apache.spark.repl.SparkILoop
-
-/**
- * This represents a Scala 2.11 Spark interpreter. It is not thread safe.
- */
-class SparkInterpreter(protected override val conf: SparkConf) extends AbstractSparkInterpreter {
-
- private var sparkILoop: SparkILoop = _
-
- override def start(): Unit = {
- require(sparkILoop == null)
-
- val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir"))
- val outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile
- outputDir.deleteOnExit()
- conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
-
- val settings = new Settings()
- settings.processArguments(List("-Yrepl-class-based",
- "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
- settings.usejavacp.value = true
- settings.embeddedDefaults(Thread.currentThread().getContextClassLoader())
-
- sparkILoop = new SparkILoop(None, new JPrintWriter(outputStream, true))
- sparkILoop.settings = settings
- sparkILoop.createInterpreter()
- sparkILoop.initializeSynchronous()
-
- restoreContextClassLoader {
- sparkILoop.setContextClassLoader()
-
- var classLoader = Thread.currentThread().getContextClassLoader
- while (classLoader != null) {
- if (classLoader.getClass.getCanonicalName ==
- "org.apache.spark.util.MutableURLClassLoader") {
- val extraJarPath = classLoader.asInstanceOf[URLClassLoader].getURLs()
- // Check if the file exists. Otherwise an exception will be thrown.
- .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
- // Livy rsc and repl are also in the extra jars list. Filter them out.
- .filterNot { u => Paths.get(u.toURI).getFileName.toString.startsWith("livy-") }
- // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
- .filterNot { u =>
- Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
- }
-
- extraJarPath.foreach { p => debug(s"Adding $p to Scala interpreter's class path...") }
- sparkILoop.addUrlsToClassPath(extraJarPath: _*)
- classLoader = null
- } else {
- classLoader = classLoader.getParent
- }
- }
-
- postStart()
- }
- }
-
- override def close(): Unit = synchronized {
- super.close()
-
- if (sparkILoop != null) {
- sparkILoop.closeInterpreter()
- sparkILoop = null
- }
- }
-
- override def addJar(jar: String): Unit = {
- sparkILoop.addUrlsToClassPath(new URL(jar))
- }
-
- override protected def isStarted(): Boolean = {
- sparkILoop != null
- }
-
- override protected def interpret(code: String): Result = {
- sparkILoop.interpret(code)
- }
-
- override protected def completeCandidates(code: String, cursor: Int) : Array[String] = {
- val completer : ScalaCompleter = {
- try {
- val cls = Class.forName("scala.tools.nsc.interpreter.PresentationCompilerCompleter")
- cls.getDeclaredConstructor(classOf[IMain]).newInstance(sparkILoop.intp)
- .asInstanceOf[ScalaCompleter]
- } catch {
- case e : ClassNotFoundException => new JLineCompletion(sparkILoop.intp).completer
- }
- }
- completer.complete(code, cursor).candidates.toArray
- }
-
- override protected def valueOfTerm(name: String): Option[Any] = {
- // IMain#valueOfTerm will always return None, so use other way instead.
- Option(sparkILoop.lastRequest.lineRep.call("$result"))
- }
-
- override protected def bind(name: String,
- tpe: String,
- value: Object,
- modifier: List[String]): Unit = {
- sparkILoop.beQuietDuring {
- sparkILoop.bind(name, tpe, value, modifier)
- }
- }
-}
diff --git a/repl/scala-2.11/src/test/scala/org/apache/livy/repl/SparkInterpreterSpec.scala b/repl/scala-2.11/src/test/scala/org/apache/livy/repl/SparkInterpreterSpec.scala
deleted file mode 100644
index d92203473..000000000
--- a/repl/scala-2.11/src/test/scala/org/apache/livy/repl/SparkInterpreterSpec.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.livy.repl
-
-import org.scalatest._
-
-import org.apache.livy.LivyBaseUnitTestSuite
-
-class SparkInterpreterSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite {
- describe("SparkInterpreter") {
- val interpreter = new SparkInterpreter(null)
-
- it("should parse Scala compile error.") {
- // Regression test for LIVY-.
- val error =
- """:27: error: type mismatch;
- | found : Int
- | required: String
- | sc.setJobGroup(groupName, groupName, true)
- | ^
- |:27: error: type mismatch;
- | found : Int
- | required: String
- | sc.setJobGroup(groupName, groupName, true)
- | ^
- |""".stripMargin
-
- val parsedError = AbstractSparkInterpreter.KEEP_NEWLINE_REGEX.split(error)
-
- val expectedTraceback = parsedError.tail
-
- val (ename, traceback) = interpreter.parseError(error)
- ename shouldBe ":27: error: type mismatch;"
- traceback shouldBe expectedTraceback
- }
-
- it("should parse Scala runtime error.") {
- val error =
- """java.lang.RuntimeException: message
- | ... 48 elided
- |
- |Tailing message""".stripMargin
-
- val parsedError = AbstractSparkInterpreter.KEEP_NEWLINE_REGEX.split(error)
-
- val expectedTraceback = parsedError.tail
-
- val (ename, traceback) = interpreter.parseError(error)
- ename shouldBe "java.lang.RuntimeException: message"
- traceback shouldBe expectedTraceback
- }
- }
-}
diff --git a/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala b/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala
index c74c8c80d..407762623 100644
--- a/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala
@@ -201,10 +201,8 @@ class SparkRInterpreter(
sendRequest("""assign(".sc", SparkR:::callJStatic("org.apache.livy.repl.SparkRInterpreter", "getSparkContext"), envir = SparkR:::.sparkREnv)""")
sendRequest("""assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)""")
- if (sparkMajorVersion >= 2) {
- sendRequest("""assign(".sparkRsession", SparkR:::callJStatic("org.apache.livy.repl.SparkRInterpreter", "getSparkSession"), envir = SparkR:::.sparkREnv)""")
- sendRequest("""assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)""")
- }
+ sendRequest("""assign(".sparkRsession", SparkR:::callJStatic("org.apache.livy.repl.SparkRInterpreter", "getSparkSession"), envir = SparkR:::.sparkREnv)""")
+ sendRequest("""assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)""")
sendRequest("""assign(".sqlc", SparkR:::callJStatic("org.apache.livy.repl.SparkRInterpreter", "getSQLContext"), envir = SparkR:::.sparkREnv)""")
sendRequest("""assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)""")
diff --git a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
index bce8fd460..e2d63e8c4 100644
--- a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
@@ -282,11 +282,6 @@ class Python2InterpreterSpec extends PythonBaseInterpreterSpec {
implicit val formats = DefaultFormats
- override protected def withFixture(test: NoArgTest): Outcome = {
- assume(!sys.props.getOrElse("skipPySpark2Tests", "false").toBoolean, "Skipping PySpark2 tests.")
- test()
- }
-
override def createInterpreter(): Interpreter = {
val sparkConf = new SparkConf()
PythonInterpreter(sparkConf, new SparkEntries(sparkConf))
diff --git a/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala b/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala
index 486dffef1..b54be11a6 100644
--- a/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/PythonSessionSpec.scala
@@ -170,13 +170,7 @@ abstract class PythonSessionSpec extends BaseSessionSpec(PySpark) {
}
}
-class Python2SessionSpec extends PythonSessionSpec {
-
- override protected def withFixture(test: NoArgTest): Outcome = {
- assume(!sys.props.getOrElse("skipPySpark2Tests", "false").toBoolean, "Skipping PySpark2 tests.")
- test()
- }
-}
+class Python2SessionSpec extends PythonSessionSpec
class Python3SessionSpec extends PythonSessionSpec with BeforeAndAfterAll {
diff --git a/scala-api/scala-2.11/pom.xml b/scala-api/scala-2.11/pom.xml
deleted file mode 100644
index 4c021fc37..000000000
--- a/scala-api/scala-2.11/pom.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-
-
-
- 4.0.0
- org.apache.livy
- livy-scala-api_2.11
- 0.10.0-incubating-SNAPSHOT
- jar
-
-
- org.apache.livy
- livy-scala-api-parent
- 0.10.0-incubating-SNAPSHOT
- ../pom.xml
-
-
-
diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index 4250794dc..0667b718c 100644
--- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -246,16 +246,15 @@ object InteractiveSession extends Logging {
} else {
val sparkHome = livyConf.sparkHome().get
val libdir = sparkMajorVersion match {
- case 2 | 3 =>
+ case 3 =>
if (new File(sparkHome, "RELEASE").isFile) {
new File(sparkHome, "jars")
- } else if (new File(sparkHome, "assembly/target/scala-2.11/jars").isDirectory) {
- new File(sparkHome, "assembly/target/scala-2.11/jars")
} else {
new File(sparkHome, "assembly/target/scala-2.12/jars")
}
case v =>
- throw new RuntimeException(s"Unsupported Spark major version: $sparkMajorVersion")
+ throw new RuntimeException(
+ s"Unsupported Spark major version: $sparkMajorVersion (minimum 3.0 required)")
}
val jars = if (!libdir.isDirectory) {
Seq.empty[String]
diff --git a/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala b/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala
index c1b4320e9..6bd8c3807 100644
--- a/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala
+++ b/server/src/main/scala/org/apache/livy/utils/LivySparkUtils.scala
@@ -41,17 +41,11 @@ object LivySparkUtils extends Logging {
// Spark 3.1 + Scala 2.12
(3, 1) -> "2.12",
// Spark 3.0 + Scala 2.12
- (3, 0) -> "2.12",
- // Spark 2.4 + Scala 2.11
- (2, 4) -> "2.11",
- // Spark 2.3 + Scala 2.11
- (2, 3) -> "2.11",
- // Spark 2.2 + Scala 2.11
- (2, 2) -> "2.11"
+ (3, 0) -> "2.12"
)
- // Supported Spark version
- private val MIN_VERSION = (2, 2)
+ // Supported Spark version (Spark 2.x support has been removed)
+ private val MIN_VERSION = (3, 0)
private val MAX_VERSION = (3, 6)
private val sparkVersionRegex = """version (.*)""".r.unanchored
diff --git a/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala b/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala
index ea62d0cc2..24b612ccd 100644
--- a/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala
+++ b/server/src/test/scala/org/apache/livy/utils/LivySparkUtilsSuite.scala
@@ -44,10 +44,10 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu
}
test("should recognize supported Spark versions") {
- testSparkVersion("2.2.0")
- testSparkVersion("2.3.0")
- testSparkVersion("2.4.0")
testSparkVersion("3.0.0")
+ testSparkVersion("3.1.0")
+ testSparkVersion("3.2.0")
+ testSparkVersion("3.5.0")
}
test("should complain about unsupported Spark versions") {
@@ -59,6 +59,9 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu
intercept[IllegalArgumentException] { testSparkVersion("1.6.0") }
intercept[IllegalArgumentException] { testSparkVersion("2.0.1") }
intercept[IllegalArgumentException] { testSparkVersion("2.1.3") }
+ intercept[IllegalArgumentException] { testSparkVersion("2.2.0") }
+ intercept[IllegalArgumentException] { testSparkVersion("2.3.0") }
+ intercept[IllegalArgumentException] { testSparkVersion("2.4.5") }
}
test("should fail on bad version") {
@@ -85,29 +88,28 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu
}
test("defaultSparkScalaVersion() should return default Scala version") {
- defaultSparkScalaVersion(formatSparkVersion("2.2.1")) shouldBe "2.11"
- defaultSparkScalaVersion(formatSparkVersion("2.3.0")) shouldBe "2.11"
- defaultSparkScalaVersion(formatSparkVersion("2.4.0")) shouldBe "2.11"
defaultSparkScalaVersion(formatSparkVersion("3.0.0")) shouldBe "2.12"
+ defaultSparkScalaVersion(formatSparkVersion("3.1.0")) shouldBe "2.12"
+ defaultSparkScalaVersion(formatSparkVersion("3.5.0")) shouldBe "2.12"
}
test("sparkScalaVersion() should use spark-submit detected Scala version.") {
- sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.10"), livyConf) shouldBe "2.10"
- sparkScalaVersion(formatSparkVersion("1.6.0"), Some("2.11"), livyConf) shouldBe "2.11"
+ sparkScalaVersion(formatSparkVersion("3.0.0"), Some("2.12"), livyConf) shouldBe "2.12"
+ sparkScalaVersion(formatSparkVersion("3.1.0"), Some("2.12"), livyConf) shouldBe "2.12"
}
test("sparkScalaVersion() should throw if configured and detected Scala version mismatch.") {
intercept[IllegalArgumentException] {
- sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.11"), livyConf210)
+ sparkScalaVersion(formatSparkVersion("3.0.0"), Some("2.11"), livyConf210)
}
intercept[IllegalArgumentException] {
- sparkScalaVersion(formatSparkVersion("1.6.1"), Some("2.10"), livyConf211)
+ sparkScalaVersion(formatSparkVersion("3.1.0"), Some("2.10"), livyConf211)
}
}
test("sparkScalaVersion() should use default Spark Scala version.") {
- sparkScalaVersion(formatSparkVersion("2.2.0"), None, livyConf) shouldBe "2.11"
- sparkScalaVersion(formatSparkVersion("2.3.1"), None, livyConf) shouldBe "2.11"
sparkScalaVersion(formatSparkVersion("3.0.0"), None, livyConf) shouldBe "2.12"
+ sparkScalaVersion(formatSparkVersion("3.1.0"), None, livyConf) shouldBe "2.12"
+ sparkScalaVersion(formatSparkVersion("3.5.0"), None, livyConf) shouldBe "2.12"
}
}
diff --git a/test-lib/pom.xml b/test-lib/pom.xml
index 1f35e0828..5d4241898 100644
--- a/test-lib/pom.xml
+++ b/test-lib/pom.xml
@@ -85,36 +85,6 @@
-
- org.codehaus.mojo
- build-helper-maven-plugin
-
-
- parse-spark-version
- process-sources
-
- parse-version
-
-
- spark
- ${spark.version}
-
-
-
- add-spark2-source-code
- process-sources
-
- add-source
-
-
-
- ${project.basedir}/src/main/spark${spark.majorVersion}/scala
- ${project.basedir}/src/main/spark${spark.majorVersion}/java
-
-
-
-
-
diff --git a/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/DatasetTest.java b/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/DatasetTest.java
deleted file mode 100644
index 92940c51f..000000000
--- a/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/DatasetTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.livy.test.jobs.spark2;
-
-import java.util.Arrays;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.FilterFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-
-import org.apache.livy.Job;
-import org.apache.livy.JobContext;
-
-public class DatasetTest implements Job {
-
- @Override
- public Long call(JobContext jc) throws Exception {
- SparkSession spark = jc.sparkSession();
-
- JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
- JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3)).map(
- new Function() {
- public Row call(Integer integer) throws Exception {
- return RowFactory.create(integer);
- }
- });
- StructType schema = DataTypes.createStructType(new StructField[] {
- DataTypes.createStructField("value", DataTypes.IntegerType, false)
- });
-
- Dataset ds = spark.createDataFrame(rdd, schema);
-
- return ds.filter(new FilterFunction() {
- @Override
- public boolean call(Row row) throws Exception {
- return row.getInt(0) >= 2;
- }
- }).count();
- }
-}
diff --git a/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/SparkSessionTest.java b/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/SparkSessionTest.java
deleted file mode 100644
index d04d5e577..000000000
--- a/test-lib/src/main/spark2/java/org/apache/livy/test/jobs/spark2/SparkSessionTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.livy.test.jobs.spark2;
-
-import java.util.Arrays;
-
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SparkSession;
-
-import org.apache.livy.Job;
-import org.apache.livy.JobContext;
-
-public class SparkSessionTest implements Job {
-
- @Override
- public Long call(JobContext jc) throws Exception {
- // Make sure SparkSession and SparkContext is callable
- SparkSession session = jc.sparkSession();
-
- JavaSparkContext sc = new JavaSparkContext(session.sparkContext());
- return sc.parallelize(Arrays.asList(1, 2, 3)).count();
- }
-}
diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
index 03e10c35a..a3b0af7ba 100644
--- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
+++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
@@ -74,7 +74,7 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
val supportUseDatabase: Boolean = {
val sparkVersion = server.livyConf.get(LivyConf.LIVY_SPARK_VERSION)
val (sparkMajorVersion, _) = LivySparkUtils.formatSparkVersion(sparkVersion)
- sparkMajorVersion > 1 || server.livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT)
+ sparkMajorVersion >= 3 || server.livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT)
}
// Configs from Hive
diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
index cc28b02e6..dd688a64a 100644
--- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
+++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
@@ -29,7 +29,7 @@ import org.apache.livy.LivyConf
trait CommonThriftTests {
def hiveSupportEnabled(sparkMajorVersion: Int, livyConf: LivyConf): Boolean = {
- sparkMajorVersion > 1 || livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT)
+ sparkMajorVersion >= 3 || livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT)
}
def dataTypesTest(statement: Statement, mapSupported: Boolean): Unit = {