PELT#

class PELT(cost: BaseCost | None = None, penalty: float | None = None, min_segment_length: int | None = None, prune: bool = True, split_cost: float = 0.0, pruning_margin: float = 0.0, step_size: int = 1)[source][source]#

Pruned exact linear time (PELT) changepoint detection.

Implements the PELT algorithm [1] for changepoint detection. This method solves the penalized optimal partitioning problem, with pruning of the admissible starts set applied to improve performance.

One can specify a minimum segment length for the partitions considered when detecting changepoints through the min_segment_length parameter, and when the minimum segment length is greater than one we use deferred pruning of the admissible starts [2] to ensure exact solutions.

Additionally, one can specify a step size through the step_size parameter, which coarsens the search space for changepoints, allowing for faster detection at the cost of change point location granularity.

Parameters:
costBaseCost or None, default=None

Cost to use for the changepoint detection. Must be a BaseCost instance with score_type='cost'. If None, defaults to L2Cost().

penaltyfloat or None, default=None

Penalty incurred for each added changepoint. Must be non-negative. If None, defaults to cost_.get_default_penalty() after fitting (BIC-based penalty). To sweep penalties for tuning, sweep this parameter directly (e.g. GridSearchCV over a log-spaced grid).

min_segment_lengthint or None, default=None

Minimum number of samples in a segment. Must be at least cost.min_size. If None, defaults to 2 * cost.min_size after fitting. The 2x factor provides a finite-sample safety floor that prevents spurious short segments from scale-estimating costs (e.g. Gaussian, Laplace).

prunebool, default=True

If False, drop the pruning step. Reverts to optimal partitioning. Can be useful for debugging and testing.

split_costfloat, default=0.0

The cost of splitting a segment, to ensure that cost(X[t:p]) + cost(X[p:(s+1)]) + split_cost <= cost(X[t:(s+1)]), for all possible splits, 0 <= t < p < s <= len(X) - 1. By default set to 0.0, which is sufficient for log likelihood cost functions to satisfy the above inequality.

pruning_marginfloat, default=0.0

The pruning margin to use. By default set to zero. This is used to reduce pruning of the admissible starts set. Can be useful if the cost function is imprecise, i.e. based on solving an optimization problem with large tolerance.

step_sizeint, default=1

Only indices that are multiples of step_size from the start are considered as potential changepoints. Implicitly ensures that min_segment_length >= step_size, but it is an error to specify min_segment_length greater than step_size.

Attributes:
cost_BaseCost

Fitted cost scorer.

penalty_float

Penalty value used (either user-specified or default from cost_).

Methods

fit(X[, y])

Fit the cost to training data.

fit_predict(X[, y])

Fit to data, then predict per-sample segment labels.

get_metadata_routing()

Get metadata routing of this object.

get_params([deep])

Get parameters for this estimator.

predict(X)

Detect changepoints, returning per-sample segment labels.

predict_all(X)

Run PELT and return all outputs in a single pass.

predict_changepoints(X)

Detect changepoints in a time series.

predict_scores(X[, return_index])

Return the unpenalised cost at every interval the PELT DP evaluated.

set_params(**params)

Set the parameters of this estimator.

References

[1]

Killick, R., Fearnhead, P., & Eckley, I. A. (2012). Optimal detection of changepoints with a linear computational cost. Journal of the American Statistical Association, 107(500), 1590-1598.

[2]

Bakka, Kristin Benedicte (2018). Changepoint model selection in Gaussian data by maximization of approximate Bayes Factors with the Pruned Exact Linear Time algorithm. Master’s thesis, Norwegian University of Science and Technology (NTNU). URL: https://ntnuopen.ntnu.no/ntnu-xmlui/handle/11250/2558597.

Examples

>>> import numpy as np
>>> from skchange.new_api.detectors import PELT
>>> rng = np.random.default_rng(2)
>>> X = np.concatenate([rng.normal(0, 1, (100, 1)),
...                     rng.normal(10, 1, (100, 1))])
>>> detector = PELT()
>>> detector.fit(X).predict_changepoints(X)
array([100])
fit(X: ArrayLike, y: ArrayLike | None = None) Self[source][source]#

