Skip to content
Draft
142 changes: 142 additions & 0 deletions examples/benchmarks/custom_benchmark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
<!--
SPDX-FileCopyrightText: 2026 Contributors to the OpenSTEF project <openstef@lfenergy.org>

SPDX-License-Identifier: MPL-2.0
-->

# Custom Benchmark Example

End-to-end examples for running and customizing OpenSTEF **BEAM** (Backtesting, Evaluation, Analysis, Metrics) benchmarks.

## What is BEAM?

BEAM replays historical data day by day, trains your model, makes forecasts, and scores them -- all without data leakage. It works with any model that implements the `BacktestForecasterMixin` interface.

## Files

| File | What it does |
|---|---|
| `example_baseline.py` | **Start here.** A minimal forecaster that predicts the median of recent history. Shows the `BacktestForecasterMixin` interface (`config`, `quantiles`, `fit`, `predict`). |
| `example_benchmark.py` | Defines a custom benchmark: target provider (where data lives), metrics, and pipeline assembly. Extends `SimpleTargetProvider` directly -- adapt this when you have your own data layout. |
| `run_liander2024_benchmark.py` | Runs the example baseline + GBLinear on the built-in **Liander 2024** dataset (auto-downloaded from HuggingFace). Good starting point if you just want to try things out. |
| `run_benchmark.py` | Same as above but uses the custom benchmark pipeline from `example_benchmark.py`. |
| `evaluate_existing_forecasts.py` | **Bring your own forecasts.** Points the pipeline at pre-existing prediction parquets and runs only evaluation + analysis (no backtesting). |
| `compare_liander2024_results.py` | Compare results from multiple runs on the **Liander 2024** dataset. Auto-detects which targets are available in all runs. |
| `compare_custom_results.py` | Compare results from multiple runs on the **custom** benchmark. Same auto-detection as above. |

## Quick Start

```bash
# 1. Clone the repo
git clone git@github.com:OpenSTEF/openstef.git -b "release/v4.0.0"
cd openstef

# 2. Install all packages (requires uv: https://docs.astral.sh/uv/)
uv sync --all-extras --all-groups --all-packages
```

### Run the Liander 2024 benchmark

Uses the built-in Liander 2024 dataset (auto-downloaded from HuggingFace). Runs the example baseline and GBLinear on all target categories.

```bash
uv run python -m examples.benchmarks.custom_benchmark.run_liander2024_benchmark
```

### Run the custom benchmark

Uses the custom target provider from `example_benchmark.py` with your own pipeline config. Runs on `solar_park` targets by default.

```bash
uv run python -m examples.benchmarks.custom_benchmark.run_benchmark
```

### Evaluate pre-existing forecasts (no backtesting)

If you already have predictions from your own model or external system, you can skip backtesting entirely. Place your forecast parquets in the expected directory layout and run only evaluation + analysis.

#### Required directory layout

```
benchmark_results/MyForecasts/
└── backtest/
└── <group_name>/ # e.g. "solar_park"
└── <target_name>/ # e.g. "Within 15 kilometers of Opmeer_normalized"
└── predictions.parquet
```

`group_name` and `target_name` must match the values from your targets YAML. You can list them:

```bash
uv run python -c "
from examples.benchmarks.custom_benchmark.example_benchmark import create_custom_benchmark_runner
for t in create_custom_benchmark_runner().target_provider.get_targets(['solar_park']):
print(t.group_name, '/', t.name)
"
```

#### Required parquet format

Each `predictions.parquet` must have:

| Column | Type | Description |
|---|---|---|
| *(index)* `timestamp` | `DatetimeIndex` | When each prediction is valid for. 15-min intervals, tz-naive UTC. |
| `available_at` | `datetime64` | When the prediction was generated (enables D-1 / lead-time filtering). |
| `quantile_P05` | `float` | 5th percentile prediction. |
| `quantile_P50` | `float` | Median prediction (**required**). |
| `quantile_P95` | `float` | 95th percentile prediction. |
| ... | `float` | One column per quantile, named with `Quantile(x).format()`. |

Example rows:

```
timestamp (index) available_at quantile_P05 quantile_P50 quantile_P95
2023-01-15 12:00:00 2023-01-14 06:00:00 0.5 1.2 2.0
2023-01-15 12:15:00 2023-01-14 06:00:00 0.6 1.3 2.1
```

#### Run

```bash
uv run python -m examples.benchmarks.custom_benchmark.evaluate_existing_forecasts
```

See `evaluate_existing_forecasts.py` for the full script.

Results are written to `./benchmark_results/`. Each model gets its own subfolder with backtest predictions, evaluation scores, and analysis plots.

### Compare results across runs

