diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 404c101..dad266b 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -28,6 +28,14 @@ jobs: run: | pip install -e .[test] + - name: Formatting and type checking + if: matrix.python-version == '3.11' + run: | + pip install mypy black isort + isort --check-only . + black --check . + mypy --strict . + - name: Run tests run: | pytest -v --cov=bayes_kit test/ diff --git a/bayes_kit/__init__.py b/bayes_kit/__init__.py index ad13f79..ac85c59 100644 --- a/bayes_kit/__init__.py +++ b/bayes_kit/__init__.py @@ -1,15 +1,14 @@ # functions from .autocorr import autocorr -from .ess import ess, ess_imse, ess_ipse -from .iat import iat, iat_imse, iat_ipse -from .rhat import rhat # classes from .ensemble import Stretcher +from .ess import ess, ess_imse, ess_ipse from .hmc import HMCDiag +from .iat import iat, iat_imse, iat_ipse from .mala import MALA from .metropolis import Metropolis, MetropolisHastings -from .ensemble import Stretcher +from .rhat import rhat from .smc import TemperedLikelihoodSMC __all__ = [ @@ -18,4 +17,11 @@ "Metropolis", "MetropolisHastings", "TemperedLikelihoodSMC", + "ess", + "ess_imse", + "ess_ipse", + "iat", + "iat_imse", + "iat_ipse", + "rhat", ] diff --git a/bayes_kit/autocorr.py b/bayes_kit/autocorr.py index 7ca7cc6..5ce8751 100644 --- a/bayes_kit/autocorr.py +++ b/bayes_kit/autocorr.py @@ -1,14 +1,13 @@ import numpy as np -import numpy.typing as npt -FloatType = np.float64 -VectorType = npt.NDArray[FloatType] +from .typing import ChainType, VectorType -def autocorr(chain: VectorType) -> VectorType: + +def autocorr(chain: ChainType) -> VectorType: """Return sample autocorrelations at all lags from 0 to the length of the sequence minus 1 for the specified sequence. The returned vector will thus be the same size as the input vector. - + Algorithmically, this function calls NumPy's fast Fourier transform and inverse fast Fourier transforms. @@ -23,6 +22,7 @@ def autocorr(chain: VectorType) -> VectorType: """ if len(chain) < 2: raise ValueError(f"autocorr requires len(chain) >= 2, but {len(chain)=}") + chain = np.asarray(chain) size = 2 ** np.ceil(np.log2(2 * len(chain) - 1)).astype("int") var = np.var(chain) ndata = chain - np.mean(chain) diff --git a/bayes_kit/ensemble.py b/bayes_kit/ensemble.py index 5e2e3a9..59f3755 100644 --- a/bayes_kit/ensemble.py +++ b/bayes_kit/ensemble.py @@ -1,8 +1,9 @@ from typing import Callable, Optional, Tuple -from numpy.typing import NDArray + import numpy as np +from numpy.typing import NDArray -from .model_types import LogDensityModel +from .typing import LogDensityModel class Stretcher: diff --git a/bayes_kit/ess.py b/bayes_kit/ess.py index dfb2cf0..45d73c3 100644 --- a/bayes_kit/ess.py +++ b/bayes_kit/ess.py @@ -1,11 +1,5 @@ -import numpy as np -import numpy.typing as npt -import bayes_kit.autocorr as autocorr -from bayes_kit.iat import iat, iat_ipse, iat_imse - -FloatType = np.float64 -IntType = np.int64 -VectorType = npt.NDArray[FloatType] +from .iat import iat, iat_imse, iat_ipse +from .typing import FloatType, VectorType def ess_ipse(chain: VectorType) -> FloatType: diff --git a/bayes_kit/hmc.py b/bayes_kit/hmc.py index d1c591d..2fe6f74 100644 --- a/bayes_kit/hmc.py +++ b/bayes_kit/hmc.py @@ -1,10 +1,8 @@ -from typing import Iterator, Optional, Union -from numpy.typing import NDArray -import numpy as np +from typing import Iterator, Optional -from .model_types import GradModel +import numpy as np -Draw = tuple[NDArray[np.float64], float] +from .typing import DrawAndLogP, GradModel, Seed, VectorType class HMCDiag: @@ -13,9 +11,9 @@ def __init__( model: GradModel, stepsize: float, steps: int, - metric_diag: Optional[NDArray[np.float64]] = None, - init: Optional[NDArray[np.float64]] = None, - seed: Union[None, int, np.random.BitGenerator, np.random.Generator] = None, + metric_diag: Optional[VectorType] = None, + init: Optional[VectorType] = None, + seed: Optional[Seed] = None, ): self._model = model self._dim = self._model.dims() @@ -29,19 +27,19 @@ def __init__( else self._rng.normal(size=self._dim) ) - def __iter__(self) -> Iterator[Draw]: + def __iter__(self) -> Iterator[DrawAndLogP]: return self - def __next__(self) -> Draw: + def __next__(self) -> DrawAndLogP: return self.sample() - def joint_logp(self, theta: NDArray[np.float64], rho: NDArray[np.float64]) -> float: + def joint_logp(self, theta: VectorType, rho: VectorType) -> float: adj: float = 0.5 * np.dot(rho, self._metric * rho) return self._model.log_density(theta) - adj def leapfrog( - self, theta: NDArray[np.float64], rho: NDArray[np.float64] - ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: + self, theta: VectorType, rho: VectorType + ) -> tuple[VectorType, VectorType]: # Initialize rho_mid by going backwards half a step so that the first full-step inside the loop brings rho_mid # up to +1/2 steps. Note that if self._steps == 0, the loop is skipped and the -0.5 and +0.5 steps cancel. lp, grad = self._model.log_density_gradient(theta) @@ -54,7 +52,7 @@ def leapfrog( rho = rho_mid + 0.5 * self._stepsize * np.multiply(self._metric, grad) return (theta, rho) - def sample(self) -> Draw: + def sample(self) -> DrawAndLogP: rho = self._rng.normal(size=self._dim) logp = self.joint_logp(self._theta, rho) theta_prop, rho_prop = self.leapfrog(self._theta, rho) diff --git a/bayes_kit/iat.py b/bayes_kit/iat.py index ea42a9e..2dfe0e2 100644 --- a/bayes_kit/iat.py +++ b/bayes_kit/iat.py @@ -1,13 +1,10 @@ -import numpy as np -import numpy.typing as npt -import bayes_kit.autocorr as autocorr +from typing import Sequence, Union -FloatType = np.float64 -IntType = np.int64 -VectorType = npt.NDArray[FloatType] +from .autocorr import autocorr +from .typing import FloatType, VectorType -def _end_pos_pairs(acor: VectorType) -> IntType: +def _end_pos_pairs(acor: Union[Sequence[float], VectorType]) -> int: """ Return the index 1 past the last positive pair of autocorrelations starting on an even index. The sequence `acor` should contain @@ -91,7 +88,8 @@ def iat_ipse(chain: VectorType) -> FloatType: raise ValueError(f"ess requires len(chains) >= 4, but {len(chain)=}") acor = autocorr(chain) n = _end_pos_pairs(acor) - return 2 * acor[0:n].sum() - 1 + iat: FloatType = 2 * acor[0:n].sum() - 1 + return iat def iat_imse(chain: VectorType) -> FloatType: @@ -126,7 +124,7 @@ def iat_imse(chain: VectorType) -> FloatType: raise ValueError(f"iat requires len(chains) >=4, but {len(chain) = }") acor = autocorr(chain) n = _end_pos_pairs(acor) - prev_min = acor[0] + acor[1] + prev_min: FloatType = acor[0] + acor[1] acor_sum = prev_min i = 2 while i + 1 < n: diff --git a/bayes_kit/mala.py b/bayes_kit/mala.py index 8a00fb8..6fd09be 100644 --- a/bayes_kit/mala.py +++ b/bayes_kit/mala.py @@ -1,11 +1,9 @@ from typing import Iterator, Optional, Union -from numpy.typing import NDArray + import numpy as np from .metropolis import metropolis_hastings_accept_test -from .model_types import GradModel - -Draw = tuple[NDArray[np.float64], float] +from .typing import DrawAndLogP, GradModel, Seed, VectorType # Note: MALA is an instance of a Metropolis-Hastings algorithm, but we do not # implement it here as a subclass in order to cache gradient calls. @@ -18,8 +16,8 @@ def __init__( self, model: GradModel, epsilon: float, - init: Optional[NDArray[np.float64]] = None, - seed: Union[None, int, np.random.BitGenerator, np.random.Generator] = None, + init: Optional[VectorType] = None, + seed: Optional[Seed] = None, ): self._model = model self._epsilon = epsilon @@ -33,13 +31,13 @@ def __init__( self._log_p_theta, logpgrad = self._model.log_density_gradient(self._theta) self._log_p_grad_theta = np.asanyarray(logpgrad) - def __iter__(self) -> Iterator[Draw]: + def __iter__(self) -> Iterator[DrawAndLogP]: return self - def __next__(self) -> Draw: + def __next__(self) -> DrawAndLogP: return self.sample() - def sample(self) -> Draw: + def sample(self) -> DrawAndLogP: theta_prop = ( self._theta + self._epsilon * self._log_p_grad_theta @@ -69,9 +67,9 @@ def sample(self) -> Draw: def _proposal_log_density( self, - theta_prime: NDArray[np.float64], - theta: NDArray[np.float64], - grad_theta: NDArray[np.float64], + theta_prime: VectorType, + theta: VectorType, + grad_theta: VectorType, ) -> float: """ Conditional log probability of the proposed parameters diff --git a/bayes_kit/metropolis.py b/bayes_kit/metropolis.py index 3f5ebbc..b5a36d4 100644 --- a/bayes_kit/metropolis.py +++ b/bayes_kit/metropolis.py @@ -1,15 +1,12 @@ -from typing import Callable, Iterator, Optional, Union -from numpy.typing import NDArray, ArrayLike +from typing import Callable, Iterator, Optional + import numpy as np +from numpy.typing import ArrayLike -from .model_types import LogDensityModel +from .typing import DrawAndLogP, LogDensityModel, Seed, VectorType -# TODO: Add to global type definitions -Vector = NDArray[np.float64] -Draw = tuple[Vector, float] # s.b. (TO_STATE, FROM_STATE) -> log_prob (float) -TransitionLPFn = Callable[[Vector, Vector], float] -Seed = Union[int, np.random.BitGenerator, np.random.Generator] +TransitionLPFn = Callable[[VectorType, VectorType], float] def metropolis_accept_test( @@ -83,10 +80,10 @@ class MetropolisHastings: def __init__( self, model: LogDensityModel, - proposal_fn: Callable[[Vector], ArrayLike], + proposal_fn: Callable[[VectorType], ArrayLike], transition_lp_fn: TransitionLPFn, *, - init: Optional[Vector] = None, + init: Optional[VectorType] = None, seed: Optional[Seed] = None, ): self._model = model @@ -101,32 +98,32 @@ def __init__( ) self._log_p_theta = self._model.log_density(self._theta) - def __iter__(self) -> Iterator[Draw]: + def __iter__(self) -> Iterator[DrawAndLogP]: return self - def __next__(self) -> Draw: + def __next__(self) -> DrawAndLogP: return self.sample() - def sample(self) -> Draw: + def sample(self) -> DrawAndLogP: proposal, lp_proposal = self._propose() accepted = self._accept_test(lp_proposal, proposal) if accepted: self._update_theta(proposal, lp_proposal) return (self._theta, self._log_p_theta) - def _propose(self) -> Draw: + def _propose(self) -> DrawAndLogP: untyped_proposed_theta = np.asanyarray( self._proposal_fn(self._theta), dtype=np.float64 ) - proposed_theta: NDArray[np.float64] = untyped_proposed_theta + proposed_theta: VectorType = untyped_proposed_theta lp_proposed_theta = self._model.log_density(proposed_theta) return (proposed_theta, lp_proposed_theta) - def _update_theta(self, theta: Vector, log_p_theta: float) -> None: + def _update_theta(self, theta: VectorType, log_p_theta: float) -> None: self._theta = theta self._log_p_theta = log_p_theta - def _accept_test(self, lp_proposal: float, proposal: Vector) -> bool: + def _accept_test(self, lp_proposal: float, proposal: VectorType) -> bool: lp_forward_transition = self._transition_lp_fn(proposal, self._theta) lp_reverse_transition = self._transition_lp_fn(self._theta, proposal) return metropolis_hastings_accept_test( @@ -142,9 +139,9 @@ class Metropolis(MetropolisHastings): def __init__( self, model: LogDensityModel, - proposal_fn: Callable[[Vector], ArrayLike], + proposal_fn: Callable[[VectorType], ArrayLike], *, - init: Optional[Vector] = None, + init: Optional[VectorType] = None, seed: Optional[Seed] = None, ): # This transition function will never be used--it isn't needed for Metropolis, @@ -154,5 +151,5 @@ def __init__( super().__init__(model, proposal_fn, dummy_transition_fn, init=init, seed=seed) # 'proposal' isn't used, but we need signature consistency to override the parent method - def _accept_test(self, lp_proposal: float, proposal: Vector) -> bool: + def _accept_test(self, lp_proposal: float, proposal: VectorType) -> bool: return metropolis_accept_test(lp_proposal, self._log_p_theta, self._rng) diff --git a/bayes_kit/model_types.py b/bayes_kit/model_types.py deleted file mode 100644 index aad4a31..0000000 --- a/bayes_kit/model_types.py +++ /dev/null @@ -1,35 +0,0 @@ -from typing import Protocol, Tuple -from numpy.typing import ArrayLike, NDArray -import numpy as np - - -class LogDensityModel(Protocol): - def dims(self) -> int: - """number of parameters""" - ... # pragma: no cover - - def log_density(self, params_unc: NDArray[np.float64]) -> float: - """unnormalized log density""" - ... # pragma: no cover - - -class GradModel(LogDensityModel, Protocol): - def log_density_gradient( - self, params_unc: NDArray[np.float64] - ) -> Tuple[float, ArrayLike]: - ... # pragma: no cover - - -class HessianModel(GradModel, Protocol): - def log_density_hessian( - self, params_unc: NDArray[np.float64] - ) -> Tuple[float, ArrayLike, ArrayLike]: - ... # pragma: no cover - - -class LogPriorLikelihoodModel(LogDensityModel, Protocol): - def log_prior(self, params_unc: NDArray[np.float64]) -> float: - ... # pragma: no cover - - def log_likelihood(self, params_unc: NDArray[np.float64]) -> float: - ... # pragma: no cover diff --git a/bayes_kit/rhat.py b/bayes_kit/rhat.py index a2908b1..81671eb 100644 --- a/bayes_kit/rhat.py +++ b/bayes_kit/rhat.py @@ -1,13 +1,12 @@ +from typing import List, Sequence + import numpy as np -from typing import Union, Sequence -from numpy.typing import NDArray import scipy as sp -FloatType = np.float64 -VectorType = NDArray[FloatType] -SeqType = Union[Sequence[float], NDArray[np.float64]] +from .typing import ChainType, FloatType, VectorType + -def split_chains(chains: list[SeqType]) -> list[SeqType]: +def split_chains(chains: Sequence[ChainType]) -> List[ChainType]: """Return a list of the input chains split in half. The result will be a list twice as long as the input. For odd sized chains, the first half will be one element longer. For example, @@ -24,7 +23,8 @@ def split_chains(chains: list[SeqType]) -> list[SeqType]: """ return [arr for chain in chains for arr in np.array_split(chain, 2)] -def rank_chains(chains: list[SeqType]) -> list[SeqType]: + +def rank_chains(chains: Sequence[ChainType]) -> List[VectorType]: """ Returns a copy of the included Markov chains with all values transformed to ranks. Ranks are ascending and start at 1. @@ -47,9 +47,9 @@ def rank_chains(chains: list[SeqType]) -> list[SeqType]: List of chains with values replaced by transformed ranks. """ if len(chains) == 0: - return chains + return chains # type: ignore flattened = np.concatenate(chains) - ranks = flattened.argsort().argsort() + 1 + ranks = (flattened.argsort().argsort() + 1).astype(np.float64) reshaped_arrays = [] current_index = 0 for array in chains: @@ -58,7 +58,8 @@ def rank_chains(chains: list[SeqType]) -> list[SeqType]: current_index += size return reshaped_arrays -def rank_normalize_chains(chains: list[SeqType]) -> list[SeqType]: + +def rank_normalize_chains(chains: Sequence[ChainType]) -> List[List[float]]: """Return the rank-normalized version of the input chains. Rank normalization maps the ranks to the range (0, 1) and then returns @@ -70,11 +71,11 @@ def rank_normalize_chains(chains: list[SeqType]) -> list[SeqType]: ``` inv_Phi((rank[i][j] - 3/8) / (size(chains) - 1/4), ``` - where + where * `inv_Phi` is the inverse cumulative distribution function for the standard normal distribution, * `rank[i][j] = rank_chains(chains)[i][j]` is the rank of element - `i` in chain `j`, and + `i` in chain `j`, and * `size(chains)` is the total number of elements in the chains. For a specification of ranking, see :func:`rank_chains`. @@ -101,10 +102,13 @@ def rank_normalize_chains(chains: list[SeqType]) -> list[SeqType]: S = sum([len(chain) for chain in chains]) result = [] for chain_i in rank_chains(chains): - result.append([sp.stats.norm.ppf((rank_ij - 0.325) / (S - 0.25)) for rank_ij in chain_i]) + result.append( + [sp.stats.norm.ppf((rank_ij - 0.325) / (S - 0.25)) for rank_ij in chain_i] + ) return result -def rhat(chains: list[SeqType]) -> FloatType: + +def rhat(chains: Sequence[ChainType]) -> FloatType: """Return the potential scale reduction factor (R-hat) for a list of Markov chains. @@ -166,7 +170,8 @@ def rhat(chains: list[SeqType]) -> FloatType: ) return r_hat -def split_rhat(chains: list[SeqType]) -> FloatType: + +def split_rhat(chains: Sequence[ChainType]) -> FloatType: """Return the potential scale reduction factor (R-hat) for a list of Markov chains consisting of each of the input chains split in half. @@ -196,7 +201,8 @@ def split_rhat(chains: list[SeqType]) -> FloatType: """ return rhat(split_chains(chains)) -def rank_normalized_rhat(chains: list[SeqType]) -> FloatType: + +def rank_normalized_rhat(chains: Sequence[ChainType]) -> FloatType: """Return the rank-normalized R-hat for the specified chains. Rank normalized r-hat replaces each value in the chains with its rank, applies a shifted inverse standard normal cdf, and diff --git a/bayes_kit/smc.py b/bayes_kit/smc.py index fb02f40..bf9cb8d 100644 --- a/bayes_kit/smc.py +++ b/bayes_kit/smc.py @@ -1,11 +1,12 @@ from typing import Callable, Iterator + import numpy as np -from numpy.typing import ArrayLike, NDArray -from bayes_kit.model_types import LogPriorLikelihoodModel +from numpy.typing import ArrayLike + +from .typing import LogPriorLikelihoodModel, VectorType -Vector = NDArray[np.float64] -DensityFunction = Callable[[Vector], float] -Kernel = Callable[[Vector, DensityFunction], ArrayLike] +DensityFunction = Callable[[VectorType], float] +Kernel = Callable[[VectorType, DensityFunction], ArrayLike] class TemperedLikelihoodSMC: @@ -17,7 +18,6 @@ def __init__( sample_initial: Callable[[int], ArrayLike], kernel: Kernel, ) -> None: - self.M = M self.N = N self.thetas = np.array(list(map(sample_initial, range(M)))) @@ -26,13 +26,13 @@ def __init__( self._model = model self.kernel = kernel - def log_prior(self, theta: Vector) -> float: + def log_prior(self, theta: VectorType) -> float: return self._model.log_prior(theta) - def log_likelihood(self, theta: Vector) -> float: + def log_likelihood(self, theta: VectorType) -> float: return self._model.log_likelihood(theta) - def __iter__(self) -> Iterator[Vector]: + def __iter__(self) -> Iterator[VectorType]: self.run() return iter(self.thetas) @@ -44,10 +44,10 @@ def time(self, n: int) -> float: return n / self.N def transition(self, n: int) -> None: - def lpminus1(theta: Vector) -> float: + def lpminus1(theta: VectorType) -> float: return self.log_likelihood(theta) * self.time(n - 1) + self.log_prior(theta) - def lp(theta: Vector) -> float: + def lp(theta: VectorType) -> float: return self.log_likelihood(theta) * self.time(n) + self.log_prior(theta) # note: try to do this in parallel @@ -62,8 +62,8 @@ def lp(theta: Vector) -> float: # TODO(bward): factor out def importance_resample( - thetas: Vector, lpminus1: DensityFunction, lp: DensityFunction -) -> Vector: + thetas: VectorType, lpminus1: DensityFunction, lp: DensityFunction +) -> VectorType: weights = np.exp( np.apply_along_axis(lp, axis=1, arr=thetas) - np.apply_along_axis(lpminus1, axis=1, arr=thetas) @@ -77,10 +77,10 @@ def importance_resample( # TODO(bward): factor out/reuse Metropolis sampler def metropolis_kernel(scale: float) -> Kernel: - def proposal_rng(theta: Vector) -> Vector: + def proposal_rng(theta: VectorType) -> VectorType: return np.random.normal(loc=theta, scale=scale) - def metropolis(theta: Vector, lp: DensityFunction) -> Vector: + def metropolis(theta: VectorType, lp: DensityFunction) -> VectorType: theta_star = proposal_rng(theta) if np.log(np.random.uniform()) < lp(theta_star) - lp(theta): return theta_star diff --git a/bayes_kit/typing.py b/bayes_kit/typing.py new file mode 100644 index 0000000..3f761fb --- /dev/null +++ b/bayes_kit/typing.py @@ -0,0 +1,42 @@ +from typing import Protocol, Sequence, Tuple, Union + +import numpy as np +from numpy.typing import ArrayLike, NDArray + +# Generic types used in most modules +FloatType = np.float64 +IntType = np.int64 +VectorType = NDArray[FloatType] +DrawAndLogP = tuple[VectorType, float] +Seed = Union[int, np.random.BitGenerator, np.random.Generator] +ChainType = Union[Sequence[float], VectorType] + + +class LogDensityModel(Protocol): + def dims(self) -> int: + """number of parameters""" + ... # pragma: no cover + + def log_density(self, params_unc: VectorType) -> float: + """unnormalized log density""" + ... # pragma: no cover + + +class GradModel(LogDensityModel, Protocol): + def log_density_gradient(self, params_unc: VectorType) -> Tuple[float, ArrayLike]: + ... # pragma: no cover + + +class HessianModel(GradModel, Protocol): + def log_density_hessian( + self, params_unc: VectorType + ) -> Tuple[float, ArrayLike, ArrayLike]: + ... # pragma: no cover + + +class LogPriorLikelihoodModel(LogDensityModel, Protocol): + def log_prior(self, params_unc: VectorType) -> float: + ... # pragma: no cover + + def log_likelihood(self, params_unc: VectorType) -> float: + ... # pragma: no cover diff --git a/pyproject.toml b/pyproject.toml index cb828bb..054bdca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,3 +9,6 @@ module = [ 'scipy.special' ] ignore_missing_imports = true + +[tool.isort] +profile = "black" diff --git a/test/models/binomial.py b/test/models/binomial.py index 3374ec8..0810b61 100644 --- a/test/models/binomial.py +++ b/test/models/binomial.py @@ -1,8 +1,11 @@ -from typing import Union -from scipy import stats as sst -from scipy.special import logit, expit as inv_logit, log1p +from typing import Optional + import numpy as np -import numpy.typing as npt +from scipy import stats as sst +from scipy.special import expit as inv_logit +from scipy.special import log1p, logit + +from bayes_kit.typing import Seed, VectorType class Binomial: @@ -18,7 +21,7 @@ def __init__( beta: float, x: int, N: int, - seed: Union[None, int, np.random.BitGenerator, np.random.Generator] = None, + seed: Optional[Seed] = None, ) -> None: self.alpha = alpha self.beta = beta @@ -30,7 +33,7 @@ def __init__( def dims(self) -> int: return 1 - def log_density(self, params_unc: npt.NDArray[np.float64]) -> float: + def log_density(self, params_unc: VectorType) -> float: """ This model's joint density is factored as the product of the prior density and the likelihood for testing with @@ -40,22 +43,20 @@ def log_density(self, params_unc: npt.NDArray[np.float64]) -> float: """ return self.log_likelihood(params_unc) + self.log_prior(params_unc) - def log_prior(self, params_unc: npt.NDArray[np.float64]) -> float: + def log_prior(self, params_unc: VectorType) -> float: theta: float = inv_logit(params_unc[0]) jac_adjust: float = np.log(theta) + log1p(-theta) prior: float = sst.beta.logpdf(theta, self.alpha, self.beta) return prior + jac_adjust - def log_likelihood(self, params_unc: npt.NDArray[np.float64]) -> float: + def log_likelihood(self, params_unc: VectorType) -> float: theta = inv_logit(params_unc[0]) return sst.binom.logpmf(self.x, self.N, theta) # type: ignore # scipy is not typed - def initial_state(self, _: int) -> npt.NDArray[np.float64]: + def initial_state(self, _: int) -> VectorType: return logit(self._rng.beta(self.alpha, self.beta, size=1)) # type: ignore # scipy is not typed - def log_density_gradient( - self, params_unc: npt.NDArray[np.float64] - ) -> tuple[float, npt.NDArray[np.float64]]: + def log_density_gradient(self, params_unc: VectorType) -> tuple[float, VectorType]: # use finite diffs for now epsilon = 0.000001 @@ -63,9 +64,7 @@ def log_density_gradient( lp_plus_e = self.log_density(params_unc + epsilon) return lp, np.array([(lp - lp_plus_e) / epsilon]) - def constrain_draws( - self, draws: npt.NDArray[np.float64] - ) -> npt.NDArray[np.float64]: + def constrain_draws(self, draws: VectorType) -> VectorType: return inv_logit(draws) # type: ignore # scipy is not typed def posterior_mean(self) -> float: diff --git a/test/models/skew_normal.py b/test/models/skew_normal.py index a3934c1..c093372 100644 --- a/test/models/skew_normal.py +++ b/test/models/skew_normal.py @@ -1,16 +1,16 @@ -import numpy.typing as npt +from typing import Optional, Union + import numpy as np from scipy import stats as sst -from typing import Optional, Union -from numpy.typing import NDArray -from numpy import float64 + +from bayes_kit.typing import VectorType class SkewNormal: def __init__( self, a: float = 4, - loc: Optional[Union[float, NDArray[float64]]] = None, + loc: Optional[Union[float, VectorType]] = None, finite_difference_epsilon: float = 0.000001, ) -> None: self.a = a @@ -20,12 +20,10 @@ def __init__( def dims(self) -> int: return 1 - def log_density(self, params_unc: npt.NDArray[np.float64]) -> float: + def log_density(self, params_unc: VectorType) -> float: return sst.skewnorm.logpdf(params_unc, self.a, loc=self._loc)[0] # type: ignore # scipy is not typed - def log_density_gradient( - self, params_unc: npt.NDArray[np.float64] - ) -> tuple[float, npt.NDArray[np.float64]]: + def log_density_gradient(self, params_unc: VectorType) -> tuple[float, VectorType]: lp = self.log_density(params_unc) lp_plus_epsilon = self.log_density(params_unc + self._epsilon) return lp, np.array([(lp - lp_plus_epsilon) / self._epsilon]) diff --git a/test/models/std_normal.py b/test/models/std_normal.py index 155ecf1..87a881a 100644 --- a/test/models/std_normal.py +++ b/test/models/std_normal.py @@ -1,18 +1,15 @@ -import numpy.typing as npt -import numpy as np +from bayes_kit.typing import VectorType class StdNormal: def dims(self) -> int: return 1 - def log_density(self, params_unc: npt.NDArray[np.float64]) -> float: + def log_density(self, params_unc: VectorType) -> float: theta: float = params_unc[0] return -0.5 * theta * theta - def log_density_gradient( - self, params_unc: npt.NDArray[np.float64] - ) -> tuple[float, npt.NDArray[np.float64]]: + def log_density_gradient(self, params_unc: VectorType) -> tuple[float, VectorType]: return -0.5 * params_unc[0] * params_unc[0], -params_unc def posterior_mean(self) -> float: diff --git a/test/test_autocorr.py b/test/test_autocorr.py index e633808..a3c2771 100644 --- a/test/test_autocorr.py +++ b/test/test_autocorr.py @@ -1,46 +1,47 @@ import numpy as np -import bayes_kit as bk -from bayes_kit.autocorr import autocorr import pytest as pt -def test_autocorr_fixed(): +from bayes_kit.autocorr import autocorr +from bayes_kit.typing import VectorType + +from .test_iat import sample_ar1 + + +def test_autocorr_fixed() -> None: y = np.asarray([1, 0, 0, 0]) ac = autocorr(y) ac_expected = np.asarray([1.000, -0.083, -0.167, -0.250]) - np.testing.assert_allclose(ac_expected, ac, atol=.001, rtol=0.001) + np.testing.assert_allclose(ac_expected, ac, atol=0.001, rtol=0.001) -def sample_ar1(rho, N): - z = np.random.normal(size = N) - for n in range(1, N): - z[n] += rho * z[n - 1] - return z - -def autocorr_ar1(rho, N): +def autocorr_ar1(rho: float, N: int) -> VectorType: ac = np.zeros(N) ac[0] = 1 for n in range(1, N): - ac[n] = ac[n - 1] * rho; + ac[n] = ac[n - 1] * rho return ac -def test_autocorr_ar1(): + +def test_autocorr_ar1() -> None: N = 3000 y = sample_ar1(0.5, N) ac = autocorr(y) ac_expected = autocorr_ar1(0.5, N) np.testing.assert_allclose(ac_expected[0:20], ac[0:20], atol=0.1, rtol=0.1) -def test_autocorr_independent(): + +def test_autocorr_independent() -> None: N = 3000 - y = np.random.normal(size = N) + y = np.random.normal(size=N) ac = autocorr(y) ac_expected = np.zeros(N) ac_expected[0] = 1 np.testing.assert_allclose(ac_expected[0:20], ac[0:20], atol=0.1, rtol=0.1) - -def test_autocorr_exceptions(): + + +def test_autocorr_exceptions() -> None: with pt.raises(ValueError): - bk.autocorr([]) + autocorr([]) with pt.raises(ValueError): - bk.autocorr([1.1]) - bk.autocorr([1.1, 1.2]) + autocorr([1.1]) + autocorr([1.1, 1.2]) diff --git a/test/test_equivalencies.py b/test/test_equivalencies.py index 3ba45db..131cd54 100644 --- a/test/test_equivalencies.py +++ b/test/test_equivalencies.py @@ -1,11 +1,13 @@ """Test different algorithms with known equivalencies against each other""" -from test.models.std_normal import StdNormal from test.models.skew_normal import SkewNormal -from bayes_kit import MALA, HMCDiag, Metropolis, MetropolisHastings +from test.models.std_normal import StdNormal + import numpy as np import scipy.stats as sst +from bayes_kit import MALA, HMCDiag, Metropolis, MetropolisHastings + def test_hmc_mala_agreement() -> None: """HMC with 1 step is equivalent to MALA""" diff --git a/test/test_ess.py b/test/test_ess.py index e78690e..044143d 100644 --- a/test/test_ess.py +++ b/test/test_ess.py @@ -1,12 +1,17 @@ import numpy as np -import bayes_kit as bk import pytest as pt -from .test_iat import sample_ar1, integrated_autocorr_time_ar1 -def expected_ess_ar1(rho, N): +import bayes_kit as bk +from bayes_kit.typing import VectorType + +from .test_iat import integrated_autocorr_time_ar1, sample_ar1 + + +def expected_ess_ar1(rho: float, N: int) -> float: return N / integrated_autocorr_time_ar1(rho) -def run_ess_test_ar1(rho, N): + +def run_ess_test_ar1(rho: float, N: int) -> None: v = sample_ar1(rho, N) E_ess = expected_ess_ar1(rho, N) @@ -20,13 +25,13 @@ def run_ess_test_ar1(rho, N): np.testing.assert_allclose(E_ess, hat_ess3, atol=N, rtol=0.1) -def test_ess_ar1(): +def test_ess_ar1() -> None: # includes correlated (rho > 0), uncorrelated (rho = 0), and antianti-correlated tests (rho < 0) for rho in np.arange(-0.5, 0.5, step=0.2): run_ess_test_ar1(rho, 20_000) -def test_ess_independent(): +def test_ess_independent() -> None: N = 10_000 y = np.random.normal(size=N) hat_ess = bk.ess(y) @@ -34,7 +39,7 @@ def test_ess_independent(): np.testing.assert_allclose(E_ess, hat_ess, rtol=0.1) -def test_ess_exceptions(): +def test_ess_exceptions() -> None: for n in range(4): v = sample_ar1(0.5, n) with pt.raises(ValueError): @@ -43,4 +48,3 @@ def test_ess_exceptions(): bk.ess_imse(v) with pt.raises(ValueError): bk.ess_ipse(v) - diff --git a/test/test_hmc.py b/test/test_hmc.py index 7c7cd0f..9859fe5 100644 --- a/test/test_hmc.py +++ b/test/test_hmc.py @@ -1,35 +1,38 @@ +import functools from test.models.binomial import Binomial from test.models.std_normal import StdNormal -from bayes_kit.hmc import HMCDiag +from typing import Any + import numpy as np -import functools import pytest +from bayes_kit.hmc import HMCDiag + -def _call_counter(f): +def _call_counter(f: Any) -> Any: @functools.wraps(f) - def wrapper(*args, **kwargs): - wrapper.calls += 1 + def wrapper(*args: Any, **kwargs: Any) -> Any: + wrapper.calls += 1 # type: ignore return f(*args, **kwargs) - wrapper.calls = 0 + wrapper.calls = 0 # type: ignore return wrapper @pytest.mark.parametrize("steps", [0, 1, 10]) -def test_hmc_leapfrog_num_evals(steps) -> None: +def test_hmc_leapfrog_num_evals(steps: int) -> None: # Expect that HMC.leapfrog calls log_density only once per step model = StdNormal() - model.log_density = _call_counter(model.log_density) - model.log_density_gradient = _call_counter(model.log_density_gradient) + model.log_density = _call_counter(model.log_density) # type: ignore + model.log_density_gradient = _call_counter(model.log_density_gradient) # type: ignore hmc = HMCDiag(model, steps=steps, stepsize=0.25) _ = hmc.sample() # Expect one call to log_density before leapfrog and one after - assert model.log_density.calls == 2 + assert model.log_density.calls == 2 # type: ignore # Expect one call to log_density_gradient per leapfrog step, plus one for initial/final half step - assert model.log_density_gradient.calls == hmc._steps + 1 + assert model.log_density_gradient.calls == hmc._steps + 1 # type: ignore def test_hmc_diag_std_normal() -> None: diff --git a/test/test_iat.py b/test/test_iat.py index 4a2b890..189d6b0 100644 --- a/test/test_iat.py +++ b/test/test_iat.py @@ -1,19 +1,23 @@ +from typing import List + import numpy as np -import bayes_kit as bk import pytest as pt + +import bayes_kit as bk from bayes_kit.iat import _end_pos_pairs +from bayes_kit.typing import VectorType -def sample_ar1(rho, N): +def sample_ar1(rho: float, N: int) -> VectorType: z = np.random.normal(size=N) for n in range(1, N): z[n] += rho * z[n - 1] return z -def integrated_autocorr_time_ar1(rho): - last_sum = 0 - sum = 1 +def integrated_autocorr_time_ar1(rho: float) -> float: + last_sum = 0.0 + sum = 1.0 t = 1 while sum != last_sum: last_sum = sum @@ -22,7 +26,7 @@ def integrated_autocorr_time_ar1(rho): return sum -def run_iat_ar1_test(rho, N): +def run_iat_ar1_test(rho: float, N: int) -> None: v = sample_ar1(rho, N) E_iat = integrated_autocorr_time_ar1(rho) @@ -36,7 +40,7 @@ def run_iat_ar1_test(rho, N): np.testing.assert_allclose(E_iat, hat_iat3, rtol=0.1) -def test_iat_independent(): +def test_iat_independent() -> None: E_iat = 1.0 N = 20_000 y = np.random.normal(size=N) @@ -44,13 +48,13 @@ def test_iat_independent(): np.testing.assert_allclose(E_iat, hat_iat, rtol=0.1) -def test_iat_ar1(): +def test_iat_ar1() -> None: # tests correlated (rho > 0) and anti-correlated (rho < 0) cases for rho in np.arange(-0.5, 0.5, step=0.2): run_iat_ar1_test(rho, 20_000) -def test_iat_exceptions(): +def test_iat_exceptions() -> None: for n in range(4): v = sample_ar1(0.5, n) with pt.raises(ValueError): @@ -61,11 +65,11 @@ def test_iat_exceptions(): bk.iat_ipse(v) -def pair_start_tester(chain, expected_pos): +def pair_start_tester(chain: List[float], expected_pos: int) -> None: np.testing.assert_equal(expected_pos, _end_pos_pairs(chain)) -def test_end_pos_pairs(): +def test_end_pos_pairs() -> None: pair_start_tester([], 0) pair_start_tester([1], 0) pair_start_tester([1, -0.5], 2) diff --git a/test/test_mala.py b/test/test_mala.py index d75e0ba..9b7d562 100644 --- a/test/test_mala.py +++ b/test/test_mala.py @@ -1,8 +1,10 @@ from test.models.binomial import Binomial from test.models.std_normal import StdNormal -from bayes_kit.mala import MALA + import numpy as np +from bayes_kit.mala import MALA + def test_mala_std_normal() -> None: # init with draw from posterior diff --git a/test/test_metropolis.py b/test/test_metropolis.py index 9dad3d3..031d42c 100644 --- a/test/test_metropolis.py +++ b/test/test_metropolis.py @@ -1,18 +1,19 @@ -from typing import Callable -from test.models.std_normal import StdNormal from test.models.binomial import Binomial from test.models.skew_normal import SkewNormal +from test.models.std_normal import StdNormal +from typing import Callable +from unittest.mock import Mock + +import numpy as np +import scipy.stats as sst + from bayes_kit.metropolis import ( Metropolis, MetropolisHastings, metropolis_accept_test, metropolis_hastings_accept_test, ) -from unittest.mock import Mock - -import numpy as np -import scipy.stats as sst -from numpy.typing import NDArray +from bayes_kit.typing import VectorType def test_metropolis_accept_test_accepts_more_likely_proposal() -> None: @@ -50,7 +51,9 @@ def test_metropolis_accept_test_accepts_when_proposal_above_uniform_draw() -> No assert metropolis_accept_test(lp_proposal, lp_current, mock_random) -def test_metropolis_hastings_accept_test_accepts_more_likely_proposal_given_transition_ratio() -> None: +def test_metropolis_hastings_accept_test_accepts_more_likely_proposal_given_transition_ratio() -> ( + None +): mock_random = Mock() mock_random.uniform = Mock(return_value=0.5) pr_proposal = 0.4 @@ -81,7 +84,9 @@ def test_metropolis_hastings_accept_test_accepts_more_likely_proposal_given_tran ) -def test_metropolis_hastings_accept_test_reduces_to_metropolis_given_equal_transition_likelihoods() -> None: +def test_metropolis_hastings_accept_test_reduces_to_metropolis_given_equal_transition_likelihoods() -> ( + None +): M = 1000 mock_uniform = Mock() for _ in range(M): @@ -142,7 +147,9 @@ def test_metropolis_hastings_skew_normal() -> None: # a test that has tight tolerance on a known value in a tolerable runtime seems most likely to # actually inform us if any bugs are introduced in the code. gen = np.random.default_rng(seed=1701) - init: NDArray[np.float64] = sst.skewnorm.rvs(skewness_a, size=[1], random_state=gen) + init: VectorType = sst.skewnorm.rvs(skewness_a, size=[1], random_state=gen).astype( + np.float64 + ) p_fn = lambda theta: np.array( [sst.skewnorm.rvs(skewness_a, loc=theta, random_state=gen)] ) @@ -175,12 +182,12 @@ def test_metropolis_reproducible() -> None: # reinitialize proposal generator for each model proposal_generator = np.random.default_rng(seed=12345) - proposal_fn = lambda theta: proposal_generator.normal(loc=theta, scale=4) # type: ignore + proposal_fn = lambda theta: proposal_generator.normal(loc=theta, scale=4) metropolis_2 = Metropolis(model, proposal_fn=proposal_fn, init=init, seed=1848) draws_2 = np.array([metropolis_2.sample()[0] for _ in range(M)]) proposal_generator = np.random.default_rng(seed=12345) - proposal_fn = lambda theta: proposal_generator.normal(loc=theta, scale=4) # type: ignore + proposal_fn = lambda theta: proposal_generator.normal(loc=theta, scale=4) metropolis_3 = Metropolis(model, proposal_fn=proposal_fn, init=init, seed=1912) draws_3 = np.array([metropolis_3.sample()[0] for _ in range(M)]) @@ -194,7 +201,7 @@ def test_metropolis_reproducible() -> None: # This winds up being more stable than rewriting the lambda directly in the test fn body. def skewnorm_proposal_fn_factory( skewness: float, gen: np.random.Generator -) -> Callable[[np.typing.NDArray[np.float64]], np.typing.ArrayLike]: +) -> Callable[[VectorType], np.typing.ArrayLike]: fn = lambda t: np.array([sst.skewnorm.rvs(skewness, loc=t, random_state=gen)]) return fn @@ -204,7 +211,9 @@ def test_metropolis_hastings_reproducible() -> None: model = SkewNormal(a=skewness_a) M = 50 g = np.random.default_rng(seed=123) - init: NDArray[np.float64] = sst.skewnorm.rvs(skewness_a, size=[1], random_state=g) + init: VectorType = sst.skewnorm.rvs(skewness_a, size=[1], random_state=g).astype( + np.float64 + ) transition_lp_fn = lambda o, g: sst.skewnorm.logpdf(o, skewness_a, loc=g)[0] proposal_generator = np.random.default_rng(seed=12345) @@ -268,7 +277,7 @@ def test_metropolis_hastings_next_trajectory_matches_calling_sample() -> None: draws_1 = np.array([mh1.sample()[0] for _ in range(M)]) proposal_generator = np.random.default_rng(seed=123) - p_fn = lambda theta: proposal_generator.normal(loc=theta, scale=4) # type: ignore + p_fn = lambda theta: proposal_generator.normal(loc=theta, scale=4) mh2 = MetropolisHastings( model, proposal_fn=p_fn, transition_lp_fn=x_fn, init=init, seed=996 ) diff --git a/test/test_rhat.py b/test/test_rhat.py index 19dad68..155b1ef 100644 --- a/test/test_rhat.py +++ b/test/test_rhat.py @@ -1,18 +1,25 @@ +from typing import Sequence + import numpy as np +import pytest as pt import scipy as sp + from bayes_kit.rhat import ( - rhat, - split_chains, - split_rhat, rank_chains, rank_normalize_chains, rank_normalized_rhat, + rhat, + split_chains, + split_rhat, ) -import pytest as pt -def rhat_expected(chains): +Chains = Sequence[Sequence[float]] + + +def rhat_expected(chains_in: Chains) -> float: # uses brute force definition from BDA3 - psij_bar = [np.mean(c) for c in chains] + chains = [np.asarray(c) for c in chains_in] + psij_bar = np.array([np.mean(c) for c in chains]) psi_bar = np.mean(psij_bar) M = len(chains) N = len(chains[0]) @@ -20,10 +27,11 @@ def rhat_expected(chains): s_sq = [1 / (N - 1) * sum((chains[m] - psij_bar[m]) ** 2) for m in range(M)] W = 1 / M * sum(s_sq) var_plus = (N - 1) / N * W + 1 / N * B - rhat = np.sqrt(var_plus / W) + rhat: float = np.sqrt(var_plus / W) return rhat -def test_rhat(): + +def test_rhat() -> None: # test API implementation vs. brute-force implementation rhat_expected chain1 = [1.01, 1.05, 0.98, 0.90, 1.23] chain2 = [0.99, 1.00, 1.01, 1.15, 0.83] @@ -36,25 +44,31 @@ def test_rhat(): chains = [chain1, chain2, chain3, chain4] np.testing.assert_allclose(rhat_expected(chains), rhat(chains), atol=0.1, rtol=0.2) -def test_rhat_ragged(): + +def test_rhat_ragged() -> None: chain1 = [1.01, 1.05, 0.98, 0.90, 1.23] chain2 = [0.99, 1.00, 1.01, 1.15, 0.83, 0.95] chains = [chain1, chain2] - rhat_est = rhat(chains) np.testing.assert_allclose(rhat_expected(chains), rhat(chains), atol=0.1, rtol=0.2) -def rhat_throws(chains): + +def rhat_throws(chains: Chains) -> None: with pt.raises(ValueError): rhat(chains) - -def test_rhat_at_least_two_chains(): + + +def test_rhat_at_least_two_chains() -> None: rhat_throws([]) - rhat_throws([[1.01, 1.2, 1.3, 1.4]],) + rhat_throws( + [[1.01, 1.2, 1.3, 1.4]], + ) -def test_rhat_at_least_two_elements_per_chain(): + +def test_rhat_at_least_two_elements_per_chain() -> None: rhat_throws([[1, 2, 3], [4], [5, 6, 7, 8, 9]]) -def test_split_chains(): + +def test_split_chains() -> None: np.testing.assert_equal([], split_chains([])) np.testing.assert_equal([[1], []], split_chains([[1]])) np.testing.assert_equal([[1], [2]], split_chains([[1, 2]])) @@ -64,7 +78,8 @@ def test_split_chains(): [[1, 2], [3], [4, 5], [6, 7]], split_chains([[1, 2, 3], [4, 5, 6, 7]]) ) -def test_split_rhat(): + +def test_split_rhat() -> None: # split_rhat should return same result as rhat on split chains np.testing.assert_allclose(rhat([[1, 2], [3, 4]]), split_rhat([[1, 2, 3, 4]])) np.testing.assert_allclose( @@ -75,40 +90,51 @@ def test_split_rhat(): split_rhat([[1, -2, 3, 4, 5, 6], [7, 8, 9, 12]]), ) -def split_rhat_throws(chains): + +def split_rhat_throws(chains: Chains) -> None: with pt.raises(ValueError): split_rhat(chains) - -def test_split_rhat_at_least_one_chain(): + + +def test_split_rhat_at_least_one_chain() -> None: split_rhat_throws([]) -def test_split_rhat_at_least_four_elements_per_chain(): + +def test_split_rhat_at_least_four_elements_per_chain() -> None: split_rhat_throws([[1, 2, 3]]) split_rhat_throws([[1, 2, 3, 4], [1, 2, 3]]) split_rhat_throws([[1, 2, 3], [1, 2, 3, 4]]) -def rank_chains_equal(ranks, chains): + +def rank_chains_equal(ranks: Chains, chains: Chains) -> None: np.testing.assert_equal(ranks, rank_chains(chains)) - -def test_rank_chains(): + + +def test_rank_chains() -> None: rank_chains_equal([], []) rank_chains_equal([[1]], [[2.3]]) rank_chains_equal([[1, 2]], [[2.3, 4.9]]) rank_chains_equal([[2, 3, 1]], [[3.9, 5.2, 2.1]]) rank_chains_equal([[2], [1]], [[4.2], [1.9]]) - rank_chains_equal([[2, 3], [1, 4]], [[4.2, 5.7], [1.9, 12.2]]) + rank_chains_equal( + [[2, 3], [1, 4]], + [[4.2, 5.7], [1.9, 12.2]], + ) rank_chains_equal( [[2, 3], [5, 4], [1, 6]], - [[4.2, 5.7], [7.2, 6.1], [-12.9, 107]] + [[4.2, 5.7], [7.2, 6.1], [-12.9, 107]], ) -def rank_norm(r, S): - return sp.stats.norm.ppf((r - 0.325) / (S - 0.25)) -def rank_normalize_chains_close(ranks, chains): +def rank_norm(r: float, S: float) -> float: + return sp.stats.norm.ppf((r - 0.325) / (S - 0.25)) # type: ignore + + +def rank_normalize_chains_close(ranks: Chains, chains: Chains) -> None: np.testing.assert_equal(ranks, rank_normalize_chains(chains)) - -def test_rank_normalize_chains(): + + +def test_rank_normalize_chains() -> None: rank_normalize_chains_close([], []) rn11 = rank_norm(1, 1) @@ -121,44 +147,40 @@ def test_rank_normalize_chains(): rn13 = rank_norm(1, 3) rn23 = rank_norm(2, 3) rn33 = rank_norm(3, 3) - rank_normalize_chains_close( - [[rn23, rn33, rn13]], [[3.5, 5.9, 1.0]] - ) + rank_normalize_chains_close([[rn23, rn33, rn13]], [[3.5, 5.9, 1.0]]) rn14 = rank_norm(1, 4) rn24 = rank_norm(2, 4) rn34 = rank_norm(3, 4) rn44 = rank_norm(4, 4) - rank_normalize_chains_close( - [[rn34, rn24], [rn14, rn44]], [[3.9, 3.1], [2.2, 5.9]] - ) + rank_normalize_chains_close([[rn34, rn24], [rn14, rn44]], [[3.9, 3.1], [2.2, 5.9]]) + -def rank_normalized_rhat_throws(chains): +def rank_normalized_rhat_throws(chains: Chains) -> None: with pt.raises(ValueError): rank_normalized_rhat(chains) -def test_rank_normalized_rhat_at_least_one_chain(): + +def test_rank_normalized_rhat_at_least_one_chain() -> None: rank_normalized_rhat_throws([]) -def test_rank_normalized_rhat_at_least_four_elemenets_per_chain(): + +def test_rank_normalized_rhat_at_least_four_elemenets_per_chain() -> None: rank_normalized_rhat_throws([[1.01, 1.2, 1.3]]) rank_normalized_rhat_throws([[1, 2, 3], [4]]) -def rank_normalized_rhat_close(ranks, chains): - np.testing.assert_allclose( - split_rhat(ranks), rank_normalized_rhat(chains) - ) -def test_rank_normalized_rhat(): +def rank_normalized_rhat_close(ranks: Chains, chains: Chains) -> None: + np.testing.assert_allclose(split_rhat(ranks), rank_normalized_rhat(chains)) + + +def test_rank_normalized_rhat() -> None: # expect rank-normalized-rhat to be equivalent to split_rhat on ranks rn14 = rank_norm(1, 4) rn24 = rank_norm(2, 4) rn34 = rank_norm(3, 4) rn44 = rank_norm(4, 4) - rank_normalized_rhat_close( - [[rn14, rn44, rn34, rn24]], - [[1.8, 10.9, 6.3, 5.1]] - ) + rank_normalized_rhat_close([[rn14, rn44, rn34, rn24]], [[1.8, 10.9, 6.3, 5.1]]) rn18 = rank_norm(1, 8) rn28 = rank_norm(2, 8) @@ -170,5 +192,5 @@ def test_rank_normalized_rhat(): rn88 = rank_norm(8, 8) rank_normalized_rhat_close( [[rn28, rn38, rn78, rn88], [rn18, rn48, rn68, rn58]], - [[2, 3, 7, 8], [1, 4, 6, 5]] + [[2, 3, 7, 8], [1, 4, 6, 5]], ) diff --git a/test/test_tempered_smc.py b/test/test_tempered_smc.py index 4efb1f9..ae79587 100644 --- a/test/test_tempered_smc.py +++ b/test/test_tempered_smc.py @@ -1,7 +1,9 @@ from test.models.binomial import Binomial -from bayes_kit.smc import TemperedLikelihoodSMC, metropolis_kernel + import numpy as np +from bayes_kit.smc import TemperedLikelihoodSMC, metropolis_kernel + def test_rwm_smc_binom() -> None: model = Binomial(alpha=2, beta=3, x=5, N=15) diff --git a/test/test_theta_initialization.py b/test/test_theta_initialization.py index 5da7411..cec3ebe 100644 --- a/test/test_theta_initialization.py +++ b/test/test_theta_initialization.py @@ -1,21 +1,22 @@ -import numpy as np -from numpy.typing import NDArray +from typing import Any, List from unittest.mock import Mock +import numpy as np + from bayes_kit.hmc import HMCDiag from bayes_kit.mala import MALA from bayes_kit.metropolis import Metropolis, MetropolisHastings -from bayes_kit.model_types import HessianModel +from bayes_kit.typing import VectorType -def assert_not_empty_array(a: NDArray[np.float64]): +def assert_not_empty_array(a: VectorType) -> None: with np.testing.assert_raises(AssertionError): np.testing.assert_array_equal(a, np.array([])) -def make_models(init: NDArray[np.float64], dims: int = 1): +def make_models(init: VectorType, dims: int = 1) -> List[Any]: # For the purposes of this (and future) tests, we only care about the model's dimensions. - mock_model: HessianModel = Mock() + mock_model: Any = Mock() mock_model.dims = Mock(return_value=dims) mock_model.log_density_gradient = Mock(return_value=(0.5, (0,))) mock_model.log_density_hessian = Mock(return_value=(0, 5, (0,), (0,)))