CircularBinarySegmentation#

class CircularBinarySegmentation(transient_score: BaseIntervalScorer | None = None, penalty: ArrayLike | float | None = None, penalty_scale: float = 2.0, agg: str = 'sum', min_subinterval_length: int = 5, max_interval_length: int | None = None, growth_factor: float = 1.8)[source][source]#

Circular binary segmentation algorithm for multiple segment anomaly detection.

Binary segmentation type changepoint detection algorithms recursively split the data into two segments and test whether the two segments are different. Circular binary segmentation [1] is a variant of binary segmentation where the statistical test (transient score) compares the data behaviour of an inner interval subset with the surrounding data contained in an outer interval. In other words, the null hypothesis within each outer interval is that the data is stationary, while the alternative hypothesis is that there is a segment anomaly within the outer interval.

Each detected segment anomaly [start, end) corresponds to a pair of epidemic changepoints in the statistical literature [2]: the regime transitions in (at start) and out (at end) of a transient segment that returns to the surrounding baseline. CBS is therefore an epidemic changepoint detector, in contrast to standard (single-shift) changepoint methods such as PELT or SeededBinarySegmentation.

The penalty, penalty_scale, and agg parameters are ignored when transient_score is an inherently penalised scorer (tag penalised=True); in that case the scorer owns aggregation and penalisation.

Parameters:
transient_scoreBaseIntervalScorer or None, default=None

Transient score to use in the algorithm. Must be an instance of BaseIntervalScorer with score_type="transient_score". If None, defaults to L2TransientScore().

Standard usage is to pass an unpenalised score and set penalty separately. If the score already has tag penalised=True, it owns its own aggregation and penalty, and penalty / penalty_scale / agg are ignored.

penaltyfloat, array-like of shape (n_features,) or None, default=None

Penalty subtracted from the aggregated feature-wise score. A candidate segment anomaly is accepted only when the penalised score is positive.

  • float: scalar penalty subtracted from the aggregated score (see agg).

  • array-like of length n_features, non-decreasing: element i is the penalty for i+1 features jointly affected; the detector picks the k largest feature scores maximising sum(top_k) - penalty[k-1]. Only consistent with the default agg="sum".

  • None: defaults to transient_score.get_default_penalty() at fit time.

Ignored when transient_score is already penalised.

penalty_scalefloat, default=2.0

Multiplicative factor applied to the effective penalty (whether penalty is user-provided or the default). Must be positive. Ignored when transient_score is already penalised. The default is larger than 1 because CBS evaluates the score over a very large number of candidate (outer, inner) interval pairs, so a stricter penalty is needed to keep the family-wise false-positive rate low.

agg{“sum”, “max”}, default=”sum”

How feature-wise raw scores are aggregated into a single score per interval before subtracting a scalar penalty:

  • "sum": sum(scores, axis=1) - penalty (dense change assumption).

  • "max": max(scores, axis=1) - penalty (a single, unknown feature changes).

Ignored when transient_score is already aggregated. Must be "sum" when penalty is array-valued (array penalties imply top-k aggregation).

min_subinterval_lengthint, default=5

Minimum length of an inner (anomalous) segment. The total length of the surrounding (left + right) baseline must also be at least this value. The effective minimum used is max(min_subinterval_length, transient_score.min_size).

max_interval_lengthint or None, default=None

Maximum length of an outer interval to evaluate. Must be at least 2 * min_subinterval_length. If None, defaults to min(200, n_samples) after fitting.

growth_factorfloat, default=1.8

Growth factor for the seeded outer intervals. Larger values produce fewer, less-overlapping intervals (faster but coarser); smaller values produce more, more-overlapping intervals (slower but finer). Must be in (1, 2].

Attributes:
transient_score_BaseIntervalScorer

Fitted transient score.

penalty_float, np.ndarray or None

Effective penalty actually used at detection time. None when transient_score is inherently penalised.

min_subinterval_length_int

Effective minimum inner-segment length used.

max_interval_length_int

Effective maximum outer-interval length used.

Methods

fit(X[, y])

Fit the transient score to training data.

fit_predict(X[, y])

Fit to data, then predict per-sample segment labels.

get_metadata_routing()

Get metadata routing of this object.

get_params([deep])

Get parameters for this estimator.

predict(X)

Detect changepoints, returning per-sample segment labels.

predict_all(X)

Run circular binary segmentation and return all outputs in a single pass.

predict_changepoints(X)

Return sorted anomaly boundary indices.

predict_scores(X[, return_index])

Return the per-outer-interval scoring objective evaluated on X.

predict_segment_anomalies(X)

Detect anomalies as [start, end) intervals.

set_params(**params)

Set the parameters of this estimator.

Notes

Using costs to generate transient scores via CostTransientScore is significantly slower than using transient scores implemented directly, since the surrounding-baseline cost requires re-precomputing for each candidate inner interval.

References

[1]

Olshen, A. B., Venkatraman, E. S., Lucito, R., & Wigler, M. (2004). Circular binary segmentation for the analysis of array-based DNA copy number data. Biostatistics, 5(4), 557-572.

[2]

Levin, B. & Kline, J. (1985). The CUSUM test of homogeneity with an application to spontaneous abortion epidemiology. Statistics in Medicine, 4(4), 469-488. (Coined the term “epidemic changepoint” for a transient regime change followed by a return to baseline.)

Examples