Fit the cost to training data.

Parameters:
XArrayLike of shape (n_samples, n_features)

Training time series data.

yArrayLike | None, default=None

Ignored.

Returns:
selfPELT

Fitted detector.

predict_all(X: ArrayLike) dict[source][source]#

Run PELT and return all outputs in a single pass.

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series to analyse for changepoints.

Returns:
resultdict with keys:
"changepoints"np.ndarray of shape (n_changepoints,)

Sorted integer indices of detected changepoints.

"cumulative_optimal_costs"np.ndarray of shape (n_samples,)

Cumulative optimal costs.

"previous_changepoints"np.ndarray of shape (n_samples,)

For each sample, the start of the optimal segment ending there.

"pruning_fraction"float

Fraction of candidate starts pruned vs. optimal partitioning.

"interval_starts"np.ndarray

Start indices of all (start, end) intervals the DP evaluated.

"interval_ends"np.ndarray

End indices of all intervals the DP evaluated.

"interval_costs"np.ndarray

Unpenalised (feature-summed) cost at each evaluated interval.

predict_changepoints(X: ArrayLike) ndarray[source][source]#

Detect changepoints in a time series.

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series to analyse for changepoints.

Returns:
changepointsnp.ndarray of shape (n_changepoints,)

Sorted integer indices of detected changepoints.

predict_scores(X: ArrayLike, return_index: bool = False) ndarray | tuple[ndarray, dict[str, ndarray]][source][source]#

Return the unpenalised cost at every interval the PELT DP evaluated.

Runs PELT on X with the fitted state and returns the feature-summed cost value at every (start, end) interval the dynamic programme actually evaluated. With pruning enabled, the set of evaluated intervals depends on the current penalty_; set prune=False (or run via skchange.new_api.tuning.unpenalised_scores with the penalty zeroed) to obtain the full optimal-partitioning grid.

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series to evaluate.

return_indexbool, default=False

If True, also return a dict locating each score on the time axis. See the Returns section for the keys.

Returns:
scoresnp.ndarray of shape (n_intervals,)

Unpenalised cost values, one per evaluated interval. Returned alone when return_index=False.

indexdict, optional

Only returned when return_index=True. Contains:

  • "starts" : np.ndarray of shape (n_intervals,) Start indices of the evaluated intervals.

  • "ends" : np.ndarray of shape (n_intervals,) End indices of the evaluated intervals.

fit_predict(X, y: ArrayLike | None = None, **fit_params) ndarray[source]#

Fit to data, then predict per-sample segment labels.

Equivalent to calling fit(X, y).predict(X).

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series data.

yNone

Ignored. Exists for sklearn API compatibility.

**fit_paramsdict

Additional parameters passed to fit().

Returns:
labelsnp.ndarray of shape (n_samples,)

Dense integer segment labels, one per sample.

get_metadata_routing()[source]#

Get metadata routing of this object.

Please check User Guide on how the routing mechanism works.

Returns:
routingMetadataRequest

A MetadataRequest encapsulating routing information.

get_params(deep=True)[source]#

Get parameters for this estimator.

Parameters:
deepbool, default=True

If True, will return the parameters for this estimator and contained subobjects that are estimators.

Returns:
paramsdict

Parameter names mapped to their values.

predict(X: ArrayLike) ndarray[source]#

Detect changepoints, returning per-sample segment labels.

Default implementation calls predict_changepoints() and converts the changepoint indices to an array with segment labels per sample. Subclasses may override this directly when the algorithm natively produces labels.

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series data.

Returns:
labelsnp.ndarray of shape (n_samples,)

Dense integer segment labels, one per sample. Segment 0 is the first segment, segment 1 the next, and so on.

Examples

>>> labels = detector.fit(X_train).predict(X_test)
>>> labels.shape
(n_samples,)
set_params(**params)[source]#

Set the parameters of this estimator.

The method works on simple estimators as well as on nested objects (such as Pipeline). The latter have parameters of the form <component>__<parameter> so that it’s possible to update each component of a nested object.

Parameters:
**paramsdict

Estimator parameters.

Returns:
selfestimator instance

Estimator instance.