Add Spark 4.0 support via deequ:2.0.14-spark-4.0#259
Add Spark 4.0 support via deequ:2.0.14-spark-4.0#259sudsali merged 7 commits intoawslabs:release/1.0.0-spark-4.0from
Conversation
|
This is now ready for review; CI tests pass on my fork: https://github.com/m-aciek/python-deequ/actions/runs/24196839467 |
chenliu0831
left a comment
There was a problem hiding this comment.
LGTM. I'm not sure if we would like to keep maintaining the Py4j approach though.
|
@m-aciek we need your commit to have verified signatures |
- Add "4.0" entry to SPARK_TO_DEEQU_COORD_MAPPING in configs.py - Widen pyspark optional dep bound to <5.0.0 in pyproject.toml - Replace scala.collection.JavaConversions (removed in Scala 2.13) with JavaConverters in scala_utils.py and profiles.py - Replace scala.collection.Seq.empty() (inaccessible via Py4J in Scala 2.13) with to_scala_seq(jvm, jvm.java.util.ArrayList()) in analyzers.py and checks.py - Add Spark 4.0.0 to CI matrix with Java 17; use include: style to pair each Spark version with its required Java version - Fix CI for Spark 4.0: - use Python 3.9 and version-marker pyspark dep - use pip install instead of poetry add - install pandas>=2.0.0 required by PySpark 4.0 - Fix empty Seq compatibility across Scala 2.12 and 2.13 Fixes awslabs#258
d80a2cd to
6aa90c3
Compare
|
@chenliu0831 Thank you for reviewing! I've setup the verification and squashed the commits |
|
The PR should be on a new branch following the deequ pattern of release to make sure we dont force everyone to use spark 4.0 |
|
@sudsali Would you be able to create a new branch that I could make a base for this PR? |
| fail-fast: false | ||
| matrix: | ||
| include: | ||
| - PYSPARK_VERSION: "3.1.3" |
There was a problem hiding this comment.
Unresolved merge conflict markers (<<<<<<< spark-4-support, =======, >>>>>>> master) are present throughout this file. This will cause a YAML parse error and CI will not run. The conflict markers must be resolved before merging.
| Scala sequence | ||
| """ | ||
| return jvm.scala.collection.JavaConversions.iterableAsScalaIterable(iterable).toSeq() | ||
| return jvm.scala.collection.JavaConverters.iterableAsScalaIterableConverter(iterable).asScala().toSeq() |
There was a problem hiding this comment.
JavaConverters is itself deprecated since Scala 2.13 in favor of scala.jdk.CollectionConverters. While JavaConverters still exists in Scala 2.13, it may be removed in a future Scala version. Consider using scala.jdk.CollectionConverters for Spark 4+ (Scala 2.13) and keeping JavaConverters as a fallback for Spark 3.x (Scala 2.12), or at minimum document this as a known future risk.
| return jvm.scala.collection.JavaConverters.iterableAsScalaIterableConverter( | ||
| jvm.java.util.ArrayList() | ||
| ).asScala().toList() | ||
|
|
There was a problem hiding this comment.
The docstring says "Uses JavaConverters.toList()" but the code actually calls .asScala().toList() on the result of iterableAsScalaIterableConverter. The docstring is misleading about which API is being used.
| """ | ||
| self._run_result = run | ||
| profile_map = self._jvm.scala.collection.JavaConversions.mapAsJavaMap(run.profiles()) # TODO from ScalaUtils | ||
| profile_map = self._jvm.scala.collection.JavaConverters.mapAsJavaMapConverter(run.profiles()).asJava() # TODO from ScalaUtils |
There was a problem hiding this comment.
This is a direct inline call to JavaConverters.mapAsJavaMapConverter(...).asJava() instead of using the existing scala_map_to_java_map helper from scala_utils.py (which was updated in this same PR). Use scala_map_to_java_map(self._jvm, run.profiles()) for consistency and to avoid duplicating the conversion logic.
|
@m-aciek Please address all the comments. |
… 4+) JavaConverters is deprecated in Scala 2.13 and may be removed in a future version. Detect the Scala version at runtime and prefer scala.jdk.javaapi.CollectionConverters (Spark 4+), falling back to scala.collection.JavaConverters (Spark 3.x). Also fix misleading docstring in empty_scala_seq. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…_to_java_map helper Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@sudsali It's done now. |
| Scala sequence | ||
| """ | ||
| return jvm.scala.collection.JavaConversions.iterableAsScalaIterable(iterable).toSeq() | ||
| style, converters = _get_converters(jvm) |
There was a problem hiding this comment.
For Scala 2.12 (Spark 3.x), JavaConverters uses iterableAsScalaIterableConverter(...).asScala() pattern, but the original code used JavaConversions.iterableAsScalaIterable(...) which returns the Scala iterable directly. The JavaConverters equivalent should work, but note that JavaConversions was deprecated in 2.12 and removed in 2.13, while JavaConverters was deprecated in 2.13. This legacy branch is fine for 2.12 but the comment in _get_converters says scala.collection.JavaConverters (Scala 2.12, Spark 3.x) — this is accurate, just noting for completeness.
| @@ -93,11 +128,17 @@ def to_scala_map(spark_session, d): | |||
|
|
|||
There was a problem hiding this comment.
to_scala_map still uses spark_session._jvm.PythonUtils.toScalaMap(d) which relies on PythonUtils — a PySpark internal helper. In Spark 4.0, PythonUtils may have moved or changed. If this breaks on Spark 4, it should be updated to use the same _get_converters pattern. Was this path tested with Spark 4.0?
| - PYSPARK_VERSION: "3.5" | ||
| PYTHON_VERSION: "3.9" | ||
| JAVA_VERSION: "17" | ||
| - PYSPARK_VERSION: "4.0.0" |
There was a problem hiding this comment.
The matrix entry uses PYSPARK_VERSION: "4.0.0" (three-part version) while all other entries use two-part versions like "3.5". In the test step, pip install pyspark==$SPARK_VERSION will work either way, but _extract_major_minor_versions in configs.py will extract "4.0" from "4.0.0", so the SPARK_VERSION env var works. However, this inconsistency is confusing — consider using "4.0" to match the other entries, or document why the full version is needed.
| - PYSPARK_VERSION: "4.0.0" | ||
| PYTHON_VERSION: "3.9" | ||
| JAVA_VERSION: "17" | ||
| PANDAS_VERSION: ">=2.0.0" |
There was a problem hiding this comment.
PANDAS_VERSION is only defined for the Spark 4.0 matrix entry. Other matrix entries won't have it set, so the if [ -n "$PANDAS_VERSION" ] guard in the run step handles that. However, the PANDAS_VERSION value is ">=2.0.0" which is a version specifier, not a version — the env var name is misleading. Consider renaming to PANDAS_VERSION_SPEC or similar for clarity.
| pip install --upgrade pip | ||
| pip install poetry==1.7.1 | ||
| poetry install | ||
| poetry run pip install pyspark==$SPARK_VERSION |
There was a problem hiding this comment.
The poetry run pip install pyspark==$SPARK_VERSION line uses SPARK_VERSION which for the 4.0 entry is "4.0.0". This works for pip (pyspark==4.0.0), but for Spark 3.x entries like "3.5", pip will install the latest 3.5.x patch. This is existing behavior but worth noting the inconsistency with the new entry.
…me PANDAS_VERSION_SPEC - to_scala_map: add CollectionConverters fallback in case PythonUtils is removed in a future Spark/PySpark version - base.yml: use "4.0" (two-part) for PYSPARK_VERSION to match other matrix entries - base.yml: rename PANDAS_VERSION → PANDAS_VERSION_SPEC to reflect that the value is a version specifier (e.g. ">=2.0.0"), not a bare version number Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…orrectly On Scala 2.12, jvm.scala.jdk.javaapi.CollectionConverters resolves to a JavaPackage placeholder rather than raising an exception, so the previous try/except always selected the jdk path and broke Spark 3.x. Verify usability by actually calling asScala() as a one-time probe. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Closes #258
Summary
"4.0": "com.amazon.deequ:deequ:2.0.14-spark-4.0"toSPARK_TO_DEEQU_COORD_MAPPINGinconfigs.py>=2.4.7,<3.4.0to>=2.4.7,<5.0.0inpyproject.tomlinclude:style so each Spark version carries its required Java version; normalize Spark 4 version string to"4.0"and renamePANDAS_VERSIONenv var toPANDAS_VERSION_SPECfor clarityscala.collection.JavaConversions(removed in Scala 2.13) with a version-aware converter layer inscala_utils.py: prefersscala.jdk.javaapi.CollectionConverters(Spark 4+ / Scala 2.13) and falls back toscala.collection.JavaConverters(Spark 3.x / Scala 2.12); detection is cached per JVM instancescala.collection.Seq.empty()(inaccessible via Py4J in Scala 2.13) with an empty Java list converted viato_scala_seqinanalyzers.pyandchecks.pyJavaConverterscall inprofiles.pywith the existingscala_map_to_java_maphelperPythonUtils.toScalaMapfallback into_scala_map: if the PySpark internal is ever removed in a future version, the function falls back toCollectionConverters/JavaConvertersRoot causes fixed
Spark 4 uses Scala 2.13, which introduced breaking changes affecting pydeequ:
scala.collection.JavaConversionswas removed — replaced by a version-aware layer usingscala.jdk.javaapi.CollectionConverters(Scala 2.13) with ascala.collection.JavaConvertersfallback (Scala 2.12)scala.collection.Seq.empty()is not accessible via Py4J reflection — replaced withto_scala_seq(jvm, jvm.java.util.ArrayList())Detection note: on Scala 2.12,
jvm.scala.jdk.javaapi.CollectionConvertersdoes not raise an exception — py4j resolves it to aJavaPackageplaceholder. Version detection therefore probes by actually callingasScala()on an empty list; aTypeErrorfrom the unresolvable package triggers the fallback.Test plan
pyspark==4.0/ Java 17pyspark==3.5/ Java 17 (and earlier 3.x versions)PR authored with assistance from Claude Code