diff --git a/doc/source/conf.py b/doc/source/conf.py index a7a54b7e2d..15972d5552 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -370,11 +370,11 @@ # Example configuration for intersphinx: refer to the Python standard library. intersphinx_mapping = { "python": ("https://docs.python.org/3/", None), - "iris": ("https://scitools-iris.readthedocs.io/en/latest/", None), - "cartopy": ("https://scitools.org.uk/cartopy/docs/latest/", None), + "iris": ("https://scitools-iris.readthedocs.io/en/stable/", None), + "cartopy": ("https://cartopy.readthedocs.io/stable/", None), "cf_units": ("https://cf-units.readthedocs.io/en/stable/", None), "numpy": ("https://numpy.org/doc/stable/", None), - "scipy": ("https://docs.scipy.org/doc/scipy-1.6.2/reference/", None), + "scipy": ("https://docs.scipy.org/doc/scipy/reference", None), "pandas": ("https://pandas.pydata.org/pandas-docs/dev/", None), } diff --git a/improver/calibration/__init__.py b/improver/calibration/__init__.py index 08e432adcc..75e3f99706 100644 --- a/improver/calibration/__init__.py +++ b/improver/calibration/__init__.py @@ -7,8 +7,12 @@ """ from collections import OrderedDict +from pathlib import Path from typing import Dict, List, Optional, Tuple, Union +import iris +import joblib +import pandas as pd from iris.cube import Cube, CubeList from improver.metadata.probabilistic import ( @@ -51,6 +55,7 @@ def __init__(self): ("altitude", pa.float32()), ("time", pa.timestamp("s", "utc")), ("wmo_id", pa.string()), + ("station_id", pa.string()), ("ob_value", pa.float32()), ] ) @@ -263,6 +268,85 @@ def split_forecasts_and_bias_files(cubes: CubeList) -> Tuple[Cube, Optional[Cube return forecast_cube, bias_cubes +def split_pickle_parquet_and_netcdf(files): + """Split the input files into pickle, parquet, and netcdf files. + Only a single pickle file is expected. + + Args: + files: + A list of input file paths which will be split into pickle, + parquet, and netcdf files. + Returns: + - A flattened cube list containing all the cubes contained within the + provided paths to NetCDF files. + - A list of paths to Parquet files. + - A loaded pickle file. + Raises: + ValueError: If multiple pickle files provided, as only one is ever expected. + """ + cubes = iris.cube.CubeList() + loaded_pickles = [] + parquets = [] + + for file_path in files: + if not file_path.exists(): + continue + + # Directories indicate we are working with parquet files. + if file_path.is_dir(): + parquets.append(file_path) + continue + + try: + cube = iris.load(file_path) + cubes.extend(cube) + except ValueError: + try: + loaded_pickles.append(joblib.load(file_path)) + except Exception as e: + msg = f"Failed to load {file_path}: {e}" + raise ValueError(msg) + + if len(loaded_pickles) > 1: + msg = "Multiple pickle inputs have been provided. Only one is expected." + raise ValueError(msg) + + return ( + cubes if cubes else None, + parquets if parquets else None, + loaded_pickles[0] if loaded_pickles else None, + ) + + +def identify_parquet_type(parquet_paths: List[Path]): + """Determine whether the provided parquet paths contain forecast or truth data. + This is done by checking the columns within the parquet files for the presence + of a forecast_period column which is only present for forecast data. + Args: + parquet_paths: + A list of paths to Parquet files. + Returns: + - The path to the Parquet file containing the historical forecasts. + - The path to the Parquet file containing the truths. + """ + import pyarrow.parquet as pq + + forecast_table_path = None + truth_table_path = None + for file_path in parquet_paths: + try: + example_file_path = next(file_path.glob("**/*.parquet")) + except StopIteration: + continue + try: + pq.read_schema(example_file_path).field("forecast_period") + forecast_table_path = file_path + except KeyError: + truth_table_path = file_path + + return forecast_table_path, truth_table_path + + def validity_time_check(forecast: Cube, validity_times: List[str]) -> bool: """Check the validity time of the forecast matches the accepted validity times within the validity times list. @@ -307,3 +391,25 @@ def add_warning_comment(forecast: Cube) -> Cube: "however, no calibration has been applied." ) return forecast + + +def get_training_period_cycles( + cycletime: str, forecast_period: Union[int, str], training_length: int +): + """Generate a list of forecast reference times for the training period. + + Args: + cycletime: The time at which the forecast is issued in a format understood by + pandas.Timestamp e.g. 20170109T0000Z. + forecast_period: The forecast period in seconds. + training_length: The number of days in the training period. + """ + forecast_period_td = pd.Timedelta(int(forecast_period), unit="seconds") + + return pd.date_range( + end=pd.Timestamp(cycletime) + - pd.Timedelta(1, unit="days") + - forecast_period_td.floor("D"), + periods=int(training_length), + freq="D", + ) diff --git a/improver/calibration/dataframe_utilities.py b/improver/calibration/dataframe_utilities.py index 857442390b..766f5aea75 100644 --- a/improver/calibration/dataframe_utilities.py +++ b/improver/calibration/dataframe_utilities.py @@ -128,7 +128,7 @@ def _unique_check(df: DataFrame, column: str) -> None: raise ValueError(msg) -def _quantile_check(df: DataFrame) -> None: +def quantile_check(df: DataFrame) -> None: """Check that the percentiles provided can be considered to be quantiles with equal spacing spanning the percentile range. @@ -142,7 +142,7 @@ def _quantile_check(df: DataFrame) -> None: if not np.allclose(expected_percentiles, df["percentile"].unique()): msg = ( - "The forecast percentiles can not be considered as quantiles. " + "Forecast percentiles must be equally spaced. " f"The forecast percentiles are {df['percentile'].unique()}." "Based on the number of percentiles provided, the expected " f"percentiles would be {expected_percentiles}." @@ -447,7 +447,7 @@ def _prepare_dataframes( # Check the percentiles can be considered to be equally space quantiles. if representation_type == "percentile": - _quantile_check(forecast_df) + quantile_check(forecast_df) # Remove forecast duplicates. forecast_cols = [ diff --git a/improver/calibration/load_and_apply_quantile_regression_random_forest.py b/improver/calibration/load_and_apply_quantile_regression_random_forest.py index 62f35a0465..ee3f0d3f29 100644 --- a/improver/calibration/load_and_apply_quantile_regression_random_forest.py +++ b/improver/calibration/load_and_apply_quantile_regression_random_forest.py @@ -6,11 +6,9 @@ """Script to load and apply the trained Quantile Regression Random Forest (QRF) model.""" -import pathlib from typing import Optional import iris -import joblib import numpy as np import pandas as pd from iris.cube import Cube, CubeList @@ -22,9 +20,6 @@ ApplyQuantileRegressionRandomForests, quantile_forest_package_available, ) -from improver.ensemble_copula_coupling.ensemble_copula_coupling import ( - RebadgePercentilesAsRealizations, -) from improver.ensemble_copula_coupling.utilities import choose_set_of_percentiles from improver.utilities.cube_checker import assert_spatial_coords_match @@ -39,15 +34,15 @@ class RandomForestQuantileRegressor: iris.FUTURE.pandas_ndim = True -class LoadAndApplyQRF(PostProcessingPlugin): - """Load and apply the trained Quantile Regression Random Forest (QRF) model.""" +class PrepareAndApplyQRF(PostProcessingPlugin): + """Prepare the input forecast for application of a trained Quantile Regression + Random Forest (QRF) model and apply the QRF model.""" def __init__( self, feature_config: dict[str, list[str]], target_cf_name: str, - transformation: Optional[str] = None, - pre_transform_addition: Optional[float] = None, + unique_site_id_keys: list[str] = ["wmo_id"], ): """Initialise the plugin. @@ -73,54 +68,36 @@ def __init__( A string containing the CF name of diagnostic to be calibrated. This will be used to separate it from the rest of the dynamic predictors, if present. - transformation (str): - Transformation to be applied to the data before fitting. - pre_transform_addition (float): - Value to be added before transformation. - + unique_site_id_keys (list): + The names of the coordinates that uniquely identify each site, + e.g. "wmo_id" or "latitude,longitude". """ - self.feature_config = feature_config self.target_cf_name = target_cf_name - self.transformation = transformation - self.pre_transform_addition = pre_transform_addition + self.unique_site_id_keys = unique_site_id_keys self.quantile_forest_installed = quantile_forest_package_available() def _get_inputs( - self, file_paths: list[pathlib.Path] - ) -> tuple[CubeList, Cube, RandomForestQuantileRegressor]: - """Get inputs from disk and separate the model and the features. + self, + cube_inputs: iris.cube.CubeList, + qrf_model: Optional[RandomForestQuantileRegressor] = None, + ) -> tuple[CubeList, Cube]: + """Split the forecast to be calibrated from the other features. Handle + the case where the qrf_model is not provided. In this case, the uncalibrated + forecast is returned with a warning comment added. Args: - file_paths: List of paths to the trained QRF model and the forecast to be - calibrated and the features, as required. + cube_inputs: List of cubes containing the features and the forecast to be + calibrated. Returns: - CubeList of the features cubes, the forecast cube, and the - trained QRF model. + CubeList of the features cubes and the forecast cube. Raises: - ValueError: If no QRF model is found in the provided file paths. - ValueError: If no features are found in the provided file paths. - ValueError: If the number of inputs does not match the number of file paths. + ValueError: If not target forecast is provided. + ValueError: If the number of cubes provided does not match the number of + features expected. """ - cube_inputs = iris.cube.CubeList([]) - qrf_model = None - - for file_path in file_paths: - try: - cube = iris.load_cube(file_path) - cube_inputs.append(cube) - except ValueError: - qrf_model = joblib.load(file_path) - - if not cube_inputs: - msg = ( - "No features found in the provided file paths. " - "At least one feature must be provided." - ) - raise ValueError(msg) - # Extract all additional cubes which are associated with a feature in the # feature_config. forecast_constraint = iris.Constraint(name=self.target_cf_name) @@ -130,15 +107,16 @@ def _get_inputs( (forecast_cube,) = forecast_cube else: msg = ( - "No target forecast provided. An input file representing the target " + "No target forecast provided. An input cube representing the target " "must be provided, even if the target will not be used as a feature. " f"The target is '{self.target_cf_name}'." ) raise ValueError(msg) if not qrf_model: + # If no model is provided, return the input forecast with a warning. forecast_cube = add_warning_comment(forecast_cube) - return None, forecast_cube, None + return None, forecast_cube if len(cube_inputs) != len(self.feature_config.keys()): msg = ( @@ -149,60 +127,30 @@ def _get_inputs( ) raise ValueError(msg) - if not qrf_model: - # The specified model doesn't exist and the forecast will not be calibrated - return forecast_cube - # If target diagnostic not a feature in the training then remove. if self.target_cf_name not in self.feature_config.keys(): cube_inputs.remove(forecast_cube) - return cube_inputs, forecast_cube, qrf_model + return cube_inputs, forecast_cube @staticmethod - def _compute_percentiles(forecast_cube: Cube, coord: str) -> list[float]: - """Compute the percentiles from the forecast cube. + def _compute_quantile_list(forecast_cube: Cube, coord: str) -> list[float]: + """Compute the list of quantiles e.g. 0.25, 0.5, 0.75 that will be produced + by using the forecast cube. Args: forecast_cube: Forecast to be calibrated. coord: Coordinate name. The length of the coordinate will be used to - determine the number of percentiles to compute. + determine the number of quantiles to compute. Returns: - List of percentiles computed from the forecast cube. + List of quantiles (e.g. 0.25, 0.5, 0.75) computed from the forecast cube. """ n_percentiles = len(forecast_cube.coord(coord).points) - percentiles = ( - np.array(choose_set_of_percentiles(n_percentiles)) / 100 - ).tolist() - return percentiles - - @staticmethod - def _percentiles_to_realizations(cube_inputs: CubeList) -> CubeList: - """Convert percentiles to realizations. The input forecasts are expected to - be percentiles but these percentiles are rebadged as realizations. + quantiles = (np.array(choose_set_of_percentiles(n_percentiles)) / 100).tolist() + return quantiles - Args: - cube_inputs: - List of cubes containing the features and the forecast to be calibrated. - Some may be percentiles. - Returns: - cube_inputs: - List of cubes with percentiles rebadged as realizations, - where appropriate - """ - - # Ensure there is a realization dimension on all cubes. This assumes a - # percentile dimension is present. - realization_cube_inputs = iris.cube.CubeList([]) - for feature_cube in cube_inputs: - if feature_cube.coords("percentile"): - feature_cube = RebadgePercentilesAsRealizations()(feature_cube) - realization_cube_inputs.append(feature_cube) - return realization_cube_inputs - - @staticmethod - def _cube_to_dataframe(cube_inputs: CubeList) -> pd.DataFrame: + def _cube_to_dataframe(self, cube_inputs: CubeList) -> pd.DataFrame: """Convert cube inputs to a pandas DataFrame. Args: @@ -215,15 +163,16 @@ def _cube_to_dataframe(cube_inputs: CubeList) -> pd.DataFrame: # Convert the first cube to a DataFrame. df = as_data_frame(cube_inputs[0], add_aux_coords=True).reset_index() + possible_columns = [ + *self.unique_site_id_keys, + "time", + "forecast_reference_time", + "forecast_period", + ] + # Iteratively convert remaining cubes to DataFrame and merge. for cube in cube_inputs[1:]: temporary_df = as_data_frame(cube, add_aux_coords=True).reset_index() - possible_columns = [ - "wmo_id", - "time", - "forecast_reference_time", - "forecast_period", - ] merge_columns = [ col for col in possible_columns if col in temporary_df.columns ] @@ -239,7 +188,10 @@ def _cube_to_dataframe(cube_inputs: CubeList) -> pd.DataFrame: def process( self, - file_paths: list[pathlib.Path], + cube_inputs: CubeList, + qrf_descriptors: Optional[ + tuple[RandomForestQuantileRegressor, str, float] + ] = None, ) -> Cube: """Load and applying the trained Quantile Regression Random Forest (QRF) model. The model is applied to the forecast supplied to calibrate the forecast. @@ -247,38 +199,48 @@ def process( input forecast is returned unchanged. Args: - file_paths (cli.inputpaths): - A list of input paths containing: - - The path to a QRF trained model in pickle file format to be used - for calibration. - - The path to a NetCDF file containing the forecast to be calibrated. - - Optionally, paths to NetCDF files containing additional predictors. + cube_inputs: List of cubes containing the features and the forecast to be + calibrated. + qrf_descriptors: The trained QRF model to be applied to the forecast + and the transformation and pre-transform addition applied during + training. The descriptors expected are a tuple of: + (qrf_model, transformation, pre_transform_addition). Returns: iris.cube.Cube: The calibrated forecast cube. """ - cube_inputs, forecast_cube, qrf_model = self._get_inputs(file_paths) - if not self.quantile_forest_installed: - return forecast_cube - if not qrf_model: + if qrf_descriptors is None: + # If no descriptors are provided, return the input forecast with a warning. + # Descriptors expected: (qrf_model, transformation, pre_transform_addition) + qrf_descriptors = (None, None, 0) + qrf_model, transformation, pre_transform_addition = qrf_descriptors + + cube_inputs, forecast_cube = self._get_inputs(cube_inputs, qrf_model=qrf_model) + + if cube_inputs: + assert_spatial_coords_match(cube_inputs) + + if not self.quantile_forest_installed or not qrf_model: return forecast_cube template_forecast_cube = forecast_cube.copy() - if forecast_cube.coords("percentile"): - percentiles = self._compute_percentiles(forecast_cube.copy(), "percentile") - elif forecast_cube.coords("realization"): - percentiles = self._compute_percentiles(forecast_cube.copy(), "realization") + if forecast_cube.coords("realization"): + quantile_list = self._compute_quantile_list( + forecast_cube.copy(), "realization" + ) + elif forecast_cube.coords("percentile"): + quantile_list = (forecast_cube.coord("percentile").points / 100.0).tolist() - assert_spatial_coords_match(cube_inputs) df = self._cube_to_dataframe(cube_inputs) calibrated_forecast = ApplyQuantileRegressionRandomForests( target_name=self.target_cf_name, feature_config=self.feature_config, - quantiles=percentiles, - transformation=self.transformation, - pre_transform_addition=self.pre_transform_addition, + quantiles=quantile_list, + transformation=transformation, + pre_transform_addition=pre_transform_addition, + unique_site_id_keys=self.unique_site_id_keys, )(qrf_model, df) calibrated_forecast_cube = template_forecast_cube.copy( data=np.broadcast_to(calibrated_forecast.T, template_forecast_cube.shape) diff --git a/improver/calibration/load_and_train_quantile_regression_random_forest.py b/improver/calibration/load_and_train_quantile_regression_random_forest.py index 421ca110cb..3ac13fbabc 100644 --- a/improver/calibration/load_and_train_quantile_regression_random_forest.py +++ b/improver/calibration/load_and_train_quantile_regression_random_forest.py @@ -7,7 +7,7 @@ import pathlib from pathlib import Path -from typing import Optional +from typing import Optional, Union import iris import numpy as np @@ -15,110 +15,101 @@ from iris.pandas import as_data_frame from improver import PostProcessingPlugin -from improver.calibration import CalibrationSchemas +from improver.calibration import ( + CalibrationSchemas, + get_training_period_cycles, + identify_parquet_type, + split_pickle_parquet_and_netcdf, +) from improver.calibration.quantile_regression_random_forest import ( TrainQuantileRegressionRandomForests, quantile_forest_package_available, ) -from improver.utilities.load import load_cube + +try: + from quantile_forest import RandomForestQuantileRegressor +except ModuleNotFoundError: + # Define empty class to avoid type hint errors. + class RandomForestQuantileRegressor: + pass + iris.FUTURE.pandas_ndim = True -class LoadAndTrainQRF(PostProcessingPlugin): - """Plugin to load and train a Quantile Regression Random Forest (QRF) model.""" +class LoadForTrainQRF(PostProcessingPlugin): + """Plugin to load input files for training a Quantile Regression Random Forest + (QRF) model.""" def __init__( self, feature_config: dict[str, list[str]], - target_diagnostic_name: str, + parquet_diagnostic_names: str, target_cf_name: str, forecast_periods: str, cycletime: str, training_length: int, experiment: Optional[str] = None, - n_estimators: int = 100, - max_depth: Optional[int] = None, - max_samples: Optional[float] = None, - random_state: Optional[int] = None, - transformation: Optional[str] = None, - pre_transform_addition: float = 0, + unique_site_id_keys: Union[list[str], str] = "wmo_id", ): - """Initialise the LoadAndTrainQRF plugin.""" + """Initialise the LoadForTrainQRF plugin. + + Args: + feature_config: Feature configuration defining the features to be used for + Quantile Regression Random Forests. + parquet_diagnostic_names (str): + A string containing the diagnostic name that will be used for filtering + the target diagnostic from the forecast and truth DataFrames read in + from the parquet files. This could be different from the CF name e.g. + 'temperature_at_screen_level'. + target_cf_name: A string containing the CF name of the forecast to be + calibrated e.g. air_temperature. + forecast_periods: Range of forecast periods to be calibrated in hours in + the form: "start:end:interval" e.g. "6:18:6" or a single forecast period + e.g. "6". + cycletime: The time at which the forecast is valid in the form: + YYYYMMDDTHHMMZ. + training_length: The number of days of training data to use. + experiment: The name of the experiment (step) that calibration is applied to. + unique_site_id_key: The names of the coordinates that uniquely identify + each site, e.g. "wmo_id" or "latitude,longitude". + """ + self.quantile_forest_installed = quantile_forest_package_available() self.feature_config = feature_config - self.target_diagnostic_name = target_diagnostic_name + self.parquet_diagnostic_names = parquet_diagnostic_names self.target_cf_name = target_cf_name self.forecast_periods = forecast_periods self.cycletime = cycletime self.training_length = training_length self.experiment = experiment - self.n_estimators = n_estimators - self.max_depth = max_depth - self.max_samples = max_samples - self.random_state = random_state - self.transformation = transformation - self.pre_transform_addition = pre_transform_addition - self.quantile_forest_installed = quantile_forest_package_available() + if isinstance(unique_site_id_keys, str): + unique_site_id_keys = [unique_site_id_keys] + self.unique_site_id_keys = unique_site_id_keys - def _split_cubes_and_parquet_files( - self, file_paths: list[pathlib.Path | str] - ) -> tuple[Optional[pathlib.Path], Optional[pathlib.Path], iris.cube.CubeList]: - """Split the input file paths into cubes and parquet files. - - Args: - file_paths: List of file paths. + def _parse_forecast_periods(self) -> list[int]: + """Parse the forecast periods argument to produce a list of forecast periods + in seconds. Returns: - Tuple containing the items below if found: - - Path to the forecast parquet file. - - Path to the truth parquet file. - - List of cubes loaded from the NetCDF files. - + List of forecast periods in seconds. Raises: - ValueError: If the number of cubes loaded does not match the number of - features expected. + ValueError: If the forecast_periods argument is not a single integer or + a range in the form 'start:end:interval'. """ - import pyarrow.parquet as pq - - forecast_table_path = None - truth_table_path = None - cube_inputs = iris.cube.CubeList([]) - - # file extraction loop: - for file_path in file_paths: - if not Path(file_path).exists(): - # This will occur when the filepath does not exist. In this case, - # return None. - return None, None, None - elif Path(file_path).is_dir(): - try: - example_file_path = next(Path(file_path).glob("**/*.parquet")) - except StopIteration: - # If no parquet files are found, return None. - return None, None, None - try: - pq.read_schema(example_file_path).field("forecast_period") - forecast_table_path = file_path - except KeyError: - truth_table_path = file_path - else: - cube = load_cube(str(file_path)) - cube_inputs.append(cube) - - if len(self.feature_config.keys()) not in [ - len(cube_inputs), - len(cube_inputs) + 1, - ]: - msg = ( - "The number of cubes loaded does not match the number of features " - "expected. These can mismatch if some features are coming from the " - "historic forecast table. The number of cubes loaded was: " - f"{len(cube_inputs)}. The number of features expected was: " - f"{len(self.feature_config.keys())}." - ) - raise ValueError(msg) - - return forecast_table_path, truth_table_path, cube_inputs + if ":" in self.forecast_periods: + forecast_periods = list(range(*map(int, self.forecast_periods.split(":")))) + forecast_periods = [fp * 3600 for fp in forecast_periods] + else: + try: + forecast_periods = [int(self.forecast_periods) * 3600] + except ValueError: + msg = ( + "The forecast_periods argument must be a single integer or " + "a range in the form 'start:end:interval'. The forecast period" + f"provided was: {self.forecast_periods}." + ) + raise ValueError(msg) + return forecast_periods def _read_parquet_files( self, @@ -127,7 +118,7 @@ def _read_parquet_files( forecast_periods: list[int], ) -> tuple[pd.DataFrame, pd.DataFrame]: """Read the forecast and truth data from parquet files. - + self.quantile_forest_installed = quantile_forest_package_available() Args: forecast_table_path: Path to the forecast parquet file. truth_table_path: Path to the truth parquet file. @@ -150,22 +141,16 @@ def _read_parquet_files( cycletimes = [] for forecast_period in forecast_periods: # Load forecasts from parquet file filtering by diagnostic and blend_time. - forecast_period_td = pd.Timedelta(int(forecast_period), unit="seconds") - cycletimes.extend( - pd.date_range( - end=pd.Timestamp(self.cycletime) - - pd.Timedelta(1, unit="days") - - forecast_period_td.floor("D"), - periods=int(self.training_length), - freq="D", + get_training_period_cycles( + self.cycletime, forecast_period, self.training_length ) ) cycletimes = list(set(cycletimes)) filters = [ [ - ("diagnostic", "==", self.target_diagnostic_name), + ("diagnostic", "in", self.parquet_diagnostic_names), ("blend_time", "in", cycletimes), ("experiment", "==", self.experiment), ] @@ -195,19 +180,53 @@ def _read_parquet_files( forecast_df = forecast_df[ forecast_df["forecast_period"].isin(np.array(forecast_periods) * 1e9) - ] + ].reset_index(drop=True) + # Convert df columns from ns to pandas timestamp object. for column in ["time", "forecast_reference_time", "blend_time"]: forecast_df[column] = pd.to_datetime( forecast_df[column], unit="ns", utc=True ) + forecast_df[column] = forecast_df[column].astype("datetime64[ns, UTC]") for column in ["forecast_period", "period"]: forecast_df[column] = pd.to_timedelta(forecast_df[column], unit="ns") + forecast_df[column] = forecast_df[column].astype("timedelta64[ns]") - forecast_df = forecast_df.rename(columns={"forecast": self.target_cf_name}) + # Convert additional features from rows to columns. + representation = ( + "percentile" if "percentile" in forecast_df.columns else "realization" + ) + base_df = forecast_df[ + forecast_df["diagnostic"] == self.parquet_diagnostic_names[0] + ] + for parquet_diagnostic_name in self.parquet_diagnostic_names[1:]: + additional_df = forecast_df[ + forecast_df["diagnostic"] == parquet_diagnostic_name + ] + base_df = pd.merge( + base_df, + additional_df[ + [ + *self.unique_site_id_keys, + "forecast_reference_time", + "forecast_period", + representation, + "forecast", + ] + ].rename(columns={"forecast": parquet_diagnostic_name}), + on=[ + *self.unique_site_id_keys, + "forecast_reference_time", + "forecast_period", + representation, + ], + how="left", + ) + forecast_df = base_df + forecast_df.rename(columns={"forecast": self.target_cf_name}, inplace=True) # Load truths from parquet file filtering by diagnostic. - filters = [[("diagnostic", "==", self.target_diagnostic_name)]] + filters = [[("diagnostic", "==", self.parquet_diagnostic_names[0])]] truth_df = pd.read_parquet( truth_table_path, filters=filters, @@ -216,6 +235,7 @@ def _read_parquet_files( ) truth_df["time"] = pd.to_datetime(truth_df["time"], unit="ns", utc=True) + truth_df["time"] = truth_df["time"].astype("datetime64[ns, UTC]") if truth_df.empty: msg = ( @@ -225,6 +245,107 @@ def _read_parquet_files( raise IOError(msg) return forecast_df, truth_df + def process( + self, + file_paths: list[pathlib.Path | str], + ) -> Optional[tuple[iris.cube.CubeList, pathlib.Path | str, pathlib.Path | str]]: + """Load input files for training a Quantile Regression Random Forest (QRF) + model. Two sources of input data must be provided: historical forecasts and + historical truth data (to use in calibration). + + Args: + file_paths (cli.inputpaths): + A list of input paths containing: + - The path to a Parquet file containing the truths to be used + for calibration. The expected columns within the + Parquet file are: ob_value, time, wmo_id, diagnostic, latitude, + longitude and altitude. + - The path to a Parquet file containing the forecasts to be used + for calibration. + - Optionally, paths to NetCDF files containing additional predictors. + """ + if not self.quantile_forest_installed: + return None + cube_inputs, parquets, _ = split_pickle_parquet_and_netcdf(file_paths) + + # If there are no parquet files, return None. + if not parquets: + return None + + forecast_table_path, truth_table_path = identify_parquet_type(parquets) + + # If either the forecast or truth parquet files are missing, return None. + if not forecast_table_path or not truth_table_path: + return None + + forecast_periods = self._parse_forecast_periods() + forecast_df, truth_df = self._read_parquet_files( + forecast_table_path, truth_table_path, forecast_periods + ) + + if cube_inputs is None: + cube_inputs = iris.cube.CubeList() + missing_features = ( + set(self.feature_config.keys()) + - set(forecast_df.columns) + - set([c.name() for c in cube_inputs]) + ) + + if len(missing_features) > 0: + msg = ( + "The features requested in the feature_config are absent from " + "the forecast parquet file and the input cubes. The missing fields are: " + f"{missing_features}." + ) + raise ValueError(msg) + return forecast_df, truth_df, cube_inputs + + +class PrepareAndTrainQRF(PostProcessingPlugin): + """Plugin to prepare and train a Quantile Regression Random Forest (QRF) model.""" + + def __init__( + self, + feature_config: dict[str, list[str]], + target_cf_name: str, + n_estimators: int = 100, + max_depth: Optional[int] = None, + max_samples: Optional[float] = None, + random_state: Optional[int] = None, + transformation: Optional[str] = None, + pre_transform_addition: float = 0, + unique_site_id_keys: Union[list[str], str] = "wmo_id", + ): + """Initialise the PrepareAndTrainQRF plugin. + + Args: + feature_config: Feature configuration defining the features to be used for + Quantile Regression Random Forests. + target_cf_name: A string containing the CF name of the forecast to be + calibrated e.g. air_temperature. + n_estimators: The number of trees in the forest. + max_depth: The maximum depth of the trees. + max_samples: The maximum number of samples to draw from the total number of + samples to train each tree. + random_state: Seed used by the random number generator. + transformation: Transformation to be applied to the data before fitting. + pre_transform_addition: Value to be added before transformation. + unique_site_id_key: The names of the coordinates that uniquely identify + each site, e.g. "wmo_id" or "latitude,longitude". + """ + self.feature_config = feature_config + self.target_cf_name = target_cf_name + self.n_estimators = n_estimators + self.max_depth = max_depth + self.max_samples = max_samples + self.random_state = random_state + self.transformation = transformation + self.pre_transform_addition = pre_transform_addition + if isinstance(unique_site_id_keys, str): + unique_site_id_keys = [unique_site_id_keys] + self.unique_site_id_keys = unique_site_id_keys + self.quantile_forest_installed = quantile_forest_package_available() + @staticmethod def _check_matching_times( forecast_df: pd.DataFrame, truth_df: pd.DataFrame @@ -246,32 +367,42 @@ def _check_matching_times( ) ) - def _add_features_to_df( + def _add_static_features_from_cubes_to_df( self, forecast_df: pd.DataFrame, cube_inputs: iris.cube.CubeList ) -> pd.DataFrame: - """Add features to the forecast DataFrame based on the feature configuration. + """Add features to the forecast DataFrame from cubes based on the feature + configuration. Other features are expected to already be present in the + forecast DataFrame. Args: forecast_df: DataFrame containing the forecast data. cube_inputs: List of cubes containing additional features. Returns: - DataFrame with additional features added. + DataFrame with additional features added from the input cubes. """ + if cube_inputs is None or len(cube_inputs) == 0: + return forecast_df for feature_name, feature_list in self.feature_config.items(): for feature in feature_list: if feature == "static": # Use the cube's data directly as a feature. constr = iris.Constraint(name=feature_name) + # Static features can be provided either as a cube or as a column + # in the forecast DataFrame. + if not cube_inputs.extract(constr): + continue feature_cube = cube_inputs.extract_cube(constr) feature_df = as_data_frame(feature_cube, add_aux_coords=True) forecast_df = forecast_df.merge( - feature_df[["wmo_id", feature_name]], on=["wmo_id"], how="left" + feature_df[[*self.unique_site_id_keys, feature_name]], + on=[*self.unique_site_id_keys], + how="left", ) return forecast_df - @staticmethod def filter_bad_sites( + self, forecast_df: pd.DataFrame, truth_df: pd.DataFrame, ) -> tuple[pd.DataFrame, pd.DataFrame]: @@ -286,71 +417,49 @@ def filter_bad_sites( - DataFrame containing the forecast data with bad sites removed. - DataFrame containing the truth data with bad sites removed. """ - truth_df.dropna( - subset=["latitude", "longitude", "altitude", "ob_value"], inplace=True - ) + truth_df.dropna(subset=["ob_value"], inplace=True) - wmo_ids = set(forecast_df["wmo_id"]).intersection(set(truth_df["wmo_id"])) + if truth_df.empty: + msg = "Empty truth DataFrame after removing NaNs." + raise ValueError(msg) + + forecast_index = forecast_df.set_index([*self.unique_site_id_keys]).index + truth_index = truth_df.set_index([*self.unique_site_id_keys]).index + forecast_df = forecast_df[forecast_index.isin(truth_index)] + truth_df = truth_df[truth_index.isin(forecast_index)] - forecast_df = forecast_df[forecast_df["wmo_id"].isin(wmo_ids)] - truth_df = truth_df[truth_df["wmo_id"].isin(wmo_ids)] return forecast_df, truth_df def process( self, - file_paths: list[pathlib.Path | str], - model_output: str = None, - ) -> None: - """Load input files and training a Quantile Regression Random Forest (QRF) + forecast_df: pd.DataFrame, + truth_df: pd.DataFrame, + cube_inputs: Optional[iris.cube.CubeList] = None, + ) -> Optional[tuple[RandomForestQuantileRegressor, str, float]]: + """Load input files and train a Quantile Regression Random Forest (QRF) model. This model can be applied later to calibrate the forecast. Two sources of input data must be provided: historical forecasts and historical truth data (to use in calibration). The model is output as a pickle file. Args: - file_paths (cli.inputpaths): - A list of input paths containing: - - The path to a Parquet file containing the truths to be used - for calibration. The expected columns within the - Parquet file are: ob_value, time, wmo_id, diagnostic, latitude, - longitude and altitude. - - The path to a Parquet file containing the forecasts to be used - for calibration. - - Optionally, paths to NetCDF files containing additional predictors. - model_output (str): - Full path including model file name that will store the pickled model. + forecast_df: DataFrame containing the forecast data. + truth_df: DataFrame containing the truth data. + cube_inputs: List of cubes containing additional features. + Raises: + ValueError: If the number of cubes loaded does not match the number of + features expected. """ if not self.quantile_forest_installed: return None - forecast_table_path, truth_table_path, cube_inputs = ( - self._split_cubes_and_parquet_files(file_paths) - ) - - if not forecast_table_path or not truth_table_path: - return None - - if ":" in self.forecast_periods: - forecast_periods = list(range(*map(int, self.forecast_periods.split(":")))) - forecast_periods = [fp * 3600 for fp in forecast_periods] - else: - try: - forecast_periods = [int(self.forecast_periods) * 3600] - except ValueError: - msg = ( - "The forecast_periods argument must be a single integer or " - "a range in the form 'start:end:interval'. The forecast period" - f"provided was: {self.forecast_periods}." - ) - raise ValueError(msg) - forecast_df, truth_df = self._read_parquet_files( - forecast_table_path, truth_table_path, forecast_periods - ) intersecting_times = self._check_matching_times(forecast_df, truth_df) if len(intersecting_times) == 0: return None - forecast_df = self._add_features_to_df(forecast_df, cube_inputs) + forecast_df = self._add_static_features_from_cubes_to_df( + forecast_df, cube_inputs + ) forecast_df, truth_df = self.filter_bad_sites(forecast_df, truth_df) result = TrainQuantileRegressionRandomForests( @@ -362,6 +471,9 @@ def process( random_state=self.random_state, transformation=self.transformation, pre_transform_addition=self.pre_transform_addition, - model_output=model_output, + unique_site_id_keys=self.unique_site_id_keys, )(forecast_df, truth_df) - return result + + # Create a tuple that returns the model, transformation and + # pre_transform_addition to allow these to be saved together. + return (result, self.transformation, self.pre_transform_addition) diff --git a/improver/calibration/quantile_regression_random_forest.py b/improver/calibration/quantile_regression_random_forest.py index 48c10c041a..630331d2ba 100644 --- a/improver/calibration/quantile_regression_random_forest.py +++ b/improver/calibration/quantile_regression_random_forest.py @@ -4,12 +4,13 @@ # See LICENSE in the root of the repository for full licensing details. """Plugins to perform quantile regression using random forests.""" -from typing import Optional +from typing import Optional, Union import numpy as np import pandas as pd from improver import BasePlugin, PostProcessingPlugin +from improver.calibration.dataframe_utilities import quantile_check from improver.constants import DAYS_IN_YEAR, HOURS_IN_DAY try: @@ -33,38 +34,109 @@ def prep_feature( df: pd.DataFrame, variable_name: str, feature_name: str, + transformation: Optional[str] = None, + pre_transform_addition: np.float32 = 0, + unique_site_id_keys: Union[list[str], str] = "wmo_id", ) -> pd.DataFrame: """Prepare features that require computation from the input DataFrame. Options - available are mean and standard deviation of the input feature, the - day of year, sine of day of year, cosine of day of year, hour of day, - sine of hour of day and cosine of hour of day. When computing the mean or standard - deviation, these will be computed over either the percentile or realization column, - depending upon which is available. + available are mean, standard deviation, min, max, percentiles and a members above + and a members below count of the input feature, the day of year, + sine of day of year, cosine of day of year, hour of day, sine of hour of day + and cosine of hour of day. When computing the mean or standard deviation, + these will be computed over either the percentile or realization column, + depending upon which is available. When a percentile column is provided, the + expectation is that these percentiles are equally spaced between 0 and 100, so that + these percentiles can be treated as being equally likely. Args: df: Input DataFrame. variable_name: Name of the variable to be used for the computation. - feature_name: Feature to be computed. Options are "mean", "std", "day_of_year", - "day_of_year_sin", "day_of_year_cos", "hour_of_day", + feature_name: Feature to be computed. Options are "mean", "std", "min", "max", + "percentile_" where is the required percentile between 0 and + 100, "members_below_" where is the threshold value + to count the number of members below, "members_above_" where + is the threshold value to count the number of members above, + "day_of_year", "day_of_year_sin", "day_of_year_cos", "hour_of_day", "hour_of_day_sin" and "hour_of_day_cos". + transformation: Transformation to be applied to the data before fitting. This + is only used when computing members_below or members_above features. + pre_transform_addition: Value to be added before transformation. This is only + used when computing members_below or members_above features. + unique_site_id_keys: The names of the coordinates that uniquely identify + each site, e.g. "wmo_id" or "latitude,longitude". Returns: df: DataFrame with the computed feature added. """ - if feature_name in ["mean", "std"]: + if ( + feature_name in ["mean", "std", "min", "max"] + or feature_name.startswith("percentile_") + or feature_name.startswith("members_below") + or feature_name.startswith("members_above") + ): + if isinstance(unique_site_id_keys, str): + unique_site_id_keys = [unique_site_id_keys] representation_name = [ n for n in ["percentile", "realization"] if n in df.columns ][0] - groupby_cols = ["forecast_reference_time", "forecast_period", "wmo_id"] + groupby_cols = [ + "forecast_reference_time", + "forecast_period", + *unique_site_id_keys, + ] subset_cols = [*groupby_cols] + [ representation_name, variable_name, ] + + if representation_name == "percentile": + quantile_check(df) + # For a subset of the input DataFrame compute the mean or standard deviation # over the representation column, grouped by the groupby columns. if feature_name == "mean": subset_df = df[subset_cols].groupby(groupby_cols).mean() elif feature_name == "std": subset_df = df[subset_cols].groupby(groupby_cols).std() + elif feature_name == "min": + subset_df = df[subset_cols].groupby(groupby_cols).min() + elif feature_name == "max": + subset_df = df[subset_cols].groupby(groupby_cols).max() + elif feature_name.startswith("members_below"): + threshold = float(feature_name.split("_")[2]) + if transformation is not None: + threshold = getattr(np, transformation)( + np.array(threshold) + pre_transform_addition + ) + orig_dtype = df[variable_name].dtype + subset_df = ( + df[subset_cols] + .assign(below_threshold=lambda x: x[variable_name] < threshold) + .groupby(groupby_cols)["below_threshold"] + .sum() + ) + subset_df.rename(variable_name, inplace=True) + subset_df = subset_df.astype(orig_dtype) + # subset_df[variable_name].astype(orig_dtype) + elif feature_name.startswith("members_above"): + threshold = float(feature_name.split("_")[2]) + if transformation is not None: + threshold = getattr(np, transformation)( + np.array(threshold) + pre_transform_addition + ) + orig_dtype = df[variable_name].dtype + subset_df = ( + df[subset_cols] + .assign(above_threshold=lambda x: x[variable_name] > threshold) + .groupby(groupby_cols)["above_threshold"] + .sum() + ) + subset_df.rename(variable_name, inplace=True) + subset_df = subset_df.astype(orig_dtype) + elif feature_name.startswith("percentile_"): + perc = float(feature_name.split("_")[1]) + orig_dtype = df[variable_name].dtype + subset_df = df[subset_cols].groupby(groupby_cols).quantile(perc / 100.0) + subset_df[variable_name] = subset_df[variable_name].astype(orig_dtype) subset_df = subset_df.reset_index() # Rename the column to distinguish the computed feature from the original. @@ -134,7 +206,16 @@ def sanitise_forecast_dataframe( ] collapsed_features = [] for key, values in feature_config.items(): - collapsed_features.extend([key for v in values if v in ["mean", "std"]]) + collapsed_features.extend( + [ + key + for v in values + if v in ["mean", "std", "min", "max"] + or v.startswith("percentile_") + or v.startswith("members_below") + or v.startswith("members_above") + ] + ) collapsed_features = list(set(collapsed_features)) # Subset the dataframe by the first value of the representation column # and drop the representation column and any features where the original variable @@ -145,36 +226,59 @@ def sanitise_forecast_dataframe( return df -def get_required_column_names( - df: pd.DataFrame, feature_config: dict[str, list[str]] -) -> list[str]: - """Process the feature_config to return the expected column names that will be - used as features with the QRF. +def prep_features_from_config( + df: pd.DataFrame, + feature_config: dict[str, list[str]], + unique_site_id_keys: Union[list[str], str] = "wmo_id", +) -> tuple[pd.DataFrame, list[str]]: + """Process the feature_config to prepare the features as required and return the + expected column names that will be used as features with the QRF. Args: df: Input DataFrame. feature_config: Feature configuration defining the features to be used for QRF. + unique_site_id_keys: The names of the coordinates that uniquely identify + each site, e.g. "wmo_id" or "latitude,longitude". Returns: - List of expected column names that will be used as features with the QRF. + Processed DataFrame and a list of expected column names that will be used as + features with the QRF. Raises: - ValueError: If a feature expected in the feature_config is not present in - the DataFrame. + ValueError: If a variable expected in the feature_config is not present in + the DataFrame e.g. "surface temperature". + ValueError: If a feature expected for a specific variable in the feature_config + is not supported e.g. "interquartile_range". """ + if isinstance(unique_site_id_keys, str): + unique_site_id_keys = [unique_site_id_keys] + feature_column_names = [] for variable_name in feature_config.keys(): - for feature in feature_config[variable_name]: - if feature in ["mean", "std"]: - feature_column_names.append(f"{variable_name}_{feature}") - elif feature in ["static"]: + if variable_name not in df.columns: + msg = f"Feature '{variable_name}' is not present in the forecast DataFrame." + raise ValueError(msg) + for feature_name in feature_config[variable_name]: + df = prep_feature(df, variable_name, feature_name, unique_site_id_keys) + if ( + feature_name in ["mean", "std", "min", "max"] + or feature_name.startswith("percentile_") + or feature_name.startswith("members_below") + or feature_name.startswith("members_above") + ): + feature_column_names.append(f"{variable_name}_{feature_name}") + elif feature_name in ["static"]: feature_column_names.append(variable_name) + elif feature_name in df.columns: + # For example, latitude, longitude, altitude, day_of_year, which + # are not tied to a specific variable. + feature_column_names.append(feature_name) else: - feature_column_names.append(feature) - - if len(list(set(feature_column_names) - set(df.columns))) > 0: - msg = f"Feature '{feature}' is not supported." - raise ValueError(msg) + msg = ( + f"Feature '{feature_name}' for variable " + f"'{variable_name}' is not supported." + ) + raise ValueError(msg) - return feature_column_names + return df, feature_column_names def _check_valid_transformation(transformation: str): @@ -205,6 +309,7 @@ def __init__( random_state: Optional[int] = None, transformation: Optional[str] = None, pre_transform_addition: np.float32 = 0, + unique_site_id_keys: Union[list[str], str] = "wmo_id", **kwargs, ) -> None: """Initialise the plugin. @@ -243,6 +348,8 @@ def __init__( Transformation to be applied to the data before fitting. pre_transform_addition (float): Value to be added before transformation. + unique_site_id_keys: The names of the coordinates that uniquely identify + each site, e.g. "wmo_id" or "latitude,longitude". kwargs: Additional keyword arguments for the quantile regression model. @@ -257,6 +364,9 @@ def __init__( self.transformation = transformation _check_valid_transformation(self.transformation) self.pre_transform_addition = pre_transform_addition + if isinstance(unique_site_id_keys, str): + unique_site_id_keys = [unique_site_id_keys] + self.unique_site_id_keys = unique_site_id_keys self.kwargs = kwargs self.expected_coordinate_order = ["forecast_reference_time", "forecast_period"] @@ -287,7 +397,7 @@ def process( self, forecast_df: pd.DataFrame, truth_df: pd.DataFrame, - ) -> None: + ) -> RandomForestQuantileRegressor: """Train a quantile regression random forests model. Args: @@ -322,21 +432,14 @@ def process( truth_df["ob_value"] + self.pre_transform_addition ) - for variable_name in self.feature_config.keys(): - if variable_name not in forecast_df.columns: - msg = ( - f"Feature '{variable_name}' is not present in the " - "forecast DataFrame." - ) - raise ValueError(msg) - for feature_name in self.feature_config[variable_name]: - forecast_df = prep_feature(forecast_df, variable_name, feature_name) - - forecast_df = sanitise_forecast_dataframe(forecast_df, self.feature_config) - feature_column_names = get_required_column_names( - forecast_df, self.feature_config + forecast_df, feature_column_names = prep_features_from_config( + forecast_df, + self.feature_config, + unique_site_id_keys=self.unique_site_id_keys, ) - merge_columns = ["wmo_id", "time"] + forecast_df = sanitise_forecast_dataframe(forecast_df, self.feature_config) + + merge_columns = [*self.unique_site_id_keys, "time"] combined_df = forecast_df.merge( truth_df[merge_columns + ["ob_value"]], on=merge_columns, how="inner" ) @@ -357,6 +460,7 @@ def __init__( quantiles: list[np.float32], transformation: str = None, pre_transform_addition: np.float32 = 0, + unique_site_id_keys: list[str] = ["wmo_id"], ) -> None: """Initialise the plugin. @@ -385,9 +489,8 @@ def __init__( Transformation to be applied to the data before fitting. pre_transform_addition (float): Value to be added before transformation. - - Raises: - ValueError: If the transformation is not one of the supported types. + unique_site_id_keys: The names of the coordinates that uniquely identify + each site, e.g. "wmo_id" or "latitude,longitude". """ self.target_name = target_name @@ -396,6 +499,7 @@ def __init__( self.transformation = transformation _check_valid_transformation(self.transformation) self.pre_transform_addition = pre_transform_addition + self.unique_site_id_keys = unique_site_id_keys def _reverse_transformation(self, forecast: np.ndarray) -> np.ndarray: """Reverse the transformation applied to the data prior to fitting the QRF. @@ -445,21 +549,20 @@ def process( forecast_df[self.target_name] = getattr(np, self.transformation)( forecast_df[self.target_name] + self.pre_transform_addition ) + break - for feature_name in self.feature_config[variable_name]: - forecast_df = prep_feature(forecast_df, variable_name, feature_name) - - forecast_df = sanitise_forecast_dataframe(forecast_df, self.feature_config) - feature_column_names = get_required_column_names( - forecast_df, self.feature_config + forecast_df, feature_column_names = prep_features_from_config( + forecast_df, + self.feature_config, + unique_site_id_keys=self.unique_site_id_keys, ) + forecast_df = sanitise_forecast_dataframe(forecast_df, self.feature_config) feature_values = np.array(forecast_df[feature_column_names]) - calibrated_forecast = qrf_model.predict( feature_values, quantiles=self.quantiles ) + calibrated_forecast = self._reverse_transformation(calibrated_forecast) calibrated_forecast = np.float32(calibrated_forecast) - calibrated_forecast = self._reverse_transformation(calibrated_forecast) return calibrated_forecast diff --git a/improver/cli/__init__.py b/improver/cli/__init__.py index 7a6be5a5c6..7ee8ba5b67 100644 --- a/improver/cli/__init__.py +++ b/improver/cli/__init__.py @@ -348,17 +348,19 @@ def with_output( Result of calling `wrapped` or None if `output` is given. """ import joblib + from iris.cube import Cube, CubeList from improver.utilities.save import save_netcdf result = wrapped(*args, **kwargs) - if output and output.endswith(".nc"): + if output and (isinstance(result, Cube) or isinstance(result, CubeList)): save_netcdf(result, output, compression_level, least_significant_digit) if pass_through_output: return ObjectAsStr(result, output) return - elif output and output.endswith((".pickle", ".pkl")): + elif output and result: + # If output is set and result exists but is not a Cube, save it as a pickle file joblib.dump(result, output, compress=compression_level) return return result diff --git a/improver/cli/apply_quantile_regression_random_forest.py b/improver/cli/apply_quantile_regression_random_forest.py index c7a6fec8d4..cb0bd00407 100644 --- a/improver/cli/apply_quantile_regression_random_forest.py +++ b/improver/cli/apply_quantile_regression_random_forest.py @@ -14,8 +14,7 @@ def process( *file_paths: cli.inputpath, feature_config: cli.inputjson, target_cf_name: str, - transformation: str = None, - pre_transform_addition: float = 0, + unique_site_id_keys: cli.comma_separated_list = "wmo_id", ): """Applying the Quantile Regression Random Forest model. @@ -49,27 +48,27 @@ def process( } target_cf_name (str): A string containing the CF name of the forecast to be - calibrated. This will be used to separate it from the rest of the - feature cubes, if present. - transformation (str): - Transformation to be applied to the data before fitting. - pre_transform_addition (float): - Value to be added before transformation. + calibrated e.g. air_temperature. This will be used to separate it from + the rest of the feature cubes, if present. + The names of the coordinates that uniquely identify each site, + e.g. "wmo_id" or "latitude,longitude". + unique_site_id_keys (str): + The names of the coordinates that uniquely identify each site, + e.g. "wmo_id" or "latitude,longitude". Returns: iris.cube.Cube: The calibrated forecast cube. """ - + from improver.calibration import split_pickle_parquet_and_netcdf from improver.calibration.load_and_apply_quantile_regression_random_forest import ( - LoadAndApplyQRF, + PrepareAndApplyQRF, ) - result = LoadAndApplyQRF( + cubes, _, qrf_descriptors = split_pickle_parquet_and_netcdf(file_paths) + + result = PrepareAndApplyQRF( feature_config=feature_config, target_cf_name=target_cf_name, - transformation=transformation, - pre_transform_addition=pre_transform_addition, - )( - file_paths, - ) + unique_site_id_keys=unique_site_id_keys, + )(cubes, qrf_descriptors=qrf_descriptors) return result diff --git a/improver/cli/estimate_emos_coefficients_from_table.py b/improver/cli/estimate_emos_coefficients_from_table.py index 8e7342f795..c78ff9aa9f 100755 --- a/improver/cli/estimate_emos_coefficients_from_table.py +++ b/improver/cli/estimate_emos_coefficients_from_table.py @@ -120,6 +120,7 @@ def process( import pandas as pd from iris.cube import CubeList + from improver.calibration import get_training_period_cycles from improver.calibration.dataframe_utilities import ( forecast_and_truth_dataframes_to_cubes, ) @@ -128,14 +129,7 @@ def process( ) # Load forecasts from parquet file filtering by diagnostic and blend_time. - forecast_period_td = pd.Timedelta(int(forecast_period), unit="seconds") - cycletimes = pd.date_range( - end=pd.Timestamp(cycletime) - - pd.Timedelta(1, unit="days") - - forecast_period_td.floor("D"), - periods=int(training_length), - freq="D", - ) + cycletimes = get_training_period_cycles(cycletime, forecast_period, training_length) filters = [[("diagnostic", "==", diagnostic), ("blend_time", "in", cycletimes)]] forecast_df = pd.read_parquet(forecast, filters=filters) diff --git a/improver/cli/train_quantile_regression_random_forest.py b/improver/cli/train_quantile_regression_random_forest.py index eb87faf018..b69dd1f87e 100644 --- a/improver/cli/train_quantile_regression_random_forest.py +++ b/improver/cli/train_quantile_regression_random_forest.py @@ -13,7 +13,7 @@ def process( *file_paths: cli.inputpath, feature_config: cli.inputjson, - target_diagnostic_name: str, + parquet_diagnostic_names: cli.comma_separated_list, target_cf_name: str, forecast_periods: str, cycletime: str, @@ -25,6 +25,7 @@ def process( random_state: int = None, transformation: str = None, pre_transform_addition: float = 0, + unique_site_id_keys: cli.comma_separated_list = "wmo_id", ): """Training a model using Quantile Regression Random Forest. @@ -59,12 +60,14 @@ def process( "visibility_at_screen_level": ["mean", "std"] "distance_to_water": ["static"], } - target_diagnostic_name (str): - A string containing the diagnostic name of the forecast to be - calibrated. This will be used to filter the target forecast and truth - dataframes. + parquet_diagnostic_names (str): + A string containing the diagnostic names that will be used for filtering + the target diagnostic from the forecast and truth DataFrames read in + from the parquet files. This could be different from the CF name e.g. + 'temperature_at_screen_level'. target_cf_name (str): - A string containing the CF name of the forecast to be calibrated. + A string containing the CF name of the forecast to be calibrated + e.g. air_temperature. forecast_periods (str): Range of forecast periods to be calibrated in hours in the form: "start:end:interval" e.g. "6:18:6" or a single forecast period e.g. "6". @@ -81,40 +84,54 @@ def process( max_depth (int): Maximum depth of the tree. max_samples (float): - If an int, then it is the number of samples to draw to train - each tree. If a float, then it is the fraction of samples to draw - to train each tree. If None, then each tree contains the same - total number of samples as originally provided. + If an int, then it is the number of samples to draw from the total number + of samples available to train each tree. Note that a 'sample' refers to + each row within the DataFrames constructed where each row will differ + primarily based on the site, forecast period, forecast reference time and + realization or percentile. If a float, then it is the fraction of samples + to draw from the total number of samples available to train each tree. + If None, then each tree contains the same number of samples as the total + available. The trees will therefore only differ due to the use of + bootstrapping (i.e. sampling with replacement) when creating each tree. random_state (int): Random seed for reproducibility. transformation (str): Transformation to be applied to the data before fitting. pre_transform_addition (float): Value to be added before transformation. + unique_site_id_keys (str): + The names of the coordinates that uniquely identify each site, + e.g. "wmo_id" or "latitude,longitude". Returns: - None: - The function creates a pickle file. + A quantile regression random forest model with associated transformation and + pre-transformation addition that will be stored as a pickle file. """ from improver.calibration.load_and_train_quantile_regression_random_forest import ( - LoadAndTrainQRF, + LoadForTrainQRF, + PrepareAndTrainQRF, ) - result = LoadAndTrainQRF( + forecast_df, truth_df, cube_inputs = LoadForTrainQRF( experiment=experiment, feature_config=feature_config, - target_diagnostic_name=target_diagnostic_name, + parquet_diagnostic_names=parquet_diagnostic_names, target_cf_name=target_cf_name, forecast_periods=forecast_periods, cycletime=cycletime, training_length=training_length, + unique_site_id_keys=unique_site_id_keys, + )(file_paths) + result = PrepareAndTrainQRF( + feature_config=feature_config, + target_cf_name=target_cf_name, n_estimators=n_estimators, max_depth=max_depth, max_samples=max_samples, random_state=random_state, transformation=transformation, pre_transform_addition=pre_transform_addition, - )( - file_paths, - ) + unique_site_id_keys=unique_site_id_keys, + )(forecast_df, truth_df, cube_inputs) + return result diff --git a/improver/utilities/compare.py b/improver/utilities/compare.py index 598349c85e..ce48145ad3 100644 --- a/improver/utilities/compare.py +++ b/improver/utilities/compare.py @@ -408,12 +408,18 @@ def raise_reporter(message): difference_found = False try: - np.testing.assert_equal(output.n_features_in_, kgo.n_features_in_) - np.testing.assert_equal(output.n_outputs_, kgo.n_outputs_) - np.testing.assert_equal(output.max_depth, kgo.max_depth) - np.testing.assert_equal(output.n_estimators, kgo.n_estimators) - np.testing.assert_equal(output.random_state, kgo.random_state) - for output_estimator, kgo_estimator in zip(output.estimators_, kgo.estimators_): + model, transformation, pre_transform_addition = output + kgo_model, kgo_transformation, kgo_pre_transform_addition = kgo + np.testing.assert_equal(transformation, kgo_transformation) + np.testing.assert_equal(pre_transform_addition, kgo_pre_transform_addition) + np.testing.assert_equal(model.n_features_in_, kgo_model.n_features_in_) + np.testing.assert_equal(model.n_outputs_, kgo_model.n_outputs_) + np.testing.assert_equal(model.max_depth, kgo_model.max_depth) + np.testing.assert_equal(model.n_estimators, kgo_model.n_estimators) + np.testing.assert_equal(model.random_state, kgo_model.random_state) + for output_estimator, kgo_estimator in zip( + model.estimators_, kgo_model.estimators_ + ): np.testing.assert_allclose( output_estimator.tree_.value, kgo_estimator.tree_.value ) @@ -422,4 +428,27 @@ def raise_reporter(message): # call the reporter function outside the except block to avoid nested # exceptions if the reporter function is raising an exception if difference_found: - reporter("different pickled forest") + try: + msg = ( + "Different pickled forest. \nThe output is: " + f"n_features: {model.n_features_in_}, " + f"n_outputs: {model.n_outputs_}, " + f"max_depth: {model.max_depth}, " + f"n_estimators: {model.n_estimators}, " + f"random_state: {model.random_state}, " + f"transformation: {transformation}, " + f"pre_transform_addition: {pre_transform_addition}." + "\nThe KGO is: " + f"n_features: {kgo.n_features_in_}, " + f"n_outputs: {kgo.n_outputs_}, " + f"max_depth: {kgo.max_depth}, " + f"n_estimators: {kgo.n_estimators}, " + f"random_state: {kgo.random_state}, " + f"transformation: {kgo_transformation}, " + f"pre_transform_addition: {kgo_pre_transform_addition}." + ) + except (AssertionError, ValueError, UnboundLocalError): + msg = "Different pickled forest." + reporter(msg) + else: + reporter(msg) diff --git a/improver_tests/acceptance/SHA256SUMS b/improver_tests/acceptance/SHA256SUMS index ddae0a26d4..8fe8c48a99 100644 --- a/improver_tests/acceptance/SHA256SUMS +++ b/improver_tests/acceptance/SHA256SUMS @@ -86,10 +86,10 @@ ed8ab78a9f55b54bf0a49f191d3eb33daae30e0d05b9051275a70c0e697aac71 ./apply-night- 809a446327626d007ac288b8277520730863bc596e98116bca5c4afb0d531e96 ./apply-night-mask/uk_prob/valid_input.nc d2f7d8389b33cde359dd253def2aaf31afbc27557f389bf0f744a28b24e145dd ./apply-quantile-regression-random-forest/config.json 8db1b35bde734c16340a6e42454b9ac146ea32f7c012c17c5e64815069af41bf ./apply-quantile-regression-random-forest/input_forecast.nc -793f17191d2421ead8d546319c7da34143ef01e2db88ca6cc5faf46e92b4b1de ./apply-quantile-regression-random-forest/with_transformation_input.pickle -d8acadfe782e2c2562dfcc31ada8ad381b320fbdf3854d6b361c38308e6b8226 ./apply-quantile-regression-random-forest/with_transformation_kgo.nc -31eab8a83a75eb8cee4c0140749a4d4e9d9b98292c28b3c53ee27756c1f3a265 ./apply-quantile-regression-random-forest/without_transformation_input.pickle -65397673caf0f1205d18a85c08ce81bd3543a71c4101f40f4869a93ddd0cf0f8 ./apply-quantile-regression-random-forest/without_transformation_kgo.nc +0f74c2d6e3caf30c4f971da2dc757b1ae659062e31194e4da8e5266a1f23d2af ./apply-quantile-regression-random-forest/with_transformation_kgo.nc +88d5ea229b42789d92d2317099b90d90f89a57e037665091b49c5f19acfea14e ./apply-quantile-regression-random-forest/with_transformation_kgo.pickle +0f74c2d6e3caf30c4f971da2dc757b1ae659062e31194e4da8e5266a1f23d2af ./apply-quantile-regression-random-forest/without_transformation_kgo.nc +8e2dda09c26d33fd06d68c5be17a74b93b1d75d54c60ce48f27ea3d2735bece9 ./apply-quantile-regression-random-forest/without_transformation_kgo.pickle 74a40c508b68294652f5dde52d8fe4b93ea002bd4b72ca8d6cc11a83309f28f1 ./apply-rainforests-calibration/basic/kgo.nc b4678f178e0e1df0c242c61b269b71dd9a3d7aca64663e2e81e5bfeb71fe97da ./apply-rainforests-calibration/features/20200802T0000Z-PT0000H00M-clearsky_solar_radiation-PT24H.nc dce2d4a28cec494c94b154dc3a9203b46b7cef2ad54d04049663166f4b42ecf5 ./apply-rainforests-calibration/features/20200802T0000Z-PT0024H00M-cape-PT24H.nc @@ -1003,8 +1003,8 @@ e3d2315b24bf769bcfb137033950d7100c8795878635e0ec9491413a1349904a ./train-quanti 1aec562fc0c39c7695521482b889d335a3becb10a76b7eae965447cbd9faf051 ./train-quantile-regression-random-forest/spot_observation_tables/20250804/diagnostic=temperature_at_screen_level/0600Z_0.parquet e81c04f9f2d8dc14b8d8cd9bc9e8827ebd0da5ba016db48cf60ab738d327f968 ./train-quantile-regression-random-forest/spot_observation_tables/20250804/diagnostic=temperature_at_screen_level/1200Z_0.parquet 99e6656982672ea2d4cb9fa1dfb5731408dd41b2c9052b7e0df8d8004864241e ./train-quantile-regression-random-forest/spot_observation_tables/20250804/diagnostic=temperature_at_screen_level/1800Z_0.parquet -d634d639dda9a4a2789eed69bfdec59954ce63f51734a71f1d9a20c3958d2786 ./train-quantile-regression-random-forest/with_transformation_kgo.pickle -e3f9fcd7544028436e34415db38d3c5841a6e22abbc3fc635e6ff87236764fcc ./train-quantile-regression-random-forest/without_transformation_kgo.pickle +88d5ea229b42789d92d2317099b90d90f89a57e037665091b49c5f19acfea14e ./train-quantile-regression-random-forest/with_transformation_kgo.pickle +8e2dda09c26d33fd06d68c5be17a74b93b1d75d54c60ce48f27ea3d2735bece9 ./train-quantile-regression-random-forest/without_transformation_kgo.pickle e2cfb5e19ef5ebcfd73dc1c8504b66e9a55ad76b7b18cb79318659224079f234 ./uv-index/basic/20181210T0600Z-PT0000H00M-radiation_flux_in_uv_downward_at_surface.nc e48c3b07bd14214a1a10c0bbf1e0acdf54cfedf93b2c6d766f594ce83a45c2b8 ./uv-index/basic/kgo.nc 2226a5e95eb29664e21f8c0658101a53d65945a1450a60a19955563a2693d22d ./vertical-updraught/cape.nc diff --git a/improver_tests/acceptance/test_apply_quantile_regression_random_forest.py b/improver_tests/acceptance/test_apply_quantile_regression_random_forest.py index 5cd27f5f17..6f43606dc1 100644 --- a/improver_tests/acceptance/test_apply_quantile_regression_random_forest.py +++ b/improver_tests/acceptance/test_apply_quantile_regression_random_forest.py @@ -36,12 +36,5 @@ def test_basic(tmp_path, transformation): output_path, ] - if transformation == "with_transformation": - args += [ - "--transformation", - "log", - "--pre-transform-addition", - "0.1", - ] run_cli(args) acc.compare(output_path, kgo_path, atol=LOOSE_TOLERANCE) diff --git a/improver_tests/acceptance/test_train_quantile_regression_random_forest.py b/improver_tests/acceptance/test_train_quantile_regression_random_forest.py index 75073355c2..3fbb571ded 100644 --- a/improver_tests/acceptance/test_train_quantile_regression_random_forest.py +++ b/improver_tests/acceptance/test_train_quantile_regression_random_forest.py @@ -41,7 +41,7 @@ def test_basic( named_args = [ "--feature-config", config_path, - "--target-diagnostic-name", + "--parquet-diagnostic-names", "temperature_at_screen_level", "--target-cf-name", "air_temperature", diff --git a/improver_tests/calibration/dataframe_utilities/test_dataframe_utilities.py b/improver_tests/calibration/dataframe_utilities/test_dataframe_utilities.py index ad46fdba43..fb9bceff37 100644 --- a/improver_tests/calibration/dataframe_utilities/test_dataframe_utilities.py +++ b/improver_tests/calibration/dataframe_utilities/test_dataframe_utilities.py @@ -977,7 +977,7 @@ def test_not_quantiles(self): self.forecast_df = self.forecast_df.replace( {"percentile": self.percentiles[0]}, 10.0 ) - msg = "The forecast percentiles can not be considered as quantiles" + msg = "Forecast percentiles must be equally spaced." with self.assertRaisesRegex(ValueError, msg): forecast_and_truth_dataframes_to_cubes( self.forecast_df, diff --git a/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_apply_quantile_regression_random_forest.py b/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_apply_quantile_regression_random_forest.py index 9fbeaced5f..3f88336ed9 100644 --- a/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_apply_quantile_regression_random_forest.py +++ b/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_apply_quantile_regression_random_forest.py @@ -2,20 +2,19 @@ # # This file is part of 'IMPROVER' and is released under the BSD 3-Clause license. # See LICENSE in the root of the repository for full licensing details. -"""Unit tests for the LoadAndApplyQRF plugin.""" +"""Unit tests for the PrepareAndApplyQRF plugin.""" import numpy as np import pytest from iris.coords import AuxCoord -from iris.cube import Cube +from iris.cube import Cube, CubeList from improver.calibration.load_and_apply_quantile_regression_random_forest import ( - LoadAndApplyQRF, + PrepareAndApplyQRF, ) from improver.ensemble_copula_coupling.ensemble_copula_coupling import ( RebadgeRealizationsAsPercentiles, ) -from improver.utilities.save import save_netcdf from improver_tests.calibration.quantile_regression_random_forests_calibration.test_quantile_regression_random_forest import ( _create_ancil_file, _create_forecasts, @@ -44,45 +43,68 @@ def _add_day_of_training_period_to_cube(cube, day_of_training_period, secondary_ return cube +# fmt: off @pytest.mark.parametrize("percentile_input", [True, False]) @pytest.mark.parametrize( - "n_estimators,max_depth,random_state,compression,transformation,pre_transform_addition,extra_kwargs,include_static,quantiles,expected", + "site_id", [["wmo_id"], ["station_id"], ["latitude", "longitude", "altitude"]] +) +@pytest.mark.parametrize( + "n_estimators,max_depth,random_state,transformation,pre_transform_addition,extra_kwargs,include_dynamic,include_static,include_nans,include_latlon_nans,quantiles,expected", [ - (2, 2, 55, 5, None, 0, {}, False, [0.5], [4.1, 5.65]), # noqa Basic test case - (100, 2, 55, 5, None, 0, {}, False, [1 / 3, 2 / 3], [[4.1, 5.1], [5.1, 5.1]]), # noqa Multiple quantiles - (1, 1, 55, 5, None, 0, {}, False, [0.5], [4.1, 6.2]), # noqa Fewer estimators and reduced depth - (1, 1, 73, 5, None, 0, {}, False, [0.5], [4.2, 6.2]), # Different random state - (2, 2, 55, 5, "log", 10, {}, False, [0.5], [4.1, 5.64]), # Log transformation - (2, 2, 55, 5, "log10", 10, {}, False, [0.5], [4.1, 5.64]), # noqa Log10 transformation - (2, 2, 55, 5, "sqrt", 10, {}, False, [0.5], [4.1, 5.64]), # noqa Square root transformation - (2, 2, 55, 5, "cbrt", 10, {}, False, [0.5], [4.1, 5.64]), # noqa Cube root transformation - (2, 2, 55, 5, None, 0, {"max_samples_leaf": 0.5}, False, [0.5], [4.1, 6.2]), # noqa # Different criterion - (2, 5, 55, 5, None, 0, {}, True, [0.5], [4.1, 5.65]), # noqa Include an additional static feature + (2, 2, 55, None, 0, {}, False, False, False, False, [0.5], [4.1, 5.65]), # Basic test case + (100, 2, 55, None, 0, {}, False, False, False, False, [1 / 3, 2 / 3], [[4.1, 5.1], [5.1, 5.1]]), # Multiple quantiles + (1, 1, 55, None, 0, {}, False, False, False, False, [0.5], [4.1, 6.2]), # Fewer estimators and reduced depth + (1, 1, 73, None, 0, {}, False, False, False, False, [0.5], [4.2, 6.2]), # Different random state + (2, 2, 55, "log", 10, {}, False, False, False, False, [0.5], [4.1, 5.64]), # Log transformation + (2, 2, 55, "log10", 10, {}, False, False, False, False, [0.5], [4.1, 5.64]), # Log10 transformation + (2, 2, 55, "sqrt", 10, {}, False, False, False, False, [0.5], [4.1, 5.64]), # Square root transformation + (2, 2, 55, "cbrt", 10, {}, False, False, False, False, [0.5], [4.1, 5.64]), # Cube root transformation + (2, 2, 55, None, 0, {"max_samples_leaf": 0.5}, False, False, False, False, [0.5], [4.1, 6.2]), # Different criterion + (2, 5, 55, None, 0, {}, True, False, False, False, [0.5], [4.1, 4.6]), # Include an additional dynamic feature + (2, 5, 55, None, 0, {}, False, True, False, False, [0.5], [4.1, 5.65]), # Include an additional static feature + (2, 5, 55, None, 0, {}, True, True, False, False, [0.5], [4.1, 4.6]), # Include an additional dynamic and static feature + (2, 2, 55, None, 0, {}, False, False, True, False, [0.5], [4.1, 5.65]), # NaNs in input data + (2, 2, 55, None, 0, {}, False, False, False, True, [0.5], [4.1, 5.65]), # NaNs in lat/lon + (2, 2, 55, None, 0, {}, True, False, False, True, [0.5], [4.1, 4.6]), # NaNs in lat/lon and dynamic feature + (2, 2, 55, None, 0, {}, False, True, False, True, [0.5], [4.1, 5.65]), # NaNs in lat/lon and static feature + (2, 2, 55, None, 0, {}, True, True, False, True, [0.5], [4.1, 4.6]), # NaNs in lat/lon, dynamic and static feature + (2, 2, 55, None, 0, {}, True, True, True, True, [0.5], [4.6, 4.6]), # NaNs in lat/lon, dynamic and static feature and input data ], ) -def test_load_and_apply_qrf( - tmp_path, +# fmt: on +def test_prepare_and_apply_qrf( percentile_input, + site_id, n_estimators, max_depth, random_state, - compression, transformation, pre_transform_addition, extra_kwargs, + include_dynamic, include_static, + include_nans, + include_latlon_nans, quantiles, expected, ): - """Test the LoadAndApplyQRF plugin.""" + """Test the PrepareAndApplyQRF plugin.""" feature_config = {"wind_speed_at_10m": ["mean", "std", "latitude", "longitude"]} - model_output = _run_train_qrf( + if include_dynamic: + feature_config["air_temperature"] = ["mean", "std"] + if include_static: + feature_config["distance_to_water"] = ["static"] + + unique_site_id_key = site_id[0] + if site_id == ["latitude", "longitude", "altitude"]: + unique_site_id_key = "station_id" + + qrf_model = _run_train_qrf( feature_config, n_estimators, max_depth, random_state, - compression, transformation, pre_transform_addition, extra_kwargs, @@ -97,13 +119,14 @@ def test_load_and_apply_qrf( ], realization_data=[2, 6, 10], truth_data=[4.2, 6.2, 4.1, 5.1], - tmp_path=tmp_path, + site_id=unique_site_id_key, ) frt = "20170103T0000Z" vt = "20170103T1200Z" data = np.arange(6, (len(quantiles) * 6) + 1, 6) day_of_training_period = 2 + cube_inputs = CubeList() forecast_cube = _create_forecasts(frt, vt, data, return_cube=True) forecast_cube = _add_day_of_training_period_to_cube( @@ -113,28 +136,45 @@ def test_load_and_apply_qrf( if percentile_input: forecast_cube = RebadgeRealizationsAsPercentiles()(forecast_cube) - features_dir = tmp_path / "features" - features_dir.mkdir(parents=True) - forecast_filepath = str(features_dir / "forecast.nc") - save_netcdf(forecast_cube, forecast_filepath) - - file_paths = [model_output, forecast_filepath] + cube_inputs.append(forecast_cube) + if include_dynamic: + dynamic_cube = _create_forecasts( + frt, + vt, + data + 0.5, # Slightly different data to the target feature + return_cube=True, + ) + dynamic_cube.rename("air_temperature") + dynamic_cube = _add_day_of_training_period_to_cube( + dynamic_cube, day_of_training_period, "forecast_reference_time" + ) + cube_inputs.append(dynamic_cube) if include_static: ancil_cube = _create_ancil_file(return_cube=True) - ancil_filepath = features_dir / "ancil.nc" - save_netcdf(ancil_cube, ancil_filepath) - file_paths.append(str(ancil_filepath)) + cube_inputs.append(ancil_cube) + + if include_nans: + # Add some NaNs to the input data to check that they are handled + for cube in cube_inputs: + if cube.name() == "distance_to_water": + cube.data[0] = np.nan + else: + cube.data[0, 0] = np.nan - plugin = LoadAndApplyQRF( + if include_latlon_nans: + # Add some NaNs to the latitude and longitude to check that they are handled + for cube in cube_inputs: + cube.coord("latitude").points[1] = np.nan + cube.coord("longitude").points[1] = np.nan + + result = PrepareAndApplyQRF( feature_config, "wind_speed_at_10m", - transformation=transformation, - pre_transform_addition=pre_transform_addition, - ) - result = plugin.process(file_paths=file_paths) - assert isinstance(result, Cube) + unique_site_id_keys=site_id, + )(cube_inputs, (qrf_model, transformation, pre_transform_addition)) + assert isinstance(result, Cube) assert result.data.shape == (len(quantiles), 2) assert np.allclose(result.data, expected, rtol=1e-2) @@ -165,28 +205,25 @@ def test_load_and_apply_qrf( ], ) def test_unexpected( - tmp_path, exception, ): - """Test LoadAndApplyQRF plugin behaviour in atypical situations.""" + """Test PrepareAndApplyQRF plugin behaviour in atypical situations.""" feature_config = {"wind_speed_at_10m": ["mean", "std", "latitude", "longitude"]} n_estimators = 2 max_depth = 2 random_state = 55 - compression = 5 transformation = None pre_transform_addition = 0 extra_kwargs = {} include_static = False quantiles = [0.5] - model_output = _run_train_qrf( + qrf_model = _run_train_qrf( feature_config, n_estimators, max_depth, random_state, - compression, transformation, pre_transform_addition, extra_kwargs, @@ -201,7 +238,6 @@ def test_unexpected( ], realization_data=[2, 6, 10], truth_data=[4.2, 6.2, 4.1, 5.1], - tmp_path=tmp_path, ) frt = "20170103T0000Z" @@ -212,61 +248,55 @@ def test_unexpected( forecast_cube = _add_day_of_training_period_to_cube( forecast_cube, day_of_training_period, "forecast_reference_time" ) - + cube_inputs = CubeList([forecast_cube]) ancil_cube = _create_ancil_file(return_cube=True) - ancil_filepath = tmp_path / "ancil.nc" - save_netcdf(ancil_cube, ancil_filepath) - - features_dir = tmp_path / "features" - features_dir.mkdir(parents=True) - forecast_filepath = str(features_dir / "forecast.nc") - save_netcdf(forecast_cube, forecast_filepath) + cube_inputs.append(ancil_cube) - plugin = LoadAndApplyQRF( + plugin = PrepareAndApplyQRF( feature_config, "wind_speed_at_10m", - transformation=transformation, - pre_transform_addition=pre_transform_addition, ) if exception == "no_model_output": - file_paths = [forecast_filepath] - result = plugin.process(file_paths=file_paths) + result = plugin(cube_inputs, qrf_descriptors=None) assert isinstance(result, Cube) assert result.name() == "wind_speed_at_10m" assert result.units == "m s-1" assert result.data.shape == forecast_cube.data.shape assert np.allclose(result.data, forecast_cube.data) elif exception == "no_features": - file_paths = [model_output] - with pytest.raises(ValueError, match="No features found"): - plugin.process(file_paths=file_paths) + qrf_descriptors = (qrf_model, transformation, pre_transform_addition) + with pytest.raises(ValueError, match="No target forecast provided."): + plugin(CubeList(), qrf_descriptors=qrf_descriptors) elif exception == "missing_target_feature": - file_paths = [model_output, ancil_filepath] + qrf_descriptors = (qrf_model, transformation, pre_transform_addition) with pytest.raises(ValueError, match="No target forecast provided."): - plugin.process(file_paths=file_paths) + plugin( + CubeList([ancil_cube]), + qrf_descriptors=qrf_descriptors, + ) elif exception == "missing_static_feature": + qrf_descriptors = (qrf_model, transformation, pre_transform_addition) feature_config = { "wind_speed_at_10m": ["mean", "std"], "distance_to_water": ["static"], } plugin.feature_config = feature_config - file_paths = [model_output, forecast_filepath] with pytest.raises(ValueError, match="The number of cubes loaded."): - plugin.process(file_paths=file_paths) + plugin(CubeList([forecast_cube]), qrf_descriptors=qrf_descriptors) elif exception == "missing_dynamic_feature": + qrf_descriptors = (qrf_model, transformation, pre_transform_addition) feature_config = { "wind_speed_at_10m": ["mean", "std"], "air_temperature": ["mean", "std"], } plugin.feature_config = feature_config - file_paths = [model_output, forecast_filepath] with pytest.raises(ValueError, match="The number of cubes loaded."): - plugin.process(file_paths=file_paths) + plugin(CubeList([forecast_cube]), qrf_descriptors=qrf_descriptors) elif exception == "no_quantile_forest_package": + qrf_descriptors = (qrf_model, transformation, pre_transform_addition) plugin.quantile_forest_installed = False - file_paths = [model_output, forecast_filepath] - result = plugin.process(file_paths=file_paths) + result = plugin(CubeList([forecast_cube]), qrf_descriptors=qrf_descriptors) assert isinstance(result, Cube) assert result.name() == "wind_speed_at_10m" assert result.units == "m s-1" diff --git a/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_train_quantile_regression_random_forest.py b/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_train_quantile_regression_random_forest.py index 396a0f6bbe..b345dee47e 100644 --- a/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_train_quantile_regression_random_forest.py +++ b/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_train_quantile_regression_random_forest.py @@ -2,7 +2,7 @@ # # This file is part of 'IMPROVER' and is released under the BSD 3-Clause license. # See LICENSE in the root of the repository for full licensing details. -"""Unit tests for the LoadAndTrainQRF plugin.""" +"""Unit tests for the LoadForTrainQRF and PrepareAndTrain QRF plugins.""" import iris import numpy as np @@ -10,7 +10,8 @@ import pytest from improver.calibration.load_and_train_quantile_regression_random_forest import ( - LoadAndTrainQRF, + LoadForTrainQRF, + PrepareAndTrainQRF, ) from improver.synthetic_data.set_up_test_cubes import set_up_spot_variable_cube @@ -20,28 +21,34 @@ ALTITUDE = [10, 20] LATITUDE = [50, 60] LONGITUDE = [0, 10] -WMO_ID = ["03001", "03002", "03003", "03004", "03005"] +SITE_ID = ["03001", "03002", "03003", "03004", "03005"] def _create_multi_site_forecast_parquet_file(tmp_path, representation="percentile"): - """Create a parquet file with multi-site forecast data.""" + """Create a parquet file with multi-site forecast data. + + Args: + tmp_path: Temporary path to save the parquet file. + representation: The type of ensemble representation to use. Options are + "percentile" or "realization". + """ data_dict = { - "percentile": np.repeat(50, 5), - "forecast": [281, 272, 287, 280, 290], - "altitude": [10, 83, 56, 23, 2], - "blend_time": [pd.Timestamp("2017-01-02 00:00:00")] * 5, + "percentile": np.repeat(50.0, 5), + "forecast": [281.0, 272.0, 287.0, 280.0, 290.0], + "altitude": [10.0, 83.0, 56.0, 23.0, 2.0], + "blend_time": [pd.Timestamp("2017-01-02 00:00:00", tz="utc")] * 5, "forecast_period": [6 * 3.6e12] * 5, - "forecast_reference_time": [pd.Timestamp("2017-01-02 00:00:00")] * 5, + "forecast_reference_time": [pd.Timestamp("2017-01-02 00:00:00", tz="utc")] * 5, "latitude": [60.1, 59.9, 59.7, 58, 57], - "longitude": [1, 2, -1, -2, -3], - "time": [pd.Timestamp("2017-01-02 06:00:00")] * 5, + "longitude": [1.0, 2.0, -1.0, -2.0, -3.0], + "time": [pd.Timestamp("2017-01-02 06:00:00", tz="utc")] * 5, "wmo_id": ["03001", "03002", "03003", "03004", "03005"], "station_id": ["03001", "03002", "03003", "03004", "03005"], "cf_name": ["air_temperature"] * 5, "units": ["K"] * 5, "experiment": ["latestblend"] * 5, - "period": [pd.NaT] * 5, + "period": [pd.NA] * 5, "height": [1.5] * 5, "diagnostic": ["temperature_at_screen_level"] * 5, } @@ -58,15 +65,22 @@ def _create_multi_site_forecast_parquet_file(tmp_path, representation="percentil wind_speed_dict["cf_name"] = "wind_speed" wind_speed_dict["units"] = "m s-1" wind_speed_dict["diagnostic"] = "wind_speed_at_10m" + wind_dir_dict = data_dict.copy() + wind_dir_dict["forecast"] = [90, 100, 110, 120, 130] + wind_dir_dict["cf_name"] = "wind_direction" + wind_dir_dict["units"] = "degrees" + wind_dir_dict["diagnostic"] = "wind_from_direction" data_df = pd.DataFrame(data_dict) wind_speed_df = pd.DataFrame(wind_speed_dict) - joined_df = pd.concat([data_df, wind_speed_df], ignore_index=True) + wind_dir_df = pd.DataFrame(wind_dir_dict) + joined_df = pd.concat([data_df, wind_speed_df, wind_dir_df], ignore_index=True) output_dir = tmp_path / "forecast_parquet_files" output_dir.mkdir(parents=True, exist_ok=True) - output_path = str(output_dir / "forecast.parquet") + output_path = output_dir / "forecast.parquet" joined_df.to_parquet(output_path, index=False, engine="pyarrow") - return output_dir, data_dict["wmo_id"] + + return output_dir, joined_df, data_dict["wmo_id"] def _create_multi_percentile_forecast_parquet_file(tmp_path, representation=None): @@ -74,47 +88,59 @@ def _create_multi_percentile_forecast_parquet_file(tmp_path, representation=None data_dict = { "percentile": [16 + 2 / 3, 33 + 1 / 3, 50, 66 + 2 / 3, 83 + 1 / 3], - "forecast": [272, 274, 275, 277, 280], - "altitude": [10, 10, 10, 10, 10], - "blend_time": [pd.Timestamp("2017-01-02 00:00:00")] * 5, + "forecast": [272.0, 274.0, 275.0, 277.0, 280.0], + "altitude": [10.0, 10.0, 10.0, 10.0, 10.0], + "blend_time": [pd.Timestamp("2017-01-02 00:00:00", tz="utc")] * 5, "forecast_period": [6 * 3.6e12] * 5, - "forecast_reference_time": [pd.Timestamp("2017-01-02 00:00:00")] * 5, + "forecast_reference_time": [pd.Timestamp("2017-01-02 00:00:00", tz="utc")] * 5, "latitude": [60.1, 60.1, 60.1, 60.1, 60.1], "longitude": [1, 1, 1, 1, 1], - "time": [pd.Timestamp("2017-01-02 06:00:00")] * 5, + "time": [pd.Timestamp("2017-01-02 06:00:00", tz="utc")] * 5, "wmo_id": ["03001", "03001", "03001", "03001", "03001"], "station_id": ["03001", "03001", "03001", "03001", "03001"], "cf_name": ["air_temperature"] * 5, "units": ["K"] * 5, "experiment": ["latestblend"] * 5, - "period": [pd.NaT] * 5, + "period": [pd.NA] * 5, "height": [1.5] * 5, "diagnostic": ["temperature_at_screen_level"] * 5, } + if representation == "realization": + data_dict["realization"] = list(range(len(data_dict["percentile"]))) + data_dict.pop("percentile") + elif representation == "kittens": + data_dict["kittens"] = list(range(len(data_dict["percentile"]))) + data_dict.pop("percentile") # Add wind speed to demonstrate filtering. wind_speed_dict = data_dict.copy() wind_speed_dict["forecast"] = [6, 10, 11, 12, 15] wind_speed_dict["cf_name"] = "wind_speed" wind_speed_dict["units"] = "m s-1" wind_speed_dict["diagnostic"] = "wind_speed_at_10m" + wind_dir_dict = data_dict.copy() + wind_dir_dict["forecast"] = [90, 100, 110, 120, 130] + wind_dir_dict["cf_name"] = "wind_direction" + wind_dir_dict["units"] = "degrees" + wind_dir_dict["diagnostic"] = "wind_from_direction" data_df = pd.DataFrame(data_dict) wind_speed_df = pd.DataFrame(wind_speed_dict) - joined_df = pd.concat([data_df, wind_speed_df], ignore_index=True) + wind_dir_df = pd.DataFrame(wind_dir_dict) + joined_df = pd.concat([data_df, wind_speed_df, wind_dir_df], ignore_index=True) output_dir = tmp_path / "forecast_parquet_files" output_dir.mkdir(parents=True, exist_ok=True) - output_path = str(output_dir / "forecast.parquet") + output_path = output_dir / "forecast.parquet" joined_df.to_parquet(output_path, index=False, engine="pyarrow") - return output_dir, data_dict["wmo_id"] + return output_dir, joined_df, data_dict["wmo_id"] def _create_multi_forecast_period_forecast_parquet_file(tmp_path, representation=None): """Create a parquet file with multi-forecast period forecast data.""" data_dict = { - "percentile": [50, 50, 50, 50], - "forecast": [277, 270, 280, 269], - "altitude": [10, 83, 10, 83], + "percentile": [50.0, 50.0, 50.0, 50.0], + "forecast": [277.0, 270.0, 280.0, 269.0], + "altitude": [10.0, 83.0, 10.0, 83.0], "blend_time": [pd.Timestamp("2017-01-02 00:00:00")] * 4, "forecast_period": np.repeat( [ @@ -135,25 +161,37 @@ def _create_multi_forecast_period_forecast_parquet_file(tmp_path, representation "cf_name": ["air_temperature"] * 4, "units": ["K"] * 4, "experiment": ["latestblend"] * 4, - "period": [pd.NaT] * 4, + "period": [pd.NA] * 4, "height": [1.5] * 4, "diagnostic": ["temperature_at_screen_level"] * 4, } + if representation == "realization": + data_dict["realization"] = [0, 1, 0, 1] + data_dict.pop("percentile") + elif representation == "kittens": + data_dict["kittens"] = [0, 1, 0, 1] + data_dict.pop("percentile") # Add wind speed to demonstrate filtering. wind_speed_dict = data_dict.copy() wind_speed_dict["forecast"] = [6, 16, 12, 15] wind_speed_dict["cf_name"] = "wind_speed" wind_speed_dict["units"] = "m s-1" wind_speed_dict["diagnostic"] = "wind_speed_at_10m" + wind_dir_dict = data_dict.copy() + wind_dir_dict["forecast"] = [180, 190, 200, 210] + wind_dir_dict["cf_name"] = "wind_from_direction" + wind_dir_dict["units"] = "degrees" + wind_dir_dict["diagnostic"] = "wind_direction" data_df = pd.DataFrame(data_dict) wind_speed_df = pd.DataFrame(wind_speed_dict) - joined_df = pd.concat([data_df, wind_speed_df], ignore_index=True) + wind_dir_df = pd.DataFrame(wind_dir_dict) + joined_df = pd.concat([data_df, wind_speed_df, wind_dir_df], ignore_index=True) output_dir = tmp_path / "forecast_parquet_files" output_dir.mkdir(parents=True, exist_ok=True) - output_path = str(output_dir / "forecast.parquet") + output_path = output_dir / "forecast.parquet" joined_df.to_parquet(output_path, index=False, engine="pyarrow") - return output_dir, data_dict["wmo_id"] + return output_dir, joined_df, data_dict["wmo_id"] def _create_multi_site_truth_parquet_file(tmp_path): @@ -161,11 +199,12 @@ def _create_multi_site_truth_parquet_file(tmp_path): data_dict = { "diagnostic": ["temperature_at_screen_level"] * 5, "latitude": [60.1, 59.9, 59.7, 58, 57], - "longitude": [1, 2, -1, -2, -3], - "altitude": [10, 83, 56, 23, 2], + "longitude": [1.0, 2.0, -1.0, -2.0, -3.0], + "altitude": [10.0, 83.0, 56.0, 23.0, 2.0], "time": [pd.Timestamp("2017-01-02 06:00:00")] * 5, "wmo_id": ["03001", "03002", "03003", "03004", "03005"], - "ob_value": [276, 270, 289, 290, 301], + "station_id": ["03001", "03002", "03003", "03004", "03005"], + "ob_value": [276.0, 270.0, 289.0, 290.0, 301.0], } wind_speed_dict = data_dict.copy() wind_speed_dict["ob_value"] = [3, 22, 24, 11, 9] @@ -176,9 +215,9 @@ def _create_multi_site_truth_parquet_file(tmp_path): output_dir = tmp_path / "truth_parquet_files" output_dir.mkdir(parents=True, exist_ok=True) - output_path = str(output_dir / "truth.parquet") + output_path = output_dir / "truth.parquet" joined_df.to_parquet(output_path, index=False, engine="pyarrow") - return output_dir + return output_dir, joined_df def _create_multi_percentile_truth_parquet_file(tmp_path): @@ -186,11 +225,12 @@ def _create_multi_percentile_truth_parquet_file(tmp_path): data_dict = { "diagnostic": ["temperature_at_screen_level"], "latitude": [60.1], - "longitude": [1], - "altitude": [10], + "longitude": [1.0], + "altitude": [10.0], "time": [pd.Timestamp("2017-01-02 06:00:00")], "wmo_id": ["03001"], - "ob_value": [276], + "station_id": ["03001"], + "ob_value": [276.0], } wind_speed_dict = data_dict.copy() wind_speed_dict["ob_value"] = [9] @@ -201,9 +241,9 @@ def _create_multi_percentile_truth_parquet_file(tmp_path): output_dir = tmp_path / "truth_parquet_files" output_dir.mkdir(parents=True, exist_ok=True) - output_path = str(output_dir / "truth.parquet") + output_path = output_dir / "truth.parquet" joined_df.to_parquet(output_path, index=False, engine="pyarrow") - return output_dir + return output_dir, joined_df def _create_multi_forecast_period_truth_parquet_file(tmp_path): @@ -211,17 +251,18 @@ def _create_multi_forecast_period_truth_parquet_file(tmp_path): data_dict = { "diagnostic": ["temperature_at_screen_level"] * 4, "latitude": [60.1, 59.9, 60.1, 59.9], - "longitude": [1, 2, 1, 2], - "altitude": [10, 83, 10, 83], + "longitude": [1.0, 2.0, 1.0, 2.0], + "altitude": [10.0, 83.0, 10.0, 83.0], "time": np.repeat( [pd.Timestamp("2017-01-02 06:00:00"), pd.Timestamp("2017-01-02 12:00:00")], 2, ), "wmo_id": ["03001", "03002", "03001", "03002"], + "station_id": ["03001", "03002", "03001", "03002"], "ob_value": [280, 273, 284, 275], } wind_speed_dict = data_dict.copy() - wind_speed_dict["ob_value"] = [2, 11, 10, 14] + wind_speed_dict["ob_value"] = [2.0, 11.0, 10.0, 14.0] wind_speed_dict["diagnostic"] = "wind_speed_at_10m" data_df = pd.DataFrame(data_dict) wind_speed_df = pd.DataFrame(wind_speed_dict) @@ -229,24 +270,25 @@ def _create_multi_forecast_period_truth_parquet_file(tmp_path): output_dir = tmp_path / "truth_parquet_files" output_dir.mkdir(parents=True, exist_ok=True) - output_path = str(output_dir / "truth.parquet") + output_path = output_dir / "truth.parquet" joined_df.to_parquet(output_path, index=False, engine="pyarrow") - return output_dir + return output_dir, joined_df -def _create_multi_site_truth_parquet_file_alt(tmp_path): +def _create_multi_site_truth_parquet_file_alt(tmp_path, site_id="wmo_id"): """Create a parquet file with multi-site truth data for wind speed.""" data_dict = { "diagnostic": ["wind_speed_at_10m"] * 5, "latitude": [60.1, 59.9, 59.7, 58, 57], - "longitude": [1, 2, -1, -2, -3], - "altitude": [10, 83, 56, 23, 2], + "longitude": [1.0, 2.0, -1.0, -2.0, -3.0], + "altitude": [10.0, 83.0, 56.0, 23.0, 2.0], "time": [pd.Timestamp("2017-01-02 06:00:00")] * 5, "wmo_id": ["03001", "03002", "03003", "03004", "03005"], - "ob_value": [10, 25, 4, 3, 11], + "station_id": ["03001", "03002", "03003", "03004", "03005"], + "ob_value": [10.0, 25.0, 4.0, 3.0, 11.0], } wind_speed_dict = data_dict.copy() - wind_speed_dict["ob_value"] = [3, 22, 24, 11, 9] + wind_speed_dict["ob_value"] = [3.0, 22.0, 24.0, 11.0, 9.0] wind_speed_dict["diagnostic"] = "wind_speed_at_10m" data_df = pd.DataFrame(data_dict) wind_speed_df = pd.DataFrame(wind_speed_dict) @@ -254,21 +296,23 @@ def _create_multi_site_truth_parquet_file_alt(tmp_path): output_dir = tmp_path / "truth_parquet_files" output_dir.mkdir(parents=True, exist_ok=True) - output_path = str(output_dir / "truth.parquet") + output_path = output_dir / "truth.parquet" joined_df.to_parquet(output_path, index=False, engine="pyarrow") - return output_dir + return output_dir, joined_df -def _create_ancil_file(tmp_path, wmo_ids): +def _create_ancil_file(tmp_path, site_ids): """Create an ancillary file for testing. Returns: An ancillary cube with a single value. """ - data = np.array(range(len(wmo_ids)), dtype=np.float32) + data = np.array(range(len(site_ids)), dtype=np.float32) template_cube = set_up_spot_variable_cube( data, - wmo_ids=wmo_ids, + wmo_ids=site_ids, + unique_site_id=site_ids, + unique_site_id_key="station_id", name="distance_to_water", units="m", ) @@ -277,159 +321,194 @@ def _create_ancil_file(tmp_path, wmo_ids): cube.remove_coord(coord) output_dir = tmp_path / "ancil_files" output_dir.mkdir(parents=True) - output_path = str(output_dir / "ancil.nc") - iris.save(cube, output_path) - return output_path + output_path = output_dir / "ancil.nc" + iris.save(cube, str(output_path)) + return output_path, cube + + +def filter_forecast_periods(forecast_df, forecast_periods): + """Filter the forecast DataFrame to only include the requested forecast periods.""" + if ":" in forecast_periods: + forecast_periods = [ + fp * 3600 for fp in range(*map(int, forecast_periods.split(":"))) + ] + else: + forecast_periods = [int(forecast_periods) * 3600] + return forecast_df[ + forecast_df["forecast_period"].isin(np.array(forecast_periods) * 1e9) + ].reset_index(drop=True) +def amend_expected_forecast_df( + forecast_df, forecast_periods, parquet_diagnostic_names, representation, site_id +): + forecast_df = filter_forecast_periods(forecast_df, forecast_periods) + for column in ["time", "forecast_reference_time", "blend_time"]: + forecast_df[column] = pd.to_datetime(forecast_df[column], unit="ns", utc=True) + for column in ["forecast_period", "period"]: + forecast_df[column] = pd.to_timedelta(forecast_df[column], unit="ns") + + base_df = forecast_df[forecast_df["diagnostic"] == parquet_diagnostic_names[0]] + for parquet_diagnostic_name in parquet_diagnostic_names[1:]: + additional_df = forecast_df[ + forecast_df["diagnostic"] == parquet_diagnostic_name + ] + base_df = pd.merge( + base_df, + additional_df[ + [ + *site_id, + "forecast_reference_time", + "forecast_period", + representation, + "forecast", + ] + ].rename(columns={"forecast": parquet_diagnostic_name}), + on=[ + *site_id, + "forecast_reference_time", + "forecast_period", + representation, + ], + how="left", + ) + forecast_df = base_df + + forecast_df.rename(columns={"forecast": "air_temperature"}, inplace=True) + return forecast_df + + +def amend_expected_truth_df(truth_df, parquet_diagnostic_name): + truth_df = truth_df[truth_df["diagnostic"] == parquet_diagnostic_name] + truth_df["time"] = pd.to_datetime(truth_df["time"], unit="ns", utc=True) + return truth_df + + +@pytest.mark.parametrize("representation", ["percentile", "realization"]) +@pytest.mark.parametrize("include_dynamic", [True, False]) +@pytest.mark.parametrize("include_static", [True, False]) +@pytest.mark.parametrize("include_noncube_static", [True, False]) +@pytest.mark.parametrize("remove_target", [True, False]) @pytest.mark.parametrize( - "forecast_creation,truth_creation,forecast_periods,include_static,remove_target,representation,expected", + "site_id", ["wmo_id", "station_id", ["wmo_id"], ["latitude", "longitude"]] +) +@pytest.mark.parametrize( + "forecast_creation,truth_creation,forecast_periods", [ ( _create_multi_site_forecast_parquet_file, _create_multi_site_truth_parquet_file, "6:18:6", - False, - False, - "percentile", - 5.6, - ), - ( - _create_multi_site_forecast_parquet_file, - _create_multi_site_truth_parquet_file, - "6:18:6", - True, - False, - "percentile", - 5.64, - ), - ( - _create_multi_percentile_forecast_parquet_file, - _create_multi_percentile_truth_parquet_file, - "6:18:6", - False, - False, - "percentile", - 5.62, ), ( _create_multi_percentile_forecast_parquet_file, _create_multi_percentile_truth_parquet_file, "6:18:6", - True, - False, - "percentile", - 5.62, ), ( _create_multi_forecast_period_forecast_parquet_file, _create_multi_forecast_period_truth_parquet_file, "6:18:6", - False, - False, - "percentile", - 5.61, - ), - ( - _create_multi_forecast_period_forecast_parquet_file, - _create_multi_forecast_period_truth_parquet_file, - "6:18:6", - True, - True, # Remove target feature - "percentile", - 5.62, - ), - ( - _create_multi_site_forecast_parquet_file, - _create_multi_site_truth_parquet_file, - "6:18:6", - False, - False, - "realization", # Provide realization input - 5.62, ), ( _create_multi_forecast_period_forecast_parquet_file, _create_multi_forecast_period_truth_parquet_file, "12", - False, - False, - "percentile", - 5.64, ), ], ) -def test_load_and_train_qrf( +def test_load_for_qrf( tmp_path, + representation, + include_dynamic, + include_static, + include_noncube_static, + remove_target, + site_id, forecast_creation, truth_creation, forecast_periods, - include_static, - remove_target, - representation, - expected, ): - """Test the LoadAndTrainQRF plugin.""" + """Test the LoadForTrainQRF plugin.""" feature_config = {"air_temperature": ["mean", "std", "altitude"]} - n_estimators = 2 - max_depth = 5 - random_state = 46 + parquet_diagnostic_names = ["temperature_at_screen_level"] + + if isinstance(site_id, str): + site_id = [site_id] + + forecast_path, base_expected_forecast_df, site_ids = forecast_creation( + tmp_path, representation + ) + truth_path, expected_truth_df = truth_creation(tmp_path) - forecast_path, wmo_ids = forecast_creation(tmp_path, representation) - truth_path = truth_creation(tmp_path) file_paths = [forecast_path, truth_path] + if include_dynamic: + feature_config["wind_speed_at_10m"] = ["mean", "std"] + parquet_diagnostic_names.append("wind_speed_at_10m") + if include_static: - ancil_path = _create_ancil_file(tmp_path, sorted(list(set(wmo_ids)))) + ancil_path, expected_cube = _create_ancil_file( + tmp_path, sorted(list(set(site_ids))) + ) file_paths.append(ancil_path) feature_config["distance_to_water"] = ["static"] + if include_noncube_static: + feature_config["wind_from_direction"] = ["static"] + parquet_diagnostic_names.append("wind_from_direction") + if remove_target: feature_config.pop("air_temperature") - # Create an instance of LoadAndTrainQRF with the required parameters - plugin = LoadAndTrainQRF( + expected_forecast_df = amend_expected_forecast_df( + base_expected_forecast_df.copy(), + forecast_periods, + parquet_diagnostic_names, + representation, + site_id, + ) + expected_truth_df = amend_expected_truth_df( + expected_truth_df, "temperature_at_screen_level" + ) + + # Create an instance of LoadForTrainQRF with the required parameters + plugin = LoadForTrainQRF( experiment="latestblend", feature_config=feature_config, - target_diagnostic_name="temperature_at_screen_level", + parquet_diagnostic_names=parquet_diagnostic_names, target_cf_name="air_temperature", forecast_periods=forecast_periods, cycletime="20170103T0000Z", training_length=2, - n_estimators=n_estimators, - max_depth=max_depth, - random_state=random_state, - transformation="log", - pre_transform_addition=1, + unique_site_id_keys=site_id, ) - qrf_model = plugin(file_paths) - - assert qrf_model.n_estimators == n_estimators - assert qrf_model.max_depth == max_depth - assert qrf_model.random_state == random_state - - if remove_target: - current_forecast = [] - else: - current_forecast = [5.64, 3, 55] + forecast_df, truth_df, cube_inputs = plugin(file_paths) - if include_static: - current_forecast.append(2.5) + assert isinstance(forecast_df, pd.DataFrame) + assert isinstance(truth_df, pd.DataFrame) - result = qrf_model.predict( - np.expand_dims(np.array(current_forecast), 0), quantiles=[0.5] + pd.testing.assert_frame_equal( + forecast_df, + expected_forecast_df, + check_dtype=False, + check_datetimelike_compat=True, ) - np.testing.assert_almost_equal(result, expected, decimal=2) + pd.testing.assert_frame_equal( + truth_df, expected_truth_df, check_dtype=False, check_datetimelike_compat=True + ) + if include_static: + assert isinstance(cube_inputs, iris.cube.CubeList) + assert len(cube_inputs) == 1 + assert cube_inputs[0].name() == "distance_to_water" + np.testing.assert_almost_equal(cube_inputs[0].data, expected_cube.data) -@pytest.mark.parametrize("make_files", [(False, True)]) -def test_load_and_train_qrf_no_paths(tmp_path, make_files): - """Test the LoadAndTrainQRF plugin when the no valid file paths are provided. +@pytest.mark.parametrize("make_files", [False, True]) +def test_load_for_qrf_no_paths(tmp_path, make_files): + """Test the LoadForTrainQRF plugin when the no valid file paths are provided. Either the paths do not exist, or the paths exist but the directories are empty.""" feature_config = {"air_temperature": ["mean", "std", "altitude"]} - n_estimators = 2 - max_depth = 5 - random_state = 46 file_paths = [ tmp_path / "partition" / "forecast_table/", @@ -439,19 +518,14 @@ def test_load_and_train_qrf_no_paths(tmp_path, make_files): for file_path in file_paths: (tmp_path / file_path).mkdir(parents=True, exist_ok=True) - plugin = LoadAndTrainQRF( + plugin = LoadForTrainQRF( experiment="latestblend", feature_config=feature_config, - target_diagnostic_name="temperature_at_screen_level", + parquet_diagnostic_names=["temperature_at_screen_level"], target_cf_name="air_temperature", forecast_periods="6:12:6", cycletime="20170102T0000Z", training_length=2, - n_estimators=n_estimators, - max_depth=max_depth, - random_state=random_state, - transformation="log", - pre_transform_addition=1, ) result = plugin(file_paths) # Expecting None since no valid paths are provided @@ -459,42 +533,63 @@ def test_load_and_train_qrf_no_paths(tmp_path, make_files): @pytest.mark.parametrize( - "cycletime,forecast_periods", + "forecast_creation,truth_creation,cycletime,forecast_periods", [ - ("20200102T0000Z", "6:12:6"), - ("20170102T0000Z", "30:36:6"), + ( + _create_multi_site_forecast_parquet_file, + _create_multi_site_truth_parquet_file, + "20200102T0000Z", + "6:12:6", + ), + ( + _create_multi_site_forecast_parquet_file, + _create_multi_site_truth_parquet_file, + "20170102T0000Z", + "30:36:6", + ), ], ) -def test_load_and_train_qrf_mismatches(tmp_path, cycletime, forecast_periods): - """Test the LoadAndTrainQRF plugin when the cycletime or forecast_periods +def test_load_for_qrf_mismatches( + tmp_path, + forecast_creation, + truth_creation, + cycletime, + forecast_periods, +): + """Test the LoadForTrainQRF plugin when the cycletime or forecast_periods requested are not present in the provided files.""" feature_config = {"air_temperature": ["mean", "std", "altitude"]} - n_estimators = 2 - max_depth = 5 - random_state = 46 + representation = "percentile" + + forecast_path, expected_forecast_df, _ = forecast_creation(tmp_path, representation) + expected_forecast_df = amend_expected_forecast_df( + expected_forecast_df, + forecast_periods, + ["temperature_at_screen_level"], + "percentile", + "wmo_id", + ) - file_paths = [ - tmp_path / "partition" / "forecast_table/", - tmp_path / "partition" / "truth_table/", - ] + truth_path, expected_truth_df = truth_creation(tmp_path) + expected_truth_df = amend_expected_truth_df( + expected_truth_df, "temperature_at_screen_level" + ) + + file_paths = [forecast_path, truth_path] - plugin = LoadAndTrainQRF( + plugin = LoadForTrainQRF( experiment="latestblend", feature_config=feature_config, - target_diagnostic_name="temperature_at_screen_level", + parquet_diagnostic_names=["temperature_at_screen_level"], target_cf_name="air_temperature", forecast_periods=forecast_periods, cycletime=cycletime, training_length=2, - n_estimators=n_estimators, - max_depth=max_depth, - random_state=random_state, - transformation="log", - pre_transform_addition=1, ) - result = plugin(file_paths) - # Expecting None since no valid paths are provided - assert result is None + forecast_df, _, _ = plugin(file_paths) + # Expecting an empty DataFrame since the cycletime or forecast_periods + # requested are not present in the provided file. + assert forecast_df.empty @pytest.mark.parametrize( @@ -552,30 +647,22 @@ def test_unexpected( forecast_periods, representation, ): - """Test LoadAndTrainQRF plugin behaviour in atypical situations.""" + """Test LoadForTrainQRF plugin behaviour in atypical situations.""" feature_config = {"air_temperature": ["mean", "std", "altitude"]} - n_estimators = 2 - max_depth = 5 - random_state = 46 - forecast_path, _ = forecast_creation(tmp_path, representation) - truth_path = truth_creation(tmp_path) + forecast_path, _, _ = forecast_creation(tmp_path, representation) + truth_path, _ = truth_creation(tmp_path) file_paths = [forecast_path, truth_path] - # Create an instance of LoadAndTrainQRF with the required parameters - plugin = LoadAndTrainQRF( + # Create an instance of LoadForTrainQRF with the required parameters + plugin = LoadForTrainQRF( experiment="latestblend", feature_config=feature_config, - target_diagnostic_name="temperature_at_screen_level", + parquet_diagnostic_names=["temperature_at_screen_level"], target_cf_name="air_temperature", forecast_periods=forecast_periods, cycletime="20170103T0000Z", training_length=2, - n_estimators=n_estimators, - max_depth=max_depth, - random_state=random_state, - transformation="log", - pre_transform_addition=1, ) if exception == "non_matching_truth": @@ -587,7 +674,7 @@ def test_unexpected( "distance_to_water": ["static"], } plugin.feature_config = feature_config - with pytest.raises(ValueError, match="The number of cubes loaded."): + with pytest.raises(ValueError, match="The features requested in the"): plugin.process(file_paths=file_paths) elif exception == "missing_dynamic_feature": feature_config = { @@ -595,7 +682,7 @@ def test_unexpected( "air_temperature": ["mean", "std"], } plugin.feature_config = feature_config - with pytest.raises(ValueError, match="The number of cubes loaded."): + with pytest.raises(ValueError, match="The features requested in the"): plugin.process(file_paths=file_paths) elif exception == "no_percentile_realization": with pytest.raises(ValueError, match="The forecast parquet file"): @@ -609,3 +696,155 @@ def test_unexpected( assert result is None else: raise ValueError(f"Unknown exception type: {exception}") + + +@pytest.mark.parametrize("representation", ["percentile", "realization"]) +@pytest.mark.parametrize("include_dynamic", [True, False]) +@pytest.mark.parametrize("include_static", [True, False]) +@pytest.mark.parametrize("include_noncube_static", [True, False]) +@pytest.mark.parametrize("remove_target", [True, False]) +@pytest.mark.parametrize("include_nans", [True, False]) +@pytest.mark.parametrize("include_latlon_nans", [True, False]) +@pytest.mark.parametrize( + "site_id", ["wmo_id", "station_id", ["wmo_id"], ["latitude", "longitude"]] +) +@pytest.mark.parametrize( + "forecast_creation,truth_creation,forecast_periods", + [ + ( + _create_multi_site_forecast_parquet_file, + _create_multi_site_truth_parquet_file, + "6:18:6", + ), + ( + _create_multi_percentile_forecast_parquet_file, + _create_multi_percentile_truth_parquet_file, + "6:18:6", + ), + ( + _create_multi_forecast_period_forecast_parquet_file, + _create_multi_forecast_period_truth_parquet_file, + "6:18:6", + ), + ( + _create_multi_forecast_period_forecast_parquet_file, + _create_multi_forecast_period_truth_parquet_file, + "12", + ), + ], +) +def test_prepare_and_train_qrf( + tmp_path, + representation, + include_dynamic, + include_static, + include_noncube_static, + remove_target, + include_nans, + include_latlon_nans, + site_id, + forecast_creation, + truth_creation, + forecast_periods, +): + """Test the PrepareAndTrainQRF plugin.""" + feature_config = {"air_temperature": ["mean", "std", "altitude"]} + n_estimators = 2 + max_depth = 5 + random_state = 46 + target_cf_name = "air_temperature" + + if isinstance(site_id, str): + site_id = [site_id] + + _, forecast_df, site_ids = forecast_creation(tmp_path, representation) + forecast_df = amend_expected_forecast_df( + forecast_df, + forecast_periods, + ["temperature_at_screen_level"], + representation, + site_id, + ) + _, truth_df = truth_creation(tmp_path) + + truth_df = amend_expected_truth_df(truth_df, "temperature_at_screen_level") + + if include_dynamic: + forecast_df["wind_speed_at_10m"] = [10.0, 20.0, 15.0, 12.0, 11.0][ + : len(forecast_df) + ] + feature_config["wind_speed_at_10m"] = ["mean", "std"] + + if include_static: + _, ancil_cube = _create_ancil_file(tmp_path, sorted(list(set(site_ids)))) + feature_config["distance_to_water"] = ["static"] + + if include_noncube_static: + forecast_df["wind_from_direction"] = [90.0, 100.0, 110.0, 120.0, 130.0][ + : len(forecast_df) + ] + feature_config["wind_from_direction"] = ["static"] + + if remove_target: + feature_config.pop("air_temperature") + + if include_nans: + # Insert a NaN will result in this row being dropped. + truth_df.loc[0, "ob_value"] = pd.NA + + if include_latlon_nans: + # As latitude is not a feature, this NaN should be ignored. + truth_df.loc[1, "latitude"] = pd.NA + + if feature_config == {}: + pytest.skip("No features to train on") + + # Create an instance of PrepareAndTrainQRF with the required parameters + plugin = PrepareAndTrainQRF( + feature_config=feature_config, + target_cf_name=target_cf_name, + n_estimators=n_estimators, + max_depth=max_depth, + random_state=random_state, + transformation="log", + pre_transform_addition=1, + unique_site_id_keys=site_id, + ) + if truth_df["ob_value"].isna().all(): + with pytest.raises(ValueError, match="Empty truth DataFrame"): + plugin(forecast_df, truth_df) + return + elif include_static: + qrf_model, transformation, pre_transform_addition = plugin( + forecast_df, truth_df, iris.cube.CubeList([ancil_cube]) + ) + else: + qrf_model, transformation, pre_transform_addition = plugin( + forecast_df, truth_df + ) + + assert qrf_model.n_estimators == n_estimators + assert qrf_model.max_depth == max_depth + assert qrf_model.random_state == random_state + assert transformation == "log" + assert pre_transform_addition == 1 + + current_forecast = [5.64, 3, 55] + + if remove_target: + current_forecast = [] + + if include_dynamic: + current_forecast.extend([7, 1]) + + if include_noncube_static: + current_forecast.append(100) + + if include_static: + current_forecast.append(2.5) + + result = qrf_model.predict( + np.expand_dims(np.array(current_forecast), 0), quantiles=[0.5] + ) + expected = 5.6 + np.testing.assert_almost_equal(result, expected, decimal=1) diff --git a/improver_tests/calibration/quantile_regression_random_forests_calibration/test_quantile_regression_random_forest.py b/improver_tests/calibration/quantile_regression_random_forests_calibration/test_quantile_regression_random_forest.py index f48d89cc3b..d303f6a854 100644 --- a/improver_tests/calibration/quantile_regression_random_forests_calibration/test_quantile_regression_random_forest.py +++ b/improver_tests/calibration/quantile_regression_random_forests_calibration/test_quantile_regression_random_forest.py @@ -20,8 +20,8 @@ ApplyQuantileRegressionRandomForests, TrainQuantileRegressionRandomForests, _check_valid_transformation, - get_required_column_names, prep_feature, + prep_features_from_config, quantile_forest_package_available, sanitise_forecast_dataframe, ) @@ -42,6 +42,7 @@ def _create_forecasts( forecast_reference_time: str, validity_time: str, data: list[int], + representation: str = "realization", return_cube: bool = False, ) -> Cube | pd.DataFrame: """Create site forecast cube with realizations. @@ -62,12 +63,20 @@ def _create_forecasts( name="wind_speed_at_10m", units="m s-1", wmo_ids=WMO_ID, + unique_site_id=WMO_ID, + unique_site_id_key="station_id", latitudes=np.array([50, 60], np.float32), longitudes=np.array([0, 10], np.float32), altitudes=np.array([10, 20], np.float32), time=dt.strptime(validity_time, DT_FORMAT), frt=dt.strptime(forecast_reference_time, DT_FORMAT), ) + if representation == "percentile": + increment = (1 / (len(cube.coord("realization").points) + 1)) * 100 + cube.coord("realization").rename("percentile") + percentiles = np.array(np.arange(increment, 100, increment), dtype=np.float32) + cube.coord("percentile").points = percentiles + if return_cube: return cube df = as_data_frame( @@ -115,6 +124,8 @@ def _create_ancil_file(return_cube=False): template_cube = set_up_spot_variable_cube( data, wmo_ids=WMO_ID, + unique_site_id=WMO_ID, + unique_site_id_key="station_id", latitudes=np.array([50, 60], np.float32), longitudes=np.array([0, 10], np.float32), altitudes=np.array([10, 20], np.float32), @@ -142,7 +153,6 @@ def _run_train_qrf( n_estimators, max_depth, random_state, - compression, transformation, pre_transform_addition, extra_kwargs, @@ -162,6 +172,8 @@ def _run_train_qrf( realization_data=[2, 6, 10], truth_data=[4.2, 3.8, 5.8, 6, 7, 7.3, 9.1, 9.5], tmp_path=None, + compression=5, + site_id="wmo_id", ): realization_data = np.array(realization_data, dtype=np.float32) forecast_dfs = [] @@ -188,7 +200,7 @@ def _run_train_qrf( if include_static: ancil_df = _create_ancil_file() forecast_df = forecast_df.merge( - ancil_df[["wmo_id", "distance_to_water"]], on=["wmo_id"], how="left" + ancil_df[[site_id, "distance_to_water"]], on=[site_id], how="left" ) feature_config["distance_to_water"] = ["static"] if "air_temperature" in feature_config.keys(): @@ -204,6 +216,7 @@ def _run_train_qrf( random_state=random_state, transformation=transformation, pre_transform_addition=pre_transform_addition, + unique_site_id_keys=site_id, **extra_kwargs, ) result = plugin.process(forecast_df, truth_df) @@ -226,27 +239,33 @@ def test_quantile_forest_package_available(): assert result == expected +@pytest.mark.parametrize("representation", ["percentile", "realization"]) @pytest.mark.parametrize( - "feature,expected,expected_dtype", + "feature_name,expected,expected_dtype", [ - ("mean", np.tile(np.array([6, 8], dtype=np.float32), 3), np.float32), + ("mean", np.tile([6, 8], 3).astype(np.float32), np.float32), ("std", np.repeat(4, 6).astype(np.float32), np.float32), - ("latitude", np.tile(np.array([50, 60], dtype=np.float32), 3), np.float32), - ("longitude", np.tile(np.array([0, 10], dtype=np.float32), 3), np.float32), - ("altitude", np.tile(np.array([10, 20], dtype=np.float32), 3), np.float32), + ("min", np.tile([2, 4], 3).astype(np.float32), np.float32), + ("max", np.tile([10, 12], 3).astype(np.float32), np.float32), + ("percentile_50", np.tile([6, 8], 3).astype(np.float32), np.float32), + ("members_below_5", np.repeat(1, 6).astype(np.float32), np.float32), + ("members_above_5", np.repeat(2, 6).astype(np.float32), np.float32), + ("latitude", np.tile([50, 60], 3).astype(np.float32), np.float32), + ("longitude", np.tile([0, 10], 3).astype(np.float32), np.float32), + ("altitude", np.tile([10, 20], 3).astype(np.float32), np.float32), ( "day_of_year", - np.tile(np.array([1, 1], dtype=np.int32), 3), + np.tile([1, 1], 3).astype(np.int32), np.int32, ), ( "day_of_year_sin", - np.tile(np.array([0.01716633, 0.01716633], dtype=np.float32), 3), + np.tile([0.01716633, 0.01716633], 3).astype(np.float32), np.float32, ), ( "day_of_year_cos", - np.tile(np.array([0.99985266, 0.99985266], dtype=np.float32), 3), + np.tile([0.99985266, 0.99985266], 3).astype(np.float32), np.float32, ), ("hour_of_day", np.repeat(12, 6).astype(np.int32), np.int32), @@ -254,20 +273,24 @@ def test_quantile_forest_package_available(): ("hour_of_day_cos", np.repeat(-1, 6).astype(np.float32), np.float32), ("forecast_period", np.repeat(43200, 6).astype(np.int32), np.int32), ("day_of_training_period", np.repeat(0, 6).astype(np.int32), np.int32), - ("static", np.tile(np.array([2, 3], dtype=np.float32), 3), np.float32), + ("static", np.tile([2, 3], 3).astype(np.float32), np.float32), ], ) -def test_prep_feature_single_time(feature, expected, expected_dtype): +def test_prep_feature_single_time( + representation, feature_name, expected, expected_dtype +): """Test the prep_feature function for a single time.""" - feature_name = "wind_speed_at_10m" + variable_name = "wind_speed_at_10m" forecast_reference_time = "20170101T0000Z" validity_time = "20170101T1200Z" data = np.array([2, 6, 10]) - forecast_df = _create_forecasts(forecast_reference_time, validity_time, data) + forecast_df = _create_forecasts( + forecast_reference_time, validity_time, data, representation + ) forecast_df = _add_day_of_training_period(forecast_df) - if feature == "static": + if feature_name == "static": feature_df = _create_ancil_file() forecast_df = forecast_df.merge( feature_df[["wmo_id", "distance_to_water"]], on=["wmo_id"], how="left" @@ -275,12 +298,20 @@ def test_prep_feature_single_time(feature, expected, expected_dtype): else: forecast_df = forecast_df.copy() - result = prep_feature(forecast_df, feature_name, feature) + result = prep_feature(forecast_df, variable_name, feature_name) - if feature in ["mean", "std"]: - assert result.shape == (6, 12) - feature_name_modified = f"{feature_name}_{feature}" - elif feature in [ + if feature_name in [ + "mean", + "std", + "min", + "max", + "percentile_50", + "members_below_5", + "members_above_5", + ]: + assert result.shape == (6, 13) + variable_name_modified = f"{variable_name}_{feature_name}" + elif feature_name in [ "day_of_year", "day_of_year_sin", "day_of_year_cos", @@ -288,24 +319,53 @@ def test_prep_feature_single_time(feature, expected, expected_dtype): "hour_of_day_sin", "hour_of_day_cos", ]: - assert result.shape == (6, 12) - feature_name_modified = feature - elif feature in ["static"]: - assert result.shape == (6, 12) - feature_name_modified = "distance_to_water" + assert result.shape == (6, 13) + variable_name_modified = feature_name + elif feature_name in ["static"]: + assert result.shape == (6, 13) + variable_name_modified = "distance_to_water" else: - assert result.shape == (6, 11) - feature_name_modified = feature + assert result.shape == (6, 12) + variable_name_modified = feature_name - assert result[feature_name_modified].dtype == expected_dtype - np.testing.assert_allclose(result[feature_name_modified], expected, atol=1e-6) + assert result[variable_name_modified].dtype == expected_dtype + np.testing.assert_allclose(result[variable_name_modified], expected, atol=1e-6) + + +@pytest.mark.parametrize("scenario", ["uneven_percentiles1", "uneven_percentiles2"]) +def test_prep_feature_invalid_percentiles(scenario): + """Test that an error is raised if invalid percentiles are provided.""" + variable_name = "wind_speed_at_10m" + + forecast_reference_time = "20170101T0000Z" + validity_time = "20170101T1200Z" + data = np.array([2, 6, 10]) + forecast_df = _create_forecasts( + forecast_reference_time, validity_time, data, representation="percentile" + ) + forecast_df = _add_day_of_training_period(forecast_df) + + if scenario == "uneven_percentiles1": + forecast_df.replace(to_replace={25: 10, 50: 20, 75: 20}, inplace=True) + elif scenario == "uneven_percentiles2": + forecast_df.replace(to_replace={25: 10, 75: 90}, inplace=True) + + with pytest.raises( + ValueError, match="Forecast percentiles must be equally spaced." + ): + prep_feature(forecast_df, variable_name, "mean") @pytest.mark.parametrize( - "feature,expected,expected_dtype", + "feature_name,expected,expected_dtype", [ ("mean", np.tile([6, 8], 18).astype(np.float32), np.float32), ("std", np.repeat(4, 36).astype(np.float32), np.float32), + ("min", np.tile([2, 4], 18).astype(np.float32), np.float32), + ("max", np.tile(np.array([10, 12], dtype=np.float32), 18), np.float32), + ("percentile_50", np.tile(np.array([6, 8], dtype=np.float32), 18), np.float32), + ("members_below_5", np.repeat(1, 36).astype(np.float32), np.float32), + ("members_above_5", np.repeat(2, 36).astype(np.float32), np.float32), ("latitude", np.tile([50, 60], 18).astype(np.float32), np.float32), ("longitude", np.tile([0, 10], 18).astype(np.float32), np.float32), ("altitude", np.tile([10, 20], 18).astype(np.float32), np.float32), @@ -344,7 +404,7 @@ def test_prep_feature_single_time(feature, expected, expected_dtype): ("static", np.tile([2, 3], 18).astype(np.float32), np.float32), ], ) -def test_prep_feature_more_times(feature, expected, expected_dtype): +def test_prep_feature_more_times(feature_name, expected, expected_dtype): """Test the prep_feature function for multiple times.""" forecast_reference_times = [ "20170101T0000Z", @@ -365,7 +425,7 @@ def test_prep_feature_more_times(feature, expected, expected_dtype): data = np.array([2, 6, 10]) - feature_name = "wind_speed_at_10m" + variable_name = "wind_speed_at_10m" data = np.array([2, 6, 10]) @@ -375,7 +435,7 @@ def test_prep_feature_more_times(feature, expected, expected_dtype): forecast_df = pd.concat(forecast_dfs) forecast_df = _add_day_of_training_period(forecast_df) - if feature == "static": + if feature_name == "static": feature_df = _create_ancil_file() forecast_df = forecast_df.merge( feature_df[["wmo_id", "distance_to_water"]], on=["wmo_id"], how="left" @@ -384,12 +444,20 @@ def test_prep_feature_more_times(feature, expected, expected_dtype): else: forecast_df = forecast_df.copy() - result = prep_feature(forecast_df, feature_name, feature) + result = prep_feature(forecast_df, variable_name, feature_name) - if feature in ["mean", "std"]: - assert result.shape == (36, 12) - feature_name_modified = f"{feature_name}_{feature}" - elif feature in [ + if feature_name in [ + "mean", + "std", + "min", + "max", + "percentile_50", + "members_below_5", + "members_above_5", + ]: + assert result.shape == (36, 13) + variable_name_modified = f"{variable_name}_{feature_name}" + elif feature_name in [ "day_of_year", "day_of_year_sin", "day_of_year_cos", @@ -397,17 +465,17 @@ def test_prep_feature_more_times(feature, expected, expected_dtype): "hour_of_day_sin", "hour_of_day_cos", ]: - assert result.shape == (36, 12) - feature_name_modified = feature - elif feature in ["static"]: - assert result.shape == (36, 12) - feature_name_modified = "distance_to_water" + assert result.shape == (36, 13) + variable_name_modified = feature_name + elif feature_name in ["static"]: + assert result.shape == (36, 13) + variable_name_modified = "distance_to_water" else: - assert result.shape == (36, 11) - feature_name_modified = feature + assert result.shape == (36, 12) + variable_name_modified = feature_name - assert result[feature_name_modified].dtype == expected_dtype - np.testing.assert_allclose(result[feature_name_modified], expected, atol=1e-6) + assert result[variable_name_modified].dtype == expected_dtype + np.testing.assert_allclose(result[variable_name_modified], expected, atol=1e-6) @pytest.mark.parametrize( @@ -465,24 +533,25 @@ def test_sanitise_forecast_dataframe(feature_config): "distance_to_water": ["static"], }, {"wind_speed_at_10m": ["latitude", "longitude", "height"]}, + {"wind_speed_at_10m": ["mean", "std"], "surface_temperature": ["mean"]}, ], ) -def test_get_required_column_names(feature_config): - """Test the get_required_column_names function.""" +def test_prep_features_from_config(feature_config): + """Test the prep_features_from_config function.""" data_dict = { "wmo_id": np.tile(WMO_ID, 3), "latitude": np.tile(LATITUDE, 3), "longitude": np.tile(LONGITUDE, 3), "altitude": np.tile(ALTITUDE, 3), - "wind_speed_at_10m_mean": np.repeat(5, 6), - "wind_speed_at_10m_std": np.repeat(1, 6), "wind_speed_at_10m": np.tile([4, 6], 3), "realization": np.tile([1, 2], 3), "distance_to_water": np.tile([2.0, 3.0], 3), + "blend_time": pd.Timestamp("2017-01-02 00:00:00", tz="utc"), + "forecast_period": 6 * 3.6e12, + "forecast_reference_time": pd.Timestamp("2017-01-02 00:00:00", tz="utc"), } df = pd.DataFrame(data_dict) - # expected = feature_config["wind_speed_at_10m"].copy() expected = list(itertools.chain.from_iterable(feature_config.values())) if "mean" in expected: expected = ["wind_speed_at_10m_mean" if e == "mean" else e for e in expected] @@ -492,11 +561,25 @@ def test_get_required_column_names(feature_config): expected = ["distance_to_water" if e == "static" else e for e in expected] if "height" in expected: - with pytest.raises(ValueError, match="Feature 'height' is not supported."): - get_required_column_names(df, feature_config) + with pytest.raises( + ValueError, + match=( + "Feature 'height' for variable 'wind_speed_at_10m' is not supported." + ), + ): + prep_features_from_config(df, feature_config) + elif "surface_temperature" in feature_config.keys(): + with pytest.raises( + ValueError, + match=( + "Feature 'surface_temperature' is not present " + "in the forecast DataFrame." + ), + ): + prep_features_from_config(df, feature_config) else: - result = get_required_column_names(df, feature_config) - assert result == expected + _, result_names = prep_features_from_config(df, feature_config) + assert result_names == expected @pytest.mark.parametrize( @@ -516,24 +599,23 @@ def test_check_valid_transformation(transformation): @pytest.mark.parametrize( - "n_estimators,max_depth,random_state,compression,transformation,pre_transform_addition,extra_kwargs,include_static,expected", + "n_estimators,max_depth,random_state,transformation,pre_transform_addition,extra_kwargs,include_static,expected", [ - (2, 2, 55, 5, None, 0, {}, False, 4.2), # Basic test case - (2, 2, 54, 5, None, 0, {}, False, 4.15), # Different random state - (1, 1, 54, 5, None, 0, {}, False, 4.2), # Fewer estimators and reduced depth - (2, 2, 55, 5, "log", 10, {}, False, 2.65), # Log transformation - (2, 2, 55, 5, "log10", 10, {}, False, 1.15), # Log10 transformation - (2, 2, 55, 5, "sqrt", 10, {}, False, 3.76), # Square root transformation - (2, 2, 55, 5, "cbrt", 10, {}, False, 2.42), # Cube root transformation - (2, 2, 55, 5, None, 0, {"criterion": "absolute_error"}, False, 4.2), # noqa # Different criterion - (2, 5, 55, 5, None, 0, {}, True, 4.2), # Include static data + (2, 2, 55, None, 0, {}, False, 4.2), # Basic test case + (2, 2, 54, None, 0, {}, False, 4.15), # Different random state + (1, 1, 54, None, 0, {}, False, 4.2), # Fewer estimators and reduced depth + (2, 2, 55, "log", 10, {}, False, 2.65), # Log transformation + (2, 2, 55, "log10", 10, {}, False, 1.15), # Log10 transformation + (2, 2, 55, "sqrt", 10, {}, False, 3.76), # Square root transformation + (2, 2, 55, "cbrt", 10, {}, False, 2.42), # Cube root transformation + (2, 2, 55, None, 0, {"criterion": "absolute_error"}, False, 4.2), # noqa # Different criterion + (2, 5, 55, None, 0, {}, True, 4.2), # Include static data ], ) def test_train_qrf_single_lead_times( n_estimators, max_depth, random_state, - compression, transformation, pre_transform_addition, extra_kwargs, @@ -550,7 +632,6 @@ def test_train_qrf_single_lead_times( n_estimators, max_depth, random_state, - compression, transformation, pre_transform_addition, extra_kwargs, @@ -581,24 +662,23 @@ def test_train_qrf_single_lead_times( @pytest.mark.parametrize( - "n_estimators,max_depth,random_state,compression,transformation,pre_transform_addition,extra_kwargs,include_static,expected", + "n_estimators,max_depth,random_state,transformation,pre_transform_addition,extra_kwargs,include_static,expected", [ - (2, 2, 55, 5, None, 0, {}, False, 5.8), # Basic test case - (1, 1, 55, 5, None, 0, {}, False, 3.8), # Fewer estimators and reduced depth - (1, 1, 73, 5, None, 0, {}, False, 5.8), # Different random state - (2, 2, 55, 5, "log", 10, {}, False, 2.752), # Log transformation - (2, 2, 55, 5, "log10", 10, {}, False, 1.195), # Log10 transformation - (2, 2, 55, 5, "sqrt", 10, {}, False, 3.964), # Square root transformation - (2, 2, 55, 5, "cbrt", 10, {}, False, 2.504), # Cube root transformation - (2, 2, 55, 5, None, 0, {"criterion": "absolute_error"}, False, 5.8), # noqa # Different criterion - (1, 1, 73, 5, None, 0, {}, True, 3.8), # Include static data + (2, 2, 55, None, 0, {}, False, 5.8), # Basic test case + (1, 1, 55, None, 0, {}, False, 3.8), # Fewer estimators and reduced depth + (1, 1, 73, None, 0, {}, False, 5.8), # Different random state + (2, 2, 55, "log", 10, {}, False, 2.752), # Log transformation + (2, 2, 55, "log10", 10, {}, False, 1.195), # Log10 transformation + (2, 2, 55, "sqrt", 10, {}, False, 3.964), # Square root transformation + (2, 2, 55, "cbrt", 10, {}, False, 2.504), # Cube root transformation + (2, 2, 55, None, 0, {"criterion": "absolute_error"}, False, 5.8), # noqa # Different criterion + (1, 1, 73, None, 0, {}, True, 3.8), # Include static data ], ) def test_train_qrf_multiple_lead_times( n_estimators, max_depth, random_state, - compression, transformation, pre_transform_addition, extra_kwargs, @@ -615,7 +695,6 @@ def test_train_qrf_multiple_lead_times( n_estimators, max_depth, random_state, - compression, transformation, pre_transform_addition, extra_kwargs, @@ -636,21 +715,24 @@ def test_train_qrf_multiple_lead_times( @pytest.mark.parametrize( - "feature_config,data,include_static,expected", + "feature_config,data,include_static,site_id,expected", [ - ({"wind_speed_at_10m": ["mean"]}, [5], False, [5]), # One feature - ({"wind_speed_at_10m": ["latitude"]}, [61], False, [7.75]), # noqa Without the target - ({"wind_speed_at_10m": ["mean"]}, [5], True, [4]), # With static data + ({"wind_speed_at_10m": ["mean"]}, [5], False, "wmo_id", [5]), # One feature + ({"wind_speed_at_10m": ["mean"]}, [5], False, "station_id", [5]), # One feature + ({"wind_speed_at_10m": ["latitude"]}, [61], False, "wmo_id", [7.75]), # noqa Without the target + ({"wind_speed_at_10m": ["mean"]}, [5], True, "wmo_id", [4]), # With static data ( {"wind_speed_at_10m": ["mean"], "air_temperature": ["mean"]}, [5], False, + "wmo_id", [5], ), # Multiple dynamic features ( {"wind_speed_at_10m": ["mean"], "pressure_at_mean_sea_level": ["mean"]}, [5], False, + "wmo_id", "Feature 'pressure_at_mean_sea_level' is not present", ), # Multiple dynamic features ], @@ -659,6 +741,7 @@ def test_alternative_feature_configs( feature_config, data, include_static, + site_id, expected, ): """Test the TrainQuantileRegressionRandomForests plugin for a few different @@ -666,7 +749,6 @@ def test_alternative_feature_configs( n_estimators = 2 max_depth = 5 random_state = 55 - compression = 5 extra_kwargs = {} transformation = None pre_transform_addition = 0 @@ -678,7 +760,6 @@ def test_alternative_feature_configs( n_estimators, max_depth, random_state, - compression, transformation, pre_transform_addition, extra_kwargs, @@ -691,11 +772,11 @@ def test_alternative_feature_configs( n_estimators, max_depth, random_state, - compression, transformation, pre_transform_addition, extra_kwargs, include_static, + site_id=site_id, ) assert qrf_model.n_estimators == n_estimators @@ -756,7 +837,6 @@ def test_apply_qrf( n_estimators = 2 max_depth = 2 random_state = 55 - compression = 5 extra_kwargs = {} qrf_model = _run_train_qrf( @@ -764,7 +844,6 @@ def test_apply_qrf( n_estimators, max_depth, random_state, - compression, transformation, pre_transform_addition, extra_kwargs, diff --git a/improver_tests/calibration/test_init.py b/improver_tests/calibration/test_init.py index 49a7be7268..ef182a79b7 100644 --- a/improver_tests/calibration/test_init.py +++ b/improver_tests/calibration/test_init.py @@ -9,11 +9,13 @@ import iris import numpy as np +import pandas as pd import pytest from iris.cube import CubeList from improver.calibration import ( add_warning_comment, + get_training_period_cycles, split_forecasts_and_bias_files, split_forecasts_and_coeffs, split_forecasts_and_truth, @@ -613,5 +615,30 @@ def test_add_warning_to_comment(comment): assert result.attributes["comment"] == expected +@pytest.mark.parametrize( + "cycletime,forecast_period,training_length,expected", + [ + ("20171110T0000Z", 3600, 1, [pd.Timestamp(2017, 11, 9, 0, 0, tz="UTC")]), # noqa 1 hour + ("20171110T0000Z", 90000, 1, [pd.Timestamp(2017, 11, 8, 0, 0, tz="UTC")]), # noqa 25 hours + ("20171110T0000Z", 176400, 1, [pd.Timestamp(2017, 11, 7, 0, 0, tz="UTC")]), # noqa 49 hours + ( + "20171110T0000Z", + 3600, + 2, + [ + pd.Timestamp(2017, 11, 8, 0, 0, tz="UTC"), + pd.Timestamp(2017, 11, 9, 0, 0, tz="UTC"), + ], + ), # 1 hour, 2 days + ], +) +def test_get_training_period_cycles( + cycletime, forecast_period, training_length, expected +): + """Test that get_training_period_cycles returns the expected cycletimes.""" + result = get_training_period_cycles(cycletime, forecast_period, training_length) + assert list(result) == expected + + if __name__ == "__main__": unittest.main() diff --git a/improver_tests/cli/test_init.py b/improver_tests/cli/test_init.py index b8a8ea5bda..a2782687fc 100644 --- a/improver_tests/cli/test_init.py +++ b/improver_tests/cli/test_init.py @@ -45,6 +45,8 @@ def dummy_function(first, second=0, third=2): (iris.cube.Cube) """ + if isinstance(first, list): + return first[0] first = int(first) return first + first @@ -187,72 +189,55 @@ def test_without_output(self, m): self.assertEqual(result, 4) @patch("improver.utilities.save.save_netcdf") - def test_with_output(self, m): + def test_with_output_cube(self, m): """Tests that save_netcdf is called with object and string, default compression_level=1 and default least_significant_digit=None""" - # pylint disable is needed as it can't see the wrappers output kwarg. - result = wrapped_with_output.cli("argv[0]", "2", "--output=foo.nc") - m.assert_called_with(4, "foo.nc", 1, None) + save_object = Cube([0]) + result = wrapped_with_output.cli("argv[0]", [save_object], "--output=foo") + m.assert_called_with(save_object, "foo", 1, None) + self.assertEqual(result, None) + + @patch("joblib.dump") + @patch("builtins.open", unittest.mock.mock_open()) + def test_with_output_pickle(self, m): + """Tests that joblib.dump is called for a non-cube object""" + save_object = {"a": 1} + result = wrapped_with_output.cli("argv[0]", [save_object], "--output=foo") + m.assert_called_with(save_object, "foo", compress=1) self.assertEqual(result, None) @patch("improver.utilities.save.save_netcdf") def test_with_output_compression_level(self, m): """Tests save_netcdf, compression-level=9 and default least-significant-digit=None""" - # pylint disable is needed as it can't see the wrappers output kwarg. + save_object = Cube([0]) result = wrapped_with_output.cli( - "argv[0]", "2", "--output=foo.nc", "--compression-level=9" + "argv[0]", [save_object], "--output=foo", "--compression-level=9" ) - m.assert_called_with(4, "foo.nc", 9, None) + m.assert_called_with(save_object, "foo", 9, None) self.assertEqual(result, None) @patch("improver.utilities.save.save_netcdf") def test_with_output_no_compression(self, m): """Tests save_netcdf, compression-level=0 and default least-significant-digit=None""" - # pylint disable is needed as it can't see the wrappers output kwarg. + save_object = Cube([0]) result = wrapped_with_output.cli( - "argv[0]", "2", "--output=foo.nc", "--compression-level=0" + "argv[0]", [save_object], "--output=foo", "--compression-level=0" ) - m.assert_called_with(4, "foo.nc", 0, None) + m.assert_called_with(save_object, "foo", 0, None) self.assertEqual(result, None) @patch("improver.utilities.save.save_netcdf") def test_with_output_with_least_significant_figure(self, m): """Tests save_netcdf, compression-level=0 and least-significant-digit=2""" - # pylint disable is needed as it can't see the wrappers output kwarg. + save_object = Cube([0]) result = wrapped_with_output.cli( "argv[0]", - "2", - "--output=foo.nc", + [save_object], + "--output=foo", "--compression-level=0", "--least-significant-digit=2", ) - m.assert_called_with(4, "foo.nc", 0, 2) - self.assertEqual(result, None) - - @patch("joblib.dump") - def test_with_output_pickle(self, m): - """Tests that joblib.dump is called with object and string, default - compression_level=1 and default least_significant_digit=None""" - # pylint disable is needed as it can't see the wrappers output kwarg. - result = wrapped_with_output.cli( - "argv[0]", - "2", - "--output=foo.pickle", - ) - m.assert_called_with(4, "foo.pickle", compress=1) - self.assertEqual(result, None) - - @patch("joblib.dump") - def test_with_output_pkl(self, m): - """Tests that joblib.dump is called with object and string, default - compression_level=1 and default least_significant_digit=None""" - # pylint disable is needed as it can't see the wrappers output kwarg. - result = wrapped_with_output.cli( - "argv[0]", - "2", - "--output=foo.pkl", - ) - m.assert_called_with(4, "foo.pkl", compress=1) + m.assert_called_with(save_object, "foo", 0, 2) self.assertEqual(result, None)