Source code for pyesmda._localization

"""
Implement some correlation functions.

@author: acollet
"""

import numbers
import warnings
from abc import ABC, abstractmethod
from typing import Callable, Optional, Sequence, Union

import numpy as np
from scipy.sparse import csr_matrix, spmatrix

from pyesmda._utils import NDArrayFloat, empirical_cross_covariance, get_anomaly_matrix


[docs] class LocalizationStrategy(ABC): """Abstract class for localization strategy."""
[docs] @abstractmethod def localize( self, X: NDArrayFloat, Y: NDArrayFloat, batch_slice: slice = slice(None) ) -> NDArrayFloat: """Apply the localization to the given covariance.""" ...
[docs] @abstractmethod def localize_multi_dot( self, X: NDArrayFloat, Y: NDArrayFloat, *args: NDArrayFloat, batch_slice: slice = slice(None), ) -> NDArrayFloat: """_summary_ Parameters ---------- X : NDArrayFloat _description_ Y : NDArrayFloat _description_ md_corr_mat : Optional[spmatrix], optional _description_, by default None Returns ------- NDArrayFloat _description_ """ ...
[docs] def check_localization_shape( self, expected_shape: Sequence[int], param_name: str ) -> None: """Check if the localization has the correct shape.""" del expected_shape, param_name # unused pass
[docs] class FixedLocalization(LocalizationStrategy): r""" Fixed localization strategy. Attributes ---------- correlation_matrix : Optional[Union[sp.sparse.sparray, NDArrayFloat]] Correlation matrix based on spatial and temporal distances between observations and :math:`\rho_{DD}`. It is used to localize the autocovariance matrix of predicted data by applying an elementwise multiplication by this matrix. Expected dimensions are (:math:`N_{\mathrm{obs}}`, :math:`N_{\mathrm{obs}}`). """
[docs] def __init__( self, correlation_matrix: Optional[Union[NDArrayFloat, spmatrix]] = None, ) -> None: r""" Initialize the instance. Parameters ---------- correlation_matrix : Optional[Union[NDArrayFloat, spmatrix]] Correlation matrix based on spatial/temporal distances between observations/parameters :math:`\rho_{DD}` or :math:`\rho_{MD}`. It is used to localize the empirical cross-covariance matrices by applying an elementwise multiplication by this matrix. Expected dimensions are (:math:`N_{\mathrm{obs}}`, :math:`N_{\mathrm{obs}}`) for :math:`\rho_{DD}` and (:math:`N_{m}`, :math:`N_{\mathrm{obs}}`) for :math:`\rho_{DD}`. It None, no localization is performed. The default is None. """ self.correlation_matrix = ( csr_matrix(correlation_matrix) if correlation_matrix is not None else None )
[docs] def check_localization_shape( self, expected_shape: Sequence[int], param_name: str ) -> None: """Check if""" if self.correlation_matrix is None: return if self.correlation_matrix.shape != tuple(expected_shape): raise ValueError( f"{param_name} must be a 2D matrix with " f"dimensions {tuple(expected_shape)}." )
[docs] def localize( self, X: NDArrayFloat, Y: NDArrayFloat, batch_slice: slice = slice(None) ) -> NDArrayFloat: """Apply the localization to the covariance matrix.""" cov_mat = empirical_cross_covariance(X, Y) if self.correlation_matrix is not None: return self.correlation_matrix[batch_slice, :].multiply(cov_mat).toarray() return cov_mat
[docs] def localize_multi_dot( self, X: NDArrayFloat, Y: NDArrayFloat, *args: NDArrayFloat, batch_slice: slice = slice(None), ) -> NDArrayFloat: """_summary_ Parameters ---------- X : NDArrayFloat _description_ Y : NDArrayFloat _description_ Returns ------- NDArrayFloat _description_ """ X_shift = get_anomaly_matrix(X) Y_shift = get_anomaly_matrix(Y) if self.correlation_matrix is not None: return np.linalg.multi_dot( [ self.correlation_matrix[batch_slice, :] .multiply(X_shift.dot(Y_shift.T)) .toarray(), *args, ] ) return np.linalg.multi_dot([X_shift, Y_shift.T, *args])
[docs] class NoLocalization(FixedLocalization): """Instance to use when no localization is to be applied."""
[docs] def __init__(self) -> None: """Initialize the instance.""" super().__init__()
def default_correlation_threshold(ensemble_size: int) -> float: """ Return a number that determines whether a correlation is significant. Default threshold taken from :cite:t:`luoContinuousHyperparameterOPtimization2022`_, Continuous Hyper-parameter OPtimization (CHOP) in an ensemble Kalman filter Section 2.3 - Localization in the CHOP problem Note ---- Original implementation from https://github.com/equinor/iterative_ensemble_smoother. Examples -------- >>> AdaptiveESMDA.correlation_threshold(0) 1.0 >>> AdaptiveESMDA.correlation_threshold(9) 1.0 >>> AdaptiveESMDA.correlation_threshold(16) 0.75 >>> AdaptiveESMDA.correlation_threshold(36) 0.5 >>> AdaptiveESMDA.correlation_threshold(100) 0.3 """ if ensemble_size == 0: raise ValueError("The ensemble size cannot be zero!") return float(min(1, max(0, 3 / np.sqrt(ensemble_size)))) def cov_to_corr( cov_XY: NDArrayFloat, stds_X: NDArrayFloat, stds_Y: NDArrayFloat, inplace: bool = True, ) -> NDArrayFloat: """ Convert a covariance matrix to a correlation matrix in-place. Note ---- Original implementation from https://github.com/equinor/iterative_ensemble_smoother. """ if not inplace: cov_XY = cov_XY.copy() # Divide each element of cov_XY by the corresponding standard deviations cov_XY /= stds_X[:, np.newaxis] cov_XY /= stds_Y[np.newaxis, :] # divide by zeros, # TODO: should not append normally -> stds is not supposed to be null cov_XY = np.nan_to_num(cov_XY, nan=0.0) # Perform checks and clip values to [-1, 1] eps = 1e-8 if not ((cov_XY.max() <= 1 + eps) and (cov_XY.min() >= -1 - eps)): warnings.warn( "Cross-correlation matrix has entries not in [-1, 1]." f"The min and max values are: {cov_XY.min()} and {cov_XY.max()}" ) return np.clip(cov_XY, a_min=-1, a_max=1, out=cov_XY) class CorrelationTransform(ABC): @abstractmethod def __call__(self, correlation_matrix: NDArrayFloat, ne: int) -> NDArrayFloat: """ Transform the given correlation matrix. Parameters ---------- correlation_matrix : NDArrayFloat Matrix to transform. ne : int Number of members in the ensemble. Returns ------- NDArrayFloat _description_ """ ... def make_correlation_threshold_callable( correlation_threshold: Union[Callable[[int], float], float, None], ) -> Callable: # Default value if correlation_threshold is None: return default_correlation_threshold # Create `correlation_threshold` if the argument is a float if correlation_threshold is not None: if callable(correlation_threshold): return correlation_threshold # Check the correlation threshold if ( isinstance(correlation_threshold, numbers.Real) and correlation_threshold >= 0 # ty:ignore[unsupported-operator] and correlation_threshold <= 1 ): def _correlation_threshold(ensemble_size: int) -> float: return correlation_threshold # ty:ignore[invalid-return-type] return _correlation_threshold raise TypeError("`correlation_threshold` must be a callable or a float in [0, 1]") class CorrelationThresholding(CorrelationTransform): """Apply a thresholding to the adaptive correlation matrix.""" __slots__ = ["correlation_threshold"] def __init__( self, correlation_threshold: Union[Callable[[int], float], float, None] = None ) -> None: """ Initialize the instance. TODO: add ref. Parameters ---------- correlation_threshold : Union[Callable[[int], float], float, None], optional Either a callable with signature f(ensemble_size) -> float, or a float in the range [0, 1]. Entries in the covariance matrix that are lower than the correlation threshold will be set to zero. If None, the default 3/sqrt(ensemble_size) is used. The default is None. """ self.correlation_threshold: Callable[[int], float] = ( make_correlation_threshold_callable(correlation_threshold) ) assert callable(self.correlation_threshold), ( "`correlation_threshold` should be callable" ) def __call__(self, correlation_matrix: NDArrayFloat, ne: int) -> NDArrayFloat: """Transform the correlation matrix.""" return np.where( np.abs(correlation_matrix) > self.correlation_threshold(correlation_matrix.shape[0]), correlation_matrix, 0.0, ) class CorrelationTempering(CorrelationTransform): """Apply a tempring to the adaptive correlation matrix.""" __slots__ = ["correlation_threshold"] def __init__( self, tempering_function: Callable[[NDArrayFloat, int], NDArrayFloat] ) -> None: """ Initialize the instance. TODO: add ref. Parameters ---------- correlation_threshold : Union[Callable[[int], float], float, None], optional Either a callable with signature f(ensemble_size) -> float, or a float in the range [0, 1]. Entries in the covariance matrix that are lower than the correlation threshold will be set to zero. If None, the default 3/sqrt(ensemble_size) is used. The default is None. """ assert callable(tempering_function), ( "`correlation_threshold` should be callable" ) # Add as an attribute self.tempering_function = tempering_function def __call__(self, correlation_matrix: NDArrayFloat, ne: int) -> NDArrayFloat: """Transform the correlation matrix.""" return self.tempering_function(correlation_matrix, ne) class CorrelationBasedLocalization(LocalizationStrategy): """Implement an adaptative correlation based localization strategy.""" __slots__ = ["correlation_threshold"] def __init__( self, transform: CorrelationTransform, ) -> None: """ Initialize the instance. TODO: add ref. """ self.transform: CorrelationTransform = transform def localize( self, X: NDArrayFloat, Y: NDArrayFloat, batch_slice: slice = slice(None) ) -> NDArrayFloat: """Apply the localization to C_SD.""" cov_mat = empirical_cross_covariance(X, Y) return np.multiply( cov_mat, self.transform( cov_to_corr( cov_mat, np.std(X, axis=1, ddof=1), np.std(Y, axis=1, ddof=1), inplace=False, ), X.shape[1], ), ) def localize_multi_dot( self, X: NDArrayFloat, Y: NDArrayFloat, *args: NDArrayFloat, batch_slice: slice = slice(None), ) -> NDArrayFloat: """_summary_ Parameters ---------- X : NDArrayFloat _description_ Y : NDArrayFloat _description_ *args : NDArrayFloat _description_ Returns ------- NDArrayFloat _description_ """ return np.linalg.multi_dot( [ self.localize(X, Y), *args, ] ) def _reversed_beta_cumulative(distances: NDArrayFloat, beta: float = 3) -> NDArrayFloat: r""" Transform the distances into weights between 0 and 1 with a beta function. .. math:: 1 - \dfrac{1}{1 + \left(\dfrac{d}{1 - d}\right)^{-\beta}} Parameters ---------- x : NDArrayFloat Input array. Ideally, values should be between 0. and 1.0. beta: float, optional Shape factor. Must be positive or null. The default is 3.0. Returns ------- NDArrayFloat. Array of same dimension as input array. """ if beta < 0.0: raise ValueError(f"Beta ({beta}) should be positive or null !") distances2 = distances.copy() distances2[distances == 0] = np.nan distances2[distances >= 1] = np.nan fact = np.where( np.isnan(distances2), 0.0, 1.0 / (1.0 + np.power((distances2 / (1 - distances2)), -beta)), ) fact[distances >= 1] = 1.0 return 1.0 - fact def gc_correlation_tempering(corr_mat: NDArrayFloat, ne: int) -> NDArrayFloat: r""" Apply the Gaspari-Cohn tempering to the correlation matrix. See section 2.3, Localization in the CHOP problem from :cite:t:`luoContinuousHyperparameterOPtimization2022`. .. math:: (TODO) 1 - \dfrac{1}{1 + \left(\dfrac{d}{s - d}\right)^{-\beta}} """ if ne <= 9: raise ValueError("Cannot use the Gaspari-Cohn tempering if Ne <= 9.") return distances_to_weights_fifth_order((1 - np.abs(corr_mat)) / (1 - 3 / ne)) def gc_correlation_tempering_positive(corr_mat: NDArrayFloat, ne: int) -> NDArrayFloat: r""" Apply the Gaspari-Cohn tempering to the correlation matrix. See section 2.3, Localization in the CHOP problem from :cite:t:`ContinuousHyperparameterOPtimization2022`. .. math:: (TODO) 1 - \dfrac{1}{1 + \left(\dfrac{d}{s - d}\right)^{-\beta}} """ out = gc_correlation_tempering(corr_mat, ne) out[corr_mat < 0] = 0 return out
[docs] def distances_to_weights_beta_cumulative( distances: NDArrayFloat, beta: float = 3, scaling_factor: float = 1.0 ) -> NDArrayFloat: r""" Transform the distances into weights between 0 and 1 with a beta function. .. math:: 1 - \dfrac{1}{1 + \left(\dfrac{d}{s - d}\right)^{-\beta}} Parameters ---------- distances : NDArrayFloat Input array of distances. beta: float, optional Shape factor. The smalest beta, the slower the variation, the higher beta the sharpest the transition (tends to a dirac function). Must be strictly positive. The default is 3. scaling_factor: float, optional The scaling factor. At 0, the function equals 1.0, at half the scaling factor, it equals 0.5, and at the scaling factor, is equals zero. The default is 1.0. Returns ------- NDArrayFloat. Array of same dimension as input array. """ if scaling_factor <= 0.0: raise ValueError( f"The scaling factor ({scaling_factor}) should be strictly positive !" ) return _reversed_beta_cumulative(distances / scaling_factor, beta=beta)
def _part1(d: Union[NDArrayFloat, float]) -> Union[NDArrayFloat, float]: return -1 / 4 * d**5.0 + 1 / 2 * d**4.0 + 5 / 8 * d**3.0 - 5 / 3 * d**2.0 + 1.0 def _part2(d: Union[NDArrayFloat, float]) -> Union[NDArrayFloat, float]: return ( 1 / 12 * d**5.0 - 1 / 2 * d**4.0 + 5 / 8 * d**3 + 5 / 3 * d**2.0 - 5.0 * d + 4.0 - 2 / 3 * (d ** (-1.0)) )
[docs] def distances_to_weights_fifth_order( distances: NDArrayFloat, scaling_factor: float = 1.0 ) -> NDArrayFloat: r""" Transform the distances into weights between 0 and 1 with a fifth order function. .. math:: f(z) = \begin{cases} 0 & z < 0 \\ \dfrac{-1}{4} z^{5} + \dfrac{1}{2} z^{4} + \dfrac{5}{8} z^{3} - \dfrac{5}{3} z^{2} + 1 & 0 \leq z \leq 1\\ \dfrac{1}{12} z^{5} - \dfrac{1}{2} z^{4} + \dfrac{5}{8} z^{3} + \dfrac{5}{3} z^{2} - 5z + 4 - \dfrac{2}{3} z^{-1} & 1 \leq z \leq 2\\ \end{cases} with :math:`z = \dfrac{d}{s}`, :math:`d` the distances, and :math:`s` the scaling factor. See :cite:p:`gaspariConstructionCorrelationFunctions1999`. Parameters ---------- distances : NDArrayFloat Input distances values. scaling_factor: float, optional Scaling factor. It is roughly the distance at which weights go under 0.25. The default is 1.0. Returns ------- NDArrayFloat. Array of same dimension as input array. """ if scaling_factor <= 0: raise ValueError( f"The scaling_factor ({scaling_factor}) should be strictly positive !" ) distances2 = distances.copy() / scaling_factor distances2[distances2 < 0] = np.nan distances2[distances2 >= 2.0] = np.nan return np.where( np.isnan(distances2), 0.0, np.where( distances2 >= 1.0, _part2(np.where(distances2 <= 0.0, np.nan, distances2)), _part1(distances2), ), )