Getting started#
This section will help you get started with Skchange by providing a variety of examples that demonstrate the library’s main capabilities.
Installation#
pip install skchange
To make full use of the library, you can install the optional Numba dependency. This will speed up the computation of the algorithms in Skchange, often by as much as 10-100 times.
pip install skchange[numba]
Change detection#
Detect changes in the mean#
[1]:
import numpy as np
import plotly.io as pio
from skchange.new_api.utils.plotting import plot_detections, plot_segmentation
pio.renderers.default = "notebook"
[2]:
from skchange.new_api.datasets import generate_piecewise_normal_data
from skchange.new_api.detectors import SeededBinarySegmentation
from skchange.new_api.interval_scorers import CUSUM
X = generate_piecewise_normal_data(
means=[0, 10, 0, -3, 5, 1],
lengths=[30, 5, 15, 50, 60, 40],
seed=0,
)
detector = SeededBinarySegmentation(CUSUM(), min_subinterval_length=2, penalty=5)
changepoints = detector.fit(X).predict_changepoints(X)
plot_detections(X, changepoints=changepoints).show()
print(changepoints)
[ 30 35 50 100 160]
[3]:
segment_labels = detector.predict(X)
plot_segmentation(X, labels=segment_labels).show()
Detect changes in a continuous piecewise linear trend#
[4]:
from skchange.new_api.datasets import generate_continuous_piecewise_linear_data
from skchange.new_api.interval_scorers import ContinuousLinearTrendScore
X = generate_continuous_piecewise_linear_data(
slopes=[0, 1, -0.5, 0.5, 0.1],
lengths=[30, 20, 50, 60, 40],
seed=1,
)
detector = SeededBinarySegmentation(
ContinuousLinearTrendScore(),
penalty=20,
selection_method="narrowest",
)
changepoints = detector.fit(X).predict_changepoints(X)
plot_detections(X, changepoints=changepoints).show()
print(changepoints)
[ 28 49 98 160]
Detect sparse changes in a high-dimensional mean vector#
[5]:
from skchange.new_api.detectors import MovingWindow
from skchange.new_api.interval_scorers import ESACScore
X = generate_piecewise_normal_data(
means=[0, 5, 10],
lengths=50,
n_segments=3,
n_variables=100,
proportion_affected=[1.0, 0.1, 0.01],
randomise_affected_variables=True,
seed=3,
)
detector = MovingWindow(ESACScore(), bandwidth=[10, 20, 30, 40])
changepoints = detector.fit(X).predict_changepoints(X)
plot_detections(X, changepoints=changepoints).show()
print(changepoints)
[ 50 100]
Detect changes in a linear regression model#
[6]:
from skchange.new_api.datasets import generate_piecewise_regression_data
from skchange.new_api.detectors import PELT
from skchange.new_api.interval_scorers import LinearRegressionCost
# generate_piecewise_regression_data returns features and target as separate
# arrays. The new-API LinearRegressionCost expects a single 2D input where one
# column is the response, identified by `response_col`.
X_features, X_target = generate_piecewise_regression_data(
lengths=50, n_segments=3, n_features=1, n_targets=1, seed=2
)
X = np.column_stack([X_features, X_target])
response_col = X.shape[1] - 1
detector = PELT(LinearRegressionCost(response_col=response_col), penalty=10)
labels = detector.fit(X).predict(X)
changepoints = detector.predict_changepoints(X)
plot_segmentation(X, changepoints=changepoints, x_var=0, y_var=response_col).show()
print(changepoints)
[ 50 100]
Segment anomaly detection#
Detect segment anomalies in the mean#
[7]:
from skchange.new_api.detectors import CAPA
from skchange.new_api.interval_scorers import L2Saving
X = generate_piecewise_normal_data(
means=[0, 4, 0, 10, 0, -3],
lengths=[100, 30, 50, 5, 50, 40],
seed=5,
)
detector = CAPA(L2Saving())
anomalies = detector.fit(X).predict_segment_anomalies(X)
plot_detections(X, segment_anomalies=anomalies).show()
print(anomalies)
[[100 130]
[180 185]
[235 275]]
Detect segment anomalies in multivariate data and identify the anomalous variables#
[8]:
# Sparse anomalies: only some variables are affected per segment.
X = generate_piecewise_normal_data(
means=[0, [8.0, 0.0, 0.0], 0, [2.0, 3.0, 5.0]],
lengths=[100, 20, 130, 50],
seed=1,
)
# Element i of the penalty array is the cost of i+1 jointly affected features.
# A non-decreasing array enables CAPA to identify sparse anomalies.
detector = CAPA(L2Saving(), segment_penalty=[15, 20, 25])
result = detector.fit(X).predict_all(X)
plot_detections(
X,
segment_anomalies=result["segment_anomalies"],
affected_features=result["segment_anomaly_features"],
data_repr="subplot-line",
).show()
print("Anomaly intervals:")
print(result["segment_anomalies"])
print("Affected features per anomaly:")
print(result["segment_anomaly_features"])
Anomaly intervals:
[[100 120]
[250 300]]
Affected features per anomaly:
[array([0]), array([2, 1, 0])]
Detect segment anomalies in the covariance matrix#
[9]:
from skchange.new_api.interval_scorers import MultivariateGaussianSaving
baseline_cov = np.array([[1, 0.9], [0.9, 1]])
anomalous_cov = np.eye(baseline_cov.shape[0])
X = generate_piecewise_normal_data(
means=0,
variances=[baseline_cov, anomalous_cov, baseline_cov],
lengths=[100, 100, 100],
seed=8,
)
# A saving with a fixed baseline measures evidence that an interval deviates
# from the supplied baseline distribution.
saving = MultivariateGaussianSaving(
baseline_mean=np.zeros(2), baseline_cov=baseline_cov
)
detector = CAPA(saving, min_segment_length=10)
anomalies = detector.fit(X).predict_segment_anomalies(X)
plot_segmentation(X, segment_anomalies=anomalies, x_var=0, y_var=1).show()
print(anomalies)
[[100 204]]
Penalty calibration#
Penalty curve#
Run a detector for a range of penalties and compute a score/summary of the resulting segmentation.
[10]:
import plotly.express as px
from skchange.new_api.tuning import penalty_curve
means = [0, 10, 0, -3, 5, 1]
lengths = [30, 5, 15, 50, 60, 40]
X = generate_piecewise_normal_data(means=means, lengths=lengths, seed=0)
detector = SeededBinarySegmentation(CUSUM(), min_subinterval_length=2)
penalty_range = np.logspace(-1, 1, 100)
estimated_changepoints = penalty_curve(
detector,
X,
penalty_name="penalty_scale", # Scale the detector's default penalty.
penalty_range=penalty_range,
scoring="n_changepoints",
)
n_true_changepoints = len(means) - 1
selected_penalty_scale = penalty_range[estimated_changepoints <= n_true_changepoints][0]
px.line(
x=penalty_range,
y=estimated_changepoints,
markers=True,
labels={"x": "penalty_scale", "y": "n_changepoints"},
title = f"Penalty curve with selected penalty scale = {selected_penalty_scale:.2f}",
).add_vline(x=selected_penalty_scale, line_dash="dash", line_color="red").show()
Empirical scores distribution#
Get the empirical distribution of the detector’s internal interval scorer (cost, change score, saving or transient score) with the penalty parameter set to zero.
[11]:
from skchange.new_api.tuning import unpenalised_scores
# Generate data without any changepoints, to represent the null distribution.
X = generate_piecewise_normal_data(means=0, lengths=1000, seed=43)
alpha = 0.01 # desired false positive rate
scores = unpenalised_scores(detector, X)
selected_penalty = np.quantile(scores, 1 - alpha)
px.histogram(
scores,
nbins=100,
labels={"value": "score"},
title=f"Distribution of scores with selected penalty = {selected_penalty:.2f}",
).add_vline(x=selected_penalty, line_dash="dash", line_color="red").show()