CAPA#
- class CAPA(segment_saving: BaseIntervalScorer | None = None, point_saving: BaseIntervalScorer | None = None, segment_penalty: ArrayLike | float | None = None, point_penalty: ArrayLike | float | None = None, penalty_scale: float = 1.0, min_segment_length: int | None = None, max_segment_length: int | None = None, include_point_anomalies: bool = False)[source][source]#
Collective and Point Anomaly (CAPA) detection algorithm.
An efficient implementation of the CAPA family of algorithms for anomaly detection [1] [2]. Detects contiguous anomalous segments (collective anomalies) and isolated anomalous samples (point anomalies) via a dynamic programming formulation based on a penalised saving.
Standard usage is to pass unpenalised savings (or
None) and configure the penalty viasegment_penalty/point_penalty/penalty_scale/agg. Already-penalised scorers are also accepted; in that case the scorer owns its own penalty/aggregation and the corresponding detector parameters are ignored.- Parameters:
- segment_savingBaseIntervalScorer or None, default=None
Saving for segment anomaly detection. Must be an instance of
BaseIntervalScorerwithscore_type="saving". IfNone, defaults toL2Saving.- point_savingBaseIntervalScorer or None, default=None
Saving for point anomaly detection. Must be an instance of
BaseIntervalScorerwithscore_type="saving"andmin_size == 1. IfNone, defaults to a clone ofsegment_savingwhensegment_saving.min_size == 1(andsegment_savingis not itself penalised), otherwiseL1Saving.- segment_penaltyfloat, array-like of shape (n_features,) or None, default=None
Penalty subtracted from the aggregated segment saving; a candidate is accepted only when the result is positive.
float: scalar penalty (summed saving across features).array-likeof lengthn_features, non-decreasing: elementiis the penalty fori+1jointly affected features; CAPA picks theklargest feature savings maximisingsum(top_k) - penalty[k-1](handles sparse anomalies).None: usessegment_saving.get_default_penalty().
Ignored when
segment_savingis already penalised.- point_penaltyfloat, array-like of shape (n_features,) or None, default=None
Same semantics as
segment_penaltybut for point anomalies. Defaults to2 * linear_chi2_penalty(n_samples, n_features)— twice the segment default — to prioritise segment anomalies over isolated points. Ignored whenpoint_savingis already penalised.- penalty_scalefloat, default=1.0
Positive multiplier applied to both
segment_penaltyandpoint_penalty. A single tuning knob that preserves the shape of array penalties.- min_segment_lengthint or None, default=None
Minimum segment anomaly length. Defaults to
2 * segment_saving.min_size— a finite-sample safety floor against spurious short segments from scale-estimating savings (e.g. Gaussian, Laplace). Must be at leastsegment_saving.min_size.- max_segment_lengthint or None, default=None
Maximum number of samples in a segment anomaly. Defaults to
n_samples // 2whenNone, with a minimum ofmin_segment_length.- include_point_anomaliesbool, default=False
If
True, detected point anomalies are included alongside segment anomalies in the output ofpredict,predict_segment_anomalies,predict_changepoints, andpredict_scorestreated as single-sample intervals. Point anomalies are always available viapredict_allregardless of this setting.
- Attributes:
- segment_saving_BaseIntervalScorer
Fitted segment saving scorer (the unpenalised scorer, or the user-supplied penalised scorer).
- point_saving_BaseIntervalScorer
Fitted point saving scorer.
- segment_penalty_float, np.ndarray or None
Effective segment penalty used at detection time (resolved base penalty multiplied by
penalty_scale).Nonewhensegment_savingis inherently penalised.- point_penalty_float, np.ndarray or None
Effective point penalty.
Nonewhenpoint_savingis inherently penalised.
Methods
fit(X[, y])Fit both savings to training data.
fit_predict(X[, y])Fit to data, then predict per-sample segment labels.
Get metadata routing of this object.
get_params([deep])Get parameters for this estimator.
predict(X)Detect anomalies, returning per-sample segment labels.
predict_all(X)Detect anomalies, returning all outputs in a single pass.
Return sorted anomaly boundary indices.
predict_scores(X[, return_index])Return the penalised savings at every interval the CAPA DP evaluated.
Detect anomalies as
[start, end)intervals.set_params(**params)Set the parameters of this estimator.
References
[1]Fisch, A. T., Eckley, I. A., & Fearnhead, P. (2022). A linear time method for the detection of collective and point anomalies. Statistical Analysis and DataMining: The ASA Data Science Journal, 15(4), 494-508.
[2]Fisch, A. T., Eckley, I. A., & Fearnhead, P. (2022). Subset multivariate collective and point anomaly detection. Journal of Computational and Graphical Statistics, 31(2), 574-585.
Examples
>>> import numpy as np >>> from skchange.new_api.detectors import CAPA >>> rng = np.random.default_rng(2) >>> X = np.concatenate([rng.normal(0, 1, (100, 1)), ... rng.normal(10, 1, (20, 1)), ... rng.normal(0, 1, (100, 1))]) >>> detector = CAPA() >>> detector.fit(X).predict_segment_anomalies(X) array([[100, 120]])
- fit(X: ArrayLike, y: ArrayLike | None = None) Self[source][source]#
Fit both savings to training data.
- Parameters:
- XArrayLike of shape (n_samples, n_features)
Training time series data.
- yNone
Ignored.
- Returns:
- selfCAPA
Fitted detector.
- predict_all(X: ArrayLike) dict[source][source]#
Detect anomalies, returning all outputs in a single pass.
This is the primary computation method. All other
predict_*methods derive their results from this one.- Parameters:
- XArrayLike of shape (n_samples, n_features)
Time series to analyse for anomalies.
- Returns:
- resultdict with keys:
"segment_anomalies"np.ndarray of shape (n_segment_anomalies, 2)Each row is
[start, end)of a contiguous segment anomaly."point_anomalies"np.ndarray of shape (n_point_anomalies,)Sorted sample indices of point anomalies.
"cumulative_optimal_savings"np.ndarray of shape (n_samples,)Cumulative optimal savings from the dynamic programme.
"segment_anomaly_features"list of np.ndarray or NoneOne array per detected segment anomaly with the 0-based feature indices identified as changed, ordered from strongest to weakest evidence.
Nonewhensegment_savingis penalised or aggregated, or whensegment_penalty_is scalar (i.e. no per-feature attribution is possible)."point_anomaly_features"list of np.ndarray or NoneSame as above, but for point anomalies (driven by
point_savingandpoint_penalty_)."segment_savings","segment_starts","segment_ends"np.ndarrayPenalised saving and
[start, end)index for every segment interval the DP evaluated. With pruning, the evaluated set depends onsegment_penalty."point_savings","point_indices"np.ndarrayPenalised saving and sample index
tfor every evaluated point interval[t, t+1).
- predict_segment_anomalies(X: ArrayLike) ndarray[source][source]#
Detect anomalies as
[start, end)intervals.When
include_point_anomalies=True, point anomalies are appended as single-sample intervals and the result is sorted by start index. Usepredict_allto access segment and point anomalies separately.- Parameters:
- XArrayLike of shape (n_samples, n_features)
Time series to analyse for anomalies.
- Returns:
- anomaliesnp.ndarray of shape (n_anomalies, 2)
Each row is
[start, end)of a detected anomaly, sorted by start.
- predict_changepoints(X: ArrayLike) ndarray[source][source]#
Return sorted anomaly boundary indices.
Each anomaly interval
[start, end)contributes two changepoints:start(regime transitions to anomalous) andend(back to normal).- Parameters:
- XArrayLike of shape (n_samples, n_features)
Time series to analyse.
- Returns:
- changepointsnp.ndarray of shape (n_changepoints,)
Sorted unique inner boundary indices of detected anomalies. When
include_point_anomalies=True, point anomaly indices are also included. Usepredict_allto access them separately.
- predict_scores(X: ArrayLike, return_index: bool = False) ndarray | tuple[ndarray, dict[str, ndarray]][source][source]#
Return the penalised savings at every interval the CAPA DP evaluated.
Concatenates the penalised savings for every
[start, end)segment interval and, wheninclude_point_anomalies=True, every single-sample point interval that the dynamic programme actually visited. With pruning enabled, the set of evaluated segment intervals depends on the currentsegment_penalty_; useskchange.new_api.tuning.unpenalised_scores(with the penalty zeroed) for an unpruned grid.- Parameters:
- XArrayLike of shape (n_samples, n_features)
Time series to evaluate.
- return_indexbool, default=False
If
True, also return a dict locating each score on the time axis. See the Returns section for the keys.
- Returns:
- scoresnp.ndarray of shape (n_evals,)
Penalised savings, segment intervals first then (if
include_point_anomalies=True) point intervals. Returned alone whenreturn_index=False.- indexdict, optional
Only returned when
return_index=True. Contains:"starts": np.ndarray of shape (n_evals,) Start index of each evaluated interval. For point savings the start equals the sample indext."ends": np.ndarray of shape (n_evals,) End index of each evaluated interval. For point savings the end equalst + 1.
- predict(X: ArrayLike) ndarray[source][source]#
Detect anomalies, returning per-sample segment labels.
- Parameters:
- XArrayLike of shape (n_samples, n_features)
Time series to analyse.
- Returns:
- labelsnp.ndarray of shape (n_samples,)
Integer labels:
0for normal samples,1, ..., Kfor each detected anomaly in chronological order. Wheninclude_point_anomalies=True, point anomalies are included as single-sample intervals and numbered together with segment anomalies.
- fit_predict(X, y: ArrayLike | None = None, **fit_params) ndarray[source]#
Fit to data, then predict per-sample segment labels.
Equivalent to calling fit(X, y).predict(X).
- Parameters:
- XArrayLike of shape (n_samples, n_features)
Time series data.
- yNone
Ignored. Exists for sklearn API compatibility.
- **fit_paramsdict
Additional parameters passed to fit().
- Returns:
- labelsnp.ndarray of shape (n_samples,)
Dense integer segment labels, one per sample.
- get_metadata_routing()[source]#
Get metadata routing of this object.
Please check User Guide on how the routing mechanism works.
- Returns:
- routingMetadataRequest
A
MetadataRequestencapsulating routing information.
- get_params(deep=True)[source]#
Get parameters for this estimator.
- Parameters:
- deepbool, default=True
If True, will return the parameters for this estimator and contained subobjects that are estimators.
- Returns:
- paramsdict
Parameter names mapped to their values.
- set_params(**params)[source]#
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects (such as
Pipeline). The latter have parameters of the form<component>__<parameter>so that it’s possible to update each component of a nested object.- Parameters:
- **paramsdict
Estimator parameters.
- Returns:
- selfestimator instance
Estimator instance.