cvx_analytics/
bocpd.rs

1//! Online change point detection via exponentially weighted statistics.
2//!
3//! A streaming detector that maintains an exponentially weighted mean and
4//! variance. When a new observation deviates significantly from the expected
5//! distribution (Mahalanobis-like distance exceeds threshold), a change
6//! point is emitted.
7//!
8//! This is a practical simplification of BOCPD that works well for
9//! embedding trajectories where the distribution shifts are typically large.
10
11use cvx_core::types::{ChangePoint, CpdMethod};
12
13/// Online detector configuration.
14#[derive(Debug, Clone)]
15pub struct BocpdConfig {
16    /// Smoothing factor for exponential moving average (0 < alpha < 1).
17    /// Lower values = more smoothing = less sensitive.
18    pub alpha: f64,
19    /// Number of standard deviations for change detection threshold.
20    pub threshold_sigmas: f64,
21    /// Minimum observations before detection starts (warm-up period).
22    pub min_observations: usize,
23    /// Cooldown: minimum observations between consecutive detections.
24    pub cooldown: usize,
25}
26
27impl Default for BocpdConfig {
28    fn default() -> Self {
29        Self {
30            alpha: 0.05,
31            threshold_sigmas: 3.0,
32            min_observations: 5,
33            cooldown: 5,
34        }
35    }
36}
37
38/// Online streaming change point detector.
39pub struct BocpdDetector {
40    config: BocpdConfig,
41    entity_id: u64,
42    /// Exponentially weighted mean per dimension.
43    ew_mean: Vec<f64>,
44    /// Exponentially weighted variance per dimension.
45    ew_var: Vec<f64>,
46    /// Previous mean (for drift vector computation).
47    prev_mean: Vec<f32>,
48    /// Number of observations processed.
49    count: usize,
50    /// Observations since last detection (cooldown counter).
51    since_last_detection: usize,
52    /// Dimensionality.
53    dim: Option<usize>,
54}
55
56impl BocpdDetector {
57    /// Create a new detector for the given entity.
58    pub fn new(entity_id: u64, config: BocpdConfig) -> Self {
59        Self {
60            config,
61            entity_id,
62            ew_mean: Vec::new(),
63            ew_var: Vec::new(),
64            prev_mean: Vec::new(),
65            count: 0,
66            since_last_detection: usize::MAX / 2,
67            dim: None,
68        }
69    }
70
71    /// Process a single observation.
72    ///
73    /// Returns `Some(ChangePoint)` if a change is detected.
74    pub fn observe(&mut self, timestamp: i64, vector: &[f32]) -> Option<ChangePoint> {
75        let dim = *self.dim.get_or_insert(vector.len());
76        assert_eq!(vector.len(), dim, "dimension mismatch");
77
78        self.count += 1;
79        self.since_last_detection += 1;
80
81        // Initialize on first observation
82        if self.ew_mean.is_empty() {
83            self.ew_mean = vector.iter().map(|&x| x as f64).collect();
84            self.ew_var = vec![1.0; dim]; // initial variance
85            self.prev_mean = vector.to_vec();
86            return None;
87        }
88
89        // Compute deviation from expected distribution
90        let deviation = self.mahalanobis_like(vector);
91
92        // Update exponential moving statistics
93        let alpha = self.config.alpha;
94        #[allow(clippy::needless_range_loop)]
95        for d in 0..dim {
96            let x = vector[d] as f64;
97            let diff = x - self.ew_mean[d];
98            self.ew_mean[d] += alpha * diff;
99            self.ew_var[d] = (1.0 - alpha) * (self.ew_var[d] + alpha * diff * diff);
100        }
101
102        // Check for change point
103        let is_change = self.count > self.config.min_observations
104            && self.since_last_detection >= self.config.cooldown
105            && deviation > self.config.threshold_sigmas;
106
107        if is_change {
108            let current_mean: Vec<f32> = self.ew_mean.iter().map(|&x| x as f32).collect();
109            let drift_vector: Vec<f32> = current_mean
110                .iter()
111                .zip(self.prev_mean.iter())
112                .map(|(a, b)| a - b)
113                .collect();
114
115            let severity = drift_vector
116                .iter()
117                .map(|d| (*d as f64) * (*d as f64))
118                .sum::<f64>()
119                .sqrt();
120            let normalized_severity = 1.0 - (-severity).exp();
121
122            self.prev_mean = current_mean;
123            self.since_last_detection = 0;
124
125            Some(ChangePoint::new(
126                self.entity_id,
127                timestamp,
128                normalized_severity,
129                drift_vector,
130                CpdMethod::Bocpd,
131            ))
132        } else {
133            None
134        }
135    }
136
137    /// Compute a Mahalanobis-like distance (average z-score across dimensions).
138    fn mahalanobis_like(&self, vector: &[f32]) -> f64 {
139        let dim = self.ew_mean.len();
140        let mut total_z2 = 0.0;
141
142        for (d, &v) in vector.iter().enumerate().take(dim) {
143            let x = v as f64;
144            let diff = x - self.ew_mean[d];
145            let var = self.ew_var[d].max(1e-10);
146            total_z2 += diff * diff / var;
147        }
148
149        (total_z2 / dim as f64).sqrt()
150    }
151
152    /// Current number of observations processed.
153    pub fn count(&self) -> usize {
154        self.count
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    #[test]
163    fn stationary_no_false_positives() {
164        let config = BocpdConfig {
165            alpha: 0.05,
166            threshold_sigmas: 3.0,
167            min_observations: 10,
168            cooldown: 5,
169        };
170        let mut detector = BocpdDetector::new(1, config);
171
172        let mut false_positives = 0;
173        for i in 0..200 {
174            let v = vec![1.0, 2.0, 3.0];
175            if detector.observe(i as i64 * 1000, &v).is_some() {
176                false_positives += 1;
177            }
178        }
179
180        let fpr = false_positives as f64 / 200.0;
181        assert!(
182            fpr < 0.05,
183            "false positive rate = {fpr:.3}, expected < 0.05"
184        );
185    }
186
187    #[test]
188    fn detects_change_within_window() {
189        let config = BocpdConfig {
190            alpha: 0.1,
191            threshold_sigmas: 3.0,
192            min_observations: 5,
193            cooldown: 3,
194        };
195        let mut detector = BocpdDetector::new(1, config);
196
197        let change_at = 50;
198        let window = 10;
199
200        let mut detected_at = None;
201        for i in 0..100 {
202            let v = if i < change_at {
203                vec![0.0, 0.0, 0.0]
204            } else {
205                vec![10.0, 10.0, 10.0]
206            };
207
208            if let Some(cp) = detector.observe(i as i64 * 1000, &v) {
209                if detected_at.is_none() && i >= change_at {
210                    detected_at = Some(i);
211                    assert_eq!(cp.method(), CpdMethod::Bocpd);
212                }
213            }
214        }
215
216        let det = detected_at.expect("should detect the change");
217        assert!(
218            det <= change_at + window,
219            "detected at {det}, expected within {window} of {change_at}"
220        );
221    }
222
223    #[test]
224    fn changepoint_has_drift_vector() {
225        let config = BocpdConfig {
226            alpha: 0.1,
227            threshold_sigmas: 3.0,
228            min_observations: 5,
229            cooldown: 3,
230        };
231        let mut detector = BocpdDetector::new(1, config);
232
233        let mut cp_found = None;
234        for i in 0..100 {
235            let v = if i < 30 {
236                vec![0.0, 0.0]
237            } else {
238                vec![5.0, -3.0]
239            };
240            if let Some(cp) = detector.observe(i as i64 * 1000, &v) {
241                if cp_found.is_none() && i >= 30 {
242                    cp_found = Some(cp);
243                }
244            }
245        }
246
247        let cp = cp_found.expect("should detect change");
248        assert_eq!(cp.drift_vector().len(), 2);
249        assert!(cp.severity() > 0.0);
250    }
251
252    #[test]
253    fn multiple_changes() {
254        let config = BocpdConfig {
255            alpha: 0.1,
256            threshold_sigmas: 3.0,
257            min_observations: 5,
258            cooldown: 10,
259        };
260        let mut detector = BocpdDetector::new(1, config);
261
262        let mut change_detections = 0;
263        for i in 0..300 {
264            let v = if i < 100 {
265                vec![0.0]
266            } else if i < 200 {
267                vec![10.0]
268            } else {
269                vec![-5.0]
270            };
271            if detector.observe(i as i64 * 1000, &v).is_some() {
272                change_detections += 1;
273            }
274        }
275
276        assert!(
277            change_detections >= 2,
278            "should detect at least 2 changes, got {change_detections}"
279        );
280    }
281
282    #[test]
283    fn count_tracks_observations() {
284        let mut detector = BocpdDetector::new(1, BocpdConfig::default());
285        for i in 0..10 {
286            detector.observe(i * 1000, &[1.0]);
287        }
288        assert_eq!(detector.count(), 10);
289    }
290}