cvx_analytics/
pelt.rs

1//! PELT (Pruned Exact Linear Time) offline change point detection.
2//!
3//! Detects structural breaks in embedding trajectories by finding
4//! timestamps where the mean vector shifts significantly.
5//!
6//! # Algorithm
7//!
8//! 1. Define cost function as sum of squared deviations from segment mean.
9//! 2. Dynamic programming with pruning: maintain set of candidate change points.
10//! 3. Penalty controls sensitivity (BIC default: `dim * ln(n) / 2`).
11//!
12//! Complexity: O(N) amortized per Killick et al. (2012).
13
14use cvx_core::types::{ChangePoint, CpdMethod};
15
16/// PELT configuration.
17#[derive(Debug, Clone)]
18pub struct PeltConfig {
19    /// Penalty per change point. If `None`, uses BIC: `dim * ln(n) / 2`.
20    pub penalty: Option<f64>,
21    /// Minimum segment length (avoids overfitting on short runs).
22    pub min_segment_len: usize,
23}
24
25impl Default for PeltConfig {
26    fn default() -> Self {
27        Self {
28            penalty: None,
29            min_segment_len: 2,
30        }
31    }
32}
33
34/// Run PELT change point detection on a trajectory.
35///
36/// Input: `trajectory` of `(timestamp, vector)` pairs, sorted by timestamp.
37/// Returns detected `ChangePoint`s.
38pub fn detect(
39    entity_id: u64,
40    trajectory: &[(i64, &[f32])],
41    config: &PeltConfig,
42) -> Vec<ChangePoint> {
43    let n = trajectory.len();
44    if n < 2 * config.min_segment_len {
45        return Vec::new();
46    }
47
48    let dim = trajectory[0].1.len();
49    let penalty = config
50        .penalty
51        .unwrap_or_else(|| dim as f64 * (n as f64).ln() / 2.0);
52
53    // Precompute cumulative sums for O(1) segment cost
54    let cumsum = build_cumsum(trajectory);
55
56    // DP: f[t] = min cost to segment [0..t]
57    // last_cp[t] = last change point before t
58    let mut f = vec![f64::INFINITY; n + 1];
59    let mut last_cp: Vec<usize> = vec![0; n + 1];
60    f[0] = -penalty; // so that f[0] + penalty + cost(0,t) = cost(0,t) for first segment
61
62    // Candidate set for pruning
63    let mut candidates: Vec<usize> = vec![0];
64
65    for t in config.min_segment_len..=n {
66        let mut best_cost = f64::INFINITY;
67        let mut best_cp = 0;
68
69        // Phase 1: find optimal cost f[t] over all candidates
70        for &s in &candidates {
71            if t - s < config.min_segment_len {
72                continue;
73            }
74
75            let seg_cost = segment_cost(&cumsum, s, t, dim);
76            let total = f[s] + seg_cost + penalty;
77
78            if total < best_cost {
79                best_cost = total;
80                best_cp = s;
81            }
82        }
83
84        f[t] = best_cost;
85        last_cp[t] = best_cp;
86
87        // Phase 2: prune candidates using the now-known f[t]
88        // (RFC-002-06, Killick Theorem 3.1). Pruning AFTER f[t] is set
89        // avoids the f[t]==INFINITY fallback that caused O(N) candidate growth.
90        let mut new_candidates = Vec::new();
91        for &s in &candidates {
92            if t - s < config.min_segment_len {
93                new_candidates.push(s);
94                continue;
95            }
96
97            let seg_cost = segment_cost(&cumsum, s, t, dim);
98            if f[s] + seg_cost <= f[t] + penalty {
99                new_candidates.push(s);
100            }
101        }
102        new_candidates.push(t);
103
104        // Safety cap: prevent O(N²) worst case if pruning is ineffective.
105        // Keep candidates with lowest f[s] values.
106        if new_candidates.len() > n / 2 {
107            new_candidates
108                .sort_by(|&a, &b| f[a].partial_cmp(&f[b]).unwrap_or(std::cmp::Ordering::Equal));
109            new_candidates.truncate(n / 4);
110        }
111
112        candidates = new_candidates;
113    }
114
115    // Backtrack to find change points
116    let mut cps = Vec::new();
117    let mut pos = n;
118    while pos > 0 {
119        let cp = last_cp[pos];
120        if cp > 0 {
121            cps.push(cp);
122        }
123        pos = cp;
124    }
125    cps.reverse();
126
127    // Convert to ChangePoint structs
128    cps.iter()
129        .map(|&idx| {
130            let ts = trajectory[idx].0;
131            let (before_mean, after_mean) = compute_segment_means(trajectory, idx);
132            let drift_vector: Vec<f32> = after_mean
133                .iter()
134                .zip(before_mean.iter())
135                .map(|(a, b)| a - b)
136                .collect();
137            let severity = drift_vector
138                .iter()
139                .map(|d| (*d as f64) * (*d as f64))
140                .sum::<f64>()
141                .sqrt();
142            // Normalize severity to [0, 1] using sigmoid-like mapping
143            let normalized_severity = 1.0 - (-severity).exp();
144
145            ChangePoint::new(
146                entity_id,
147                ts,
148                normalized_severity,
149                drift_vector,
150                CpdMethod::Pelt,
151            )
152        })
153        .collect()
154}
155
156/// Cumulative sums for O(1) segment cost computation.
157struct CumulativeSums {
158    /// Cumulative sum of vectors: `sum[i]` = Σ_{j=0}^{i-1} v[j]
159    sum: Vec<Vec<f64>>,
160    /// Cumulative sum of squared norms: `sq[i]` = Σ_{j=0}^{i-1} ||v[j]||²
161    sq: Vec<f64>,
162}
163
164fn build_cumsum(trajectory: &[(i64, &[f32])]) -> CumulativeSums {
165    let n = trajectory.len();
166    let dim = trajectory[0].1.len();
167
168    let mut sum = vec![vec![0.0f64; dim]; n + 1];
169    let mut sq = vec![0.0f64; n + 1];
170
171    for i in 0..n {
172        let v = trajectory[i].1;
173        for (d, &val) in v.iter().enumerate().take(dim) {
174            sum[i + 1][d] = sum[i][d] + val as f64;
175        }
176        sq[i + 1] = sq[i] + v.iter().map(|&x| (x as f64) * (x as f64)).sum::<f64>();
177    }
178
179    CumulativeSums { sum, sq }
180}
181
182/// Cost of segment [start, end): sum of squared deviations from segment mean.
183fn segment_cost(cs: &CumulativeSums, start: usize, end: usize, dim: usize) -> f64 {
184    let n = (end - start) as f64;
185    if n <= 0.0 {
186        return 0.0;
187    }
188
189    // cost = Σ||x_i||² - (1/n)||Σx_i||²
190    let total_sq = cs.sq[end] - cs.sq[start];
191    let mut mean_sq = 0.0;
192    for d in 0..dim {
193        let s = cs.sum[end][d] - cs.sum[start][d];
194        mean_sq += s * s;
195    }
196
197    total_sq - mean_sq / n
198}
199
200/// Compute mean vectors before and after a change point.
201fn compute_segment_means(trajectory: &[(i64, &[f32])], cp_idx: usize) -> (Vec<f32>, Vec<f32>) {
202    let dim = trajectory[0].1.len();
203
204    let before_mean = segment_mean(&trajectory[..cp_idx], dim);
205    let after_end = (cp_idx + 10).min(trajectory.len());
206    let after_mean = segment_mean(&trajectory[cp_idx..after_end], dim);
207
208    (before_mean, after_mean)
209}
210
211fn segment_mean(segment: &[(i64, &[f32])], dim: usize) -> Vec<f32> {
212    if segment.is_empty() {
213        return vec![0.0; dim];
214    }
215    let n = segment.len() as f64;
216    let mut mean = vec![0.0f64; dim];
217    for (_, v) in segment {
218        for d in 0..dim {
219            mean[d] += v[d] as f64;
220        }
221    }
222    mean.iter().map(|m| (m / n) as f32).collect()
223}
224
225#[cfg(test)]
226#[allow(clippy::needless_range_loop)]
227mod tests {
228    use super::*;
229
230    fn make_trajectory(points: &[(i64, Vec<f32>)]) -> Vec<(i64, &[f32])> {
231        points.iter().map(|(t, v)| (*t, v.as_slice())).collect()
232    }
233
234    #[test]
235    fn stationary_no_changepoints() {
236        // Constant trajectory — should detect 0 change points
237        let points: Vec<(i64, Vec<f32>)> = (0..100)
238            .map(|i| (i as i64 * 1000, vec![1.0, 2.0, 3.0]))
239            .collect();
240        let traj = make_trajectory(&points);
241
242        let cps = detect(1, &traj, &PeltConfig::default());
243        assert!(
244            cps.is_empty(),
245            "stationary data should have 0 changepoints, got {}",
246            cps.len()
247        );
248    }
249
250    #[test]
251    fn single_planted_change() {
252        // Trajectory with one clear change at index 50
253        let mut points: Vec<(i64, Vec<f32>)> = Vec::new();
254        for i in 0..50 {
255            points.push((i as i64 * 1000, vec![0.0, 0.0, 0.0]));
256        }
257        for i in 50..100 {
258            points.push((i as i64 * 1000, vec![10.0, 10.0, 10.0]));
259        }
260        let traj = make_trajectory(&points);
261
262        let cps = detect(1, &traj, &PeltConfig::default());
263        assert!(!cps.is_empty(), "should detect at least 1 changepoint");
264
265        // Check that a changepoint is near index 50 (ts = 50000)
266        let near_50 = cps.iter().any(|cp| (cp.timestamp() - 50000).abs() < 5000);
267        assert!(
268            near_50,
269            "changepoint should be near t=50000, got: {:?}",
270            cps.iter().map(|cp| cp.timestamp()).collect::<Vec<_>>()
271        );
272    }
273
274    #[test]
275    fn three_planted_changes() {
276        // Segments: [0,25), [25,50), [50,75), [75,100) with different means
277        let means = [
278            vec![0.0, 0.0],
279            vec![5.0, 5.0],
280            vec![0.0, 10.0],
281            vec![10.0, 0.0],
282        ];
283        let mut points: Vec<(i64, Vec<f32>)> = Vec::new();
284        for seg in 0..4 {
285            for i in 0..25 {
286                let idx = seg * 25 + i;
287                points.push((idx as i64 * 1000, means[seg].clone()));
288            }
289        }
290        let traj = make_trajectory(&points);
291
292        let cps = detect(1, &traj, &PeltConfig::default());
293
294        // Should detect 3 change points (at 25, 50, 75)
295        let expected_ts = [25000, 50000, 75000];
296        let mut found = 0;
297        for &expected in &expected_ts {
298            if cps
299                .iter()
300                .any(|cp| (cp.timestamp() - expected).abs() < 5000)
301            {
302                found += 1;
303            }
304        }
305
306        let precision = found as f64 / cps.len().max(1) as f64;
307        let recall = found as f64 / 3.0;
308        let f1 = if precision + recall > 0.0 {
309            2.0 * precision * recall / (precision + recall)
310        } else {
311            0.0
312        };
313
314        assert!(
315            f1 >= 0.85,
316            "F1 = {f1:.2} (precision={precision:.2}, recall={recall:.2}), expected >= 0.85. \
317             Detected: {:?}",
318            cps.iter().map(|cp| cp.timestamp()).collect::<Vec<_>>()
319        );
320    }
321
322    #[test]
323    fn changepoints_have_severity() {
324        let mut points: Vec<(i64, Vec<f32>)> = Vec::new();
325        for i in 0..50 {
326            points.push((i as i64 * 1000, vec![0.0]));
327        }
328        for i in 50..100 {
329            points.push((i as i64 * 1000, vec![100.0])); // large change
330        }
331        let traj = make_trajectory(&points);
332
333        let cps = detect(1, &traj, &PeltConfig::default());
334        assert!(!cps.is_empty());
335        // Large change → high severity
336        assert!(
337            cps[0].severity() > 0.5,
338            "severity should be high for large change, got {}",
339            cps[0].severity()
340        );
341    }
342
343    #[test]
344    fn changepoints_have_drift_vector() {
345        let mut points: Vec<(i64, Vec<f32>)> = Vec::new();
346        for i in 0..50 {
347            points.push((i as i64 * 1000, vec![0.0, 0.0]));
348        }
349        for i in 50..100 {
350            points.push((i as i64 * 1000, vec![5.0, -3.0]));
351        }
352        let traj = make_trajectory(&points);
353
354        let cps = detect(1, &traj, &PeltConfig::default());
355        assert!(!cps.is_empty());
356        let drift = cps[0].drift_vector();
357        assert_eq!(drift.len(), 2);
358        // Drift should be approximately [5.0, -3.0]
359        assert!((drift[0] - 5.0).abs() < 1.0, "drift[0] = {}", drift[0]);
360        assert!((drift[1] + 3.0).abs() < 1.0, "drift[1] = {}", drift[1]);
361    }
362
363    #[test]
364    fn short_trajectory_returns_empty() {
365        let points = vec![(0i64, vec![1.0]), (1000, vec![2.0])];
366        let traj = make_trajectory(&points);
367        let cps = detect(
368            1,
369            &traj,
370            &PeltConfig {
371                min_segment_len: 3,
372                ..Default::default()
373            },
374        );
375        assert!(cps.is_empty());
376    }
377
378    #[test]
379    fn custom_penalty() {
380        // Higher penalty → fewer changepoints
381        let mut points: Vec<(i64, Vec<f32>)> = Vec::new();
382        for i in 0..50 {
383            points.push((i as i64 * 1000, vec![0.0]));
384        }
385        for i in 50..100 {
386            points.push((i as i64 * 1000, vec![2.0]));
387        }
388        let traj = make_trajectory(&points);
389
390        let low_penalty = detect(
391            1,
392            &traj,
393            &PeltConfig {
394                penalty: Some(1.0),
395                ..Default::default()
396            },
397        );
398        let high_penalty = detect(
399            1,
400            &traj,
401            &PeltConfig {
402                penalty: Some(1000.0),
403                ..Default::default()
404            },
405        );
406
407        assert!(
408            low_penalty.len() >= high_penalty.len(),
409            "higher penalty should find fewer or equal changepoints"
410        );
411    }
412
413    #[test]
414    fn all_changepoints_are_pelt_method() {
415        let mut points: Vec<(i64, Vec<f32>)> = Vec::new();
416        for i in 0..50 {
417            points.push((i as i64 * 1000, vec![0.0]));
418        }
419        for i in 50..100 {
420            points.push((i as i64 * 1000, vec![10.0]));
421        }
422        let traj = make_trajectory(&points);
423
424        let cps = detect(1, &traj, &PeltConfig::default());
425        for cp in &cps {
426            assert_eq!(cp.method(), CpdMethod::Pelt);
427        }
428    }
429}