Skip to content

V2 rewrite (beta): DuckDB Engine Support with Benchmark#255

Open
chenliu0831 wants to merge 11 commits intov2_rewritefrom
v2_engine
Open

V2 rewrite (beta): DuckDB Engine Support with Benchmark#255
chenliu0831 wants to merge 11 commits intov2_rewritefrom
v2_engine

Conversation

@chenliu0831
Copy link
Copy Markdown
Contributor

@chenliu0831 chenliu0831 commented Jan 20, 2026

Issue #, if available: #128

Description of changes:

Adds DuckDB as a lightweight, JVM-free backend for PyDeequ 2.0 with optional dependency installation support. The overall design is inspired by DuckDQ project mentioned in #128 (actually most credit needs to go to that project). The stateful aggregation for streaming DQ monitoring is not implemented yet (i.e. MetricsRepository).

Other notable changes:

  • Restructured pyproject.toml to support optional dependencies. pip install pydeequ[duckdb] - DuckDB backend (no JVM required). Core package now has minimal dependencies (numpy, pandas, protobuf)
  • Engine Parity tests between Spark and DuckDB engine. Some HLL/quantile differences exists because of algorithm difference. More details in Engines.md
  • Benchmark tooling.
  • Comprehensive test suite.

See https://github.com/awslabs/python-deequ/blob/v2_engine/README.md and https://github.com/awslabs/python-deequ/blob/v2_engine/docs/architecture.md for more background.

Benchmark

See https://github.com/awslabs/python-deequ/blob/v2_engine/BENCHMARK.md for more details.

benchmark_chart

chenliu0831 and others added 10 commits January 19, 2026 22:13
- All runners take engine in constructor, onData(table=, dataframe=) for data binding
- Spark protobuf builders become private (_Spark*RunBuilder)
- Refactor connect() to use isinstance() instead of string matching
- Fix ApproxQuantile alias collision and metric name mismatch
- Scale up test datasets for reliable HLL tolerance validation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Poetry 1.7.1 doesn't support PEP 621 [project] table format,
causing 'name' parse error. Replace with uv pip install.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Handle Spark Connect session type in connect() (separate class
  from pyspark.sql.SparkSession)
- Remove manual server start/stop from CI; conftest fixture handles it
- Accept NaN for non-numeric profile stats from Spark

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Generated by AI (model: us.anthropic.claude-opus-4-6-v1, prompt: 8c93b14f) — may not be fully accurate. Reply if this doesn't help.

Additional feedback:

tests/helpers/spark_server.py:42 — Same hardcoded developer-machine-specific paths as benchmark/config.py. The spark_home default /Volumes/workplace/deequ_rewrite/spark-3.5.0-bin-hadoop3 and deequ_jar default will fail for any other developer. Consider requiring these via environment variables with no fallback, or raising a clear error when not set.

pydeequ/engines/operators/scan_operators.py:21SizeOperator does not inherit from ScanOperator — it inherits directly from the mixins. This means it doesn't have the column attribute or the abstract method contracts from ScanOperator. While it works because it implements the same interface manually, this breaks the type hierarchy. Consider either inheriting from ScanOperator (passing column='*') or documenting why it's intentionally different.

pydeequ/engines/operators/scan_operators.py:355ComplianceOperator also does not inherit from ScanOperator, same issue as SizeOperator. It inherits directly from the mixins, bypassing the base class contract.

pydeequ/engines/operators/scan_operators.py:393CorrelationOperator does not inherit from ScanOperator either. Same structural inconsistency.

pydeequ/v2/profiles.py:100 — The EngineColumnProfilerRunBuilder has no withKLLProfiling() or setKLLParameters() methods, but the ColumnProfilerRunner docstring (line 86) shows .withKLLProfiling() as an example. Users following the docstring will get an AttributeError. Either add stub methods that raise NotImplementedError with a clear message, or update the docstring.

pydeequ/v2/verification.py:97EngineVerificationRunBuilder has an addAnalyzer method but run() only calls self._engine.run_checks(self._checks) — the self._analyzers list is collected but never used. Either remove addAnalyzer or pass analyzers to the engine.

pydeequ/engines/suggestions/registry.py:82_register_default_rules() is called at module import time, populating the class-level _rules list. But _rules is defined as a class variable _rules: List[SuggestionRule] = []. If _register_default_rules() is called multiple times (e.g., during test reloads), rules will be duplicated. Add a guard or use clear() before registering.

tests/engines/fixtures/datasets.py:62create_df_missing generates 1200 rows with att1 having a pattern of 2 non-null then 2 null (50% complete), but the EXPECTED_VALUES dict on line 432 says ("df_missing", "Completeness", "att1"): 0.5 and the comment says # 6/12. The comment 6/12 is stale from an older 12-row version — the actual ratio is 600/1200. Update the comment.

Comment thread benchmark/config.py
)
)
spark_home: str = field(
default_factory=lambda: os.environ.get(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded default paths are developer-machine-specific and will fail for other contributors. spark_home defaults to /Volumes/workplace/deequ_rewrite/spark-3.5.0-bin-hadoop3 and deequ_jar defaults to /Volumes/workplace/deequ_rewrite/deequ/target/deequ_2.12-2.1.0b-spark-3.5.jar. These should either have no default (requiring explicit configuration) or use a more portable default like empty string with validation.

Failure = "Failure"


class CheckStatus(Enum):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConstraintStatus defines duplicate enum members. SUCCESS = "Success" and Success = "Success" have the same value, which means Success becomes an alias for SUCCESS in Python enums. This is confusing and error-prone — code comparing status == ConstraintStatus.Failure will silently work because Failure is an alias for FAILURE. Pick one naming convention (e.g., SUCCESS/FAILURE) and remove the aliases.

Warning = "Warning"
Error = "Error"


Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same duplicate enum member issue as ConstraintStatus. CheckStatus defines both SUCCESS = "Success" and Success = "Success", etc. Pick one naming convention.

Comment thread pydeequ/engines/duckdb.py
def compute_metrics(
self, analyzers: Sequence["_ConnectAnalyzer"]
) -> List[MetricResult]:
"""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benchmark_duckdb_validation function in benchmark/experiments.py calls VerificationSuite(engine).onData(table="benchmark_data").addCheck(check).run(), but the DuckDBEngine returned by setup_duckdb_from_parquet uses table=f"read_parquet('{parquet_path}')" — a raw read_parquet() expression, not a registered table. Then get_schema() calls PRAGMA table_info('{self.table}') which will fail on read_parquet(...) expressions. The profiling path works around this by creating a VIEW, but the validation path does not.


This module provides the abstract base classes that combine mixins
to create the foundation for concrete evaluator implementations.
"""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BaseEvaluator.evaluate method (line 118) defaults to checking value == 1.0 when no assertion is provided. This exact float comparison is fragile — a completeness of 0.9999999999999998 due to floating-point arithmetic would fail. Use an epsilon comparison like abs(value - 1.0) < 1e-9.

for where, group_evaluators in where_groups.items():
try:
group_results = self._execute_ratio_group(
table, execute_fn, group_evaluators, where
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When where is set, the query rebuilds conditions inline with a list comprehension that calls evaluators[i].get_condition() again, duplicating the logic from the cases list built on line 218. This is both redundant and inconsistent — cases was built without the WHERE wrapper. The cases variable is unused when where is truthy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant