From af2ad279dbcd390a8cde01d05ca918575646a6bb Mon Sep 17 00:00:00 2001 From: arnavb Date: Sun, 28 Sep 2025 08:20:29 +0000 Subject: [PATCH 1/8] update --- README.md | 8 ++--- dev/merge_livy_pr.py | 33 ++++++++++--------- examples/src/main/python/pi_app.py | 7 ++-- integration-test/src/test/resources/batch.py | 3 +- .../src/test/resources/test_python_api.py | 27 ++++++++------- python-api/src/main/python/livy/client.py | 4 ++- .../src/main/python/livy/job_context.py | 6 ++-- python-api/src/main/python/livy/job_handle.py | 2 ++ .../src/test/python/livy-tests/client_test.py | 1 + repl/src/main/resources/fake_shell.py | 23 ++++++++----- 10 files changed, 67 insertions(+), 47 deletions(-) diff --git a/README.md b/README.md index b653569f4..e1188f293 100644 --- a/README.md +++ b/README.md @@ -29,20 +29,20 @@ To build Livy, you will need: Debian/Ubuntu: * mvn (from ``maven`` package or maven3 tarball) * openjdk-8-jdk (or Oracle JDK 8) - * Python 2.7+ + * Python 3.x+ * R 3.x Redhat/CentOS: * mvn (from ``maven`` package or maven3 tarball) * java-1.8.0-openjdk (or Oracle JDK 8) - * Python 2.7+ + * Python 3.x+ * R 3.x MacOS: * Xcode command line tools * Oracle's JDK 1.8 * Maven (Homebrew) - * Python 2.7+ + * Python 3.x+ * R 3.x Required python packages for building Livy: @@ -96,5 +96,5 @@ version of Spark without needing to rebuild. | -Phadoop2 | Choose Hadoop2 based build dependencies (default configuration) | | -Pspark2 | Choose Spark 2.x based build dependencies (default configuration) | | -Pspark3 | Choose Spark 3.x based build dependencies | -| -Pscala-2.11 | Choose Scala 2.11 based build dependencies (default configuration) | +| -Pscala-2.11 | Choose Scala 2.11 based build dependencies (default configuration) | | -Pscala-2.12 | Choose scala 2.12 based build dependencies | diff --git a/dev/merge_livy_pr.py b/dev/merge_livy_pr.py index 85ce74170..1019d6cbf 100755 --- a/dev/merge_livy_pr.py +++ b/dev/merge_livy_pr.py @@ -34,6 +34,9 @@ # +from __future__ import print_function +from future import standard_library +standard_library.install_aliases() import json import os import re @@ -41,8 +44,8 @@ import sys if sys.version_info[0] < 3: - import urllib2 - from urllib2 import HTTPError + import urllib.request, urllib.error, urllib.parse + from urllib.error import HTTPError input_prompt_fn = raw_input else: import urllib.request as urllib2 @@ -82,10 +85,10 @@ def get_json(url): try: - request = urllib2.Request(url) + request = urllib.request.Request(url) if GITHUB_OAUTH_KEY: request.add_header('Authorization', 'token %s' % GITHUB_OAUTH_KEY) - return json.load(urllib2.urlopen(request)) + return json.load(urllib.request.urlopen(request)) except HTTPError as e: if "X-RateLimit-Remaining" in e.headers and e.headers["X-RateLimit-Remaining"] == '0': print("Exceeded the GitHub API rate limit; see the instructions in " + @@ -126,7 +129,7 @@ def clean_up(): branches = run_cmd("git branch").replace(" ", "").split("\n") - for branch in filter(lambda x: x.startswith(BRANCH_PREFIX), branches): + for branch in [x for x in branches if x.startswith(BRANCH_PREFIX)]: print("Deleting local branch %s" % branch) run_cmd("git branch -D %s" % branch) @@ -243,7 +246,7 @@ def fix_version_from_branch(branch, versions): return versions[0] else: branch_ver = branch.replace("branch-", "") - return filter(lambda x: x.name.startswith(branch_ver), versions)[-1] + return [x for x in versions if x.name.startswith(branch_ver)][-1] def resolve_jira_issue(merge_branches, comment, default_jira_id=""): @@ -275,11 +278,11 @@ def resolve_jira_issue(merge_branches, comment, default_jira_id=""): versions = asf_jira.project_versions("LIVY") versions = sorted(versions, key=lambda x: x.name, reverse=True) - versions = filter(lambda x: x.raw['released'] is False, versions) + versions = [x for x in versions if x.raw['released'] is False] # Consider only x.y.z versions - versions = filter(lambda x: re.match('\d+\.\d+\.\d+', x.name), versions) + versions = [x for x in versions if re.match('\d+\.\d+\.\d+', x.name)] - default_fix_versions = map(lambda x: fix_version_from_branch(x, versions).name, merge_branches) + default_fix_versions = [fix_version_from_branch(x, versions).name for x in merge_branches] for v in default_fix_versions: # Handles the case where we have forked a release branch but not yet made the release. # In this case, if the PR is committed to the master branch and the release branch, we @@ -289,7 +292,7 @@ def resolve_jira_issue(merge_branches, comment, default_jira_id=""): if patch == "0": previous = "%s.%s.%s" % (major, int(minor) - 1, 0) if previous in default_fix_versions: - default_fix_versions = filter(lambda x: x != v, default_fix_versions) + default_fix_versions = [x for x in default_fix_versions if x != v] default_fix_versions = ",".join(default_fix_versions) fix_versions = input_prompt_fn( @@ -299,12 +302,12 @@ def resolve_jira_issue(merge_branches, comment, default_jira_id=""): fix_versions = fix_versions.replace(" ", "").split(",") def get_version_json(version_str): - return filter(lambda v: v.name == version_str, versions)[0].raw + return [v for v in versions if v.name == version_str][0].raw - jira_fix_versions = map(lambda v: get_version_json(v), fix_versions) + jira_fix_versions = [get_version_json(v) for v in fix_versions] - resolve = filter(lambda a: a['name'] == "Resolve Issue", asf_jira.transitions(jira_id))[0] - resolution = filter(lambda r: r.raw['name'] == "Fixed", asf_jira.resolutions())[0] + resolve = [a for a in asf_jira.transitions(jira_id) if a['name'] == "Resolve Issue"][0] + resolution = [r for r in asf_jira.resolutions() if r.raw['name'] == "Fixed"][0] asf_jira.transition_issue( jira_id, resolve["id"], fixVersions=jira_fix_versions, comment=comment, resolution={'id': resolution.raw['id']}) @@ -379,7 +382,7 @@ def main(): original_head = get_current_ref() branches = get_json("%s/branches" % GITHUB_API_BASE) - branch_names = filter(lambda x: x.startswith("branch-"), [x['name'] for x in branches]) + branch_names = [x for x in [x['name'] for x in branches] if x.startswith("branch-")] # Assumes branch names can be sorted lexicographically latest_branch = sorted(branch_names, reverse=True)[0] diff --git a/examples/src/main/python/pi_app.py b/examples/src/main/python/pi_app.py index 2945d4b35..adb40e868 100644 --- a/examples/src/main/python/pi_app.py +++ b/examples/src/main/python/pi_app.py @@ -16,7 +16,10 @@ # from __future__ import print_function +from __future__ import division +from builtins import range +from past.utils import old_div import sys from random import random from operator import add @@ -48,8 +51,8 @@ def f(_): return 1 if x ** 2 + y ** 2 <= 1 else 0 def pi_job(context): - count = context.sc.parallelize(range(1, samples + 1), slices).map(f).reduce(add) - return 4.0 * count / samples + count = context.sc.parallelize(list(range(1, samples + 1)), slices).map(f).reduce(add) + return old_div(4.0 * count, samples) pi = client.submit(pi_job).result() diff --git a/integration-test/src/test/resources/batch.py b/integration-test/src/test/resources/batch.py index 56a53cef9..2f3cf1e5d 100644 --- a/integration-test/src/test/resources/batch.py +++ b/integration-test/src/test/resources/batch.py @@ -16,12 +16,13 @@ # limitations under the License. # +from builtins import range import sys from pyspark import SparkContext output = sys.argv[1] sc = SparkContext(appName="PySpark Test") try: - sc.parallelize(range(100), 10).map(lambda x: (x, x * 2)).saveAsTextFile(output) + sc.parallelize(list(range(100)), 10).map(lambda x: (x, x * 2)).saveAsTextFile(output) finally: sc.stop() diff --git a/integration-test/src/test/resources/test_python_api.py b/integration-test/src/test/resources/test_python_api.py index f89f85d85..7afb04eca 100644 --- a/integration-test/src/test/resources/test_python_api.py +++ b/integration-test/src/test/resources/test_python_api.py @@ -14,6 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from future import standard_library +standard_library.install_aliases() +from builtins import str import os import base64 import json @@ -21,13 +24,13 @@ try: from urllib.parse import urlparse except ImportError: - from urlparse import urlparse + from urllib.parse import urlparse import requests from requests_kerberos import HTTPKerberosAuth, REQUIRED, OPTIONAL import cloudpickle import pytest try: - import httplib + import http.client except ImportError: from http import HTTPStatus as httplib from flaky import flaky @@ -69,7 +72,7 @@ def process_job(job, expected_result, is_error_job=False): header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'} response = requests.request('POST', request_url, headers=header, data=base64_pickled_job_json, auth=request_auth, verify=ssl_cert) - assert response.status_code == httplib.CREATED + assert response.status_code == http.client.CREATED job_id = response.json()['id'] poll_time = 1 @@ -85,7 +88,7 @@ def process_job(job, expected_result, is_error_job=False): poll_time *= 2 assert poll_response.json()['id'] == job_id - assert poll_response.status_code == httplib.OK + assert poll_response.status_code == http.client.OK if not is_error_job: assert poll_response.json()['error'] is None result = poll_response.json()['result'] @@ -109,7 +112,7 @@ def stop_session(): request_url = livy_end_point + "/sessions/" + str(session_id) headers = {'X-Requested-By': 'livy'} response = requests.request('DELETE', request_url, headers=headers, auth=request_auth, verify=ssl_cert) - assert response.status_code == httplib.OK + assert response.status_code == http.client.OK def test_create_session(): @@ -121,7 +124,7 @@ def test_create_session(): json_data = json.dumps({'kind': 'pyspark', 'conf': {'livy.uri': uri.geturl()}}) response = requests.request('POST', request_url, headers=header, data=json_data, auth=request_auth, verify=ssl_cert) - assert response.status_code == httplib.CREATED + assert response.status_code == http.client.CREATED session_id = response.json()['id'] @@ -130,7 +133,7 @@ def test_wait_for_session_to_become_idle(): request_url = livy_end_point + "/sessions/" + str(session_id) header = {'X-Requested-By': 'livy'} response = requests.request('GET', request_url, headers=header, auth=request_auth, verify=ssl_cert) - assert response.status_code == httplib.OK + assert response.status_code == http.client.OK session_state = response.json()['state'] assert session_state == 'idle' @@ -160,7 +163,7 @@ def test_reconnect(): header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'} response = requests.request('POST', request_url, headers=header, auth=request_auth, verify=ssl_cert) - assert response.status_code == httplib.OK + assert response.status_code == http.client.OK assert session_id == response.json()['id'] @@ -171,7 +174,7 @@ def test_add_file(): header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'} response = requests.request('POST', request_url, headers=header, data=json_data, auth=request_auth, verify=ssl_cert) - assert response.status_code == httplib.OK + assert response.status_code == http.client.OK def add_file_job(context): from pyspark import SparkFiles @@ -190,7 +193,7 @@ def test_add_pyfile(): header = {'Content-Type': 'application/json', 'X-Requested-By': 'livy'} response_add_pyfile = requests.request('POST', request_url, headers=header, data=json_data, auth=request_auth, verify=ssl_cert) - assert response_add_pyfile.status_code == httplib.OK + assert response_add_pyfile.status_code == http.client.OK def add_pyfile_job(context): pyfile_module = __import__ (add_pyfile_name) @@ -207,7 +210,7 @@ def test_upload_file(): header = {'X-Requested-By': 'livy'} response = requests.request('POST', request_url, headers=header, files=files, auth=request_auth, verify=ssl_cert) - assert response.status_code == httplib.OK + assert response.status_code == http.client.OK def upload_file_job(context): from pyspark import SparkFiles @@ -226,7 +229,7 @@ def test_upload_pyfile(): files = {'file': upload_pyfile} header = {'X-Requested-By': 'livy'} response = requests.request('POST', request_url, headers=header, files=files, auth=request_auth, verify=ssl_cert) - assert response.status_code == httplib.OK + assert response.status_code == http.client.OK def upload_pyfile_job(context): pyfile_module = __import__ (upload_pyfile_name) diff --git a/python-api/src/main/python/livy/client.py b/python-api/src/main/python/livy/client.py index d9830f321..fb2848452 100644 --- a/python-api/src/main/python/livy/client.py +++ b/python-api/src/main/python/livy/client.py @@ -16,6 +16,8 @@ # from __future__ import absolute_import +from builtins import str +from builtins import object import base64 import cloudpickle import os @@ -357,7 +359,7 @@ def _delete_conf(self, key): self._config.remove_option(self._CONFIG_SECTION, key) def _set_multiple_conf(self, conf_dict): - for key, value in conf_dict.items(): + for key, value in list(conf_dict.items()): self._set_conf(key, value) def _load_config(self, load_defaults, conf_dict): diff --git a/python-api/src/main/python/livy/job_context.py b/python-api/src/main/python/livy/job_context.py index 832364999..537035864 100644 --- a/python-api/src/main/python/livy/job_context.py +++ b/python-api/src/main/python/livy/job_context.py @@ -14,10 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from builtins import object from abc import ABCMeta, abstractproperty, abstractmethod +from future.utils import with_metaclass -class JobContext: +class JobContext(with_metaclass(ABCMeta, object)): """ An abstract class that holds runtime information about the job execution context. @@ -28,8 +30,6 @@ class JobContext: """ - __metaclass__ = ABCMeta - @abstractproperty def sc(self): """ diff --git a/python-api/src/main/python/livy/job_handle.py b/python-api/src/main/python/livy/job_handle.py index 278834c23..e70983c23 100644 --- a/python-api/src/main/python/livy/job_handle.py +++ b/python-api/src/main/python/livy/job_handle.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from builtins import str +from builtins import object import base64 import cloudpickle import sys diff --git a/python-api/src/test/python/livy-tests/client_test.py b/python-api/src/test/python/livy-tests/client_test.py index b6426ae10..7f235c404 100644 --- a/python-api/src/test/python/livy-tests/client_test.py +++ b/python-api/src/test/python/livy-tests/client_test.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from builtins import str import os import pytest import responses diff --git a/repl/src/main/resources/fake_shell.py b/repl/src/main/resources/fake_shell.py index 5472f533e..a0565f2a3 100644 --- a/repl/src/main/resources/fake_shell.py +++ b/repl/src/main/resources/fake_shell.py @@ -16,6 +16,11 @@ # from __future__ import print_function +from future import standard_library +standard_library.install_aliases() +from builtins import str +from builtins import range +from builtins import object import ast from collections import OrderedDict import datetime @@ -35,10 +40,10 @@ import textwrap if sys.version >= '3': - unicode = str + str = str else: - import cStringIO - import StringIO + import io + import io if sys.version_info > (3,8): from ast import Module @@ -85,8 +90,8 @@ def execute_reply_error(exc_type, exc_value, tb): break return execute_reply('error', { - 'ename': unicode(exc_type.__name__), - 'evalue': unicode(exc_value), + 'ename': str(exc_type.__name__), + 'evalue': str(exc_value), 'traceback': formatted_tb, }) @@ -209,7 +214,7 @@ def addPyFile(self, uri_path): def getLocalTmpDirPath(self): return os.path.join(job_context.get_local_tmp_dir_path(), '__livy__') - class Scala: + class Scala(object): extends = ['org.apache.livy.repl.PySparkJobProcessor'] @@ -422,8 +427,8 @@ def magic_table_convert_map(m): # python 2.x only if sys.version < '3': magic_table_types.update({ - long: lambda x: ('BIGINT_TYPE', x), - unicode: lambda x: ('STRING_TYPE', x.encode('utf-8')) + int: lambda x: ('BIGINT_TYPE', x), + str: lambda x: ('STRING_TYPE', x.encode('utf-8')) }) @@ -557,7 +562,7 @@ def main(): if sys.version >= '3': sys.stdin = io.StringIO() else: - sys.stdin = cStringIO.StringIO() + sys.stdin = io.StringIO() sys.stdout = UnicodeDecodingStringIO() sys.stderr = UnicodeDecodingStringIO() From 928d620fa1448654599b50ae938a0b430751130c Mon Sep 17 00:00:00 2001 From: arnavb Date: Sat, 15 Nov 2025 16:12:44 +0000 Subject: [PATCH 2/8] update --- dev/merge_livy_pr.py | 37 ++--------- examples/src/main/python/pi_app.py | 7 +-- integration-test/src/test/resources/batch.py | 1 - .../src/test/resources/test_python_api.py | 17 +---- python-api/src/main/python/livy/client.py | 13 ++-- .../src/main/python/livy/job_context.py | 6 +- python-api/src/main/python/livy/job_handle.py | 9 +-- .../src/test/python/livy-tests/client_test.py | 42 ++++--------- repl/src/main/resources/fake_shell.py | 62 ++----------------- 9 files changed, 32 insertions(+), 162 deletions(-) diff --git a/dev/merge_livy_pr.py b/dev/merge_livy_pr.py index 1019d6cbf..335607ff7 100755 --- a/dev/merge_livy_pr.py +++ b/dev/merge_livy_pr.py @@ -33,24 +33,15 @@ # usage: ./merge_livy_pr.py (see config env vars below) # - -from __future__ import print_function -from future import standard_library -standard_library.install_aliases() import json import os import re import subprocess import sys +import urllib.request +from urllib.error import HTTPError -if sys.version_info[0] < 3: - import urllib.request, urllib.error, urllib.parse - from urllib.error import HTTPError - input_prompt_fn = raw_input -else: - import urllib.request as urllib2 - from urllib.error import HTTPError - input_prompt_fn = input +input_prompt_fn = input try: import jira.client @@ -74,7 +65,6 @@ # https://github.com/settings/tokens. This script only requires the "public_repo" scope. GITHUB_OAUTH_KEY = os.environ.get("GITHUB_OAUTH_KEY") - GITHUB_BASE = "https://github.com/apache/incubator-livy/pull" GITHUB_API_BASE = "https://api.github.com/repos/apache/incubator-livy" JIRA_BASE = "https://issues.apache.org/jira/browse" @@ -82,7 +72,6 @@ # Prefix added to temporary branches BRANCH_PREFIX = "PR_TOOL" - def get_json(url): try: request = urllib.request.Request(url) @@ -98,31 +87,24 @@ def get_json(url): print("Unable to fetch URL, exiting: %s" % url) sys.exit(-1) - def fail(msg): print(msg) clean_up() sys.exit(-1) - def run_cmd(cmd): print(cmd) if isinstance(cmd, list): out_bytes = subprocess.check_output(cmd) else: out_bytes = subprocess.check_output(cmd.split(" ")) - if sys.version_info[0] > 2: - return out_bytes.decode() - else: - return out_bytes - + return out_bytes.decode() def continue_maybe(prompt): result = input_prompt_fn("\n%s (y/n): " % prompt) if result.lower() != "y": fail("Okay, exiting") - def clean_up(): print("Restoring head pointer to %s" % original_head) run_cmd("git checkout %s" % original_head) @@ -133,7 +115,6 @@ def clean_up(): print("Deleting local branch %s" % branch) run_cmd("git branch -D %s" % branch) - # merge the requested PR and return the merge hash def merge_pr(pr_num, target_ref, title, body, pr_repo_desc): pr_branch_name = "%s_MERGE_PR_%s" % (BRANCH_PREFIX, pr_num) @@ -204,7 +185,6 @@ def merge_pr(pr_num, target_ref, title, body, pr_repo_desc): print("Merge hash: %s" % merge_hash) return merge_hash - def cherry_pick(pr_num, merge_hash, default_branch): pick_ref = input_prompt_fn("Enter a branch name [%s]: " % default_branch) if pick_ref == "": @@ -239,7 +219,6 @@ def cherry_pick(pr_num, merge_hash, default_branch): print("Pick hash: %s" % pick_hash) return pick_ref - def fix_version_from_branch(branch, versions): # Note: Assumes this is a sorted (newest->oldest) list of un-released versions if branch == "master": @@ -248,7 +227,6 @@ def fix_version_from_branch(branch, versions): branch_ver = branch.replace("branch-", "") return [x for x in versions if x.name.startswith(branch_ver)][-1] - def resolve_jira_issue(merge_branches, comment, default_jira_id=""): asf_jira = jira.client.JIRA({'server': JIRA_API_BASE}, basic_auth=(JIRA_USERNAME, JIRA_PASSWORD)) @@ -280,7 +258,7 @@ def resolve_jira_issue(merge_branches, comment, default_jira_id=""): versions = sorted(versions, key=lambda x: x.name, reverse=True) versions = [x for x in versions if x.raw['released'] is False] # Consider only x.y.z versions - versions = [x for x in versions if re.match('\d+\.\d+\.\d+', x.name)] + versions = [x for x in versions if re.match(r'\d+\.\d+\.\d+', x.name)] default_fix_versions = [fix_version_from_branch(x, versions).name for x in merge_branches] for v in default_fix_versions: @@ -314,7 +292,6 @@ def get_version_json(version_str): print("Successfully resolved %s with fixVersions=%s!" % (jira_id, fix_versions)) - def resolve_jira_issues(title, merge_branches, comment): jira_ids = re.findall("LIVY-[0-9]{3,6}", title) @@ -323,7 +300,6 @@ def resolve_jira_issues(title, merge_branches, comment): for jira_id in jira_ids: resolve_jira_issue(merge_branches, comment, jira_id) - def standardize_jira_ref(text): """ Standardize the [LIVY-XXXXX] [MODULE] prefix @@ -365,7 +341,6 @@ def standardize_jira_ref(text): return clean_text - def get_current_ref(): ref = run_cmd("git rev-parse --abbrev-ref HEAD").strip() if ref == 'HEAD': @@ -374,7 +349,6 @@ def get_current_ref(): else: return ref - def main(): global original_head @@ -465,7 +439,6 @@ def main(): print("Could not find jira-python library. Run 'sudo pip install jira' to install.") print("Exiting without trying to close the associated JIRA.") - if __name__ == "__main__": import doctest (failure_count, test_count) = doctest.testmod() diff --git a/examples/src/main/python/pi_app.py b/examples/src/main/python/pi_app.py index adb40e868..c283476c6 100644 --- a/examples/src/main/python/pi_app.py +++ b/examples/src/main/python/pi_app.py @@ -15,11 +15,6 @@ # limitations under the License. # -from __future__ import print_function -from __future__ import division - -from builtins import range -from past.utils import old_div import sys from random import random from operator import add @@ -52,7 +47,7 @@ def f(_): def pi_job(context): count = context.sc.parallelize(list(range(1, samples + 1)), slices).map(f).reduce(add) - return old_div(4.0 * count, samples) + return 4.0 * count / samples pi = client.submit(pi_job).result() diff --git a/integration-test/src/test/resources/batch.py b/integration-test/src/test/resources/batch.py index 2f3cf1e5d..e371652ab 100644 --- a/integration-test/src/test/resources/batch.py +++ b/integration-test/src/test/resources/batch.py @@ -16,7 +16,6 @@ # limitations under the License. # -from builtins import range import sys from pyspark import SparkContext diff --git a/integration-test/src/test/resources/test_python_api.py b/integration-test/src/test/resources/test_python_api.py index 7afb04eca..3bc7b13cc 100644 --- a/integration-test/src/test/resources/test_python_api.py +++ b/integration-test/src/test/resources/test_python_api.py @@ -14,9 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from future import standard_library -standard_library.install_aliases() -from builtins import str + import os import base64 import json @@ -61,7 +59,6 @@ def after_all(request): request.addfinalizer(stop_session) - def process_job(job, expected_result, is_error_job=False): global job_id @@ -100,12 +97,10 @@ def process_job(job, expected_result, is_error_job=False): error = poll_response.json()['error'] assert expected_result in error - def delay_rerun(*args): time.sleep(10) return True - def stop_session(): global session_id @@ -114,7 +109,6 @@ def stop_session(): response = requests.request('DELETE', request_url, headers=headers, auth=request_auth, verify=ssl_cert) assert response.status_code == http.client.OK - def test_create_session(): global session_id @@ -127,7 +121,6 @@ def test_create_session(): assert response.status_code == http.client.CREATED session_id = response.json()['id'] - @flaky(max_runs=6, rerun_filter=delay_rerun) def test_wait_for_session_to_become_idle(): request_url = livy_end_point + "/sessions/" + str(session_id) @@ -138,7 +131,6 @@ def test_wait_for_session_to_become_idle(): assert session_state == 'idle' - def test_spark_job(): def simple_spark_job(context): elements = [10, 20, 30] @@ -147,7 +139,6 @@ def simple_spark_job(context): process_job(simple_spark_job, 3) - def test_error_job(): def error_job(context): return "hello" + 1 @@ -155,7 +146,6 @@ def error_job(context): process_job(error_job, "TypeError: ", True) - def test_reconnect(): global session_id @@ -166,7 +156,6 @@ def test_reconnect(): assert response.status_code == http.client.OK assert session_id == response.json()['id'] - def test_add_file(): add_file_name = os.path.basename(add_file_url) json_data = json.dumps({'uri': add_file_url}) @@ -184,7 +173,6 @@ def add_file_job(context): process_job(add_file_job, "hello from addfile") - def test_add_pyfile(): add_pyfile_name_with_ext = os.path.basename(add_pyfile_url) add_pyfile_name = add_pyfile_name_with_ext.rsplit('.', 1)[0] @@ -201,7 +189,6 @@ def add_pyfile_job(context): process_job(add_pyfile_job, "hello from addpyfile") - def test_upload_file(): upload_file = open(upload_file_url) upload_file_name = os.path.basename(upload_file.name) @@ -220,7 +207,6 @@ def upload_file_job(context): process_job(upload_file_job, "hello from uploadfile") - def test_upload_pyfile(): upload_pyfile = open(upload_pyfile_url) upload_pyfile_name_with_ext = os.path.basename(upload_pyfile.name) @@ -236,7 +222,6 @@ def upload_pyfile_job(context): return pyfile_module.test_upload_pyfile() process_job(upload_pyfile_job, "hello from uploadpyfile") - if __name__ == '__main__': value = pytest.main([os.path.dirname(__file__)]) if value != 0: diff --git a/python-api/src/main/python/livy/client.py b/python-api/src/main/python/livy/client.py index fb2848452..15dc307d8 100644 --- a/python-api/src/main/python/livy/client.py +++ b/python-api/src/main/python/livy/client.py @@ -14,10 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from __future__ import absolute_import -from builtins import str -from builtins import object import base64 import cloudpickle import os @@ -27,13 +24,12 @@ import traceback from configparser import ConfigParser from concurrent.futures import ThreadPoolExecutor -from future.moves.urllib.parse import ParseResult, urlparse -from io import open, StringIO +from urllib.parse import ParseResult, urlparse +from io import StringIO from requests_kerberos import HTTPKerberosAuth, REQUIRED from livy.job_handle import JobHandle - -class HttpClient(object): +class HttpClient: """A http based client for submitting Spark-based jobs to a Livy backend. Parameters @@ -427,8 +423,7 @@ def _add_or_upload_resource( return self._conn.send_request('POST', suffix_url, files=files, data=data, headers=headers).content - -class _LivyConnection(object): +class _LivyConnection: _SESSIONS_URI = '/sessions' # Timeout in seconds diff --git a/python-api/src/main/python/livy/job_context.py b/python-api/src/main/python/livy/job_context.py index 537035864..795cb6fa2 100644 --- a/python-api/src/main/python/livy/job_context.py +++ b/python-api/src/main/python/livy/job_context.py @@ -14,12 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from builtins import object -from abc import ABCMeta, abstractproperty, abstractmethod -from future.utils import with_metaclass +from abc import ABCMeta, abstractproperty, abstractmethod -class JobContext(with_metaclass(ABCMeta, object)): +class JobContext(metaclass=ABCMeta): """ An abstract class that holds runtime information about the job execution context. diff --git a/python-api/src/main/python/livy/job_handle.py b/python-api/src/main/python/livy/job_handle.py index e70983c23..62d35f014 100644 --- a/python-api/src/main/python/livy/job_handle.py +++ b/python-api/src/main/python/livy/job_handle.py @@ -14,8 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from builtins import str -from builtins import object + import base64 import cloudpickle import sys @@ -32,7 +31,6 @@ SENT = 'SENT' QUEUED = 'QUEUED' - class JobHandle(Future): """A child class of concurrent.futures.Future. Allows for monitoring and @@ -219,10 +217,7 @@ def set_exception(self, exception): raise NotImplementedError("This operation is not supported.") def set_job_exception(self, exception, error_msg=None): - if sys.version >= '3': - super(JobHandle, self).set_exception(exception) - else: - super(JobHandle, self).set_exception_info(exception, error_msg) + super(JobHandle, self).set_exception(exception) class _RepeatedTimer(object): def __init__(self, interval, polling_job, executor): diff --git a/python-api/src/test/python/livy-tests/client_test.py b/python-api/src/test/python/livy-tests/client_test.py index 7f235c404..1b25eb987 100644 --- a/python-api/src/test/python/livy-tests/client_test.py +++ b/python-api/src/test/python/livy-tests/client_test.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from builtins import str + import os import pytest import responses @@ -28,12 +28,11 @@ session_id = 0 job_id = 1 # Make sure host name is lower case. See LIVY-582 -base_uri = 'http://{0}:{1}'.format(socket.gethostname().lower(), 8998) +base_uri = f'http://{socket.gethostname().lower()}:{8998}' client_test = None invoked_queued_callback = False invoked_running_callback = False - @responses.activate def mock_and_validate_create_new_session(defaults): global client_test @@ -41,8 +40,8 @@ def mock_and_validate_create_new_session(defaults): app_name = 'Test App' conf_dict = {'spark.app.name': app_name} json_data = { - u'kind': u'pyspark', u'log': [], u'proxyUser': None, - u'state': u'starting', u'owner': None, u'id': session_id + 'kind': 'pyspark', 'log': [], 'proxyUser': None, + 'state': 'starting', 'owner': None, 'id': session_id } responses.add(responses.POST, create_session_request_mock_uri, json=json_data, status=201, content_type='application/json') @@ -56,7 +55,6 @@ def mock_and_validate_create_new_session(defaults): assert client_test._config.get(client_test._CONFIG_SECTION, 'spark.config') == 'override' - def mock_submit_job_and_poll_result( job, job_state, @@ -69,13 +67,13 @@ def mock_submit_job_and_poll_result( + "/jobs/" + str(job_id) post_json_data = { - u'state': u'SENT', u'error': None, u'id': job_id, u'result': None + 'state': 'SENT', 'error': None, 'id': job_id, 'result': None } responses.add(responses.POST, submit_request_mock_uri, status=201, json=post_json_data, content_type='application/json') get_json_data = { - u'state': job_state, u'error': error, u'id': job_id, u'result': result + 'state': job_state, 'error': error, 'id': job_id, 'result': result } responses.add(responses.GET, poll_request_mock_uri, status=200, json=get_json_data, content_type='application/json') @@ -83,7 +81,6 @@ def mock_submit_job_and_poll_result( submit_job_future = client_test.submit(job) return submit_job_future - def mock_file_apis(job_command, job_func, job_func_arg): request_uri = base_uri + "/sessions/" + str(session_id) + \ "/" + job_command @@ -92,34 +89,29 @@ def mock_file_apis(job_command, job_func, job_func_arg): test_file_api_future = job_func(job_func_arg) return test_file_api_future - def simple_spark_job(context): elements = [10, 20, 30] sc = context.sc return sc.parallelize(elements, 2).count() - def failure_job(context): return "hello" + 1 - def test_create_new_session_without_default_config(): mock_and_validate_create_new_session(False) - def test_create_new_session_with_default_config(): os.environ["LIVY_CLIENT_CONF_DIR"] = \ os.path.dirname(os.path.abspath(__file__)) + "/resources" mock_and_validate_create_new_session(True) - def test_connect_to_existing_session(): reconnect_mock_request_uri = base_uri + "/sessions/" + str(session_id) + \ "/connect" reconnect_session_uri = base_uri + "/sessions/" + str(session_id) json_data = { - u'kind': u'pyspark', u'log': [], u'proxyUser': None, - u'state': u'starting', u'owner': None, u'id': session_id + 'kind': 'pyspark', 'log': [], 'proxyUser': None, + 'state': 'starting', 'owner': None, 'id': session_id } with responses.RequestsMock() as rsps: rsps.add(responses.POST, reconnect_mock_request_uri, json=json_data, @@ -133,18 +125,16 @@ def test_connect_to_existing_session(): client_reconnect._config.get(client_reconnect._CONFIG_SECTION, 'spark.app.name') == 'Test App' - def create_test_archive(ext): (fd, path) = tempfile.mkstemp(suffix=ext) os.close(fd) zipfile.ZipFile(path, mode='w').close() return path - @responses.activate def test_submit_job_verify_running_state(): submit_job_future = mock_submit_job_and_poll_result(simple_spark_job, - u'STARTED') + 'STARTED') lock = threading.Event() def handle_job_running_callback(f): @@ -155,11 +145,10 @@ def handle_job_running_callback(f): lock.wait(15) assert invoked_running_callback - @responses.activate def test_submit_job_verify_queued_state(): submit_job_future = mock_submit_job_and_poll_result(simple_spark_job, - u'QUEUED') + 'QUEUED') lock = threading.Event() def handle_job_queued_callback(f): @@ -170,24 +159,21 @@ def handle_job_queued_callback(f): lock.wait(15) assert invoked_queued_callback - @responses.activate def test_submit_job_verify_succeeded_state(): submit_job_future = mock_submit_job_and_poll_result(simple_spark_job, - u'SUCCEEDED', + 'SUCCEEDED', result='Z0FKVkZGc3hNREFzSURJd01Dd2dNekF3TENBME1EQmRjUUF1') result = submit_job_future.result(15) assert result == '[100, 200, 300, 400]' - @responses.activate def test_submit_job_verify_failed_state(): - submit_job_future = mock_submit_job_and_poll_result(failure_job, u'FAILED', + submit_job_future = mock_submit_job_and_poll_result(failure_job, 'FAILED', error='Error job') exception = submit_job_future.exception(15) assert isinstance(exception, Exception) - @responses.activate def test_add_file(): file_uri = "file://" + os.path.dirname(os.path.abspath(__file__)) + \ @@ -197,7 +183,6 @@ def test_add_file(): add_file_future.result(15) assert add_file_future.done() - @responses.activate def test_upload_file(): file_path = os.path.dirname(os.path.abspath(__file__)) + \ @@ -207,7 +192,6 @@ def test_upload_file(): upload_file_future.result(15) assert upload_file_future.done() - @responses.activate def test_add_pyfile(): file_uri = "file://" + os.path.dirname(os.path.abspath(__file__)) + \ @@ -217,7 +201,6 @@ def test_add_pyfile(): add_file_future.result(15) assert add_file_future.done() - @responses.activate def test_upload_pyfile(): file_path = create_test_archive('.zip') @@ -226,7 +209,6 @@ def test_upload_pyfile(): pyfile_future.result(15) assert pyfile_future.done() - @responses.activate def test_add_jar(): file_uri = 'file:' + create_test_archive('.jar') diff --git a/repl/src/main/resources/fake_shell.py b/repl/src/main/resources/fake_shell.py index a0565f2a3..2c67a5422 100644 --- a/repl/src/main/resources/fake_shell.py +++ b/repl/src/main/resources/fake_shell.py @@ -15,12 +15,6 @@ # limitations under the License. # -from __future__ import print_function -from future import standard_library -standard_library.install_aliases() -from builtins import str -from builtins import range -from builtins import object import ast from collections import OrderedDict import datetime @@ -39,12 +33,6 @@ import pickle import textwrap -if sys.version >= '3': - str = str -else: - import io - import io - if sys.version_info > (3,8): from ast import Module else : @@ -71,19 +59,14 @@ def execute_reply(status, content): ) } - def execute_reply_ok(data): return execute_reply('ok', { 'data': data, }) - def execute_reply_error(exc_type, exc_value, tb): LOG.error('execute_reply', exc_info=True) - if sys.version >= '3': - formatted_tb = traceback.format_exception(exc_type, exc_value, tb, chain=False) - else: - formatted_tb = traceback.format_exception(exc_type, exc_value, tb) + formatted_tb = traceback.format_exception(exc_type, exc_value, tb, chain=False) for i in range(len(formatted_tb)): if TOP_FRAME_REGEX.match(formatted_tb[i]): formatted_tb = formatted_tb[:1] + formatted_tb[i + 1:] @@ -95,7 +78,6 @@ def execute_reply_error(exc_type, exc_value, tb): 'traceback': formatted_tb, }) - def execute_reply_internal_error(message, exc_info=None): LOG.error('execute_reply_internal_error', exc_info=exc_info) return execute_reply('error', { @@ -104,7 +86,6 @@ def execute_reply_internal_error(message, exc_info=None): 'traceback': [], }) - class JobContextImpl(object): def __init__(self): self.lock = threading.Lock() @@ -190,14 +171,10 @@ def remove_shared_object(self, name): except: pass - class PySparkJobProcessorImpl(object): def processBypassJob(self, serialized_job): try: - if sys.version >= '3': - deserialized_job = pickle.loads(serialized_job, encoding="bytes") - else: - deserialized_job = pickle.loads(serialized_job) + deserialized_job = pickle.loads(serialized_job, encoding="bytes") result = deserialized_job(job_context) serialized_result = global_dict['cloudpickle'].dumps(result) response = bytearray(base64.b64encode(serialized_result)) @@ -217,12 +194,10 @@ def getLocalTmpDirPath(self): class Scala(object): extends = ['org.apache.livy.repl.PySparkJobProcessor'] - class ExecutionError(Exception): def __init__(self, exc_info): self.exc_info = exc_info - class NormalNode(object): def __init__(self, code): self.code = compile(code, '', 'exec', ast.PyCF_ONLY_AST, 1) @@ -245,11 +220,9 @@ def execute(self): # code and passing the error along. raise ExecutionError(sys.exc_info()) - class UnknownMagic(Exception): pass - class MagicNode(object): def __init__(self, line): parts = line[1:].split(' ', 1) @@ -269,7 +242,6 @@ def execute(self): return handler(*self.rest) - def parse_code_into_nodes(code): nodes = [] try: @@ -309,7 +281,6 @@ def parse_code_into_nodes(code): return nodes - def execute_request(content): try: code = content['code'] @@ -359,7 +330,6 @@ def execute_request(content): return execute_reply_ok(result) - def magic_table_convert(value): try: converter = magic_table_types[type(value)] @@ -368,7 +338,6 @@ def magic_table_convert(value): return converter(value) - def magic_table_convert_seq(items): last_item_type = None converted_items = [] @@ -385,7 +354,6 @@ def magic_table_convert_seq(items): return 'ARRAY_TYPE', converted_items - def magic_table_convert_map(m): last_key_type = None last_value_type = None @@ -409,7 +377,6 @@ def magic_table_convert_map(m): return 'MAP_TYPE', converted_items - magic_table_types = { type(None): lambda x: ('NULL_TYPE', x), bool: lambda x: ('BOOLEAN_TYPE', x), @@ -424,15 +391,6 @@ def magic_table_convert_map(m): dict: magic_table_convert_map, } -# python 2.x only -if sys.version < '3': - magic_table_types.update({ - int: lambda x: ('BIGINT_TYPE', x), - str: lambda x: ('STRING_TYPE', x.encode('utf-8')) - }) - - - def magic_table(name): try: value = global_dict[name] @@ -449,7 +407,7 @@ def magic_table(name): for row in value: cols = [] data.append(cols) - + if 'Row' == row.__class__.__name__: row = row.asDict() @@ -493,7 +451,6 @@ def magic_table(name): } } - def magic_json(name): try: value = global_dict[name] @@ -512,9 +469,7 @@ def magic_matplot(name): imgdata = io.BytesIO() fig.savefig(imgdata, format='png') imgdata.seek(0) - encode = base64.b64encode(imgdata.getvalue()) - if sys.version >= '3': - encode = encode.decode() + encode = base64.b64encode(imgdata.getvalue()).decode() except: exc_type, exc_value, tb = sys.exc_info() @@ -528,7 +483,6 @@ def magic_matplot(name): def shutdown_request(_content): sys.exit() - magic_router = { 'table': magic_table, 'json': magic_json, @@ -546,24 +500,18 @@ def write(self, s): s = s.decode("utf-8") super(UnicodeDecodingStringIO, self).write(s) - def clearOutputs(): sys.stdout.close() sys.stderr.close() sys.stdout = UnicodeDecodingStringIO() sys.stderr = UnicodeDecodingStringIO() - def main(): sys_stdin = sys.stdin sys_stdout = sys.stdout sys_stderr = sys.stderr - if sys.version >= '3': - sys.stdin = io.StringIO() - else: - sys.stdin = io.StringIO() - + sys.stdin = io.StringIO() sys.stdout = UnicodeDecodingStringIO() sys.stderr = UnicodeDecodingStringIO() From 7e211480666407d6eaee23f69087569107cecc8a Mon Sep 17 00:00:00 2001 From: arnavb Date: Sat, 15 Nov 2025 16:18:54 +0000 Subject: [PATCH 3/8] Fix merge conflict in README.md - remove Spark 2.x and Scala 2.11 profiles --- README.md | 8 -------- 1 file changed, 8 deletions(-) diff --git a/README.md b/README.md index 4ac6bb34b..150243f65 100644 --- a/README.md +++ b/README.md @@ -93,14 +93,6 @@ version of Spark without needing to rebuild. | Flag | Purpose | |--------------|--------------------------------------------------------------------| -<<<<<<< HEAD -| -Phadoop2 | Choose Hadoop2 based build dependencies (default configuration) | -| -Pspark2 | Choose Spark 2.x based build dependencies (default configuration) | -| -Pspark3 | Choose Spark 3.x based build dependencies | -| -Pscala-2.11 | Choose Scala 2.11 based build dependencies (default configuration) | -| -Pscala-2.12 | Choose scala 2.12 based build dependencies | -======= | -Phadoop2 | Choose Hadoop2 based build dependencies | | -Pspark3 | Choose Spark 3.x based build dependencies (default configuration) | | -Pscala-2.12 | Choose Scala 2.12 based build dependencies (default configuration) | ->>>>>>> master From 774c56542ce3b5bc5a85c80b8ebcafa613483a53 Mon Sep 17 00:00:00 2001 From: arnavb Date: Fri, 21 Nov 2025 05:45:23 +0000 Subject: [PATCH 4/8] lint --- python-api/src/main/python/livy/client.py | 2 ++ python-api/src/main/python/livy/job_context.py | 1 + python-api/src/main/python/livy/job_handle.py | 2 +- .../src/test/python/livy-tests/client_test.py | 18 ++++++++++++++++++ 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/python-api/src/main/python/livy/client.py b/python-api/src/main/python/livy/client.py index 15dc307d8..22b881329 100644 --- a/python-api/src/main/python/livy/client.py +++ b/python-api/src/main/python/livy/client.py @@ -29,6 +29,7 @@ from requests_kerberos import HTTPKerberosAuth, REQUIRED from livy.job_handle import JobHandle + class HttpClient: """A http based client for submitting Spark-based jobs to a Livy backend. @@ -423,6 +424,7 @@ def _add_or_upload_resource( return self._conn.send_request('POST', suffix_url, files=files, data=data, headers=headers).content + class _LivyConnection: _SESSIONS_URI = '/sessions' diff --git a/python-api/src/main/python/livy/job_context.py b/python-api/src/main/python/livy/job_context.py index 795cb6fa2..a36dbc2ab 100644 --- a/python-api/src/main/python/livy/job_context.py +++ b/python-api/src/main/python/livy/job_context.py @@ -17,6 +17,7 @@ from abc import ABCMeta, abstractproperty, abstractmethod + class JobContext(metaclass=ABCMeta): """ An abstract class that holds runtime information about the job execution diff --git a/python-api/src/main/python/livy/job_handle.py b/python-api/src/main/python/livy/job_handle.py index 62d35f014..fc4dcde20 100644 --- a/python-api/src/main/python/livy/job_handle.py +++ b/python-api/src/main/python/livy/job_handle.py @@ -17,7 +17,6 @@ import base64 import cloudpickle -import sys import threading import traceback from concurrent.futures import Future @@ -31,6 +30,7 @@ SENT = 'SENT' QUEUED = 'QUEUED' + class JobHandle(Future): """A child class of concurrent.futures.Future. Allows for monitoring and diff --git a/python-api/src/test/python/livy-tests/client_test.py b/python-api/src/test/python/livy-tests/client_test.py index 1b25eb987..efa3d446f 100644 --- a/python-api/src/test/python/livy-tests/client_test.py +++ b/python-api/src/test/python/livy-tests/client_test.py @@ -33,6 +33,7 @@ invoked_queued_callback = False invoked_running_callback = False + @responses.activate def mock_and_validate_create_new_session(defaults): global client_test @@ -55,6 +56,7 @@ def mock_and_validate_create_new_session(defaults): assert client_test._config.get(client_test._CONFIG_SECTION, 'spark.config') == 'override' + def mock_submit_job_and_poll_result( job, job_state, @@ -81,6 +83,7 @@ def mock_submit_job_and_poll_result( submit_job_future = client_test.submit(job) return submit_job_future + def mock_file_apis(job_command, job_func, job_func_arg): request_uri = base_uri + "/sessions/" + str(session_id) + \ "/" + job_command @@ -89,22 +92,27 @@ def mock_file_apis(job_command, job_func, job_func_arg): test_file_api_future = job_func(job_func_arg) return test_file_api_future + def simple_spark_job(context): elements = [10, 20, 30] sc = context.sc return sc.parallelize(elements, 2).count() + def failure_job(context): return "hello" + 1 + def test_create_new_session_without_default_config(): mock_and_validate_create_new_session(False) + def test_create_new_session_with_default_config(): os.environ["LIVY_CLIENT_CONF_DIR"] = \ os.path.dirname(os.path.abspath(__file__)) + "/resources" mock_and_validate_create_new_session(True) + def test_connect_to_existing_session(): reconnect_mock_request_uri = base_uri + "/sessions/" + str(session_id) + \ "/connect" @@ -125,12 +133,14 @@ def test_connect_to_existing_session(): client_reconnect._config.get(client_reconnect._CONFIG_SECTION, 'spark.app.name') == 'Test App' + def create_test_archive(ext): (fd, path) = tempfile.mkstemp(suffix=ext) os.close(fd) zipfile.ZipFile(path, mode='w').close() return path + @responses.activate def test_submit_job_verify_running_state(): submit_job_future = mock_submit_job_and_poll_result(simple_spark_job, @@ -145,6 +155,7 @@ def handle_job_running_callback(f): lock.wait(15) assert invoked_running_callback + @responses.activate def test_submit_job_verify_queued_state(): submit_job_future = mock_submit_job_and_poll_result(simple_spark_job, @@ -159,6 +170,7 @@ def handle_job_queued_callback(f): lock.wait(15) assert invoked_queued_callback + @responses.activate def test_submit_job_verify_succeeded_state(): submit_job_future = mock_submit_job_and_poll_result(simple_spark_job, @@ -167,6 +179,7 @@ def test_submit_job_verify_succeeded_state(): result = submit_job_future.result(15) assert result == '[100, 200, 300, 400]' + @responses.activate def test_submit_job_verify_failed_state(): submit_job_future = mock_submit_job_and_poll_result(failure_job, 'FAILED', @@ -174,6 +187,7 @@ def test_submit_job_verify_failed_state(): exception = submit_job_future.exception(15) assert isinstance(exception, Exception) + @responses.activate def test_add_file(): file_uri = "file://" + os.path.dirname(os.path.abspath(__file__)) + \ @@ -183,6 +197,7 @@ def test_add_file(): add_file_future.result(15) assert add_file_future.done() + @responses.activate def test_upload_file(): file_path = os.path.dirname(os.path.abspath(__file__)) + \ @@ -192,6 +207,7 @@ def test_upload_file(): upload_file_future.result(15) assert upload_file_future.done() + @responses.activate def test_add_pyfile(): file_uri = "file://" + os.path.dirname(os.path.abspath(__file__)) + \ @@ -201,6 +217,7 @@ def test_add_pyfile(): add_file_future.result(15) assert add_file_future.done() + @responses.activate def test_upload_pyfile(): file_path = create_test_archive('.zip') @@ -209,6 +226,7 @@ def test_upload_pyfile(): pyfile_future.result(15) assert pyfile_future.done() + @responses.activate def test_add_jar(): file_uri = 'file:' + create_test_archive('.jar') From 570cb50735c2adb75ae36656bb79bd09efdca41b Mon Sep 17 00:00:00 2001 From: arnavb Date: Sun, 23 Nov 2025 10:49:45 +0000 Subject: [PATCH 5/8] update --- .../src/test/scala/org/apache/livy/test/JobApiIT.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala b/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala index 92c3ea242..d62906aeb 100644 --- a/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala +++ b/integration-test/src/test/scala/org/apache/livy/test/JobApiIT.scala @@ -255,7 +255,7 @@ class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logg val testDir = Files.createTempDirectory(tmpDir.toPath(), "python-tests-").toFile() val testFile = createPyTestsForPythonAPI(testDir) - val builder = new ProcessBuilder(Seq("python", testFile.getAbsolutePath()).asJava) + val builder = new ProcessBuilder(Seq("python3", testFile.getAbsolutePath()).asJava) builder.directory(testDir) val env = builder.environment() From e74b01f6cb836e9653c44e4cc9bca894da78f171 Mon Sep 17 00:00:00 2001 From: arnavb Date: Sun, 23 Nov 2025 11:51:02 +0000 Subject: [PATCH 6/8] update --- dev/docker/livy-dev-base/Dockerfile | 28 +++------------------------- python-api/setup.py | 6 ++---- 2 files changed, 5 insertions(+), 29 deletions(-) diff --git a/dev/docker/livy-dev-base/Dockerfile b/dev/docker/livy-dev-base/Dockerfile index 07711a5f5..5686e61d1 100644 --- a/dev/docker/livy-dev-base/Dockerfile +++ b/dev/docker/livy-dev-base/Dockerfile @@ -70,32 +70,11 @@ RUN git clone https://github.com/pyenv/pyenv.git $HOME/pyenv ENV PYENV_ROOT=$HOME/pyenv ENV PATH="$HOME/pyenv/shims:$HOME/pyenv/bin:$HOME/bin:$PATH" -RUN pyenv install -v 2.7.18 && \ - pyenv install -v 3.9.21 && \ - pyenv global 2.7.18 3.9.21 && \ +RUN pyenv install -v 3.9.21 && \ + pyenv global 3.9.21 && \ pyenv rehash -# Add build dependencies for python2 -# - First we upgrade pip because that makes a lot of things better -# - Then we remove the provided version of setuptools and install a different version -# - Then we install additional dependencies -RUN python2 -m pip install -U "pip < 21.0" && \ - apt-get remove -y python-setuptools && \ - python2 -m pip install "setuptools < 36" && \ - python2 -m pip install \ - cloudpickle \ - codecov \ - flake8 \ - flaky \ - "future>=0.15.2" \ - "futures>=3.0.5" \ - pytest \ - pytest-runner \ - requests-kerberos \ - "requests >= 2.10.0" \ - "responses >= 0.5.1" - -# Now do the same for python3 +# Install build dependencies for python3 RUN python3 -m pip install -U pip && pip3 install \ cloudpickle \ codecov \ @@ -112,4 +91,3 @@ RUN pyenv rehash RUN apt remove -y openjdk-11-jre-headless WORKDIR /workspace - diff --git a/python-api/setup.py b/python-api/setup.py index 59bec11fe..511592f26 100644 --- a/python-api/setup.py +++ b/python-api/setup.py @@ -23,15 +23,13 @@ 'Development Status :: 1 - Planning', 'Intended Audience :: Developers', 'Operating System :: OS Independent', - 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.9', 'Topic :: Software Development :: Libraries :: Python Modules', ] requirements = [ 'cloudpickle>=0.2.1', - 'configparser>=3.5.0', - 'future>=0.15.2', - 'mock~=3.0.5', 'requests>=2.10.0', 'responses>=0.5.1', 'requests-kerberos>=0.11.0', From c4d5e11eca4b3f59052146c51ac64a4d4dd64365 Mon Sep 17 00:00:00 2001 From: arnavb Date: Sun, 23 Nov 2025 11:53:44 +0000 Subject: [PATCH 7/8] update --- integration-test/src/test/resources/test_python_api.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/integration-test/src/test/resources/test_python_api.py b/integration-test/src/test/resources/test_python_api.py index 3bc7b13cc..bef67bd04 100644 --- a/integration-test/src/test/resources/test_python_api.py +++ b/integration-test/src/test/resources/test_python_api.py @@ -19,18 +19,12 @@ import base64 import json import time -try: - from urllib.parse import urlparse -except ImportError: - from urllib.parse import urlparse +from urllib.parse import urlparse import requests from requests_kerberos import HTTPKerberosAuth, REQUIRED, OPTIONAL import cloudpickle import pytest -try: - import http.client -except ImportError: - from http import HTTPStatus as httplib +import http.client from flaky import flaky global session_id, job_id From 57deae53f708187d4df52b952e4693c0c3b795b6 Mon Sep 17 00:00:00 2001 From: arnavb Date: Tue, 2 Dec 2025 07:34:18 +0000 Subject: [PATCH 8/8] update --- .../src/main/scala/org/apache/livy/repl/PythonInterpreter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala index 58b7147a5..40a25c556 100644 --- a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala +++ b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala @@ -49,7 +49,7 @@ object PythonInterpreter extends Logging { val pythonExec = conf.getOption("spark.pyspark.python") .orElse(sys.env.get("PYSPARK_PYTHON")) .orElse(sys.props.get("pyspark.python")) // This java property is only used for internal UT. - .getOrElse("python") + .getOrElse("python3") val secretKey = Utils.createSecret(256) val gatewayServer = createGatewayServer(sparkEntries, secretKey)