CAPA#

class CAPA(segment_saving: BaseIntervalScorer | None = None, point_saving: BaseIntervalScorer | None = None, segment_penalty: ArrayLike | float | None = None, point_penalty: ArrayLike | float | None = None, penalty_scale: float = 1.0, min_segment_length: int | None = None, max_segment_length: int | None = None, include_point_anomalies: bool = False)[source][source]#

Collective and Point Anomaly (CAPA) detection algorithm.

An efficient implementation of the CAPA family of algorithms for anomaly detection [1] [2]. Detects contiguous anomalous segments (collective anomalies) and isolated anomalous samples (point anomalies) via a dynamic programming formulation based on a penalised saving.

Standard usage is to pass unpenalised savings (or None) and configure the penalty via segment_penalty / point_penalty / penalty_scale / agg. Already-penalised scorers are also accepted; in that case the scorer owns its own penalty/aggregation and the corresponding detector parameters are ignored.

Parameters:
segment_savingBaseIntervalScorer or None, default=None

Saving for segment anomaly detection. Must be an instance of BaseIntervalScorer with score_type="saving". If None, defaults to L2Saving.

point_savingBaseIntervalScorer or None, default=None

Saving for point anomaly detection. Must be an instance of BaseIntervalScorer with score_type="saving" and min_size == 1. If None, defaults to a clone of segment_saving when segment_saving.min_size == 1 (and segment_saving is not itself penalised), otherwise L1Saving.

segment_penaltyfloat, array-like of shape (n_features,) or None, default=None

Penalty subtracted from the aggregated segment saving; a candidate is accepted only when the result is positive.

  • float: scalar penalty (summed saving across features).

  • array-like of length n_features, non-decreasing: element i is the penalty for i+1 jointly affected features; CAPA picks the k largest feature savings maximising sum(top_k) - penalty[k-1] (handles sparse anomalies).

  • None: uses segment_saving.get_default_penalty().

Ignored when segment_saving is already penalised.

point_penaltyfloat, array-like of shape (n_features,) or None, default=None

Same semantics as segment_penalty but for point anomalies. Defaults to 2 * linear_chi2_penalty(n_samples, n_features) — twice the segment default — to prioritise segment anomalies over isolated points. Ignored when point_saving is already penalised.

penalty_scalefloat, default=1.0

Positive multiplier applied to both segment_penalty and point_penalty. A single tuning knob that preserves the shape of array penalties.

min_segment_lengthint or None, default=None

Minimum segment anomaly length. Defaults to 2 * segment_saving.min_size — a finite-sample safety floor against spurious short segments from scale-estimating savings (e.g. Gaussian, Laplace). Must be at least segment_saving.min_size.

max_segment_lengthint or None, default=None

Maximum number of samples in a segment anomaly. Defaults to n_samples // 2 when None, with a minimum of min_segment_length.

include_point_anomaliesbool, default=False

If True, detected point anomalies are included alongside segment anomalies in the output of predict, predict_segment_anomalies, predict_changepoints, and predict_scores treated as single-sample intervals. Point anomalies are always available via predict_all regardless of this setting.

Attributes:
segment_saving_BaseIntervalScorer

Fitted segment saving scorer (the unpenalised scorer, or the user-supplied penalised scorer).

point_saving_BaseIntervalScorer

Fitted point saving scorer.

segment_penalty_float, np.ndarray or None

Effective segment penalty used at detection time (resolved base penalty multiplied by penalty_scale). None when segment_saving is inherently penalised.

point_penalty_float, np.ndarray or None

Effective point penalty. None when point_saving is inherently penalised.

Methods

fit(X[, y])

Fit both savings to training data.

fit_predict(X[, y])

Fit to data, then predict per-sample segment labels.

get_metadata_routing()

Get metadata routing of this object.

get_params([deep])

Get parameters for this estimator.

predict(X)

Detect anomalies, returning per-sample segment labels.

predict_all(X)

Detect anomalies, returning all outputs in a single pass.

predict_changepoints(X)

Return sorted anomaly boundary indices.

predict_scores(X[, return_index])

Return the penalised savings at every interval the CAPA DP evaluated.

predict_segment_anomalies(X)

Detect anomalies as [start, end) intervals.

set_params(**params)

Set the parameters of this estimator.

References

[1]

Fisch, A. T., Eckley, I. A., & Fearnhead, P. (2022). A linear time method for the detection of collective and point anomalies. Statistical Analysis and DataMining: The ASA Data Science Journal, 15(4), 494-508.

[2]

Fisch, A. T., Eckley, I. A., & Fearnhead, P. (2022). Subset multivariate collective and point anomaly detection. Journal of Computational and Graphical Statistics, 31(2), 574-585.

Examples

>>> import numpy as np
>>> from skchange.new_api.detectors import CAPA
>>> rng = np.random.default_rng(2)
>>> X = np.concatenate([rng.normal(0, 1, (100, 1)),
...                     rng.normal(10, 1, (20, 1)),
...                     rng.normal(0, 1, (100, 1))])
>>> detector = CAPA()
>>> detector.fit(X).predict_segment_anomalies(X)
array([[100, 120]])
fit(X: ArrayLike, y: ArrayLike | None = None) Self[source][source]#

Fit both savings to training data.

Parameters:
XArrayLike of shape (n_samples, n_features)

Training time series data.

yNone

Ignored.

Returns:
selfCAPA

Fitted detector.

predict_all(X: ArrayLike) dict[source][source]#

Detect anomalies, returning all outputs in a single pass.

This is the primary computation method. All other predict_* methods derive their results from this one.

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series to analyse for anomalies.

Returns:
resultdict with keys:
"segment_anomalies"np.ndarray of shape (n_segment_anomalies, 2)

Each row is [start, end) of a contiguous segment anomaly.

"point_anomalies"np.ndarray of shape (n_point_anomalies,)

Sorted sample indices of point anomalies.

"cumulative_optimal_savings"np.ndarray of shape (n_samples,)

Cumulative optimal savings from the dynamic programme.

"segment_anomaly_features"list of np.ndarray or None

One array per detected segment anomaly with the 0-based feature indices identified as changed, ordered from strongest to weakest evidence. None when segment_saving is penalised or aggregated, or when segment_penalty_ is scalar (i.e. no per-feature attribution is possible).

"point_anomaly_features"list of np.ndarray or None

Same as above, but for point anomalies (driven by point_saving and point_penalty_).

"segment_savings", "segment_starts", "segment_ends"np.ndarray

Penalised saving and [start, end) index for every segment interval the DP evaluated. With pruning, the evaluated set depends on segment_penalty.

"point_savings", "point_indices"np.ndarray

Penalised saving and sample index t for every evaluated point interval [t, t+1).

predict_segment_anomalies(X: ArrayLike) ndarray[source][source]#

Detect anomalies as [start, end) intervals.

When include_point_anomalies=True, point anomalies are appended as single-sample intervals and the result is sorted by start index. Use predict_all to access segment and point anomalies separately.

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series to analyse for anomalies.

Returns:
anomaliesnp.ndarray of shape (n_anomalies, 2)

Each row is [start, end) of a detected anomaly, sorted by start.

predict_changepoints(X: ArrayLike) ndarray[source][source]#

Return sorted anomaly boundary indices.

Each anomaly interval [start, end) contributes two changepoints: start (regime transitions to anomalous) and end (back to normal).

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series to analyse.

Returns:
changepointsnp.ndarray of shape (n_changepoints,)

Sorted unique inner boundary indices of detected anomalies. When include_point_anomalies=True, point anomaly indices are also included. Use predict_all to access them separately.

predict_scores(X: ArrayLike, return_index: bool = False) ndarray | tuple[ndarray, dict[str, ndarray]][source][source]#

Return the penalised savings at every interval the CAPA DP evaluated.

Concatenates the penalised savings for every [start, end) segment interval and, when include_point_anomalies=True, every single-sample point interval that the dynamic programme actually visited. With pruning enabled, the set of evaluated segment intervals depends on the current segment_penalty_; use skchange.new_api.tuning.unpenalised_scores (with the penalty zeroed) for an unpruned grid.

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series to evaluate.

return_indexbool, default=False

If True, also return a dict locating each score on the time axis. See the Returns section for the keys.

Returns:
scoresnp.ndarray of shape (n_evals,)

Penalised savings, segment intervals first then (if include_point_anomalies=True) point intervals. Returned alone when return_index=False.

indexdict, optional

Only returned when return_index=True. Contains:

  • "starts" : np.ndarray of shape (n_evals,) Start index of each evaluated interval. For point savings the start equals the sample index t.

  • "ends" : np.ndarray of shape (n_evals,) End index of each evaluated interval. For point savings the end equals t + 1.

predict(X: ArrayLike) ndarray[source][source]#

Detect anomalies, returning per-sample segment labels.

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series to analyse.

Returns:
labelsnp.ndarray of shape (n_samples,)

Integer labels: 0 for normal samples, 1, ..., K for each detected anomaly in chronological order. When include_point_anomalies=True, point anomalies are included as single-sample intervals and numbered together with segment anomalies.

fit_predict(X, y: ArrayLike | None = None, **fit_params) ndarray[source]#

Fit to data, then predict per-sample segment labels.

Equivalent to calling fit(X, y).predict(X).

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series data.

yNone

Ignored. Exists for sklearn API compatibility.

**fit_paramsdict

Additional parameters passed to fit().

Returns:
labelsnp.ndarray of shape (n_samples,)

Dense integer segment labels, one per sample.

get_metadata_routing()[source]#

Get metadata routing of this object.

Please check User Guide on how the routing mechanism works.

Returns:
routingMetadataRequest

A MetadataRequest encapsulating routing information.

get_params(deep=True)[source]#

Get parameters for this estimator.

Parameters:
deepbool, default=True

If True, will return the parameters for this estimator and contained subobjects that are estimators.

Returns:
paramsdict

Parameter names mapped to their values.

set_params(**params)[source]#

Set the parameters of this estimator.

The method works on simple estimators as well as on nested objects (such as Pipeline). The latter have parameters of the form <component>__<parameter> so that it’s possible to update each component of a nested object.

Parameters:
**paramsdict

Estimator parameters.

Returns:
selfestimator instance

Estimator instance.