After running at least two models, generate side-by-side comparison plots (global, per-group, per-target). The scripts automatically detect which targets are available in all runs.

```bash
# Compare on the Liander 2024 dataset
uv run python -m examples.benchmarks.custom_benchmark.compare_liander2024_results

# Compare on the custom benchmark
uv run python -m examples.benchmarks.custom_benchmark.compare_custom_results
```

Comparison output (HTML plots) is saved to `./benchmark_results_comparison/`.

## Creating Your Own

### 1. Write a forecaster

Copy `example_baseline.py` and implement two methods:

- **`fit(data)`** -- called periodically with recent history. Train your model here.
- **`predict(data)`** -- called every few hours. Return a `TimeSeriesDataset` with a `"load"` column and one column per quantile (e.g. `"quantile_P05"`, `"quantile_P50"`).

The `data` argument is a `RestrictedHorizonVersionedTimeSeries` -- it enforces no-lookahead by only exposing data available at `data.horizon`. Use `data.get_window(start, end, available_before)` to retrieve slices.

### 2. Define a benchmark (optional)

Copy `example_benchmark.py` if you want to use **your own data**. The key class is `SimpleTargetProvider` -- override `_get_measurements_path_for_target()` and `_get_weather_path_for_target()` to point to your parquet files.

If you're fine with the Liander 2024 dataset, skip this step and use `create_liander2024_benchmark_runner()` directly.

### 3. Write a runner

Copy `run_benchmark.py`. Register your models as forecaster factories and call `pipeline.run()`.
48 changes: 48 additions & 0 deletions examples/benchmarks/custom_benchmark/compare_custom_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Compare benchmark results from different runs on the custom benchmark.

Usage:
1. First run at least two models with run_benchmark.py
(e.g. ExampleBaseline and GBLinear).
2. Then run this script to generate side-by-side comparison plots.

Output is saved to ./benchmark_results_comparison/custom/.
"""

# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0

from pathlib import Path
from typing import cast

from examples.benchmarks.custom_benchmark.example_benchmark import ANALYSIS_CONFIG, create_custom_benchmark_runner
from openstef_beam.analysis.models import RunName
from openstef_beam.benchmarking import BenchmarkComparisonPipeline, LocalBenchmarkStorage
from openstef_beam.benchmarking.storage import BenchmarkStorage

# One storage per run — keys are human-readable labels shown in comparison plots.
run_storages: dict[RunName, BenchmarkStorage] = {
"ExampleBaseline": LocalBenchmarkStorage(base_path=Path("./benchmark_results/ExampleBaseline")),
"GBLinear": LocalBenchmarkStorage(base_path=Path("./benchmark_results/GBLinear")),
}

# Check that results exist.
for name, storage in run_storages.items():
base_path = cast(LocalBenchmarkStorage, storage).base_path
if not base_path.exists():
msg = f"Benchmark directory not found for '{name}': {base_path}. Run the benchmarks first."
raise FileNotFoundError(msg)

# Reuse the custom target provider.
OUTPUT_PATH = Path("./benchmark_results_comparison/custom")
target_provider = create_custom_benchmark_runner(
storage=LocalBenchmarkStorage(base_path=OUTPUT_PATH),
).target_provider

# Run the comparison — generates global, group, and per-target HTML plots.
comparison = BenchmarkComparisonPipeline(
analysis_config=ANALYSIS_CONFIG,
storage=LocalBenchmarkStorage(base_path=OUTPUT_PATH),
target_provider=target_provider,
)
comparison.run(run_data=run_storages, filter_args=["solar_park"])
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Compare benchmark results from different runs on the Liander 2024 dataset.

Usage:
1. First run at least two models with run_liander2024_benchmark.py
(e.g. ExampleBaseline and GBLinear).
2. Then run this script to generate side-by-side comparison plots.

Output is saved to ./benchmark_results_comparison/liander2024/.
"""

# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0

from pathlib import Path
from typing import cast

from openstef_beam.analysis.models import RunName
from openstef_beam.benchmarking import BenchmarkComparisonPipeline, LocalBenchmarkStorage
from openstef_beam.benchmarking.benchmarks import create_liander2024_benchmark_runner
from openstef_beam.benchmarking.benchmarks.liander2024 import LIANDER2024_ANALYSIS_CONFIG
from openstef_beam.benchmarking.storage import BenchmarkStorage

# One storage per run — keys are human-readable labels shown in comparison plots.
run_storages: dict[RunName, BenchmarkStorage] = {
"ExampleBaseline": LocalBenchmarkStorage(base_path=Path("./benchmark_results/ExampleBaseline")),
"GBLinear": LocalBenchmarkStorage(base_path=Path("./benchmark_results/GBLinear")),
}

