1use crate::calculus::drift_magnitude_l2;
12use cvx_core::error::AnalyticsError;
13
14#[derive(Debug, Clone)]
18pub struct TemporalJoinResult {
19 pub start: i64,
21 pub end: i64,
23 pub mean_distance: f32,
25 pub min_distance: f32,
27 pub points_a: usize,
29 pub points_b: usize,
31}
32
33#[derive(Debug, Clone)]
35pub struct GroupJoinResult {
36 pub start: i64,
38 pub end: i64,
40 pub n_converged: usize,
42 pub converged_entities: Vec<u64>,
44 pub mean_distance: f32,
46}
47
48pub fn temporal_join(
67 traj_a: &[(i64, &[f32])],
68 traj_b: &[(i64, &[f32])],
69 epsilon: f32,
70 window_us: i64,
71) -> Result<Vec<TemporalJoinResult>, AnalyticsError> {
72 if traj_a.is_empty() || traj_b.is_empty() {
73 return Err(AnalyticsError::InsufficientData { needed: 1, have: 0 });
74 }
75
76 let global_start = traj_a[0].0.min(traj_b[0].0);
78 let global_end = traj_a.last().unwrap().0.max(traj_b.last().unwrap().0);
79
80 if global_end - global_start < window_us {
81 let (min_dist, mean_dist, n_a, n_b) =
83 window_distances(traj_a, traj_b, global_start, global_end);
84 return if min_dist <= epsilon && n_a > 0 && n_b > 0 {
85 Ok(vec![TemporalJoinResult {
86 start: global_start,
87 end: global_end,
88 mean_distance: mean_dist,
89 min_distance: min_dist,
90 points_a: n_a,
91 points_b: n_b,
92 }])
93 } else {
94 Ok(vec![])
95 };
96 }
97
98 let step = (window_us / 2).max(1);
100 let mut results: Vec<TemporalJoinResult> = Vec::new();
101 let mut current_start: Option<i64> = None;
102 let mut acc_min_dist = f32::MAX;
103 let mut acc_sum_dist = 0.0f32;
104 let mut acc_count = 0usize;
105 let mut acc_points_a = 0usize;
106 let mut acc_points_b = 0usize;
107
108 let mut t = global_start;
109 while t <= global_end {
110 let w_end = (t + window_us).min(global_end);
111 let (min_dist, mean_dist, n_a, n_b) = window_distances(traj_a, traj_b, t, w_end);
112
113 let is_close = min_dist <= epsilon && n_a > 0 && n_b > 0;
114
115 if is_close {
116 if current_start.is_none() {
117 current_start = Some(t);
118 acc_min_dist = f32::MAX;
119 acc_sum_dist = 0.0;
120 acc_count = 0;
121 acc_points_a = 0;
122 acc_points_b = 0;
123 }
124 acc_min_dist = acc_min_dist.min(min_dist);
125 acc_sum_dist += mean_dist;
126 acc_count += 1;
127 acc_points_a = acc_points_a.max(n_a);
128 acc_points_b = acc_points_b.max(n_b);
129 } else if let Some(start) = current_start.take() {
130 results.push(TemporalJoinResult {
131 start,
132 end: (t - step + window_us).min(global_end),
133 mean_distance: acc_sum_dist / acc_count as f32,
134 min_distance: acc_min_dist,
135 points_a: acc_points_a,
136 points_b: acc_points_b,
137 });
138 }
139
140 t += step;
141 }
142
143 if let Some(start) = current_start {
145 results.push(TemporalJoinResult {
146 start,
147 end: global_end,
148 mean_distance: if acc_count > 0 {
149 acc_sum_dist / acc_count as f32
150 } else {
151 0.0
152 },
153 min_distance: acc_min_dist,
154 points_a: acc_points_a,
155 points_b: acc_points_b,
156 });
157 }
158
159 Ok(results)
160}
161
162#[allow(clippy::type_complexity)]
181pub fn group_temporal_join(
182 trajectories: &[(u64, &[(i64, &[f32])])],
183 epsilon: f32,
184 min_entities: usize,
185 window_us: i64,
186) -> Result<Vec<GroupJoinResult>, AnalyticsError> {
187 if trajectories.len() < min_entities {
188 return Err(AnalyticsError::InsufficientData {
189 needed: min_entities,
190 have: trajectories.len(),
191 });
192 }
193
194 let global_start = trajectories
196 .iter()
197 .filter_map(|(_, t)| t.first().map(|(ts, _)| *ts))
198 .min()
199 .unwrap_or(0);
200 let global_end = trajectories
201 .iter()
202 .filter_map(|(_, t)| t.last().map(|(ts, _)| *ts))
203 .max()
204 .unwrap_or(0);
205
206 if global_end <= global_start {
207 return Ok(vec![]);
208 }
209
210 let step = (window_us / 2).max(1);
211 let mut results: Vec<GroupJoinResult> = Vec::new();
212
213 let mut t = global_start;
214 while t <= global_end {
215 let w_end = (t + window_us).min(global_end);
216
217 let mut entity_reps: Vec<(u64, Vec<f32>)> = Vec::new();
220
221 for &(eid, traj) in trajectories {
222 let points_in_window: Vec<&[f32]> = traj
223 .iter()
224 .filter(|(ts, _)| *ts >= t && *ts <= w_end)
225 .map(|(_, v)| *v)
226 .collect();
227
228 if points_in_window.is_empty() {
229 continue;
230 }
231
232 let dim = points_in_window[0].len();
234 let n = points_in_window.len() as f32;
235 let mut centroid = vec![0.0f32; dim];
236 for v in &points_in_window {
237 for (i, &val) in v.iter().enumerate() {
238 centroid[i] += val;
239 }
240 }
241 for val in &mut centroid {
242 *val /= n;
243 }
244
245 entity_reps.push((eid, centroid));
246 }
247
248 if entity_reps.len() >= min_entities {
249 let converged = find_largest_epsilon_cluster(&entity_reps, epsilon);
252
253 if converged.len() >= min_entities {
254 let mean_dist = mean_pairwise_distance(&entity_reps, &converged);
256
257 results.push(GroupJoinResult {
258 start: t,
259 end: w_end,
260 n_converged: converged.len(),
261 converged_entities: converged,
262 mean_distance: mean_dist,
263 });
264 }
265 }
266
267 t += step;
268 }
269
270 merge_group_results(&mut results, step);
272
273 Ok(results)
274}
275
276fn window_distances(
280 traj_a: &[(i64, &[f32])],
281 traj_b: &[(i64, &[f32])],
282 start: i64,
283 end: i64,
284) -> (f32, f32, usize, usize) {
285 let a_in_window: Vec<&[f32]> = traj_a
286 .iter()
287 .filter(|(ts, _)| *ts >= start && *ts <= end)
288 .map(|(_, v)| *v)
289 .collect();
290 let b_in_window: Vec<&[f32]> = traj_b
291 .iter()
292 .filter(|(ts, _)| *ts >= start && *ts <= end)
293 .map(|(_, v)| *v)
294 .collect();
295
296 if a_in_window.is_empty() || b_in_window.is_empty() {
297 return (f32::MAX, f32::MAX, a_in_window.len(), b_in_window.len());
298 }
299
300 let mut min_dist = f32::MAX;
301 let mut sum_dist = 0.0f32;
302 let mut count = 0;
303
304 for a in &a_in_window {
305 for b in &b_in_window {
306 let d = drift_magnitude_l2(a, b);
307 min_dist = min_dist.min(d);
308 sum_dist += d;
309 count += 1;
310 }
311 }
312
313 let mean_dist = if count > 0 {
314 sum_dist / count as f32
315 } else {
316 f32::MAX
317 };
318
319 (min_dist, mean_dist, a_in_window.len(), b_in_window.len())
320}
321
322fn find_largest_epsilon_cluster(entity_reps: &[(u64, Vec<f32>)], epsilon: f32) -> Vec<u64> {
327 let n = entity_reps.len();
328 let mut best: Vec<u64> = Vec::new();
329
330 for i in 0..n {
331 let mut cluster: Vec<usize> = vec![i];
332
333 for j in 0..n {
334 if i == j {
335 continue;
336 }
337 let all_close = cluster
339 .iter()
340 .all(|&c| drift_magnitude_l2(&entity_reps[c].1, &entity_reps[j].1) <= epsilon);
341 if all_close {
342 cluster.push(j);
343 }
344 }
345
346 if cluster.len() > best.len() {
347 best = cluster.into_iter().map(|idx| entity_reps[idx].0).collect();
348 }
349 }
350
351 best
352}
353
354fn mean_pairwise_distance(entity_reps: &[(u64, Vec<f32>)], ids: &[u64]) -> f32 {
356 let vecs: Vec<&Vec<f32>> = ids
357 .iter()
358 .filter_map(|id| {
359 entity_reps
360 .iter()
361 .find(|(eid, _)| eid == id)
362 .map(|(_, v)| v)
363 })
364 .collect();
365
366 if vecs.len() < 2 {
367 return 0.0;
368 }
369
370 let mut sum = 0.0f32;
371 let mut count = 0;
372 for i in 0..vecs.len() {
373 for j in (i + 1)..vecs.len() {
374 sum += drift_magnitude_l2(vecs[i], vecs[j]);
375 count += 1;
376 }
377 }
378
379 if count > 0 { sum / count as f32 } else { 0.0 }
380}
381
382fn merge_group_results(results: &mut Vec<GroupJoinResult>, step: i64) {
384 if results.len() < 2 {
385 return;
386 }
387
388 let mut merged: Vec<GroupJoinResult> = Vec::new();
389 let mut current = results[0].clone();
390
391 for r in &results[1..] {
392 if r.start <= current.end + step {
394 current.end = r.end;
396 current.n_converged = current.n_converged.max(r.n_converged);
397 current.mean_distance = (current.mean_distance + r.mean_distance) / 2.0;
398 for eid in &r.converged_entities {
400 if !current.converged_entities.contains(eid) {
401 current.converged_entities.push(*eid);
402 }
403 }
404 } else {
405 merged.push(current);
406 current = r.clone();
407 }
408 }
409 merged.push(current);
410
411 *results = merged;
412}
413
414#[cfg(test)]
417#[allow(
418 clippy::type_complexity,
419 clippy::needless_range_loop,
420 clippy::useless_vec
421)]
422mod tests {
423 use super::*;
424
425 fn as_refs(points: &[(i64, Vec<f32>)]) -> Vec<(i64, &[f32])> {
426 points.iter().map(|(t, v)| (*t, v.as_slice())).collect()
427 }
428
429 #[test]
432 fn join_empty_trajectories() {
433 let a: Vec<(i64, &[f32])> = vec![];
434 let b_owned = vec![(100i64, vec![1.0f32])];
435 let b = as_refs(&b_owned);
436 assert!(temporal_join(&a, &b, 1.0, 1000).is_err());
437 }
438
439 #[test]
440 fn join_no_overlap() {
441 let a_owned = vec![
443 (1000i64, vec![0.0f32, 0.0]),
444 (2000, vec![0.0, 0.0]),
445 (3000, vec![0.0, 0.0]),
446 ];
447 let b_owned = vec![
448 (1000i64, vec![100.0f32, 100.0]),
449 (2000, vec![100.0, 100.0]),
450 (3000, vec![100.0, 100.0]),
451 ];
452 let a = as_refs(&a_owned);
453 let b = as_refs(&b_owned);
454
455 let results = temporal_join(&a, &b, 0.5, 2000).unwrap();
456 assert!(
457 results.is_empty(),
458 "distant entities should produce no join"
459 );
460 }
461
462 #[test]
463 fn join_full_convergence() {
464 let a_owned = vec![
466 (1000i64, vec![1.0f32, 1.0]),
467 (2000, vec![1.0, 1.0]),
468 (3000, vec![1.0, 1.0]),
469 ];
470 let b_owned = vec![
471 (1000i64, vec![1.0f32, 1.0]),
472 (2000, vec![1.0, 1.0]),
473 (3000, vec![1.0, 1.0]),
474 ];
475 let a = as_refs(&a_owned);
476 let b = as_refs(&b_owned);
477
478 let results = temporal_join(&a, &b, 0.5, 2000).unwrap();
479 assert!(
480 !results.is_empty(),
481 "identical trajectories should produce at least one join"
482 );
483 assert!(results[0].min_distance < 1e-6);
484 }
485
486 #[test]
487 fn join_partial_convergence() {
488 let a_owned = vec![
490 (1000i64, vec![0.0f32, 0.0]),
491 (2000, vec![0.0, 0.0]),
492 (3000, vec![0.2, 0.0]),
493 (4000, vec![0.8, 0.0]),
494 (5000, vec![1.0, 0.0]), (6000, vec![1.0, 0.0]), (7000, vec![0.5, 0.0]),
497 (8000, vec![0.0, 0.0]),
498 ];
499 let b_owned = vec![
500 (1000i64, vec![1.0f32, 0.0]),
501 (2000, vec![1.0, 0.0]),
502 (3000, vec![1.0, 0.0]),
503 (4000, vec![1.0, 0.0]),
504 (5000, vec![1.0, 0.0]),
505 (6000, vec![1.0, 0.0]),
506 (7000, vec![1.0, 0.0]),
507 (8000, vec![1.0, 0.0]),
508 ];
509 let a = as_refs(&a_owned);
510 let b = as_refs(&b_owned);
511
512 let results = temporal_join(&a, &b, 0.15, 2000).unwrap();
513 assert!(
514 !results.is_empty(),
515 "should detect convergence around t=5000-6000"
516 );
517
518 let convergence = &results[0];
520 assert!(
521 convergence.start >= 3000 && convergence.start <= 5000,
522 "convergence should start around t=4000-5000, got {}",
523 convergence.start
524 );
525 }
526
527 #[test]
528 fn join_respects_epsilon() {
529 let a_owned = vec![(1000i64, vec![0.0f32, 0.0])];
530 let b_owned = vec![(1000i64, vec![0.5, 0.0])];
531 let a = as_refs(&a_owned);
532 let b = as_refs(&b_owned);
533
534 let results = temporal_join(&a, &b, 0.4, 2000).unwrap();
536 assert!(results.is_empty());
537
538 let results = temporal_join(&a, &b, 0.6, 2000).unwrap();
540 assert!(!results.is_empty());
541 }
542
543 #[test]
544 fn join_single_window_covers_all() {
545 let a_owned = vec![(100i64, vec![1.0f32]), (200, vec![1.0])];
547 let b_owned = vec![(150i64, vec![1.0f32]), (250, vec![1.0])];
548 let a = as_refs(&a_owned);
549 let b = as_refs(&b_owned);
550
551 let results = temporal_join(&a, &b, 0.5, 1000).unwrap();
552 assert!(!results.is_empty());
553 assert_eq!(results.len(), 1);
554 }
555
556 #[test]
557 fn join_result_has_point_counts() {
558 let a_owned = vec![
559 (1000i64, vec![0.0f32]),
560 (2000, vec![0.0]),
561 (3000, vec![0.0]),
562 ];
563 let b_owned = vec![(1500i64, vec![0.0f32]), (2500, vec![0.0])];
564 let a = as_refs(&a_owned);
565 let b = as_refs(&b_owned);
566
567 let results = temporal_join(&a, &b, 1.0, 4000).unwrap();
568 assert!(!results.is_empty());
569 assert!(results[0].points_a > 0);
570 assert!(results[0].points_b > 0);
571 }
572
573 #[test]
576 fn group_join_insufficient_entities() {
577 let t1_owned = vec![(100i64, vec![1.0f32])];
578 let t1 = as_refs(&t1_owned);
579 let trajectories: Vec<(u64, &[(i64, &[f32])])> = vec![(1, &t1)];
580 let result = group_temporal_join(&trajectories, 0.5, 3, 1000);
581 assert!(result.is_err());
582 }
583
584 #[test]
585 fn group_join_all_converge() {
586 let mut owned: Vec<Vec<(i64, Vec<f32>)>> = Vec::new();
588 for _ in 0..4 {
589 owned.push(vec![
590 (1000i64, vec![1.0f32, 1.0]),
591 (2000, vec![1.0, 1.0]),
592 (3000, vec![1.0, 1.0]),
593 ]);
594 }
595 let refs: Vec<Vec<(i64, &[f32])>> = owned.iter().map(|t| as_refs(t)).collect();
596 let trajectories: Vec<(u64, &[(i64, &[f32])])> = refs
597 .iter()
598 .enumerate()
599 .map(|(i, t)| (i as u64, t.as_slice()))
600 .collect();
601
602 let results = group_temporal_join(&trajectories, 0.5, 3, 3000).unwrap();
603 assert!(
604 !results.is_empty(),
605 "all identical entities should converge"
606 );
607 assert!(results[0].n_converged >= 3);
608 }
609
610 #[test]
611 fn group_join_partial_subset() {
612 let owned = vec![
614 vec![(1000i64, vec![1.0f32, 1.0]), (2000, vec![1.0, 1.0])],
615 vec![(1000i64, vec![1.0f32, 1.1]), (2000, vec![1.0, 1.1])],
616 vec![(1000i64, vec![1.1f32, 1.0]), (2000, vec![1.1, 1.0])],
617 vec![(1000i64, vec![100.0f32, 100.0]), (2000, vec![100.0, 100.0])], ];
619 let refs: Vec<Vec<(i64, &[f32])>> = owned.iter().map(|t| as_refs(t)).collect();
620 let trajectories: Vec<(u64, &[(i64, &[f32])])> = refs
621 .iter()
622 .enumerate()
623 .map(|(i, t)| (i as u64, t.as_slice()))
624 .collect();
625
626 let results = group_temporal_join(&trajectories, 0.5, 3, 2000).unwrap();
627 assert!(!results.is_empty(), "3 close entities should converge");
628
629 let converged = &results[0].converged_entities;
631 assert!(
632 !converged.contains(&3),
633 "far entity should not be in cluster"
634 );
635 assert!(converged.len() >= 3, "at least 3 should converge");
636 }
637
638 #[test]
639 fn group_join_no_convergence() {
640 let owned = [
642 vec![(1000i64, vec![0.0f32, 0.0])],
643 vec![(1000i64, vec![10.0, 0.0])],
644 vec![(1000i64, vec![0.0, 10.0])],
645 ];
646 let refs: Vec<Vec<(i64, &[f32])>> = owned.iter().map(|t| as_refs(t)).collect();
647 let trajectories: Vec<(u64, &[(i64, &[f32])])> = refs
648 .iter()
649 .enumerate()
650 .map(|(i, t)| (i as u64, t.as_slice()))
651 .collect();
652
653 let results = group_temporal_join(&trajectories, 0.5, 2, 1000).unwrap();
654 assert!(results.is_empty(), "distant entities should not converge");
655 }
656
657 #[test]
660 fn window_distances_empty_window() {
661 let a_owned = vec![(1000i64, vec![0.0f32])];
662 let b_owned = vec![(5000i64, vec![0.0f32])];
663 let a = as_refs(&a_owned);
664 let b = as_refs(&b_owned);
665
666 let (min, _, n_a, n_b) = window_distances(&a, &b, 900, 1100);
668 assert_eq!(n_a, 1);
669 assert_eq!(n_b, 0);
670 assert_eq!(min, f32::MAX);
671 }
672
673 #[test]
674 fn find_cluster_all_close() {
675 let reps = vec![
676 (0u64, vec![0.0f32, 0.0]),
677 (1, vec![0.1, 0.0]),
678 (2, vec![0.0, 0.1]),
679 ];
680 let cluster = find_largest_epsilon_cluster(&reps, 0.5);
681 assert_eq!(cluster.len(), 3);
682 }
683
684 #[test]
685 fn find_cluster_with_outlier() {
686 let reps = vec![
687 (0u64, vec![0.0f32, 0.0]),
688 (1, vec![0.1, 0.0]),
689 (2, vec![0.0, 0.1]),
690 (3, vec![100.0, 100.0]),
691 ];
692 let cluster = find_largest_epsilon_cluster(&reps, 0.5);
693 assert_eq!(cluster.len(), 3);
694 assert!(!cluster.contains(&3));
695 }
696}