Skip to content

V2 rewrite (beta): Support Spark Connect#254

Open
chenliu0831 wants to merge 10 commits intomasterfrom
v2_rewrite
Open

V2 rewrite (beta): Support Spark Connect#254
chenliu0831 wants to merge 10 commits intomasterfrom
v2_rewrite

Conversation

@chenliu0831
Copy link
Copy Markdown
Contributor

@chenliu0831 chenliu0831 commented Jan 13, 2026

Issue #, if available:

Description of changes:

This PR introduces PyDeequ 2.0 beta, a major release that replaces the Py4J-based architecture with Spark Connect for client-server communication.

The Deequ side change will be opened separately. the proto file here is copied for review purpose. For ease of testing, I created a pre-release https://github.com/awslabs/python-deequ/releases/tag/v2.0.0b1 to host the jars/wheels.

Motivation

The legacy PyDeequ relied on Py4J to bridge Python and the JVM, which had several limitations:

  • Required local Spark session with JVM access
  • Python lambdas couldn't be serialized for remote execution
  • Tight coupling between Python client and JVM made debugging difficult

Spark Connect (introduced in Spark 3.4) provides a clean gRPC-based protocol that solves these issues.

Code Changes

  • New pydeequ/v2/ module with Spark Connect implementation:

    • checks.py - Check and constraint builders
    • analyzers.py - Analyzer classes
    • predicates.py - Serializable predicates (eq, gte, between, etc.)
    • verification.py - VerificationSuite and AnalysisRunner
    • proto/ - Protobuf definitions and generated code
  • New test suite in tests/v2/:

    • test_unit.py - Unit tests (no Spark required)
    • test_analyzers.py - Analyzer integration tests
    • test_checks.py - Check constraint tests
    • test_e2e_spark_connect.py - End-to-end tests
  • Updated documentation:

    • Merged README with 2.0 quick start guide
    • Added architecture diagram
    • Migration guide from 1.x to 2.0

API Changes

# Before (1.x)
from pydeequ.checks import Check, CheckLevel
check.hasSize(lambda x: x == 3)

# After (2.0)
from pydeequ.v2.checks import Check, CheckLevel
from pydeequ.v2.predicates import eq
check.hasSize(eq(3))

Testing

More details see https://github.com/awslabs/python-deequ/blob/v2_rewrite/README.md.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Comment thread pydeequ/v2/verification.py Outdated
Comment thread pydeequ/v2/verification.py Outdated
plan = _create_deequ_plan(extension)

# Use DataFrame.withPlan to properly create the DataFrame
return ConnectDataFrame.withPlan(plan, session=self._spark)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to ignore!

There is a breaking change between 3.5.x and 4.0.x
In GraphFrames we are using such a code:

def _dataframe_from_plan(plan: LogicalPlan, session: SparkSession) -> DataFrame:
    if hasattr(DataFrame, "withPlan"):
        # Spark 3
        return DataFrame.withPlan(plan, session)

    # Spark 4
    return DataFrame(plan, session)

I would recommend to switch to this approach to avoid the pain during Spark 4.x migration.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the callout - addressed in 69a5ed9

Copy link
Copy Markdown

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Generated by AI (model: us.anthropic.claude-opus-4-6-v1, prompt: 8c93b14f) — may not be fully accurate. Reply if this doesn't help.

Comment thread pydeequ/__init__.py
return AnalysisRunner
elif name == "Check":
from pydeequ.checks import Check
return Check
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The __getattr__ function handles "PyDeequSession" twice: once in the if name in (...) block (lines 47-62) and again in the separate if name == "PyDeequSession" block (line 64). The first block's elif name == "PyDeequSession" branch is dead code because the second if block at line 64 will never be reached (the first block returns ColumnProfilerRunner for that case... wait, actually the first block's elif returns nothing for PyDeequSession since it's missing a return). Actually, the elif name == "PyDeequSession" at line 60 is missing - it's not in the if name in (...) tuple check. The tuple check includes "PyDeequSession" but the inner if/elif chain doesn't handle it, so it falls through without returning, then hits the duplicate check at line 64. Remove "PyDeequSession" from the tuple on line 47 to avoid confusion, since it's handled separately at line 64.

Comment thread pydeequ/__init__.py

def createCheck(self, level: CheckLevel, description: str, constraints=None):
return Check(self._spark_session, level, description, constraints)
if name in ("AnalysisRunner", "Check", "CheckLevel", "ColumnProfilerRunner",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string "DEEQU_MAVEN_COORD" is listed in the tuple but the inner if/elif chain handles it as elif name == "DEEQU_MAVEN_COORD" which will work. However, "PyDeequSession" is in the tuple but has no corresponding elif branch inside the block, so execution falls through the entire if name in (...) block without returning, then hits the separate if name == "PyDeequSession" check at line 64. This is confusing and fragile. Either add the elif for PyDeequSession inside the block, or remove it from the tuple.

Comment thread tests/conftest.py
def master(self, master):
# Ignored - we always use Spark Connect
return self

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function get_spark_connect_session() is called on line 72 but is never defined in this file. This will raise a NameError at runtime.

# Pass None as child - this is a leaf node
super().__init__(child=None)
self._extension = ext

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogicalPlan subclass calls self._create_proto_relation() but this method does not exist on the base LogicalPlan class in PySpark 3.5. The correct approach is to construct a spark_proto.Relation() directly: rel = spark_proto.Relation(). This will cause an AttributeError at runtime.

Comment thread pyproject.toml

[tool.poetry.extras]
pyspark = ["pyspark"]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The [tool.poetry.extras] section is empty. This is a leftover from removing the pyspark extra. Either remove the section entirely or keep it with meaningful content to avoid confusion.

Comment thread pyproject.toml
safety = "^1.10.3"
[tool.poetry.group.dev.dependencies]
pytest = "^8.0.0"
pytest-cov = "^4.1.0"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pinning pyspark to exactly 3.5.0 as a hard dependency (not optional) means users cannot use any other PySpark version. The 1.x API allowed a range. Consider making this more flexible (e.g., >=3.5.0,<4.0) or making it optional, especially since v2 users need pyspark[connect] but v1 users may need different versions.

@@ -1,3 +1,5 @@
name: PyDeequ V2 Tests
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CI workflow was renamed to PyDeequ V2 Tests and all legacy v1 test matrix jobs (Spark 3.1, 3.2, 3.3, 3.5 with different PySpark versions) were removed. This means there is no CI coverage for the legacy pydeequ (v1) code that is still shipped in this package. Users relying on from pydeequ.checks import Check will have no test coverage.

Comment thread pydeequ/v2/profiles.py
if self._low_cardinality_threshold > 0:
msg.low_cardinality_histogram_threshold = self._low_cardinality_threshold

# Set KLL profiling
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accessing self._df._plan and self._spark._client are internal/private APIs of PySpark's Spark Connect implementation. These may break across PySpark patch versions. Consider documenting this risk or adding a try/except with a helpful error message.

Comment thread pydeequ/v2/suggestions.py
- description: Human-readable description
- suggesting_rule: Rule that generated this suggestion
- code_for_constraint: Python code snippet for the constraint

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ValueError for invalid testset_ratio is raised in useTrainTestSplitWithTestsetRatio, but the test test_train_test_invalid_ratio expects it to be raised during .run(). Looking more carefully, the validation is in useTrainTestSplitWithTestsetRatio (line 258), so the test wrapping the entire chain including .run() will catch it. However, the boundary value 1.0 is not rejected by the check 0.0 < ratio < 1.0 (it is rejected since < is strict), but 0.0 is also rejected. The test passes 0.0 and 1.5 which are both correctly rejected. No issue here, ignore this.

Comment thread pydeequ/v2/checks.py
hint: Optional[str] = None,
) -> "Check":
"""
Check that all values are in the allowed set.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The where method modifies the last constraint in-place, but if _constraints is empty, it silently does nothing. This could lead to confusing behavior if a user calls .where(...) before adding any constraints. Consider raising a ValueError when _constraints is empty.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants