Source code for skchange.costs._empirical_distribution_cost

import numpy as np
from numpy.typing import ArrayLike

from ..costs.base import BaseCost
from ..utils.numba import njit, numba_available
from ..utils.numba.general import compute_finite_difference_derivatives
from ..utils.numba.stats import col_cumsum
from ..utils.validation.parameters import check_larger_than


def make_approximate_mle_edf_cost_quantile_points(
    X: np.ndarray, num_quantiles: int
) -> tuple[np.ndarray, np.ndarray]:
    """
    Compute transformed quantile points for approximating the EDF integral.

    Parameters
    ----------
    X : np.ndarray
        Input data array, shape (n_samples, n_features).
    num_quantiles : int
        Number of quantiles to use for approximation.

    Returns
    -------
    quantile_points : np.ndarray
        Quantile points for each feature, shape (num_quantiles, n_features).
    quantile_values : np.ndarray
        Quantile values for each feature, shape (num_quantiles, n_features).
    """
    n_samples = X.shape[0]

    integrated_edf_scaling = -np.log(2 * n_samples - 1)
    quantiles_range = np.arange(1, num_quantiles + 1)
    integration_quantiles: np.ndarray = 1.0 / (
        1
        + np.exp(
            integrated_edf_scaling * ((2 * quantiles_range - 1) / num_quantiles - 1)
        )
    )

    quantile_points = np.quantile(X, integration_quantiles, axis=0)

    # Tile out the quantile values to match the number of columns in X:
    quantile_values = np.tile(integration_quantiles.reshape(-1, 1), (1, X.shape[1]))

    return quantile_points, quantile_values


@njit
def make_cumulative_edf_cache(
    xs: np.ndarray, quantile_points: np.ndarray
) -> np.ndarray:
    """
    Create a cache for cumulative empirical distribution function (EDF) values.

    Parameters
    ----------
    xs : np.ndarray
        Input data array, shape (n_samples,).
    quantile_points : np.ndarray
        Quantile values at which to compute the cumulative counts.
        shape: (num_quantiles,).

    Returns
    -------
    cumulative_edf_quantiles : np.ndarray
        2D array of cumulative counts for each quantile.
        Shape: (n_samples + 1, num_quantiles).
    """
    lte_quantile_point_mask = (xs[:, None] < quantile_points[None, :]).astype(
        np.float64
    )

    # Add 0.5 to the count for samples equal to the quantile values:
    lte_quantile_point_mask += 0.5 * (xs[:, None] == quantile_points[None, :])

    cumulative_edf_quantiles = col_cumsum(lte_quantile_point_mask, init_zero=True)

    return cumulative_edf_quantiles


@njit
def make_fixed_cdf_cost_quantile_weights(
    fixed_quantiles: np.ndarray,
    quantile_points: np.ndarray,
) -> np.ndarray:
    """
    Compute quantile integration weights for fixed CDF cost.

    Parameters
    ----------
    fixed_quantiles : np.ndarray
        Quantiles at which to compute the EDF, shape (num_quantiles,).
    quantile_points : np.ndarray
        Pre-image of the fixed quantiles, shape (num_quantiles,).

    Returns
    -------
    quantile_integration_weights : np.ndarray
        Quantile integration weights for the fixed CDF cost, shape (num_quantiles,).
    """
    if len(fixed_quantiles) < 3:
        raise ValueError("At least three fixed quantile values are required.")

    reciprocal_fixed_cdf_weights = 1.0 / (fixed_quantiles * (1.0 - fixed_quantiles))
    fixed_quantile_derivatives = compute_finite_difference_derivatives(
        ts=quantile_points,
        ys=fixed_quantiles,
    )
    quantile_integration_weights = (
        reciprocal_fixed_cdf_weights * fixed_quantile_derivatives
    )

    return quantile_integration_weights


@njit
def numba_fixed_cdf_cost_cached_edf(
    cumulative_edf_quantiles: np.ndarray,
    segment_starts: np.ndarray,
    segment_ends: np.ndarray,
    log_fixed_quantiles: np.ndarray,
    log_one_minus_fixed_quantiles: np.ndarray,
    quantile_weights: np.ndarray,
    out_segment_costs: np.ndarray | None = None,
) -> np.ndarray:
    """Compute the empirical distribution cost for segments using a reference CDF.

    Parameters
    ----------
    cumulative_edf_quantiles : np.ndarray
        Cumulative EDF values, shape (n_samples + 1, num_quantiles).
    segment_starts : np.ndarray
        Start indices of segments, shape (n_segments,).
    segment_ends : np.ndarray
        End indices of segments, shape (n_segments,).
    log_fixed_quantiles : np.ndarray
        Logarithm of fixed quantiles, shape (num_quantiles,).
    log_one_minus_fixed_quantiles : np.ndarray
        Logarithm of one minus fixed quantiles, shape (num_quantiles,).
    quantile_weights : np.ndarray
        Quantile integration weights, shape (num_quantiles,).
    out_segment_costs : np.ndarray or None, optional
        Output array for costs, shape (n_segments,). If None, a new array is created.

    Returns
    -------
    out_segment_costs : np.ndarray
        Array of empirical distribution costs for each segment, shape (n_segments,).
    """
    num_quantiles = cumulative_edf_quantiles.shape[1]
    segment_quantiles = np.zeros(num_quantiles, dtype=np.float64)
    one_minus_segment_quantiles = np.zeros(num_quantiles, dtype=np.float64)

    if out_segment_costs is None:
        out_segment_costs = np.zeros(len(segment_starts), dtype=np.float64)

    for i, (segment_start, segment_end) in enumerate(zip(segment_starts, segment_ends)):
        segment_length = segment_end - segment_start

        # Shifted by 1 to account for the row of zeros at the start:
        segment_quantiles[:] = cumulative_edf_quantiles[segment_end, :]
        segment_quantiles -= cumulative_edf_quantiles[segment_start, :]
        segment_quantiles /= float(segment_length)

        one_minus_segment_quantiles[:] = 1.0 - segment_quantiles[:]

        integrated_ll_at_fixed_cdf = segment_length * (
            np.sum(
                (
                    segment_quantiles * log_fixed_quantiles
                    + one_minus_segment_quantiles * log_one_minus_fixed_quantiles
                )
                * quantile_weights
            )
        )

        # The cost equals twice the negative integrated log-likelihood:
        out_segment_costs[i] = -2.0 * integrated_ll_at_fixed_cdf

    return out_segment_costs


@njit
def numpy_fixed_cdf_cost_cached_edf(
    cumulative_edf_quantiles: np.ndarray,
    segment_starts: np.ndarray,
    segment_ends: np.ndarray,
    log_fixed_quantiles: np.ndarray,
    log_one_minus_fixed_quantiles: np.ndarray,
    quantile_weights: np.ndarray,
    out_segment_costs: np.ndarray | None = None,
) -> np.ndarray:
    """Compute the empirical distribution cost for segments using a reference CDF.

    Parameters
    ----------
    cumulative_edf_quantiles : np.ndarray
        Cumulative EDF values, shape (n_samples + 1, num_quantiles).
    segment_starts : np.ndarray
        Start indices of segments, shape (n_segments,).
    segment_ends : np.ndarray
        End indices of segments, shape (n_segments,).
    log_fixed_quantiles : np.ndarray
        Logarithm of fixed quantiles, shape (num_quantiles,).
    log_one_minus_fixed_quantiles : np.ndarray
        Logarithm of one minus fixed quantiles, shape (num_quantiles,).
    quantile_weights : np.ndarray
        Quantile integration weights, shape (num_quantiles,).
    out_segment_costs : np.ndarray or None, optional
        Output array for costs, shape (n_segments,). If None, a new array is created.

    Returns
    -------
    out_segment_costs : np.ndarray
        Array of empirical distribution costs for each segment, shape (n_segments,).
    """
    if out_segment_costs is None:
        out_segment_costs = np.zeros(len(segment_starts), dtype=np.float64)

    segment_edfs_at_quantiles = (
        cumulative_edf_quantiles[segment_ends, :]
        - cumulative_edf_quantiles[segment_starts, :]
    ) / (segment_ends - segment_starts).reshape(-1, 1)
    one_minus_segment_edfs_at_quantiles = 1.0 - segment_edfs_at_quantiles

    integrated_lls_at_fixed_cdf = (segment_ends - segment_starts) * np.sum(
        (
            segment_edfs_at_quantiles * log_fixed_quantiles
            + one_minus_segment_edfs_at_quantiles * log_one_minus_fixed_quantiles
        )
        * quantile_weights[None, :],
        axis=1,
    )

    out_segment_costs[:] = -2.0 * integrated_lls_at_fixed_cdf

    return out_segment_costs


def numpy_approximate_mle_edf_cost_cached_edf(
    cumulative_edf_quantiles: np.ndarray,
    segment_starts: np.ndarray,
    segment_ends: np.ndarray,
    out_segment_costs: np.ndarray | None = None,
) -> np.ndarray:
    """Compute approximate empirical distribution cost from cumulative EDF quantiles.

    Parameters
    ----------
    cumulative_edf_quantiles : np.ndarray
        Cumulative EDF values, shape (n_samples + 1, num_quantiles).
    segment_starts : np.ndarray
        Start indices of segments, shape (n_segments,).
    segment_ends : np.ndarray
        End indices of segments, shape (n_segments,).
    out_segment_costs : np.ndarray or None, optional
        Output array for costs, shape (n_segments,). If None, a new array is created.

    Returns
    -------
    out_segment_costs : np.ndarray
        Array of empirical distribution costs for each segment, shape (n_segments,).
    """
    n_samples, num_quantiles = cumulative_edf_quantiles.shape

    if out_segment_costs is None:
        out_segment_costs = np.zeros(len(segment_starts), dtype=np.float64)

    # Constant scaling term from the approximation of the integrated log-likelihood:
    edf_integration_scale = -np.log(2 * n_samples - 1)

    segment_edfs_at_quantiles = (
        cumulative_edf_quantiles[segment_ends, :]
        - cumulative_edf_quantiles[segment_starts, :]
    ) / (segment_ends - segment_starts).reshape(-1, 1)

    # Clip quantiles to within (0, 1) to avoid log(0) issues:
    np.clip(segment_edfs_at_quantiles, 1e-10, 1 - 1e-10, segment_edfs_at_quantiles)
    one_minus_segment_edfs_at_quantiles = 1.0 - segment_edfs_at_quantiles

    segments_ll_at_mle = np.sum(
        segment_edfs_at_quantiles * np.log(segment_edfs_at_quantiles)
        + one_minus_segment_edfs_at_quantiles
        * np.log(one_minus_segment_edfs_at_quantiles),
        axis=1,
    )
    segments_ll_at_mle *= (-2.0 * edf_integration_scale / num_quantiles) * (
        segment_ends - segment_starts
    )
    out_segment_costs[:] = -2.0 * segments_ll_at_mle[:]

    return out_segment_costs


@njit(fastmath=True)
def numba_approximate_mle_edf_cost_cached_edf(
    cumulative_edf_quantiles: np.ndarray,
    segment_starts: np.ndarray,
    segment_ends: np.ndarray,
    out_segment_costs: np.ndarray | None = None,
) -> np.ndarray:
    """Compute approximate empirical distribution cost from cumulative EDF quantiles.

    Parameters
    ----------
    cumulative_edf_quantiles : np.ndarray
        Cumulative EDF values, shape (n_samples + 1, num_quantiles).
    segment_starts : np.ndarray
        Start indices of segments, shape (n_segments,).
    segment_ends : np.ndarray
        End indices of segments, shape (n_segments,).
    out_segment_costs : np.ndarray or None, optional
        Output array for costs, shape (n_segments,). If None, a new array is created.

    Returns
    -------
    out_segment_costs : np.ndarray
        Array of empirical distribution costs for each segment, shape (n_segments,).
    """
    n_samples, num_quantiles = cumulative_edf_quantiles.shape
    segment_edf_at_quantiles = np.zeros(num_quantiles, dtype=np.float64)

    if out_segment_costs is None:
        out_segment_costs = np.zeros(len(segment_starts), dtype=np.float64)

    # Constant scaling term from the approximation of the integrated log-likelihood:
    edf_integration_scale = -np.log(2 * n_samples - 1)

    # Iterate over each segment defined by the starts and ends:
    for i, (segment_start, segment_end) in enumerate(zip(segment_starts, segment_ends)):
        segment_length = segment_end - segment_start

        # Shifted by 1 to account for the row of zeros at the start:
        segment_edf_at_quantiles[:] = cumulative_edf_quantiles[segment_end, :]
        segment_edf_at_quantiles[:] -= cumulative_edf_quantiles[segment_start, :]
        segment_edf_at_quantiles[:] /= float(segment_length)

        ### Begin computing integrated log-likelihood for the segment ###
        segment_ll_at_mle = 0.0
        for quantile in segment_edf_at_quantiles:
            if quantile > 1.0e-10:
                segment_ll_at_mle += quantile * np.log(quantile)
            one_minus_quantile = 1.0 - quantile
            if one_minus_quantile > 1.0e-10:
                segment_ll_at_mle += one_minus_quantile * np.log(one_minus_quantile)

        segment_ll_at_mle *= (
            -2.0 * edf_integration_scale / num_quantiles
        ) * segment_length
        ### Done computing integrated log-likelihood for the segment ###

        # The cost equals twice the negative integrated log-likelihood:
        out_segment_costs[i] = (-2.0) * segment_ll_at_mle

    return out_segment_costs


[docs] class EmpiricalDistributionCost(BaseCost): """ Empirical Distribution Cost. Computationally efficient approximate empirical distribution cost [1]_. Uses the integrated log-likelihood of the empirical cumulative distribution function (EDF) to evaluate the cost for each segment defined by the cuts, in a non-parametric way. Parameters ---------- param : any, optional (default=None) If None, the cost is evaluated on the empirical CDF estimate, i.e. the maximum likelihood estimate of the CDF. If not None, the interval empirical distributions are evaluated against the provided CDF. num_approximation_quantiles : int or None, optional (default=None) Number of quantiles to use for approximating the empirical distribution. If None, it will be set to `floor(4 * log(n_samples))` during fitting. References ---------- .. [1] Haynes, K., Fearnhead, P. & Eckley, I.A. A computationally efficient nonparametric approach for changepoint detection. Stat Comput 27, 1293-1305 (2017). https://doi.org/10.1007/s11222-016-9687-5 """ _tags = { "authors": ["johannvk"], "maintainers": "johannvk", "distribution_type": "None", "is_conditional": False, "is_aggregated": False, "supports_fixed_param": True, } def __init__( self, param: tuple[ArrayLike, ArrayLike] | None = None, num_approximation_quantiles: int | None = None, ): # Initialize the base class super().__init__(param) self.num_approximation_quantiles = num_approximation_quantiles check_larger_than( min_value=3, value=self.num_approximation_quantiles, name="num_approximation_quantiles", allow_none=True, ) self.num_quantiles_: int = 0 # Will be set during fitting self.quantile_points_: np.ndarray | None = None self.quantile_values_: np.ndarray | None = None # Cache for empirical distribution function self._edf_cache: list[np.ndarray] | None = None # Storage for fixed samples and quantiles: self._quantile_weights: np.ndarray | None = None self._one_minus_fixed_quantiles: np.ndarray | None = None self._log_fixed_quantiles: np.ndarray | None = None self._log_one_minus_fixed_quantiles: np.ndarray | None = None def _fit(self, X: np.ndarray, y=None): """ Fit the cost. Precomputes cumulative empirical distribution function if caching is enabled. Parameters ---------- X : np.ndarray Data to evaluate. Must be a 2D array. y : None, optional Ignored. Included for API consistency by convention. Returns ------- self : EmpiricalDistributionCost Fitted cost instance. """ self._param = self._check_param(self.param, X) n_samples, n_columns = X.shape if self._param is None: # Calculate number of quantiles to use if not provided: if self.num_approximation_quantiles is None: self.num_quantiles_ = int(np.ceil(4 * np.log(n_samples))) else: self.num_quantiles_ = self.num_approximation_quantiles self.quantile_points_, self.quantile_values_ = ( make_approximate_mle_edf_cost_quantile_points( X, num_quantiles=self.num_quantiles_ ) ) else: self.quantile_points_, self.quantile_values_ = self._param self.num_quantiles_ = self.quantile_values_.shape[0] self._log_fixed_quantiles = np.log(self.quantile_values_) self._log_one_minus_fixed_quantiles = np.log(1.0 - self.quantile_values_) # Store the quantile integration weights for each data column: self._quantile_weights = np.zeros(self.quantile_values_.shape) for col in range(n_columns): self._quantile_weights[:, col] = make_fixed_cdf_cost_quantile_weights( self.quantile_values_[:, col], self.quantile_points_[:, col], ) self._edf_cache = [ make_cumulative_edf_cache( self._X[:, col], quantile_points=self.quantile_points_[:, col] ) for col in range(n_columns) ] return self def _check_fixed_param(self, param: tuple[np.ndarray, np.ndarray], X: np.ndarray): """ Validate and standardize fixed samples and quantiles. Parameters ---------- param : tuple[np.ndarray, np.ndarray] Tuple of (fixed_samples, fixed_cdf_quantiles). X : np.ndarray Data to evaluate. Used to match shapes. Returns ------- tuple[np.ndarray, np.ndarray] Standardized fixed_samples and fixed_cdf_quantiles. """ fixed_samples, fixed_cdf_quantiles = param if not (fixed_samples.shape == fixed_cdf_quantiles.shape): raise ValueError("All samples must have a corresponding fixed quantile.") if fixed_cdf_quantiles.ndim > 2: raise ValueError( "Fixed samples and quantiles must be 1D or 2D arrays, " f"but got shapes {fixed_samples.shape} and {fixed_cdf_quantiles.shape}." ) # Standardize the shapes of fixed samples and quantiles: if fixed_samples.ndim == 1: fixed_samples = fixed_samples.reshape(-1, 1) if fixed_cdf_quantiles.ndim == 1: fixed_cdf_quantiles = fixed_cdf_quantiles.reshape(-1, 1) if not np.all(np.diff(fixed_samples, axis=0) > 0): raise ValueError("Fixed samples must be sorted, and strictly increasing.") if not np.all(np.diff(fixed_cdf_quantiles, axis=0) > 0): raise ValueError( "Fixed CDF quantiles must be sorted, and strictly increasing." ) if not ( np.all(0 <= fixed_cdf_quantiles) and np.all(fixed_cdf_quantiles <= 1.0) ): raise ValueError( "Fixed quantiles must be within the closed interval [0, 1]" ) # Clip fixed quantiles to avoid log(0) issues: fixed_cdf_quantiles = np.clip(fixed_cdf_quantiles, 1.0e-10, 1.0 - 1.0e-10) # Repeat the fixed samples and quantiles to match the number of columns in X: if fixed_samples.shape[1] == 1 and X.shape[1] > 1: fixed_samples = np.tile(fixed_samples, (1, X.shape[1])) fixed_cdf_quantiles = np.tile(fixed_cdf_quantiles, (1, X.shape[1])) return fixed_samples, fixed_cdf_quantiles def _evaluate_optim_param(self, starts: np.ndarray, ends: np.ndarray) -> np.ndarray: """ Evaluate the cost using empirical distribution. Parameters ---------- starts : np.ndarray Start indices of the intervals (inclusive). ends : np.ndarray End indices of the intervals (exclusive). Returns ------- costs : np.ndarray A 2D array of costs. One row per interval, one column per variable. """ n_intervals = len(starts) n_cols = self._X.shape[1] costs = np.zeros((n_intervals, n_cols)) if numba_available: for col in range(n_cols): numba_approximate_mle_edf_cost_cached_edf( self._edf_cache[col], segment_starts=starts, segment_ends=ends, out_segment_costs=costs[:, col], ) else: for col in range(n_cols): numpy_approximate_mle_edf_cost_cached_edf( self._edf_cache[col], segment_starts=starts, segment_ends=ends, out_segment_costs=costs[:, col], ) return costs def _evaluate_fixed_param(self, starts: np.ndarray, ends: np.ndarray): """ Evaluate the cost using fixed quantile parameters. Parameters ---------- starts : np.ndarray Start indices of the intervals (inclusive). ends : np.ndarray End indices of the intervals (exclusive). Returns ------- costs : np.ndarray A 2D array of costs. One row per interval, one column per variable. """ n_intervals = len(starts) n_cols = self._X.shape[1] costs = np.zeros((n_intervals, n_cols)) if numba_available: for col in range(n_cols): costs[:, col] = numba_fixed_cdf_cost_cached_edf( cumulative_edf_quantiles=self._edf_cache[col], segment_starts=starts, segment_ends=ends, log_fixed_quantiles=self._log_fixed_quantiles[:, col], log_one_minus_fixed_quantiles=self._log_one_minus_fixed_quantiles[ :, col ], quantile_weights=self._quantile_weights[:, col], out_segment_costs=costs[:, col], ) else: for col in range(n_cols): costs[:, col] = numpy_fixed_cdf_cost_cached_edf( cumulative_edf_quantiles=self._edf_cache[col], segment_starts=starts, segment_ends=ends, log_fixed_quantiles=self._log_fixed_quantiles[:, col], log_one_minus_fixed_quantiles=self._log_one_minus_fixed_quantiles[ :, col ], quantile_weights=self._quantile_weights[:, col], out_segment_costs=costs[:, col], ) return costs @property def min_size(self) -> int: """ Minimum size of the interval to evaluate. Returns ------- int The minimum valid size of an interval to evaluate. """ if self.is_fitted: return self.num_quantiles_ if self.num_approximation_quantiles is not None: return self.num_approximation_quantiles else: return 10
[docs] def get_model_size(self, p: int) -> int: """ Get the number of parameters in the cost function. Parameters ---------- p : int Number of variables in the data. Returns ------- int Number of parameters in the cost function. """ # For the EDF, the model size depends on the number of quantiles? if self.is_fitted: return 2 * self.num_quantiles_ elif self.num_approximation_quantiles is not None: return 2 * self.num_approximation_quantiles else: raise ValueError( "The cost is not fitted, and no number of quantiles was specified " "for the approximation of the ECDF at initialization." )
[docs] @classmethod def get_test_params(cls, parameter_set="default"): """ Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return. Returns ------- params : dict or list of dict Parameters to create testing instances of the class. """ # Fixed Standard Normal Quantiles: # fmt: off fixed_samples = np.array( [ -2.32634787, -1.95996398, -1.64485363, -1.03643339, -0.52440051, 0.0, 0.52440051, 1.03643339, 1.64485363, 1.95996398, 2.32634787 ] ) # fmt: on fixed_quantiles = np.array( [0.01, 0.025, 0.05, 0.15, 0.3, 0.5, 0.7, 0.85, 0.95, 0.975, 0.99] ) # Define two different parameter sets for testing params1 = {"param": None, "num_approximation_quantiles": 10} params2 = { "param": (fixed_samples, fixed_quantiles), "num_approximation_quantiles": len(fixed_quantiles), } return [params1, params2]