diff --git a/qiskit_machine_learning/optimizers/__init__.py b/qiskit_machine_learning/optimizers/__init__.py index df5653478..f5ccea049 100644 --- a/qiskit_machine_learning/optimizers/__init__.py +++ b/qiskit_machine_learning/optimizers/__init__.py @@ -122,6 +122,7 @@ """ from .adam_amsgrad import ADAM +from .nadam import NAdam from .aqgd import AQGD from .cg import CG from .cobyla import COBYLA @@ -157,6 +158,7 @@ "OptimizerResult", "Minimizer", "ADAM", + "NAdam", "AQGD", "CG", "COBYLA", diff --git a/qiskit_machine_learning/optimizers/nadam.py b/qiskit_machine_learning/optimizers/nadam.py new file mode 100644 index 000000000..12acfc005 --- /dev/null +++ b/qiskit_machine_learning/optimizers/nadam.py @@ -0,0 +1,148 @@ +# This code is part of a Qiskit project. +# +# (C) Copyright IBM 2019, 2025. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""NAdam Optimizer""" + +from __future__ import annotations +import os +import numpy as np +from typing import Callable +from .optimizer import Optimizer, OptimizerResult, POINT, OptimizerSupportLevel + + +class NAdam(Optimizer): + """NAdam optimizer (Nesterov-accelerated Adaptive Moment Estimation).""" + + def __init__( + self, + maxiter: int = 200, + tol: float = 1e-6, + lr: float = 0.001, + beta_1: float = 0.9, + beta_2: float = 0.999, + eps: float = 1e-8, + noise_factor: float = 1e-8, + callback: Callable | None = None, + snapshot_dir: str | None = None, + ) -> None: + """Initialize NAdam optimizer.""" + super().__init__() + self.maxiter = maxiter + self.tol = tol + self.lr = lr + self.beta_1 = beta_1 + self.beta_2 = beta_2 + self.eps = eps + self.noise_factor = noise_factor + self.callback = callback + self.snapshot_dir = snapshot_dir + + # Internal state + self._m = None + self._v = None + self._t = 0 + + def get_support_level(self): + """Return the support level for NAdam optimizer.""" + return { + "gradient": OptimizerSupportLevel.ignored, + "bounds": OptimizerSupportLevel.ignored, + "initial_point": OptimizerSupportLevel.required, + } + + @property + def settings(self): + """Return optimizer settings as a dictionary.""" + return { + "maxiter": self.maxiter, + "tol": self.tol, + "lr": self.lr, + "beta_1": self.beta_1, + "beta_2": self.beta_2, + "eps": self.eps, + "noise_factor": self.noise_factor, + "callback": self.callback, + "snapshot_dir": self.snapshot_dir, + } + + def minimize( + self, + fun: Callable[[POINT], float], + x0: POINT, + jac: Callable[[POINT], POINT] | None = None, + bounds: list[tuple[float, float]] | None = None, + ) -> OptimizerResult: + """Minimize the scalar function using NAdam.""" + + result = OptimizerResult() + x = np.array(x0, dtype=float) + self._m = np.zeros_like(x) + self._v = np.zeros_like(x) + self._t = 0 + + for i in range(self.maxiter): + self._t += 1 + + # Compute gradient numerically + grad = self.gradient_num_diff(x, fun, self.eps) + + # Add optional stochastic noise + grad += self.noise_factor * np.random.randn(*grad.shape) + + # NAdam update rule + m_hat = self.beta_1 * self._m + (1 - self.beta_1) * grad + v_hat = self.beta_2 * self._v + (1 - self.beta_2) * (grad ** 2) + + m_corr = m_hat / (1 - self.beta_1 ** self._t) + v_corr = v_hat / (1 - self.beta_2 ** self._t) + + # Nesterov momentum + x_update = self.lr * (self.beta_1 * m_corr + (1 - self.beta_1) * grad / (1 - self.beta_1 ** self._t)) / (np.sqrt(v_corr) + self.eps) + x -= x_update + + # Update state + self._m = m_hat + self._v = v_hat + + fval = fun(x) + + # Callback + if self.callback is not None: + self.callback(self._t, x, fval) + + # Save snapshot + if self.snapshot_dir is not None: + np.save(os.path.join(self.snapshot_dir, f"nadam_m_{i}.npy"), self._m) + np.save(os.path.join(self.snapshot_dir, f"nadam_v_{i}.npy"), self._v) + np.save(os.path.join(self.snapshot_dir, f"nadam_x_{i}.npy"), x) + + # Check convergence + if np.linalg.norm(x_update) < self.tol: + break + + result.x = x + result.fun = fun(x) + result.nfev = self._t + result.nit = self._t + + return result + + def load_params(self, snapshot_dir: str): + """Load optimizer state from snapshot files.""" + last_iter = max([int(f.split("_")[-1].split(".")[0]) + for f in os.listdir(snapshot_dir) if f.startswith("nadam_x_")], default=-1) + if last_iter >= 0: + self._m = np.load(os.path.join(snapshot_dir, f"nadam_m_{last_iter}.npy")) + self._v = np.load(os.path.join(snapshot_dir, f"nadam_v_{last_iter}.npy")) + # x can also be restored if needed + # x = np.load(os.path.join(snapshot_dir, f"nadam_x_{last_iter}.npy")) + self._t = last_iter + 1 diff --git a/test/optimizers/test_nadam.py b/test/optimizers/test_nadam.py new file mode 100644 index 000000000..03a7cdf55 --- /dev/null +++ b/test/optimizers/test_nadam.py @@ -0,0 +1,96 @@ +# This code is part of a Qiskit project. +# +# (C) Copyright IBM 2019, 2025. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Test of NAdam optimizer""" + +import unittest +import tempfile +import numpy as np +from ddt import ddt + +# Import the test case base class +from test.algorithms_test_case import QiskitAlgorithmsTestCase +from qiskit_machine_learning.optimizers import NAdam +from qiskit_machine_learning.utils import algorithm_globals + + +@ddt +class TestOptimizerNAdam(QiskitAlgorithmsTestCase): + """Test NAdam optimizer""" + + def setUp(self): + super().setUp() + algorithm_globals.random_seed = 42 + self.quadratic_objective = lambda x: x[0] ** 2 + x[1] ** 2 + self.initial_point = np.array([1.0, 1.0]) + + def test_optimizer_minimize(self): + """Test NAdam optimizer minimize method""" + nadam = NAdam(maxiter=200, tol=1e-6, lr=1e-1) + result = nadam.minimize(self.quadratic_objective, self.initial_point) + self.assertAlmostEqual(result.fun, 0.0, places=6) + self.assertTrue(np.allclose(result.x, np.zeros_like(self.initial_point), atol=1e-2)) + + def test_optimizer_with_noise(self): + """Test NAdam optimizer with noise factor""" + nadam = NAdam(maxiter=150, tol=1e-6, lr=1e-1, noise_factor=1e-2) + result = nadam.minimize(self.quadratic_objective, self.initial_point) + self.assertAlmostEqual(result.fun, 0.0, places=4) + self.assertTrue(np.allclose(result.x, np.zeros_like(self.initial_point), atol=1e-2)) + + def test_save_load_params(self): + """Test save and load optimizer parameters""" + with tempfile.TemporaryDirectory() as tmpdir: + nadam = NAdam(maxiter=100, tol=1e-6, lr=1e-1, snapshot_dir=tmpdir) + nadam.minimize(self.quadratic_objective, self.initial_point) + new_nadam = NAdam(snapshot_dir=tmpdir) + new_nadam.load_params(tmpdir) + + self.assertTrue(np.allclose(nadam._m, new_nadam._m)) + self.assertTrue(np.allclose(nadam._v, new_nadam._v)) + self.assertEqual(nadam._t, new_nadam._t) + + def test_settings(self): + """Test settings property""" + nadam = NAdam(maxiter=100, tol=1e-6, lr=1e-1) + settings = nadam.settings + self.assertEqual(settings["maxiter"], 100) + self.assertEqual(settings["tol"], 1e-6) + self.assertEqual(settings["lr"], 1e-1) + self.assertEqual(settings["beta_1"], 0.9) + self.assertEqual(settings["beta_2"], 0.999) + self.assertEqual(settings["eps"], 1e-8) + self.assertEqual(settings["noise_factor"], 1e-8) + # NAdam does not have amsgrad, so just check key safely + self.assertIsNone(settings.get("amsgrad")) + self.assertEqual(settings["snapshot_dir"], None) + + def test_callback(self): + """Test using the callback.""" + history = {"ite": [], "weights": [], "fvals": []} + + def callback(n_t, weight, fval): + history["ite"].append(n_t) + history["weights"].append(weight) + history["fvals"].append(fval) + + nadam = NAdam(maxiter=100, tol=1e-6, lr=1e-1, callback=callback) + nadam.minimize(self.quadratic_objective, self.initial_point) + + expected_types = [int, np.ndarray, float] + for i, (key, values) in enumerate(history.items()): + self.assertTrue(all(isinstance(value, expected_types[i]) for value in values)) + self.assertEqual(len(history[key]), 100) + + +if __name__ == "__main__": + unittest.main()