Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,11 @@
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
"python": ("https://docs.python.org/3/", None),
"iris": ("https://scitools-iris.readthedocs.io/en/latest/", None),
"cartopy": ("https://scitools.org.uk/cartopy/docs/latest/", None),
"iris": ("https://scitools-iris.readthedocs.io/en/stable/", None),
"cartopy": ("https://cartopy.readthedocs.io/stable/", None),
"cf_units": ("https://cf-units.readthedocs.io/en/stable/", None),
"numpy": ("https://numpy.org/doc/stable/", None),
"scipy": ("https://docs.scipy.org/doc/scipy-1.6.2/reference/", None),
"scipy": ("https://docs.scipy.org/doc/scipy/reference", None),
"pandas": ("https://pandas.pydata.org/pandas-docs/dev/", None),
}

Expand Down
106 changes: 106 additions & 0 deletions improver/calibration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
"""

from collections import OrderedDict
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union

import iris
import joblib
import pandas as pd
from iris.cube import Cube, CubeList

from improver.metadata.probabilistic import (
Expand Down Expand Up @@ -51,6 +55,7 @@ def __init__(self):
("altitude", pa.float32()),
("time", pa.timestamp("s", "utc")),
("wmo_id", pa.string()),
("station_id", pa.string()),
("ob_value", pa.float32()),
]
)
Expand Down Expand Up @@ -263,6 +268,85 @@ def split_forecasts_and_bias_files(cubes: CubeList) -> Tuple[Cube, Optional[Cube
return forecast_cube, bias_cubes


def split_pickle_parquet_and_netcdf(files):
"""Split the input files into pickle, parquet, and netcdf files.
Only a single pickle file is expected.

Args:
files:
A list of input file paths which will be split into pickle,
parquet, and netcdf files.
Returns:
- A flattened cube list containing all the cubes contained within the
provided paths to NetCDF files.
- A list of paths to Parquet files.
- A loaded pickle file.
Raises:
ValueError: If multiple pickle files provided, as only one is ever expected.
"""
cubes = iris.cube.CubeList()
loaded_pickles = []
parquets = []

for file_path in files:
if not file_path.exists():
continue

# Directories indicate we are working with parquet files.
if file_path.is_dir():
parquets.append(file_path)
continue

try:
cube = iris.load(file_path)
cubes.extend(cube)
except ValueError:
try:
loaded_pickles.append(joblib.load(file_path))
except Exception as e:
msg = f"Failed to load {file_path}: {e}"
raise ValueError(msg)

if len(loaded_pickles) > 1:
msg = "Multiple pickle inputs have been provided. Only one is expected."
raise ValueError(msg)

return (
cubes if cubes else None,
parquets if parquets else None,
loaded_pickles[0] if loaded_pickles else None,
)


def identify_parquet_type(parquet_paths: List[Path]):
"""Determine whether the provided parquet paths contain forecast or truth data.
This is done by checking the columns within the parquet files for the presence
of a forecast_period column which is only present for forecast data.
Args:
parquet_paths:
A list of paths to Parquet files.
Returns:
- The path to the Parquet file containing the historical forecasts.
- The path to the Parquet file containing the truths.
"""
import pyarrow.parquet as pq

forecast_table_path = None
truth_table_path = None
for file_path in parquet_paths:
try:
example_file_path = next(file_path.glob("**/*.parquet"))
except StopIteration:
continue
try:
pq.read_schema(example_file_path).field("forecast_period")
forecast_table_path = file_path
except KeyError:
truth_table_path = file_path

return forecast_table_path, truth_table_path


def validity_time_check(forecast: Cube, validity_times: List[str]) -> bool:
"""Check the validity time of the forecast matches the accepted validity times
within the validity times list.
Expand Down Expand Up @@ -307,3 +391,25 @@ def add_warning_comment(forecast: Cube) -> Cube:
"however, no calibration has been applied."
)
return forecast


def get_training_period_cycles(
cycletime: str, forecast_period: Union[int, str], training_length: int
):
"""Generate a list of forecast reference times for the training period.

Args:
cycletime: The time at which the forecast is issued in a format understood by
pandas.Timestamp e.g. 20170109T0000Z.
forecast_period: The forecast period in seconds.
training_length: The number of days in the training period.
"""
forecast_period_td = pd.Timedelta(int(forecast_period), unit="seconds")

return pd.date_range(
end=pd.Timestamp(cycletime)
- pd.Timedelta(1, unit="days")
- forecast_period_td.floor("D"),
periods=int(training_length),
freq="D",
)
6 changes: 3 additions & 3 deletions improver/calibration/dataframe_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def _unique_check(df: DataFrame, column: str) -> None:
raise ValueError(msg)


def _quantile_check(df: DataFrame) -> None:
def quantile_check(df: DataFrame) -> None:
"""Check that the percentiles provided can be considered to be
quantiles with equal spacing spanning the percentile range.

Expand All @@ -142,7 +142,7 @@ def _quantile_check(df: DataFrame) -> None:

if not np.allclose(expected_percentiles, df["percentile"].unique()):
msg = (
"The forecast percentiles can not be considered as quantiles. "
"Forecast percentiles must be equally spaced. "
f"The forecast percentiles are {df['percentile'].unique()}."
"Based on the number of percentiles provided, the expected "
f"percentiles would be {expected_percentiles}."
Expand Down Expand Up @@ -447,7 +447,7 @@ def _prepare_dataframes(

# Check the percentiles can be considered to be equally space quantiles.
if representation_type == "percentile":
_quantile_check(forecast_df)
quantile_check(forecast_df)

# Remove forecast duplicates.
forecast_cols = [
Expand Down
Loading