diff --git a/pointblank/validate.py b/pointblank/validate.py index 2a80d38af..861371d27 100644 --- a/pointblank/validate.py +++ b/pointblank/validate.py @@ -157,6 +157,7 @@ "missing_vals_tbl", "get_action_metadata", "get_column_count", + "get_dataframe", "get_data_path", "get_row_count", "get_validation_summary", @@ -4150,6 +4151,8 @@ def connect_to_table(connection_string: str) -> Any: "Install it with: pip install 'ibis-framework[duckdb]' (or other backend as needed)" ) + import os + import ibis # Check if connection string includes table specification @@ -4165,6 +4168,8 @@ def connect_to_table(connection_string: str) -> Any: available_tables = [] conn.disconnect() + conn.close() + os.unlink(base_connection) # Create helpful error message if available_tables: @@ -17805,6 +17810,297 @@ def get_step_report( return step_report + def get_dataframe(self, tbl_type: Literal["polars", "pandas", "duckdb"] = "polars"): + """ + Validation results as a dataframe + + The `get_dataframe()` method returns a dataframe that represents the validation report. This dataframe provides a summary of the validation results, including the validation steps, the number of test units, the number of failing test units, and the fraction of failing test units. This can be particularly helpful for logging purposes and enables writing validation summaries to CSVs and other on-disk formats. + + Parameters + ---------- + tbl_type : + The output backend for the dataframe. The named options are `"polars"`, `"pandas"`, and `"duckdb"`. Default is "polars". + + Supported DataFrame Types + ------------------------- + The `tbl_type=` parameter can be set to one of the following: + + - `"polars"`: A Polars DataFrame. + - `"pandas"`: A Pandas DataFrame. + - `"duckdb"`: An Ibis table for a DuckDB database. + + Examples + -------- + + ```{python} + import pointblank as pb + + # Create a validation + validation = ( + pb.Validate(data=pb.load_dataset("small_table", tbl_type = "polars"), label="My validation") + .col_vals_gt(columns="d", value=100) + .col_vals_regex(columns="b", pattern=r"[0-9]-[a-z]{3}-[0-9]{3}") + .interrogate() + ) + + # Get a dataframe of the validation summary results + df_validation = validation.get_dataframe() + + ``` + + """ + # Raise an error if tbl_type is not one of the supported types + if tbl_type not in ["polars", "pandas", "duckdb"]: + raise ValueError( + f"The DataFrame type `{tbl_type}` is not valid. Choose one of the following:\n" + "- `polars`\n" + "- `pandas`\n" + "- `duckdb`" + ) + + # Grab the summary data from validation info helper function + report_original = _validation_info_as_dict(self.validation_info) + + # Pop the extracts off if present + if "extract" in report_original: + report_original.pop("extract") + + # Set a dictionary for converting column names for df + names_dict = { + "active": "active", + "i": "step_number", + "assertion_type": "step_description", + "column": "columns", + "values": "values", + "pre": "original_pre", + "segments": "original_segments", + "eval_error": "step_evaluated", + "n": "units", + "all_passed": "all_units_passed", + "n_passed": "pass_n", + "f_passed": "pass_pct", + "n_failed": "failed_n", + "f_failed": "failed_pct", + "warning": "warning", + "error": "error", + "critical": "critical", + "brief": "input_brief", + "autobrief": "autobrief", + } + + final_report = { + key: report_original[key] for key in names_dict.keys() if key in report_original + } + + # Check for polars, raise if not installed + if tbl_type == "polars": + if not _is_lib_present(lib_name="polars"): + raise ImportError( + "The Polars library is not installed but is required when specifying " + '`tbl_type="polars".' + ) + + import polars as pl + + # Create the schema for the df + pl_schema = pl.Schema( + { + "active": pl.Boolean, + "i": pl.Int64, + "assertion_type": pl.String, + "column": pl.String, + "values": pl.Object, + "pre": pl.Object, + "segments": pl.String, + "eval_error": pl.Boolean, + "n": pl.Int64, + "all_passed": pl.Boolean, + "n_passed": pl.Int64, + "f_passed": pl.Float64, + "n_failed": pl.Int64, + "f_failed": pl.Float64, + "warning": pl.Boolean, + "error": pl.Boolean, + "critical": pl.Boolean, + "brief": pl.String, + "autobrief": pl.String, # Default brief if none found + } + ) + + df_validation_results = ( + pl.DataFrame(data=final_report, schema=pl_schema) + .rename(names_dict) + .with_columns( + brief=pl.coalesce(pl.col("input_brief"), pl.col("autobrief")), + preprocessed=pl.when(pl.col("original_pre").is_not_null()) + .then(pl.lit(True)) + .otherwise(pl.lit(False)), + segmented=pl.when(pl.col("original_segments").is_not_null()) + .then(pl.lit(True)) + .otherwise(pl.lit(False)), + # Extract pattern from values if it's a dict, otherwise keep as-is + values=pl.col("values").map_elements( + lambda x: x.get("pattern") if isinstance(x, dict) and "pattern" in x else x, + return_dtype=pl.Object, + ), + ) + .with_columns( + [ + pl.when(~pl.col("active")) + .then(pl.lit("-")) + .otherwise(pl.col(col)) + .alias(col) + for col in [ + "step_evaluated", + "units", + "all_units_passed", + "pass_n", + "pass_pct", + "failed_n", + "failed_pct", + "warning", + "error", + "critical", + ] + ] + ) + .drop(["input_brief", "autobrief", "original_pre", "original_segments"]) + ) + + return df_validation_results + + if tbl_type == "pandas": + if not _is_lib_present(lib_name="pandas"): + raise ImportError( + "The Pandas library is not installed but is required when specifying " + '`tbl_type="pandas".' + ) + + import pandas as pd + + def transform_validation_results(df): + # Coalesce: use fillna for first occurrence + df = df.assign(brief=df["input_brief"].fillna(df["autobrief"])) + + # Boolean columns based on null checks + df = df.assign( + preprocessed=df["original_pre"].notna(), + segmented=df["original_segments"].notna(), + ) + + # Extract pattern from dict + df = df.assign( + values=df["values"].apply( + lambda x: x.get("pattern") if isinstance(x, dict) and "pattern" in x else x + ) + ) + + # Create conditional columns in a loop + conditional_cols = [ + "step_evaluated", + "units", + "all_units_passed", + "pass_n", + "pass_pct", + "failed_n", + "failed_pct", + "warning", + "error", + "critical", + ] + + for col in conditional_cols: + df[col] = df[col].where(df["active"], "-") + + # Drop columns + df = df.drop( + columns=["input_brief", "autobrief", "original_pre", "original_segments"] + ) + + return df + + df_validation_results = ( + pd.DataFrame(data=final_report) + .rename(columns=names_dict) + .pipe(transform_validation_results) + ) + + return df_validation_results + + if tbl_type == "duckdb": + if not _is_lib_present(lib_name="ibis"): + raise ImportError( + "The Ibis library is not installed but is required when specifying " + '`tbl_type="duckdb".' + ) + + import ibis + import ibis.expr.datatypes as dt + + ibis_schema = { + "active": dt.Boolean(), + "i": dt.Int64(), + "assertion_type": dt.String(), + "column": dt.String(), + "values": dt.json(), + "pre": dt.json(), + "segments": dt.String(), + "eval_error": dt.Boolean(), + "n": dt.Int64(), + "all_passed": dt.Boolean(), + "n_passed": dt.Int64(), + "f_passed": dt.Float64(), + "n_failed": dt.Int64(), + "f_failed": dt.Float64(), + "warning": dt.Boolean(), + "error": dt.Boolean(), + "critical": dt.Boolean(), + "brief": dt.String(), + "autobrief": dt.String(), + } + + # Pulling out clean regex pattern if needed + final_report["values"] = [ + values.get("pattern") + if isinstance(values, dict) and "pattern" in values + else values + for values in final_report["values"] + ] + + report_table = ibis.memtable(final_report, schema=ibis_schema).rename( + {v: k for k, v in names_dict.items()} + ) + + conditional_cols = [ + "step_number", + "step_evaluated", + "units", + "all_units_passed", + "pass_n", + "pass_pct", + "failed_n", + "failed_pct", + "warning", + "error", + "critical", + ] + + df_validation_results = report_table.mutate( + brief=ibis.coalesce(report_table.input_brief, report_table.autobrief), + preprocessed=report_table.original_pre.notnull(), + segmented=report_table.original_segments.notnull(), + **{ + col: ibis.ifelse( + ~report_table.active, + ibis.literal("-"), + report_table[col].cast("string"), + ) + for col in conditional_cols + }, + ).drop("input_brief", "autobrief", "original_pre", "original_segments") + + return df_validation_results + def _add_validation(self, validation_info): """ Add a validation to the list of validations. diff --git a/tests/test_validate.py b/tests/test_validate.py index f58c2f150..ab531247b 100644 --- a/tests/test_validate.py +++ b/tests/test_validate.py @@ -78,7 +78,7 @@ class StrEnum(str, Enum): ## If we specifically disable tests in pytest set the availability to False if os.environ.get("SKIP_PYSPARK_TESTS", "").lower() in ("true", "1", "yes"): - PYSPARK_AVAILABLE = False + PYSPARKAVAILABLE = False SQLITE_AVAILABLE = True if os.environ.get("SKIP_SQLITE_TESTS", "").lower() in ("true", "1", "yes"): SQLITE_AVAILABLE = False @@ -13899,6 +13899,47 @@ def test_get_step_report_schema_checks(schema) -> None: assert isinstance(validation.get_step_report(i=1), GT.GT) +def test_get_dataframe_wrong_tbl_type_messaging(): + tbl = pl.DataFrame({"name": ["Monica", "Erica", "Rita", "Tina"], "mambo_no": [2, 3, 4, 5]}) + + validation = Validate(data=tbl).col_vals_gt(columns="mambo_no", value=5).interrogate() + + with pytest.raises(ValueError, match="The DataFrame type `polar` is not valid. Choose one of"): + validation.get_dataframe("polar") + + +@pytest.mark.parametrize( + "library, tbl_type", [("Polars", "polars"), ("Pandas", "pandas"), ("Ibis", "duckdb")] +) +def test_get_dataframe_missing_libraries(library, tbl_type): + + validation = Validate(data="small_table") + + with patch("pointblank.validate._is_lib_present") as mock_is_lib: + mock_is_lib.return_value = False # library not present + + with pytest.raises(ImportError, match=f"The {library} library is not installed"): + validation.get_dataframe(tbl_type) + + +def test_get_dataframe_returns_polars_df(): + validation = Validate(data="small_table") + df_polars = validation.get_dataframe("polars") + assert isinstance(df_polars, pl.DataFrame) + + +def test_get_dataframe_returns_pandas_df(): + validation = Validate(data="small_table") + df_pandas = validation.get_dataframe("pandas") + assert isinstance(df_pandas, pd.DataFrame) + + +def test_get_dataframe_returns_ibis_memtable(): + validation = Validate(data="small_table") + df_ibis = validation.get_dataframe("duckdb") + assert isinstance(df_ibis, ibis.expr.types.relations.Table) + + def get_schema_info( data_tbl, schema, @@ -19317,6 +19358,7 @@ def test_col_vals_ge_timezone_datetime_duckdb() -> None: finally: conn.close() + os.unlink(temp_db_path) @pytest.mark.xfail(reason="Mixed timezone comparisons may not work correctly yet")