Source code for pyesmda._utils

"""
Implement the ES-MDA algorithms.

@author: acollet
"""

from functools import lru_cache, wraps
from typing import List

import covmats
import numpy as np
import numpy.typing as npt
import scipy as sp

NDArrayFloat = npt.NDArray[np.float64]


def np_cache(*args, **kwargs):
    """
    LRU cache implementation for functions whose FIRST parameter is a numpy array.

    Examples
    --------
    >>> array = np.array([[1, 2, 3], [4, 5, 6]])
    >>> @np_cache(maxsize=256)
    ... def multiply(array, factor):
    ...     print("Calculating...")
    ...     return factor*array
    >>> multiply(array, 2)
    Calculating...
    array([[ 2,  4,  6],
           [ 8, 10, 12]])
    >>> multiply(array, 2)
    array([[ 2,  4,  6],
           [ 8, 10, 12]])
    >>> multiply.cache_info()
    CacheInfo(hits=1, misses=1, maxsize=256, currsize=1)

    """

    def decorator(function):
        @wraps(function)
        def wrapper(np_array, *args, **kwargs):
            hashable_array = array_to_tuple(np_array)
            return cached_wrapper(hashable_array, *args, **kwargs)

        @lru_cache(*args, **kwargs)
        def cached_wrapper(hashable_array, *args, **kwargs):
            array = np.array(hashable_array)
            return function(array, *args, **kwargs)

        def array_to_tuple(np_array):
            """Iterates recursively."""
            try:
                return tuple(array_to_tuple(_) for _ in np_array)
            except TypeError:
                return np_array

        # copy lru_cache attributes over too
        wrapper.cache_info = cached_wrapper.cache_info  # ty:ignore[unresolved-attribute]
        wrapper.cache_clear = cached_wrapper.cache_clear  # ty:ignore[unresolved-attribute]

        return wrapper

    return decorator


[docs] def get_ensemble_variance( ensemble: NDArrayFloat, ) -> NDArrayFloat: """ Get the given ensemble variance (diagonal terms of the covariance matrix). Parameters ---------- ensemble : NDArrayFloat Ensemble of realization with diemnsions (:math:`N_{m}, N_{e}`), $N_{m}$) and $N_{e}$ being the ensemble size and one member size respectively. Returns ------- NDArrayFloat The variance as a 1d array. Raises ------ ValueError If the ensemble is not a 2D matrix. """ # test ensemble size if len(ensemble.shape) != 2: raise ValueError("The ensemble must be a 2D matrix!") return np.sum( ((ensemble - np.mean(ensemble, axis=1, keepdims=True)) ** 2), axis=1 ) / (ensemble.shape[1] - 1.0)
[docs] @np_cache() def get_anomaly_matrix( ensemble: NDArrayFloat, ) -> NDArrayFloat: """ Return the zero-mean (i.e., centered) anomaly matrix of the ensemble. Parameters ---------- ensemble: NDArrayFloat Ensemble of realization with shape ($N_{m}$, $N_{e}$), $N_{e}$ and $N_{m}$ being the ensemble size and one member size respectively. Examples -------- >>> X = np.array([[-2.4, -0.3, 0.7, 0.2, 1.1], ... [-1.5, 0.4, -0.4, -0.9, 1. ], ... [-0.1, -0.4, -0. , -0.5, 1.1]]) >>> get_anomaly_matrix(X) array([[-0.75424723, -0.11785113, 0.87209836], [-0.14142136, 0.35355339, -0.21213203], [ 0.42426407, -0.35355339, -0.07071068], [ 0.42426407, -0.35355339, -0.07071068], [ 0.02357023, -0.04714045, 0.02357023]]) Return ------ The anomaly matrix with with shape ($N_{e}$, $N_{m}$). """ # Note: this is the same as # 1 / np.sqrt(ne - 1) * (a @ np.identity(ne) - a @ np.ones((ne, 1)) / ne) return (ensemble - np.mean(ensemble, axis=1, keepdims=True)) / np.sqrt( ensemble.shape[1] - 1 )
[docs] def empirical_covariance_upper(ensemble: NDArrayFloat) -> NDArrayFloat: """Compute the upper triangular part of the empirical covariance matrix X. The output shape (num_variables, num_observations). Parameter --------- ensemble: NDArrayFloat Ensemble of values. Examples -------- >>> X = np.array([[-2.4, -0.3, 0.7, 0.2, 1.1], ... [-1.5, 0.4, -0.4, -0.9, 1. ], ... [-0.1, -0.4, -0. , -0.5, 1.1]]) >>> empirical_covariance_upper(X.T) array([[1.873, 0.981, 0.371], [0. , 0.997, 0.392], [0. , 0. , 0.407]]) Naive computation: >>> approximate_covariance_matrix_from_ensembles(X.T, X.T) array([[1.873, 0.981, 0.371], [0.981, 0.997, 0.392], [0.371, 0.392, 0.407]]) """ # https://www.math.utah.edu/software/lapack/lapack-blas/dsyrk.html XXT: npt.NDArray[np.double] = sp.linalg.blas.dsyrk( alpha=1.0, a=get_anomaly_matrix(ensemble) ) return XXT
[docs] def approximate_covariance_matrix_from_ensembles( ensemble_1: NDArrayFloat, ensemble_2: NDArrayFloat ) -> NDArrayFloat: r""" Approximate the covariance matrix between two ensembles in the EnKF way. The covariance matrice :math:`C_{m1m2}` is approximated from the ensemble in the standard way of EnKF :cite:p:`evensenDataAssimilationEnsemble2007,aanonsenEnsembleKalmanFilter2009`: .. math:: C_{p1p2} = \frac{1}{N_{e} - 1} \sum_{j=1}^{N_{e}}\left(m1_{j} - \overline{m1}\right)\left(m2_{j} - \overline{m2} \right)^{T} Parameters ---------- ensemble_1 : NDArrayFloat First ensemble of realization with diemnsions (:math:`N_{m1}, N_{e}`). ensemble_2 : NDArrayFloat Second ensemble of realization with diemnsions (:math:`N_{m2}, N_{e}`). Returns ------- NDArrayFloat The two ensembles approximated covariance matrix. Examples -------- >>> X = np.array([[-2.4, -0.3, 0.7], ... [ 0.2, 1.1, -1.5]]) >>> Y = np.array([[ 0.4, -0.4, -0.9], ... [ 1. , -0.1, -0.4], ... [-0. , -0.5, 1.1], ... [-1.8, -1.1, 0.3]]) >>> approximate_covariance_matrix_from_ensembles(X.T, Y.T) array([[-1.035 , -1.15833333, 0.66 , 1.56333333], [ 0.465 , 0.36166667, -1.08 , -1.09666667]]) Verify against numpy.cov >>> np.cov(X, rowvar=True, ddof=1) array([[ 2.50333333, -0.99666667], [-0.99666667, 1.74333333]]) >>> approximate_covariance_matrix_from_ensembles(X.T, X.T) array([[ 2.50333333, -0.99666667], [-0.99666667, 1.74333333]]) Raises ------ ValueError _description_ """ # test ensemble size is_issue = False if ensemble_1.ndim != 2 or ensemble_2.ndim != 2: is_issue = True elif ensemble_1.shape[1] != ensemble_2.shape[1]: is_issue = True if is_issue: raise ValueError( "The ensemble should be 2D matrices with equal second dimension!" ) return get_anomaly_matrix(ensemble_1) @ get_anomaly_matrix(ensemble_2).T
# define an alias empirical_cross_covariance = approximate_covariance_matrix_from_ensembles
[docs] def ls_cost_function( pred: NDArrayFloat, obs: NDArrayFloat, cov_obs: covmats.CovarianceMatrix, ) -> NDArrayFloat: r""" Compute the normalized objective function for a given member :math:`j`. .. math:: O_{N_{d}, j} = \frac{1}{2N_{d}} \sum_{j=1}^{N_{e}}\left(d^{l}_{j} - {d_{obs}} \right)^{T}C_{D}^{-1}\left(d^{l}_{j} - {d_{obs}} \right) Parameters ---------- pred : NDArrayFloat Ensemble of prediction vector with shape (:math:`N_{\mathrm{obs}}, N_{e}`), or single vector with shape :math:`(N_{\mathrm{obs}},)`. obs : NDArrayFloat Vector of observed values. cov_obs Cholesky upper factorisation of the covariance matrix of observed data measurement errors with dimensions (:math:`N_{\mathrm{obs}}`, :math:`N_{\mathrm{obs}}`). Also denoted :math:`R`. Returns ------- NDArrayFloat The objective function for each ensemble realization. """ if not isinstance(cov_obs, covmats.CovarianceMatrix): n = np.shape(obs)[0] raise ValueError( "cov_obs must be an implementation of `covmats.CovarianceMatrix`" f" with dimensions ({n}, {n})." ) residuals: NDArrayFloat = (pred.T - obs).T # still has shape (N_obs, N_e) return 0.5 * np.sum(residuals * cov_obs.solve(residuals), axis=0)
[docs] def inflate_ensemble_around_its_mean( ensemble: NDArrayFloat, inflation_factor: float ) -> NDArrayFloat: r""" Inflate the given parameter ensemble around its mean. .. math:: m^{l+1}_{j} \leftarrow r^{l+1}\left(m^{l+1}_{j} - \frac{1}{N_{e}} \sum_{j}^{N_{e}}m^{l+1}_{j}\right) + \frac{1}{N_{e}}\sum_{j}^{N_{e}}m^{l+1}_{j} Parameters ---------- ensemble: NDArrayFloat Ensemble of realization with diemnsions (:math:`N_{m}, N_{e}`). Returns ------- NDArrayFloat The inflated ensemble. """ if not inflation_factor == 1.0: return inflation_factor * ( ensemble - np.mean(ensemble, axis=1, keepdims=True) ) + np.mean(ensemble, axis=1, keepdims=True) return ensemble
[docs] def check_nans_in_predictions(d_pred: NDArrayFloat, assimilation_step: int) -> None: """ Check and raise an exception if there is any NaNs in the input predictions array. Parameters ---------- d_pred : NDArrayFloat Input prediction vector(s). assimilation_step : int Assimilation step index. 0 means before the first assimilation. Raises ------ Exception Raised if NaNs are found. It indicates which ensemble members have incorrect predictions, and at which assimilaiton step. """ if not np.isnan(d_pred).any(): return # indices of members for which nan have been found error_indices: List[int] = sorted(set(np.where(np.isnan(d_pred))[1])) if assimilation_step == 0: msg: str = "with the initial ensemble predictions " else: msg = f" after assimilation step {assimilation_step}" raise Exception( f"Something went wrong {msg} -> NaN values" f" are found in predictions for members {[int(e) for e in error_indices]} !" )