1use cvx_core::types::{ChangePoint, CpdMethod};
12
13#[derive(Debug, Clone)]
15pub struct BocpdConfig {
16 pub alpha: f64,
19 pub threshold_sigmas: f64,
21 pub min_observations: usize,
23 pub cooldown: usize,
25}
26
27impl Default for BocpdConfig {
28 fn default() -> Self {
29 Self {
30 alpha: 0.05,
31 threshold_sigmas: 3.0,
32 min_observations: 5,
33 cooldown: 5,
34 }
35 }
36}
37
38pub struct BocpdDetector {
40 config: BocpdConfig,
41 entity_id: u64,
42 ew_mean: Vec<f64>,
44 ew_var: Vec<f64>,
46 prev_mean: Vec<f32>,
48 count: usize,
50 since_last_detection: usize,
52 dim: Option<usize>,
54}
55
56impl BocpdDetector {
57 pub fn new(entity_id: u64, config: BocpdConfig) -> Self {
59 Self {
60 config,
61 entity_id,
62 ew_mean: Vec::new(),
63 ew_var: Vec::new(),
64 prev_mean: Vec::new(),
65 count: 0,
66 since_last_detection: usize::MAX / 2,
67 dim: None,
68 }
69 }
70
71 pub fn observe(&mut self, timestamp: i64, vector: &[f32]) -> Option<ChangePoint> {
75 let dim = *self.dim.get_or_insert(vector.len());
76 assert_eq!(vector.len(), dim, "dimension mismatch");
77
78 self.count += 1;
79 self.since_last_detection += 1;
80
81 if self.ew_mean.is_empty() {
83 self.ew_mean = vector.iter().map(|&x| x as f64).collect();
84 self.ew_var = vec![1.0; dim]; self.prev_mean = vector.to_vec();
86 return None;
87 }
88
89 let deviation = self.mahalanobis_like(vector);
91
92 let alpha = self.config.alpha;
94 #[allow(clippy::needless_range_loop)]
95 for d in 0..dim {
96 let x = vector[d] as f64;
97 let diff = x - self.ew_mean[d];
98 self.ew_mean[d] += alpha * diff;
99 self.ew_var[d] = (1.0 - alpha) * (self.ew_var[d] + alpha * diff * diff);
100 }
101
102 let is_change = self.count > self.config.min_observations
104 && self.since_last_detection >= self.config.cooldown
105 && deviation > self.config.threshold_sigmas;
106
107 if is_change {
108 let current_mean: Vec<f32> = self.ew_mean.iter().map(|&x| x as f32).collect();
109 let drift_vector: Vec<f32> = current_mean
110 .iter()
111 .zip(self.prev_mean.iter())
112 .map(|(a, b)| a - b)
113 .collect();
114
115 let severity = drift_vector
116 .iter()
117 .map(|d| (*d as f64) * (*d as f64))
118 .sum::<f64>()
119 .sqrt();
120 let normalized_severity = 1.0 - (-severity).exp();
121
122 self.prev_mean = current_mean;
123 self.since_last_detection = 0;
124
125 Some(ChangePoint::new(
126 self.entity_id,
127 timestamp,
128 normalized_severity,
129 drift_vector,
130 CpdMethod::Bocpd,
131 ))
132 } else {
133 None
134 }
135 }
136
137 fn mahalanobis_like(&self, vector: &[f32]) -> f64 {
139 let dim = self.ew_mean.len();
140 let mut total_z2 = 0.0;
141
142 for (d, &v) in vector.iter().enumerate().take(dim) {
143 let x = v as f64;
144 let diff = x - self.ew_mean[d];
145 let var = self.ew_var[d].max(1e-10);
146 total_z2 += diff * diff / var;
147 }
148
149 (total_z2 / dim as f64).sqrt()
150 }
151
152 pub fn count(&self) -> usize {
154 self.count
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[test]
163 fn stationary_no_false_positives() {
164 let config = BocpdConfig {
165 alpha: 0.05,
166 threshold_sigmas: 3.0,
167 min_observations: 10,
168 cooldown: 5,
169 };
170 let mut detector = BocpdDetector::new(1, config);
171
172 let mut false_positives = 0;
173 for i in 0..200 {
174 let v = vec![1.0, 2.0, 3.0];
175 if detector.observe(i as i64 * 1000, &v).is_some() {
176 false_positives += 1;
177 }
178 }
179
180 let fpr = false_positives as f64 / 200.0;
181 assert!(
182 fpr < 0.05,
183 "false positive rate = {fpr:.3}, expected < 0.05"
184 );
185 }
186
187 #[test]
188 fn detects_change_within_window() {
189 let config = BocpdConfig {
190 alpha: 0.1,
191 threshold_sigmas: 3.0,
192 min_observations: 5,
193 cooldown: 3,
194 };
195 let mut detector = BocpdDetector::new(1, config);
196
197 let change_at = 50;
198 let window = 10;
199
200 let mut detected_at = None;
201 for i in 0..100 {
202 let v = if i < change_at {
203 vec![0.0, 0.0, 0.0]
204 } else {
205 vec![10.0, 10.0, 10.0]
206 };
207
208 if let Some(cp) = detector.observe(i as i64 * 1000, &v) {
209 if detected_at.is_none() && i >= change_at {
210 detected_at = Some(i);
211 assert_eq!(cp.method(), CpdMethod::Bocpd);
212 }
213 }
214 }
215
216 let det = detected_at.expect("should detect the change");
217 assert!(
218 det <= change_at + window,
219 "detected at {det}, expected within {window} of {change_at}"
220 );
221 }
222
223 #[test]
224 fn changepoint_has_drift_vector() {
225 let config = BocpdConfig {
226 alpha: 0.1,
227 threshold_sigmas: 3.0,
228 min_observations: 5,
229 cooldown: 3,
230 };
231 let mut detector = BocpdDetector::new(1, config);
232
233 let mut cp_found = None;
234 for i in 0..100 {
235 let v = if i < 30 {
236 vec![0.0, 0.0]
237 } else {
238 vec![5.0, -3.0]
239 };
240 if let Some(cp) = detector.observe(i as i64 * 1000, &v) {
241 if cp_found.is_none() && i >= 30 {
242 cp_found = Some(cp);
243 }
244 }
245 }
246
247 let cp = cp_found.expect("should detect change");
248 assert_eq!(cp.drift_vector().len(), 2);
249 assert!(cp.severity() > 0.0);
250 }
251
252 #[test]
253 fn multiple_changes() {
254 let config = BocpdConfig {
255 alpha: 0.1,
256 threshold_sigmas: 3.0,
257 min_observations: 5,
258 cooldown: 10,
259 };
260 let mut detector = BocpdDetector::new(1, config);
261
262 let mut change_detections = 0;
263 for i in 0..300 {
264 let v = if i < 100 {
265 vec![0.0]
266 } else if i < 200 {
267 vec![10.0]
268 } else {
269 vec![-5.0]
270 };
271 if detector.observe(i as i64 * 1000, &v).is_some() {
272 change_detections += 1;
273 }
274 }
275
276 assert!(
277 change_detections >= 2,
278 "should detect at least 2 changes, got {change_detections}"
279 );
280 }
281
282 #[test]
283 fn count_tracks_observations() {
284 let mut detector = BocpdDetector::new(1, BocpdConfig::default());
285 for i in 0..10 {
286 detector.observe(i * 1000, &[1.0]);
287 }
288 assert_eq!(detector.count(), 10);
289 }
290}