CircularBinarySegmentation#
- class CircularBinarySegmentation(transient_score: BaseIntervalScorer | None = None, penalty: ArrayLike | float | None = None, penalty_scale: float = 2.0, agg: str = 'sum', min_subinterval_length: int = 5, max_interval_length: int | None = None, growth_factor: float = 1.8)[source][source]#
Circular binary segmentation algorithm for multiple segment anomaly detection.
Binary segmentation type changepoint detection algorithms recursively split the data into two segments and test whether the two segments are different. Circular binary segmentation [1] is a variant of binary segmentation where the statistical test (transient score) compares the data behaviour of an inner interval subset with the surrounding data contained in an outer interval. In other words, the null hypothesis within each outer interval is that the data is stationary, while the alternative hypothesis is that there is a segment anomaly within the outer interval.
Each detected segment anomaly
[start, end)corresponds to a pair of epidemic changepoints in the statistical literature [2]: the regime transitions in (atstart) and out (atend) of a transient segment that returns to the surrounding baseline. CBS is therefore an epidemic changepoint detector, in contrast to standard (single-shift) changepoint methods such asPELTorSeededBinarySegmentation.The
penalty,penalty_scale, andaggparameters are ignored whentransient_scoreis an inherently penalised scorer (tagpenalised=True); in that case the scorer owns aggregation and penalisation.- Parameters:
- transient_scoreBaseIntervalScorer or None, default=None
Transient score to use in the algorithm. Must be an instance of
BaseIntervalScorerwithscore_type="transient_score". IfNone, defaults toL2TransientScore().Standard usage is to pass an unpenalised score and set
penaltyseparately. If the score already has tagpenalised=True, it owns its own aggregation and penalty, andpenalty/penalty_scale/aggare ignored.- penaltyfloat, array-like of shape (n_features,) or None, default=None
Penalty subtracted from the aggregated feature-wise score. A candidate segment anomaly is accepted only when the penalised score is positive.
float: scalar penalty subtracted from the aggregated score (seeagg).array-likeof lengthn_features, non-decreasing: elementiis the penalty fori+1features jointly affected; the detector picks theklargest feature scores maximisingsum(top_k) - penalty[k-1]. Only consistent with the defaultagg="sum".None: defaults totransient_score.get_default_penalty()at fit time.
Ignored when
transient_scoreis already penalised.- penalty_scalefloat, default=2.0
Multiplicative factor applied to the effective penalty (whether
penaltyis user-provided or the default). Must be positive. Ignored whentransient_scoreis already penalised. The default is larger than 1 because CBS evaluates the score over a very large number of candidate(outer, inner)interval pairs, so a stricter penalty is needed to keep the family-wise false-positive rate low.- agg{“sum”, “max”}, default=”sum”
How feature-wise raw scores are aggregated into a single score per interval before subtracting a scalar penalty:
"sum":sum(scores, axis=1) - penalty(dense change assumption)."max":max(scores, axis=1) - penalty(a single, unknown feature changes).
Ignored when
transient_scoreis already aggregated. Must be"sum"whenpenaltyis array-valued (array penalties imply top-k aggregation).- min_subinterval_lengthint, default=5
Minimum length of an inner (anomalous) segment. The total length of the surrounding (left + right) baseline must also be at least this value. The effective minimum used is
max(min_subinterval_length, transient_score.min_size).- max_interval_lengthint or None, default=None
Maximum length of an outer interval to evaluate. Must be at least
2 * min_subinterval_length. IfNone, defaults tomin(200, n_samples)after fitting.- growth_factorfloat, default=1.8
Growth factor for the seeded outer intervals. Larger values produce fewer, less-overlapping intervals (faster but coarser); smaller values produce more, more-overlapping intervals (slower but finer). Must be in
(1, 2].
- Attributes:
- transient_score_BaseIntervalScorer
Fitted transient score.
- penalty_float, np.ndarray or None
Effective penalty actually used at detection time.
Nonewhentransient_scoreis inherently penalised.- min_subinterval_length_int
Effective minimum inner-segment length used.
- max_interval_length_int
Effective maximum outer-interval length used.
Methods
fit(X[, y])Fit the transient score to training data.
fit_predict(X[, y])Fit to data, then predict per-sample segment labels.
Get metadata routing of this object.
get_params([deep])Get parameters for this estimator.
predict(X)Detect changepoints, returning per-sample segment labels.
predict_all(X)Run circular binary segmentation and return all outputs in a single pass.
Return sorted anomaly boundary indices.
predict_scores(X[, return_index])Return the per-outer-interval scoring objective evaluated on
X.Detect anomalies as
[start, end)intervals.set_params(**params)Set the parameters of this estimator.
Notes
Using costs to generate transient scores via
CostTransientScoreis significantly slower than using transient scores implemented directly, since the surrounding-baseline cost requires re-precomputing for each candidate inner interval.References
[1]Olshen, A. B., Venkatraman, E. S., Lucito, R., & Wigler, M. (2004). Circular binary segmentation for the analysis of array-based DNA copy number data. Biostatistics, 5(4), 557-572.
[2]Levin, B. & Kline, J. (1985). The CUSUM test of homogeneity with an application to spontaneous abortion epidemiology. Statistics in Medicine, 4(4), 469-488. (Coined the term “epidemic changepoint” for a transient regime change followed by a return to baseline.)
Examples
>>> import numpy as np >>> from skchange.new_api.detectors import CircularBinarySegmentation >>> rng = np.random.default_rng(2) >>> X = np.concatenate([ ... rng.normal(0, 1, (40, 1)), ... rng.normal(10, 1, (10, 1)), ... rng.normal(0, 1, (40, 1)), ... ]) >>> detector = CircularBinarySegmentation() >>> detector.fit(X).predict_segment_anomalies(X) array([[40, 50]])
- fit(X: ArrayLike, y: ArrayLike | None = None) Self[source][source]#
Fit the transient score to training data.
- Parameters:
- XArrayLike of shape (n_samples, n_features)
Training time series data.
- yArrayLike | None, default=None
Ignored.
- Returns:
- selfCircularBinarySegmentation
Fitted detector.
- predict_all(X: ArrayLike) dict[source][source]#
Run circular binary segmentation and return all outputs in a single pass.
- Parameters:
- XArrayLike of shape (n_samples, n_features)
Time series to analyse.
- Returns:
- resultdict with keys:
"segment_anomalies"np.ndarray of shape (n_anomalies, 2)Each row is
[start, end)of a detected segment anomaly, sorted by start."changepoints"np.ndarray of shape (n_changepoints,)Sorted unique inner boundary indices of detected anomalies.
"interval_starts"np.ndarrayStart indices of the seeded outer intervals evaluated.
"interval_ends"np.ndarrayEnd indices of the seeded outer intervals evaluated.
"interval_max_scores"np.ndarrayMaximum (aggregated, penalised) score within each outer interval.
"interval_argmax_inner_starts"np.ndarrayBest inner-interval start per outer interval.
"interval_argmax_inner_ends"np.ndarrayBest inner-interval end per outer interval.
- predict_segment_anomalies(X: ArrayLike) ndarray[source][source]#
Detect anomalies as
[start, end)intervals.- Parameters:
- XArrayLike of shape (n_samples, n_features)
Time series to analyse for anomalies.
- Returns:
- anomaliesnp.ndarray of shape (n_anomalies, 2)
Each row is
[start, end)of a detected anomaly, sorted by start.
- predict_changepoints(X: ArrayLike) ndarray[source][source]#
Return sorted anomaly boundary indices.
Each anomaly interval
[start, end)contributes two changepoints (startandend) at the regime transitions in and out of the anomalous segment.- Parameters:
- XArrayLike of shape (n_samples, n_features)
Time series to analyse.
- Returns:
- changepointsnp.ndarray of shape (n_changepoints,)
Sorted unique inner boundary indices of detected anomalies. Empty array if no anomalies are detected.
- predict_scores(X: ArrayLike, return_index: bool = False) ndarray | tuple[ndarray, dict[str, ndarray]][source][source]#
Return the per-outer-interval scoring objective evaluated on
X.For each seeded outer interval, returns the maximum aggregated, penalised transient score over candidate inner
[start, end)sub-intervals. This is whatpredict_segment_anomaliesreduces over via the greedy selection step, without the selection itself.For penalty calibration, use the free function
skchange.new_api.tuning.unpenalised_scores, which fits a clone of this detector with the penalty parameter set to zero and returns the resulting unpenalised scores.- Parameters:
- XArrayLike of shape (n_samples, n_features)
Time series to evaluate.
- return_indexbool, default=False
If
True, also return a dict locating each score on the time axis. See the Returns section for the keys.
- Returns:
- scoresnp.ndarray of shape (n_outer,)
Aggregated, penalised transient-score values, one per seeded outer interval, at the best inner sub-interval within that outer interval. Returned alone when
return_index=False.- indexdict, optional
Only returned when
return_index=True. Contains:"starts": np.ndarray of shape (n_outer,) Start indices of the seeded outer intervals."ends": np.ndarray of shape (n_outer,) End indices of the seeded outer intervals."argmax_inner_starts": np.ndarray of shape (n_outer,) Inner-interval start of the maximising candidate per outer interval."argmax_inner_ends": np.ndarray of shape (n_outer,) Inner-interval end of the maximising candidate per outer interval.
- fit_predict(X, y: ArrayLike | None = None, **fit_params) ndarray[source]#
Fit to data, then predict per-sample segment labels.
Equivalent to calling fit(X, y).predict(X).
- Parameters:
- XArrayLike of shape (n_samples, n_features)
Time series data.
- yNone
Ignored. Exists for sklearn API compatibility.
- **fit_paramsdict
Additional parameters passed to fit().
- Returns:
- labelsnp.ndarray of shape (n_samples,)
Dense integer segment labels, one per sample.
- get_metadata_routing()[source]#
Get metadata routing of this object.
Please check User Guide on how the routing mechanism works.
- Returns:
- routingMetadataRequest
A
MetadataRequestencapsulating routing information.
- get_params(deep=True)[source]#
Get parameters for this estimator.
- Parameters:
- deepbool, default=True
If True, will return the parameters for this estimator and contained subobjects that are estimators.
- Returns:
- paramsdict
Parameter names mapped to their values.
- predict(X: ArrayLike) ndarray[source]#
Detect changepoints, returning per-sample segment labels.
Default implementation calls predict_changepoints() and converts the changepoint indices to an array with segment labels per sample. Subclasses may override this directly when the algorithm natively produces labels.
- Parameters:
- XArrayLike of shape (n_samples, n_features)
Time series data.
- Returns:
- labelsnp.ndarray of shape (n_samples,)
Dense integer segment labels, one per sample. Segment 0 is the first segment, segment 1 the next, and so on.
Examples
>>> labels = detector.fit(X_train).predict(X_test) >>> labels.shape (n_samples,)
- set_params(**params)[source]#
Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects (such as
Pipeline). The latter have parameters of the form<component>__<parameter>so that it’s possible to update each component of a nested object.- Parameters:
- **paramsdict
Estimator parameters.
- Returns:
- selfestimator instance
Estimator instance.