-
Notifications
You must be signed in to change notification settings - Fork 152
Unfail repository tests #251
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,13 +1,13 @@ | ||
| # -*- coding: utf-8 -*- | ||
| import unittest | ||
|
|
||
| import pytest | ||
| from py4j.protocol import Py4JError | ||
| from pyspark.sql import Row | ||
|
|
||
| from pydeequ.analyzers import * | ||
| from pydeequ.checks import * | ||
| from pydeequ.repository import * | ||
| from pydeequ.verification import * | ||
| from pydeequ.analyzers import AnalyzerContext, AnalysisRunner, ApproxCountDistinct | ||
| from pydeequ.checks import Check, CheckLevel | ||
| from pydeequ.repository import FileSystemMetricsRepository, InMemoryMetricsRepository, ResultKey | ||
| from pydeequ.verification import VerificationResult, VerificationSuite | ||
| from tests.conftest import setup_pyspark | ||
|
|
||
|
|
||
|
|
@@ -18,7 +18,9 @@ def setUpClass(cls): | |
| cls.AnalysisRunner = AnalysisRunner(cls.spark) | ||
| cls.VerificationSuite = VerificationSuite(cls.spark) | ||
| cls.sc = cls.spark.sparkContext | ||
| cls.df = cls.sc.parallelize([Row(a="foo", b=1, c=5), Row(a="bar", b=2, c=6), Row(a="baz", b=3, c=None)]).toDF() | ||
| cls.df = cls.sc.parallelize( | ||
| [Row(a="foo", b=1, c=5), Row(a="bar", b=2, c=6), Row(a="baz", b=3, c=None)] | ||
| ).toDF() | ||
|
|
||
| @classmethod | ||
| def tearDownClass(cls): | ||
|
|
@@ -121,12 +123,16 @@ def test_verifications_FSmetrep(self): | |
| ) | ||
|
|
||
| # TEST: Check JSON for tags | ||
| result_metrep_json = repository.load().before(ResultKey.current_milli_time()).getSuccessMetricsAsJson() | ||
| result_metrep_json = ( | ||
| repository.load().before(ResultKey.current_milli_time()).getSuccessMetricsAsJson() | ||
| ) | ||
|
|
||
| print(result_metrep_json[0]["tag"], key_tags["tag"]) | ||
| self.assertEqual(result_metrep_json[0]["tag"], key_tags["tag"]) | ||
|
|
||
| result_metrep = repository.load().before(ResultKey.current_milli_time()).getSuccessMetricsAsDataFrame() | ||
| result_metrep = ( | ||
| repository.load().before(ResultKey.current_milli_time()).getSuccessMetricsAsDataFrame() | ||
| ) | ||
|
|
||
| df = VerificationResult.checkResultsAsDataFrame(self.spark, result) | ||
| print(df.collect()) | ||
|
|
@@ -146,7 +152,9 @@ def test_verifications_FSmetrep_noTags_noFile(self): | |
| ) | ||
|
|
||
| # TEST: Check DF parity | ||
| result_metrep = repository.load().before(ResultKey.current_milli_time()).getSuccessMetricsAsDataFrame() | ||
| result_metrep = ( | ||
| repository.load().before(ResultKey.current_milli_time()).getSuccessMetricsAsDataFrame() | ||
| ) | ||
|
|
||
| df = VerificationResult.checkResultsAsDataFrame(self.spark, result) | ||
| print(df.collect()) | ||
|
|
@@ -243,12 +251,16 @@ def test_verifications_IMmetrep(self): | |
| ) | ||
|
|
||
| # TEST: Check JSON for tags | ||
| result_metrep_json = repository.load().before(ResultKey.current_milli_time()).getSuccessMetricsAsJson() | ||
| result_metrep_json = ( | ||
| repository.load().before(ResultKey.current_milli_time()).getSuccessMetricsAsJson() | ||
| ) | ||
|
|
||
| print(result_metrep_json[0]["tag"], key_tags["tag"]) | ||
| self.assertEqual(result_metrep_json[0]["tag"], key_tags["tag"]) | ||
|
|
||
| result_metrep = repository.load().before(ResultKey.current_milli_time()).getSuccessMetricsAsDataFrame() | ||
| result_metrep = ( | ||
| repository.load().before(ResultKey.current_milli_time()).getSuccessMetricsAsDataFrame() | ||
| ) | ||
|
|
||
| df = VerificationResult.checkResultsAsDataFrame(self.spark, result) | ||
| print(df.collect()) | ||
|
|
@@ -267,37 +279,43 @@ def test_verifications_IMmetrep_noTags_noFile(self): | |
| ) | ||
|
|
||
| # TEST: Check DF parity | ||
| result_metrep = repository.load().before(ResultKey.current_milli_time()).getSuccessMetricsAsDataFrame() | ||
| result_metrep = ( | ||
| repository.load().before(ResultKey.current_milli_time()).getSuccessMetricsAsDataFrame() | ||
| ) | ||
|
|
||
| df = VerificationResult.checkResultsAsDataFrame(self.spark, result) | ||
| print(df.collect()) | ||
| print(result_metrep.collect()) | ||
|
|
||
| @pytest.mark.xfail(reason="@unittest.expectedFailure") | ||
| def test_fail_no_useRepository(self): | ||
| """This test should fail because it doesn't call useRepository() before saveOrAppendResult()""" | ||
| """This run fails because it doesn't call useRepository() before saveOrAppendResult().""" | ||
| metrics_file = FileSystemMetricsRepository.helper_metrics_file(self.spark, "metrics.json") | ||
| print(f"metrics filepath: {metrics_file}") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| key_tags = {"tag": "FS metrep analyzers -- FAIL"} | ||
| resultKey = ResultKey(self.spark, ResultKey.current_milli_time(), key_tags) | ||
|
|
||
| # MISSING useRepository() | ||
| result = ( | ||
| self.AnalysisRunner.onData(self.df) | ||
| .addAnalyzer(ApproxCountDistinct("b")) | ||
| .saveOrAppendResult(resultKey) | ||
| .run() | ||
| with self.assertRaises(Py4JError) as err: | ||
| _ = ( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| self.AnalysisRunner.onData(self.df) | ||
| .addAnalyzer(ApproxCountDistinct("b")) | ||
| .saveOrAppendResult(resultKey) | ||
| .run() | ||
| ) | ||
|
|
||
| self.assertIn( | ||
| "Method saveOrAppendResult([class com.amazon.deequ.repository.ResultKey]) does not exist", | ||
| str(err.exception), | ||
| ) | ||
|
|
||
| @pytest.mark.xfail(reason="@unittest.expectedFailure") | ||
| def test_fail_no_load(self): | ||
| """This test should fail because we do not load() for the repository reading""" | ||
| """This run fails because we do not load() for the repository reading.""" | ||
| metrics_file = FileSystemMetricsRepository.helper_metrics_file(self.spark, "metrics.json") | ||
| print(f"metrics filepath: {metrics_file}") | ||
| repository = FileSystemMetricsRepository(self.spark, metrics_file) | ||
| key_tags = {"tag": "FS metrep analyzers"} | ||
| resultKey = ResultKey(self.spark, ResultKey.current_milli_time(), key_tags) | ||
| result = ( | ||
| _ = ( | ||
| self.AnalysisRunner.onData(self.df) | ||
| .addAnalyzer(ApproxCountDistinct("b")) | ||
| .useRepository(repository) | ||
|
|
@@ -306,8 +324,13 @@ def test_fail_no_load(self): | |
| ) | ||
|
|
||
| # MISSING: repository.load() | ||
| result_metrep_json = ( | ||
| repository.before(ResultKey.current_milli_time()) | ||
| .forAnalyzers([ApproxCountDistinct("b")]) | ||
| .getSuccessMetricsAsJson() | ||
| with self.assertRaises(AttributeError) as err: | ||
| _ = ( | ||
| repository.before(ResultKey.current_milli_time()) | ||
| .forAnalyzers([ApproxCountDistinct("b")]) | ||
| .getSuccessMetricsAsJson() | ||
| ) | ||
|
|
||
| self.assertEqual( | ||
| "'FileSystemMetricsRepository' object has no attribute 'RepositoryLoader'", str(err.exception) | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
from py4j.protocol import Py4JErroris imported butpytestwas removed. However,Py4JErroris used in the test, so this is fine. Just noting thatpytestremoval is correct sincepytest.mark.xfailis no longer used.