1use cvx_core::types::{ChangePoint, CpdMethod};
15
16#[derive(Debug, Clone)]
18pub struct PeltConfig {
19 pub penalty: Option<f64>,
21 pub min_segment_len: usize,
23}
24
25impl Default for PeltConfig {
26 fn default() -> Self {
27 Self {
28 penalty: None,
29 min_segment_len: 2,
30 }
31 }
32}
33
34pub fn detect(
39 entity_id: u64,
40 trajectory: &[(i64, &[f32])],
41 config: &PeltConfig,
42) -> Vec<ChangePoint> {
43 let n = trajectory.len();
44 if n < 2 * config.min_segment_len {
45 return Vec::new();
46 }
47
48 let dim = trajectory[0].1.len();
49 let penalty = config
50 .penalty
51 .unwrap_or_else(|| dim as f64 * (n as f64).ln() / 2.0);
52
53 let cumsum = build_cumsum(trajectory);
55
56 let mut f = vec![f64::INFINITY; n + 1];
59 let mut last_cp: Vec<usize> = vec![0; n + 1];
60 f[0] = -penalty; let mut candidates: Vec<usize> = vec![0];
64
65 for t in config.min_segment_len..=n {
66 let mut best_cost = f64::INFINITY;
67 let mut best_cp = 0;
68
69 for &s in &candidates {
71 if t - s < config.min_segment_len {
72 continue;
73 }
74
75 let seg_cost = segment_cost(&cumsum, s, t, dim);
76 let total = f[s] + seg_cost + penalty;
77
78 if total < best_cost {
79 best_cost = total;
80 best_cp = s;
81 }
82 }
83
84 f[t] = best_cost;
85 last_cp[t] = best_cp;
86
87 let mut new_candidates = Vec::new();
91 for &s in &candidates {
92 if t - s < config.min_segment_len {
93 new_candidates.push(s);
94 continue;
95 }
96
97 let seg_cost = segment_cost(&cumsum, s, t, dim);
98 if f[s] + seg_cost <= f[t] + penalty {
99 new_candidates.push(s);
100 }
101 }
102 new_candidates.push(t);
103
104 if new_candidates.len() > n / 2 {
107 new_candidates
108 .sort_by(|&a, &b| f[a].partial_cmp(&f[b]).unwrap_or(std::cmp::Ordering::Equal));
109 new_candidates.truncate(n / 4);
110 }
111
112 candidates = new_candidates;
113 }
114
115 let mut cps = Vec::new();
117 let mut pos = n;
118 while pos > 0 {
119 let cp = last_cp[pos];
120 if cp > 0 {
121 cps.push(cp);
122 }
123 pos = cp;
124 }
125 cps.reverse();
126
127 cps.iter()
129 .map(|&idx| {
130 let ts = trajectory[idx].0;
131 let (before_mean, after_mean) = compute_segment_means(trajectory, idx);
132 let drift_vector: Vec<f32> = after_mean
133 .iter()
134 .zip(before_mean.iter())
135 .map(|(a, b)| a - b)
136 .collect();
137 let severity = drift_vector
138 .iter()
139 .map(|d| (*d as f64) * (*d as f64))
140 .sum::<f64>()
141 .sqrt();
142 let normalized_severity = 1.0 - (-severity).exp();
144
145 ChangePoint::new(
146 entity_id,
147 ts,
148 normalized_severity,
149 drift_vector,
150 CpdMethod::Pelt,
151 )
152 })
153 .collect()
154}
155
156struct CumulativeSums {
158 sum: Vec<Vec<f64>>,
160 sq: Vec<f64>,
162}
163
164fn build_cumsum(trajectory: &[(i64, &[f32])]) -> CumulativeSums {
165 let n = trajectory.len();
166 let dim = trajectory[0].1.len();
167
168 let mut sum = vec![vec![0.0f64; dim]; n + 1];
169 let mut sq = vec![0.0f64; n + 1];
170
171 for i in 0..n {
172 let v = trajectory[i].1;
173 for (d, &val) in v.iter().enumerate().take(dim) {
174 sum[i + 1][d] = sum[i][d] + val as f64;
175 }
176 sq[i + 1] = sq[i] + v.iter().map(|&x| (x as f64) * (x as f64)).sum::<f64>();
177 }
178
179 CumulativeSums { sum, sq }
180}
181
182fn segment_cost(cs: &CumulativeSums, start: usize, end: usize, dim: usize) -> f64 {
184 let n = (end - start) as f64;
185 if n <= 0.0 {
186 return 0.0;
187 }
188
189 let total_sq = cs.sq[end] - cs.sq[start];
191 let mut mean_sq = 0.0;
192 for d in 0..dim {
193 let s = cs.sum[end][d] - cs.sum[start][d];
194 mean_sq += s * s;
195 }
196
197 total_sq - mean_sq / n
198}
199
200fn compute_segment_means(trajectory: &[(i64, &[f32])], cp_idx: usize) -> (Vec<f32>, Vec<f32>) {
202 let dim = trajectory[0].1.len();
203
204 let before_mean = segment_mean(&trajectory[..cp_idx], dim);
205 let after_end = (cp_idx + 10).min(trajectory.len());
206 let after_mean = segment_mean(&trajectory[cp_idx..after_end], dim);
207
208 (before_mean, after_mean)
209}
210
211fn segment_mean(segment: &[(i64, &[f32])], dim: usize) -> Vec<f32> {
212 if segment.is_empty() {
213 return vec![0.0; dim];
214 }
215 let n = segment.len() as f64;
216 let mut mean = vec![0.0f64; dim];
217 for (_, v) in segment {
218 for d in 0..dim {
219 mean[d] += v[d] as f64;
220 }
221 }
222 mean.iter().map(|m| (m / n) as f32).collect()
223}
224
225#[cfg(test)]
226#[allow(clippy::needless_range_loop)]
227mod tests {
228 use super::*;
229
230 fn make_trajectory(points: &[(i64, Vec<f32>)]) -> Vec<(i64, &[f32])> {
231 points.iter().map(|(t, v)| (*t, v.as_slice())).collect()
232 }
233
234 #[test]
235 fn stationary_no_changepoints() {
236 let points: Vec<(i64, Vec<f32>)> = (0..100)
238 .map(|i| (i as i64 * 1000, vec![1.0, 2.0, 3.0]))
239 .collect();
240 let traj = make_trajectory(&points);
241
242 let cps = detect(1, &traj, &PeltConfig::default());
243 assert!(
244 cps.is_empty(),
245 "stationary data should have 0 changepoints, got {}",
246 cps.len()
247 );
248 }
249
250 #[test]
251 fn single_planted_change() {
252 let mut points: Vec<(i64, Vec<f32>)> = Vec::new();
254 for i in 0..50 {
255 points.push((i as i64 * 1000, vec![0.0, 0.0, 0.0]));
256 }
257 for i in 50..100 {
258 points.push((i as i64 * 1000, vec![10.0, 10.0, 10.0]));
259 }
260 let traj = make_trajectory(&points);
261
262 let cps = detect(1, &traj, &PeltConfig::default());
263 assert!(!cps.is_empty(), "should detect at least 1 changepoint");
264
265 let near_50 = cps.iter().any(|cp| (cp.timestamp() - 50000).abs() < 5000);
267 assert!(
268 near_50,
269 "changepoint should be near t=50000, got: {:?}",
270 cps.iter().map(|cp| cp.timestamp()).collect::<Vec<_>>()
271 );
272 }
273
274 #[test]
275 fn three_planted_changes() {
276 let means = [
278 vec![0.0, 0.0],
279 vec![5.0, 5.0],
280 vec![0.0, 10.0],
281 vec![10.0, 0.0],
282 ];
283 let mut points: Vec<(i64, Vec<f32>)> = Vec::new();
284 for seg in 0..4 {
285 for i in 0..25 {
286 let idx = seg * 25 + i;
287 points.push((idx as i64 * 1000, means[seg].clone()));
288 }
289 }
290 let traj = make_trajectory(&points);
291
292 let cps = detect(1, &traj, &PeltConfig::default());
293
294 let expected_ts = [25000, 50000, 75000];
296 let mut found = 0;
297 for &expected in &expected_ts {
298 if cps
299 .iter()
300 .any(|cp| (cp.timestamp() - expected).abs() < 5000)
301 {
302 found += 1;
303 }
304 }
305
306 let precision = found as f64 / cps.len().max(1) as f64;
307 let recall = found as f64 / 3.0;
308 let f1 = if precision + recall > 0.0 {
309 2.0 * precision * recall / (precision + recall)
310 } else {
311 0.0
312 };
313
314 assert!(
315 f1 >= 0.85,
316 "F1 = {f1:.2} (precision={precision:.2}, recall={recall:.2}), expected >= 0.85. \
317 Detected: {:?}",
318 cps.iter().map(|cp| cp.timestamp()).collect::<Vec<_>>()
319 );
320 }
321
322 #[test]
323 fn changepoints_have_severity() {
324 let mut points: Vec<(i64, Vec<f32>)> = Vec::new();
325 for i in 0..50 {
326 points.push((i as i64 * 1000, vec![0.0]));
327 }
328 for i in 50..100 {
329 points.push((i as i64 * 1000, vec![100.0])); }
331 let traj = make_trajectory(&points);
332
333 let cps = detect(1, &traj, &PeltConfig::default());
334 assert!(!cps.is_empty());
335 assert!(
337 cps[0].severity() > 0.5,
338 "severity should be high for large change, got {}",
339 cps[0].severity()
340 );
341 }
342
343 #[test]
344 fn changepoints_have_drift_vector() {
345 let mut points: Vec<(i64, Vec<f32>)> = Vec::new();
346 for i in 0..50 {
347 points.push((i as i64 * 1000, vec![0.0, 0.0]));
348 }
349 for i in 50..100 {
350 points.push((i as i64 * 1000, vec![5.0, -3.0]));
351 }
352 let traj = make_trajectory(&points);
353
354 let cps = detect(1, &traj, &PeltConfig::default());
355 assert!(!cps.is_empty());
356 let drift = cps[0].drift_vector();
357 assert_eq!(drift.len(), 2);
358 assert!((drift[0] - 5.0).abs() < 1.0, "drift[0] = {}", drift[0]);
360 assert!((drift[1] + 3.0).abs() < 1.0, "drift[1] = {}", drift[1]);
361 }
362
363 #[test]
364 fn short_trajectory_returns_empty() {
365 let points = vec![(0i64, vec![1.0]), (1000, vec![2.0])];
366 let traj = make_trajectory(&points);
367 let cps = detect(
368 1,
369 &traj,
370 &PeltConfig {
371 min_segment_len: 3,
372 ..Default::default()
373 },
374 );
375 assert!(cps.is_empty());
376 }
377
378 #[test]
379 fn custom_penalty() {
380 let mut points: Vec<(i64, Vec<f32>)> = Vec::new();
382 for i in 0..50 {
383 points.push((i as i64 * 1000, vec![0.0]));
384 }
385 for i in 50..100 {
386 points.push((i as i64 * 1000, vec![2.0]));
387 }
388 let traj = make_trajectory(&points);
389
390 let low_penalty = detect(
391 1,
392 &traj,
393 &PeltConfig {
394 penalty: Some(1.0),
395 ..Default::default()
396 },
397 );
398 let high_penalty = detect(
399 1,
400 &traj,
401 &PeltConfig {
402 penalty: Some(1000.0),
403 ..Default::default()
404 },
405 );
406
407 assert!(
408 low_penalty.len() >= high_penalty.len(),
409 "higher penalty should find fewer or equal changepoints"
410 );
411 }
412
413 #[test]
414 fn all_changepoints_are_pelt_method() {
415 let mut points: Vec<(i64, Vec<f32>)> = Vec::new();
416 for i in 0..50 {
417 points.push((i as i64 * 1000, vec![0.0]));
418 }
419 for i in 50..100 {
420 points.push((i as i64 * 1000, vec![10.0]));
421 }
422 let traj = make_trajectory(&points);
423
424 let cps = detect(1, &traj, &PeltConfig::default());
425 for cp in &cps {
426 assert_eq!(cp.method(), CpdMethod::Pelt);
427 }
428 }
429}