>>> import numpy as np
>>> from skchange.new_api.detectors import CircularBinarySegmentation
>>> rng = np.random.default_rng(2)
>>> X = np.concatenate([
...     rng.normal(0, 1, (40, 1)),
...     rng.normal(10, 1, (10, 1)),
...     rng.normal(0, 1, (40, 1)),
... ])
>>> detector = CircularBinarySegmentation()
>>> detector.fit(X).predict_segment_anomalies(X)
array([[40, 50]])
fit(X: ArrayLike, y: ArrayLike | None = None) Self[source][source]#

Fit the transient score to training data.

Parameters:
XArrayLike of shape (n_samples, n_features)

Training time series data.

yArrayLike | None, default=None

Ignored.

Returns:
selfCircularBinarySegmentation

Fitted detector.

predict_all(X: ArrayLike) dict[source][source]#

Run circular binary segmentation and return all outputs in a single pass.

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series to analyse.

Returns:
resultdict with keys:
"segment_anomalies"np.ndarray of shape (n_anomalies, 2)

Each row is [start, end) of a detected segment anomaly, sorted by start.

"changepoints"np.ndarray of shape (n_changepoints,)

Sorted unique inner boundary indices of detected anomalies.

"interval_starts"np.ndarray

Start indices of the seeded outer intervals evaluated.

"interval_ends"np.ndarray

End indices of the seeded outer intervals evaluated.

"interval_max_scores"np.ndarray

Maximum (aggregated, penalised) score within each outer interval.

"interval_argmax_inner_starts"np.ndarray

Best inner-interval start per outer interval.

"interval_argmax_inner_ends"np.ndarray

Best inner-interval end per outer interval.

predict_segment_anomalies(X: ArrayLike) ndarray[source][source]#

Detect anomalies as [start, end) intervals.

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series to analyse for anomalies.

Returns:
anomaliesnp.ndarray of shape (n_anomalies, 2)

Each row is [start, end) of a detected anomaly, sorted by start.

predict_changepoints(X: ArrayLike) ndarray[source][source]#

Return sorted anomaly boundary indices.

Each anomaly interval [start, end) contributes two changepoints (start and end) at the regime transitions in and out of the anomalous segment.

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series to analyse.

Returns:
changepointsnp.ndarray of shape (n_changepoints,)

Sorted unique inner boundary indices of detected anomalies. Empty array if no anomalies are detected.

predict_scores(X: ArrayLike, return_index: bool = False) ndarray | tuple[ndarray, dict[str, ndarray]][source][source]#

Return the per-outer-interval scoring objective evaluated on X.

For each seeded outer interval, returns the maximum aggregated, penalised transient score over candidate inner [start, end) sub-intervals. This is what predict_segment_anomalies reduces over via the greedy selection step, without the selection itself.

For penalty calibration, use the free function skchange.new_api.tuning.unpenalised_scores, which fits a clone of this detector with the penalty parameter set to zero and returns the resulting unpenalised scores.

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series to evaluate.

return_indexbool, default=False

If True, also return a dict locating each score on the time axis. See the Returns section for the keys.

Returns:
scoresnp.ndarray of shape (n_outer,)

Aggregated, penalised transient-score values, one per seeded outer interval, at the best inner sub-interval within that outer interval. Returned alone when return_index=False.

indexdict, optional

Only returned when return_index=True. Contains:

  • "starts" : np.ndarray of shape (n_outer,) Start indices of the seeded outer intervals.

  • "ends" : np.ndarray of shape (n_outer,) End indices of the seeded outer intervals.

  • "argmax_inner_starts" : np.ndarray of shape (n_outer,) Inner-interval start of the maximising candidate per outer interval.

  • "argmax_inner_ends" : np.ndarray of shape (n_outer,) Inner-interval end of the maximising candidate per outer interval.

fit_predict(X, y: ArrayLike | None = None, **fit_params) ndarray[source]#

Fit to data, then predict per-sample segment labels.

Equivalent to calling fit(X, y).predict(X).

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series data.

yNone

Ignored. Exists for sklearn API compatibility.

**fit_paramsdict

Additional parameters passed to fit().

Returns:
labelsnp.ndarray of shape (n_samples,)

Dense integer segment labels, one per sample.

get_metadata_routing()[source]#

Get metadata routing of this object.

Please check User Guide on how the routing mechanism works.

Returns:
routingMetadataRequest

A MetadataRequest encapsulating routing information.

get_params(deep=True)[source]#

Get parameters for this estimator.

Parameters:
deepbool, default=True

If True, will return the parameters for this estimator and contained subobjects that are estimators.

Returns:
paramsdict

Parameter names mapped to their values.

predict(X: ArrayLike) ndarray[source]#

Detect changepoints, returning per-sample segment labels.

Default implementation calls predict_changepoints() and converts the changepoint indices to an array with segment labels per sample. Subclasses may override this directly when the algorithm natively produces labels.

Parameters:
XArrayLike of shape (n_samples, n_features)

Time series data.

Returns:
labelsnp.ndarray of shape (n_samples,)

Dense integer segment labels, one per sample. Segment 0 is the first segment, segment 1 the next, and so on.

Examples

>>> labels = detector.fit(X_train).predict(X_test)
>>> labels.shape
(n_samples,)
set_params(**params)[source]#

Set the parameters of this estimator.

The method works on simple estimators as well as on nested objects (such as Pipeline). The latter have parameters of the form <component>__<parameter> so that it’s possible to update each component of a nested object.

Parameters:
**paramsdict

Estimator parameters.

Returns:
selfestimator instance

Estimator instance.