V2 rewrite (beta): DuckDB Engine Support with Benchmark#255
V2 rewrite (beta): DuckDB Engine Support with Benchmark#255chenliu0831 wants to merge 11 commits intov2_rewritefrom
Conversation
- All runners take engine in constructor, onData(table=, dataframe=) for data binding - Spark protobuf builders become private (_Spark*RunBuilder) - Refactor connect() to use isinstance() instead of string matching - Fix ApproxQuantile alias collision and metric name mismatch - Scale up test datasets for reliable HLL tolerance validation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Poetry 1.7.1 doesn't support PEP 621 [project] table format, causing 'name' parse error. Replace with uv pip install. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Handle Spark Connect session type in connect() (separate class from pyspark.sql.SparkSession) - Remove manual server start/stop from CI; conftest fixture handles it - Accept NaN for non-numeric profile stats from Spark Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Generated by AI (model: us.anthropic.claude-opus-4-6-v1, prompt: 8c93b14f) — may not be fully accurate. Reply if this doesn't help.
Additional feedback:
tests/helpers/spark_server.py:42 — Same hardcoded developer-machine-specific paths as benchmark/config.py. The spark_home default /Volumes/workplace/deequ_rewrite/spark-3.5.0-bin-hadoop3 and deequ_jar default will fail for any other developer. Consider requiring these via environment variables with no fallback, or raising a clear error when not set.
pydeequ/engines/operators/scan_operators.py:21 — SizeOperator does not inherit from ScanOperator — it inherits directly from the mixins. This means it doesn't have the column attribute or the abstract method contracts from ScanOperator. While it works because it implements the same interface manually, this breaks the type hierarchy. Consider either inheriting from ScanOperator (passing column='*') or documenting why it's intentionally different.
pydeequ/engines/operators/scan_operators.py:355 — ComplianceOperator also does not inherit from ScanOperator, same issue as SizeOperator. It inherits directly from the mixins, bypassing the base class contract.
pydeequ/engines/operators/scan_operators.py:393 — CorrelationOperator does not inherit from ScanOperator either. Same structural inconsistency.
pydeequ/v2/profiles.py:100 — The EngineColumnProfilerRunBuilder has no withKLLProfiling() or setKLLParameters() methods, but the ColumnProfilerRunner docstring (line 86) shows .withKLLProfiling() as an example. Users following the docstring will get an AttributeError. Either add stub methods that raise NotImplementedError with a clear message, or update the docstring.
pydeequ/v2/verification.py:97 — EngineVerificationRunBuilder has an addAnalyzer method but run() only calls self._engine.run_checks(self._checks) — the self._analyzers list is collected but never used. Either remove addAnalyzer or pass analyzers to the engine.
pydeequ/engines/suggestions/registry.py:82 — _register_default_rules() is called at module import time, populating the class-level _rules list. But _rules is defined as a class variable _rules: List[SuggestionRule] = []. If _register_default_rules() is called multiple times (e.g., during test reloads), rules will be duplicated. Add a guard or use clear() before registering.
tests/engines/fixtures/datasets.py:62 — create_df_missing generates 1200 rows with att1 having a pattern of 2 non-null then 2 null (50% complete), but the EXPECTED_VALUES dict on line 432 says ("df_missing", "Completeness", "att1"): 0.5 and the comment says # 6/12. The comment 6/12 is stale from an older 12-row version — the actual ratio is 600/1200. Update the comment.
| ) | ||
| ) | ||
| spark_home: str = field( | ||
| default_factory=lambda: os.environ.get( |
There was a problem hiding this comment.
Hardcoded default paths are developer-machine-specific and will fail for other contributors. spark_home defaults to /Volumes/workplace/deequ_rewrite/spark-3.5.0-bin-hadoop3 and deequ_jar defaults to /Volumes/workplace/deequ_rewrite/deequ/target/deequ_2.12-2.1.0b-spark-3.5.jar. These should either have no default (requiring explicit configuration) or use a more portable default like empty string with validation.
| Failure = "Failure" | ||
|
|
||
|
|
||
| class CheckStatus(Enum): |
There was a problem hiding this comment.
ConstraintStatus defines duplicate enum members. SUCCESS = "Success" and Success = "Success" have the same value, which means Success becomes an alias for SUCCESS in Python enums. This is confusing and error-prone — code comparing status == ConstraintStatus.Failure will silently work because Failure is an alias for FAILURE. Pick one naming convention (e.g., SUCCESS/FAILURE) and remove the aliases.
| Warning = "Warning" | ||
| Error = "Error" | ||
|
|
||
|
|
There was a problem hiding this comment.
Same duplicate enum member issue as ConstraintStatus. CheckStatus defines both SUCCESS = "Success" and Success = "Success", etc. Pick one naming convention.
| def compute_metrics( | ||
| self, analyzers: Sequence["_ConnectAnalyzer"] | ||
| ) -> List[MetricResult]: | ||
| """ |
There was a problem hiding this comment.
The benchmark_duckdb_validation function in benchmark/experiments.py calls VerificationSuite(engine).onData(table="benchmark_data").addCheck(check).run(), but the DuckDBEngine returned by setup_duckdb_from_parquet uses table=f"read_parquet('{parquet_path}')" — a raw read_parquet() expression, not a registered table. Then get_schema() calls PRAGMA table_info('{self.table}') which will fail on read_parquet(...) expressions. The profiling path works around this by creating a VIEW, but the validation path does not.
|
|
||
| This module provides the abstract base classes that combine mixins | ||
| to create the foundation for concrete evaluator implementations. | ||
| """ |
There was a problem hiding this comment.
The BaseEvaluator.evaluate method (line 118) defaults to checking value == 1.0 when no assertion is provided. This exact float comparison is fragile — a completeness of 0.9999999999999998 due to floating-point arithmetic would fail. Use an epsilon comparison like abs(value - 1.0) < 1e-9.
| for where, group_evaluators in where_groups.items(): | ||
| try: | ||
| group_results = self._execute_ratio_group( | ||
| table, execute_fn, group_evaluators, where |
There was a problem hiding this comment.
When where is set, the query rebuilds conditions inline with a list comprehension that calls evaluators[i].get_condition() again, duplicating the logic from the cases list built on line 218. This is both redundant and inconsistent — cases was built without the WHERE wrapper. The cases variable is unused when where is truthy.
Issue #, if available: #128
Description of changes:
Adds DuckDB as a lightweight, JVM-free backend for PyDeequ 2.0 with optional dependency installation support. The overall design is inspired by DuckDQ project mentioned in #128 (actually most credit needs to go to that project). The stateful aggregation for streaming DQ monitoring is not implemented yet (i.e. MetricsRepository).
Other notable changes:
pyproject.tomlto support optional dependencies.pip install pydeequ[duckdb]- DuckDB backend (no JVM required). Core package now has minimal dependencies (numpy, pandas, protobuf)Engines.mdSee https://github.com/awslabs/python-deequ/blob/v2_engine/README.md and https://github.com/awslabs/python-deequ/blob/v2_engine/docs/architecture.md for more background.
Benchmark
See https://github.com/awslabs/python-deequ/blob/v2_engine/BENCHMARK.md for more details.