# Check that results exist.
for name, storage in run_storages.items():
base_path = cast(LocalBenchmarkStorage, storage).base_path
if not base_path.exists():
msg = f"Benchmark directory not found for '{name}': {base_path}. Run the benchmarks first."
raise FileNotFoundError(msg)

# Reuse the Liander 2024 target provider.
OUTPUT_PATH = Path("./benchmark_results_comparison/liander2024")
target_provider = create_liander2024_benchmark_runner(
storage=LocalBenchmarkStorage(base_path=OUTPUT_PATH),
).target_provider

# Run the comparison — generates global, group, and per-target HTML plots.
comparison = BenchmarkComparisonPipeline(
analysis_config=LIANDER2024_ANALYSIS_CONFIG,
storage=LocalBenchmarkStorage(base_path=OUTPUT_PATH),
target_provider=target_provider,
)
comparison.run(run_data=run_storages)
104 changes: 104 additions & 0 deletions examples/benchmarks/custom_benchmark/evaluate_existing_forecasts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""Evaluate pre-existing forecasts without running backtesting.

If you already have forecast predictions (e.g. from your own model or an external
system), you can point the benchmark pipeline at them and run only the evaluation
and analysis steps.

How it works:
1. Place your prediction parquet files in the expected directory layout (see below).
2. Run this script — the pipeline detects existing backtest output and
automatically skips to evaluation + analysis.

Expected directory layout::

benchmark_results/MyForecasts/
└── backtest/
└── <group_name>/ # e.g. "solar_park"
└── <target_name>/ # e.g. "Within 15 kilometers of Opmeer_normalized"
└── predictions.parquet

Expected parquet format::

Index: pd.DatetimeIndex (name="timestamp", tz-naive UTC, 15-min intervals)
Columns:
- "available_at" (datetime) — when the prediction was generated
- "quantile_P05" (float) — 5th percentile prediction
- "quantile_P50" (float) — median prediction (REQUIRED)
- "quantile_P95" (float) — 95th percentile prediction
- ...one column per quantile, named with Quantile(x).format()

Example row::

timestamp (index) available_at quantile_P05 quantile_P50 quantile_P95
2023-01-15 12:00:00 2023-01-14 06:00:00 0.5 1.2 2.0

You can list the expected target names and group names by checking the targets.yaml
in your dataset, or by running::

runner = create_custom_benchmark_runner()
for t in runner.target_provider.get_targets(["solar_park"]):
print(t.group_name, t.name)

The pipeline still needs a "forecaster factory" to know which quantiles were used,
but fit() and predict() are never called. We use DummyForecaster for this.
"""

# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0

import logging
import multiprocessing
import os
from pathlib import Path

from examples.benchmarks.custom_benchmark.example_benchmark import create_custom_benchmark_runner
from openstef_beam.backtesting.backtest_forecaster import DummyForecaster
from openstef_beam.benchmarking import BenchmarkContext, BenchmarkTarget, LocalBenchmarkStorage
from openstef_core.types import Q

os.environ["OMP_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"

_logger = logging.getLogger(__name__)

logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(levelname)s] %(message)s")

# Path to the folder that contains the backtest/ directory with your parquets.
OUTPUT_PATH = Path("./benchmark_results/MyForecasts")
N_PROCESSES = multiprocessing.cpu_count()

# Quantiles your forecasts were generated for (must include 0.5 = median).
# Adjust this list to match whatever quantiles are in your parquet columns.
PREDICTION_QUANTILES = [Q(0.05), Q(0.1), Q(0.3), Q(0.5), Q(0.7), Q(0.9), Q(0.95)]


def stub_factory(_context: BenchmarkContext, _target: BenchmarkTarget) -> DummyForecaster:
"""Factory that returns a DummyForecaster (backtesting is skipped).

DummyForecaster provides quantile info to the pipeline but never runs
fit() or predict() since backtest output already exists on disk.

Returns:
DummyForecaster with the configured quantiles.
"""
return DummyForecaster(predict_quantiles=PREDICTION_QUANTILES)


if __name__ == "__main__":
# Point the storage at your results folder.
# The pipeline reads parquets from:
# OUTPUT_PATH / backtest / <group_name> / <target_name> / predictions.parquet
storage = LocalBenchmarkStorage(base_path=OUTPUT_PATH)

runner = create_custom_benchmark_runner(storage=storage)

# Run the pipeline — backtesting is auto-skipped for every target that
# already has a predictions.parquet on disk.
runner.run(
forecaster_factory=stub_factory,
run_name="my_forecasts",
n_processes=N_PROCESSES,
filter_args=["solar_park"],
)
Loading
Loading