cvx_analytics/
temporal_join.rs

1//! Temporal join — find time windows where entities are semantically close.
2//!
3//! A temporal join finds periods where two (or more) entities converge in
4//! embedding space. Unlike point-level kNN, this captures sustained proximity.
5//!
6//! # Example
7//!
8//! Two users who sporadically post similar content won't trigger a join,
9//! but users who consistently converge over a week-long window will.
10
11use crate::calculus::drift_magnitude_l2;
12use cvx_core::error::AnalyticsError;
13
14// ─── Types ──────────────────────────────────────────────────────────
15
16/// A time window during which two entities were semantically close.
17#[derive(Debug, Clone)]
18pub struct TemporalJoinResult {
19    /// Start of the convergence window.
20    pub start: i64,
21    /// End of the convergence window.
22    pub end: i64,
23    /// Mean pairwise distance during the window.
24    pub mean_distance: f32,
25    /// Minimum pairwise distance during the window.
26    pub min_distance: f32,
27    /// Number of points from entity A in the window.
28    pub points_a: usize,
29    /// Number of points from entity B in the window.
30    pub points_b: usize,
31}
32
33/// A time window during which a subset of a group converged.
34#[derive(Debug, Clone)]
35pub struct GroupJoinResult {
36    /// Start of the convergence window.
37    pub start: i64,
38    /// End of the convergence window.
39    pub end: i64,
40    /// Number of entities within epsilon during this window.
41    pub n_converged: usize,
42    /// Entity IDs that converged.
43    pub converged_entities: Vec<u64>,
44    /// Mean pairwise distance among converged entities.
45    pub mean_distance: f32,
46}
47
48// ─── Core: pairwise temporal join ───────────────────────────────────
49
50/// Find time windows where entities A and B are within distance `epsilon`.
51///
52/// Uses a sliding window of size `window_us` microseconds over the merged
53/// timeline. Within each window, computes the minimum pairwise distance
54/// between A's and B's points.
55///
56/// # Arguments
57///
58/// * `traj_a` — Entity A's trajectory, sorted by timestamp
59/// * `traj_b` — Entity B's trajectory, sorted by timestamp
60/// * `epsilon` — Maximum distance threshold for convergence
61/// * `window_us` — Sliding window size in microseconds
62///
63/// # Errors
64///
65/// Returns [`AnalyticsError::InsufficientData`] if either trajectory is empty.
66pub fn temporal_join(
67    traj_a: &[(i64, &[f32])],
68    traj_b: &[(i64, &[f32])],
69    epsilon: f32,
70    window_us: i64,
71) -> Result<Vec<TemporalJoinResult>, AnalyticsError> {
72    if traj_a.is_empty() || traj_b.is_empty() {
73        return Err(AnalyticsError::InsufficientData { needed: 1, have: 0 });
74    }
75
76    // Determine the global time range
77    let global_start = traj_a[0].0.min(traj_b[0].0);
78    let global_end = traj_a.last().unwrap().0.max(traj_b.last().unwrap().0);
79
80    if global_end - global_start < window_us {
81        // Single window covers everything
82        let (min_dist, mean_dist, n_a, n_b) =
83            window_distances(traj_a, traj_b, global_start, global_end);
84        return if min_dist <= epsilon && n_a > 0 && n_b > 0 {
85            Ok(vec![TemporalJoinResult {
86                start: global_start,
87                end: global_end,
88                mean_distance: mean_dist,
89                min_distance: min_dist,
90                points_a: n_a,
91                points_b: n_b,
92            }])
93        } else {
94            Ok(vec![])
95        };
96    }
97
98    // Slide window with step = window_us / 2 (50% overlap for smoother detection)
99    let step = (window_us / 2).max(1);
100    let mut results: Vec<TemporalJoinResult> = Vec::new();
101    let mut current_start: Option<i64> = None;
102    let mut acc_min_dist = f32::MAX;
103    let mut acc_sum_dist = 0.0f32;
104    let mut acc_count = 0usize;
105    let mut acc_points_a = 0usize;
106    let mut acc_points_b = 0usize;
107
108    let mut t = global_start;
109    while t <= global_end {
110        let w_end = (t + window_us).min(global_end);
111        let (min_dist, mean_dist, n_a, n_b) = window_distances(traj_a, traj_b, t, w_end);
112
113        let is_close = min_dist <= epsilon && n_a > 0 && n_b > 0;
114
115        if is_close {
116            if current_start.is_none() {
117                current_start = Some(t);
118                acc_min_dist = f32::MAX;
119                acc_sum_dist = 0.0;
120                acc_count = 0;
121                acc_points_a = 0;
122                acc_points_b = 0;
123            }
124            acc_min_dist = acc_min_dist.min(min_dist);
125            acc_sum_dist += mean_dist;
126            acc_count += 1;
127            acc_points_a = acc_points_a.max(n_a);
128            acc_points_b = acc_points_b.max(n_b);
129        } else if let Some(start) = current_start.take() {
130            results.push(TemporalJoinResult {
131                start,
132                end: (t - step + window_us).min(global_end),
133                mean_distance: acc_sum_dist / acc_count as f32,
134                min_distance: acc_min_dist,
135                points_a: acc_points_a,
136                points_b: acc_points_b,
137            });
138        }
139
140        t += step;
141    }
142
143    // Close any open interval
144    if let Some(start) = current_start {
145        results.push(TemporalJoinResult {
146            start,
147            end: global_end,
148            mean_distance: if acc_count > 0 {
149                acc_sum_dist / acc_count as f32
150            } else {
151                0.0
152            },
153            min_distance: acc_min_dist,
154            points_a: acc_points_a,
155            points_b: acc_points_b,
156        });
157    }
158
159    Ok(results)
160}
161
162// ─── Core: group temporal join ──────────────────────────────────────
163
164/// Find time windows where at least `min_entities` from the group converge.
165///
166/// For each window, computes all pairwise distances and finds the largest
167/// subset within distance `epsilon` of each other.
168///
169/// # Arguments
170///
171/// * `trajectories` — `(entity_id, trajectory)` pairs
172/// * `epsilon` — Maximum distance threshold
173/// * `min_entities` — Minimum number of entities that must converge
174/// * `window_us` — Sliding window size in microseconds
175///
176/// # Errors
177///
178/// Returns [`AnalyticsError::InsufficientData`] if fewer than `min_entities`
179/// trajectories are provided.
180#[allow(clippy::type_complexity)]
181pub fn group_temporal_join(
182    trajectories: &[(u64, &[(i64, &[f32])])],
183    epsilon: f32,
184    min_entities: usize,
185    window_us: i64,
186) -> Result<Vec<GroupJoinResult>, AnalyticsError> {
187    if trajectories.len() < min_entities {
188        return Err(AnalyticsError::InsufficientData {
189            needed: min_entities,
190            have: trajectories.len(),
191        });
192    }
193
194    // Global time range
195    let global_start = trajectories
196        .iter()
197        .filter_map(|(_, t)| t.first().map(|(ts, _)| *ts))
198        .min()
199        .unwrap_or(0);
200    let global_end = trajectories
201        .iter()
202        .filter_map(|(_, t)| t.last().map(|(ts, _)| *ts))
203        .max()
204        .unwrap_or(0);
205
206    if global_end <= global_start {
207        return Ok(vec![]);
208    }
209
210    let step = (window_us / 2).max(1);
211    let mut results: Vec<GroupJoinResult> = Vec::new();
212
213    let mut t = global_start;
214    while t <= global_end {
215        let w_end = (t + window_us).min(global_end);
216
217        // For each entity, find their representative vector in this window
218        // (use the centroid of all points in window)
219        let mut entity_reps: Vec<(u64, Vec<f32>)> = Vec::new();
220
221        for &(eid, traj) in trajectories {
222            let points_in_window: Vec<&[f32]> = traj
223                .iter()
224                .filter(|(ts, _)| *ts >= t && *ts <= w_end)
225                .map(|(_, v)| *v)
226                .collect();
227
228            if points_in_window.is_empty() {
229                continue;
230            }
231
232            // Centroid of points in window
233            let dim = points_in_window[0].len();
234            let n = points_in_window.len() as f32;
235            let mut centroid = vec![0.0f32; dim];
236            for v in &points_in_window {
237                for (i, &val) in v.iter().enumerate() {
238                    centroid[i] += val;
239                }
240            }
241            for val in &mut centroid {
242                *val /= n;
243            }
244
245            entity_reps.push((eid, centroid));
246        }
247
248        if entity_reps.len() >= min_entities {
249            // Greedy clustering: find entities within epsilon of the first,
250            // then try each as seed and keep the largest cluster
251            let converged = find_largest_epsilon_cluster(&entity_reps, epsilon);
252
253            if converged.len() >= min_entities {
254                // Compute mean pairwise distance among converged
255                let mean_dist = mean_pairwise_distance(&entity_reps, &converged);
256
257                results.push(GroupJoinResult {
258                    start: t,
259                    end: w_end,
260                    n_converged: converged.len(),
261                    converged_entities: converged,
262                    mean_distance: mean_dist,
263                });
264            }
265        }
266
267        t += step;
268    }
269
270    // Merge consecutive overlapping windows
271    merge_group_results(&mut results, step);
272
273    Ok(results)
274}
275
276// ─── Helpers ────────────────────────────────────────────────────────
277
278/// Compute min and mean distances between A's and B's points in a time window.
279fn window_distances(
280    traj_a: &[(i64, &[f32])],
281    traj_b: &[(i64, &[f32])],
282    start: i64,
283    end: i64,
284) -> (f32, f32, usize, usize) {
285    let a_in_window: Vec<&[f32]> = traj_a
286        .iter()
287        .filter(|(ts, _)| *ts >= start && *ts <= end)
288        .map(|(_, v)| *v)
289        .collect();
290    let b_in_window: Vec<&[f32]> = traj_b
291        .iter()
292        .filter(|(ts, _)| *ts >= start && *ts <= end)
293        .map(|(_, v)| *v)
294        .collect();
295
296    if a_in_window.is_empty() || b_in_window.is_empty() {
297        return (f32::MAX, f32::MAX, a_in_window.len(), b_in_window.len());
298    }
299
300    let mut min_dist = f32::MAX;
301    let mut sum_dist = 0.0f32;
302    let mut count = 0;
303
304    for a in &a_in_window {
305        for b in &b_in_window {
306            let d = drift_magnitude_l2(a, b);
307            min_dist = min_dist.min(d);
308            sum_dist += d;
309            count += 1;
310        }
311    }
312
313    let mean_dist = if count > 0 {
314        sum_dist / count as f32
315    } else {
316        f32::MAX
317    };
318
319    (min_dist, mean_dist, a_in_window.len(), b_in_window.len())
320}
321
322/// Find the largest cluster of entities where all pairs are within epsilon.
323///
324/// Uses a greedy approach: for each entity as seed, collect all within epsilon,
325/// then keep the largest set.
326fn find_largest_epsilon_cluster(entity_reps: &[(u64, Vec<f32>)], epsilon: f32) -> Vec<u64> {
327    let n = entity_reps.len();
328    let mut best: Vec<u64> = Vec::new();
329
330    for i in 0..n {
331        let mut cluster: Vec<usize> = vec![i];
332
333        for j in 0..n {
334            if i == j {
335                continue;
336            }
337            // Check if j is within epsilon of ALL current cluster members
338            let all_close = cluster
339                .iter()
340                .all(|&c| drift_magnitude_l2(&entity_reps[c].1, &entity_reps[j].1) <= epsilon);
341            if all_close {
342                cluster.push(j);
343            }
344        }
345
346        if cluster.len() > best.len() {
347            best = cluster.into_iter().map(|idx| entity_reps[idx].0).collect();
348        }
349    }
350
351    best
352}
353
354/// Mean pairwise distance among a subset of entities.
355fn mean_pairwise_distance(entity_reps: &[(u64, Vec<f32>)], ids: &[u64]) -> f32 {
356    let vecs: Vec<&Vec<f32>> = ids
357        .iter()
358        .filter_map(|id| {
359            entity_reps
360                .iter()
361                .find(|(eid, _)| eid == id)
362                .map(|(_, v)| v)
363        })
364        .collect();
365
366    if vecs.len() < 2 {
367        return 0.0;
368    }
369
370    let mut sum = 0.0f32;
371    let mut count = 0;
372    for i in 0..vecs.len() {
373        for j in (i + 1)..vecs.len() {
374            sum += drift_magnitude_l2(vecs[i], vecs[j]);
375            count += 1;
376        }
377    }
378
379    if count > 0 { sum / count as f32 } else { 0.0 }
380}
381
382/// Merge consecutive group join results that overlap in time.
383fn merge_group_results(results: &mut Vec<GroupJoinResult>, step: i64) {
384    if results.len() < 2 {
385        return;
386    }
387
388    let mut merged: Vec<GroupJoinResult> = Vec::new();
389    let mut current = results[0].clone();
390
391    for r in &results[1..] {
392        // If this window overlaps with the current merged window
393        if r.start <= current.end + step {
394            // Extend the window
395            current.end = r.end;
396            current.n_converged = current.n_converged.max(r.n_converged);
397            current.mean_distance = (current.mean_distance + r.mean_distance) / 2.0;
398            // Union of converged entities
399            for eid in &r.converged_entities {
400                if !current.converged_entities.contains(eid) {
401                    current.converged_entities.push(*eid);
402                }
403            }
404        } else {
405            merged.push(current);
406            current = r.clone();
407        }
408    }
409    merged.push(current);
410
411    *results = merged;
412}
413
414// ─── Tests ──────────────────────────────────────────────────────────
415
416#[cfg(test)]
417#[allow(
418    clippy::type_complexity,
419    clippy::needless_range_loop,
420    clippy::useless_vec
421)]
422mod tests {
423    use super::*;
424
425    fn as_refs(points: &[(i64, Vec<f32>)]) -> Vec<(i64, &[f32])> {
426        points.iter().map(|(t, v)| (*t, v.as_slice())).collect()
427    }
428
429    // ─── temporal_join (pairwise) ───────────────────────────────
430
431    #[test]
432    fn join_empty_trajectories() {
433        let a: Vec<(i64, &[f32])> = vec![];
434        let b_owned = vec![(100i64, vec![1.0f32])];
435        let b = as_refs(&b_owned);
436        assert!(temporal_join(&a, &b, 1.0, 1000).is_err());
437    }
438
439    #[test]
440    fn join_no_overlap() {
441        // A and B are far apart in embedding space
442        let a_owned = vec![
443            (1000i64, vec![0.0f32, 0.0]),
444            (2000, vec![0.0, 0.0]),
445            (3000, vec![0.0, 0.0]),
446        ];
447        let b_owned = vec![
448            (1000i64, vec![100.0f32, 100.0]),
449            (2000, vec![100.0, 100.0]),
450            (3000, vec![100.0, 100.0]),
451        ];
452        let a = as_refs(&a_owned);
453        let b = as_refs(&b_owned);
454
455        let results = temporal_join(&a, &b, 0.5, 2000).unwrap();
456        assert!(
457            results.is_empty(),
458            "distant entities should produce no join"
459        );
460    }
461
462    #[test]
463    fn join_full_convergence() {
464        // A and B are at the same point for the entire duration
465        let a_owned = vec![
466            (1000i64, vec![1.0f32, 1.0]),
467            (2000, vec![1.0, 1.0]),
468            (3000, vec![1.0, 1.0]),
469        ];
470        let b_owned = vec![
471            (1000i64, vec![1.0f32, 1.0]),
472            (2000, vec![1.0, 1.0]),
473            (3000, vec![1.0, 1.0]),
474        ];
475        let a = as_refs(&a_owned);
476        let b = as_refs(&b_owned);
477
478        let results = temporal_join(&a, &b, 0.5, 2000).unwrap();
479        assert!(
480            !results.is_empty(),
481            "identical trajectories should produce at least one join"
482        );
483        assert!(results[0].min_distance < 1e-6);
484    }
485
486    #[test]
487    fn join_partial_convergence() {
488        // A starts far from B, converges around t=5000, then diverges
489        let a_owned = vec![
490            (1000i64, vec![0.0f32, 0.0]),
491            (2000, vec![0.0, 0.0]),
492            (3000, vec![0.2, 0.0]),
493            (4000, vec![0.8, 0.0]),
494            (5000, vec![1.0, 0.0]), // close to B
495            (6000, vec![1.0, 0.0]), // close to B
496            (7000, vec![0.5, 0.0]),
497            (8000, vec![0.0, 0.0]),
498        ];
499        let b_owned = vec![
500            (1000i64, vec![1.0f32, 0.0]),
501            (2000, vec![1.0, 0.0]),
502            (3000, vec![1.0, 0.0]),
503            (4000, vec![1.0, 0.0]),
504            (5000, vec![1.0, 0.0]),
505            (6000, vec![1.0, 0.0]),
506            (7000, vec![1.0, 0.0]),
507            (8000, vec![1.0, 0.0]),
508        ];
509        let a = as_refs(&a_owned);
510        let b = as_refs(&b_owned);
511
512        let results = temporal_join(&a, &b, 0.15, 2000).unwrap();
513        assert!(
514            !results.is_empty(),
515            "should detect convergence around t=5000-6000"
516        );
517
518        // The convergence should be in the t=4000-7000 region
519        let convergence = &results[0];
520        assert!(
521            convergence.start >= 3000 && convergence.start <= 5000,
522            "convergence should start around t=4000-5000, got {}",
523            convergence.start
524        );
525    }
526
527    #[test]
528    fn join_respects_epsilon() {
529        let a_owned = vec![(1000i64, vec![0.0f32, 0.0])];
530        let b_owned = vec![(1000i64, vec![0.5, 0.0])];
531        let a = as_refs(&a_owned);
532        let b = as_refs(&b_owned);
533
534        // Distance is 0.5, epsilon=0.4 should not match
535        let results = temporal_join(&a, &b, 0.4, 2000).unwrap();
536        assert!(results.is_empty());
537
538        // epsilon=0.6 should match
539        let results = temporal_join(&a, &b, 0.6, 2000).unwrap();
540        assert!(!results.is_empty());
541    }
542
543    #[test]
544    fn join_single_window_covers_all() {
545        // Window larger than total time range
546        let a_owned = vec![(100i64, vec![1.0f32]), (200, vec![1.0])];
547        let b_owned = vec![(150i64, vec![1.0f32]), (250, vec![1.0])];
548        let a = as_refs(&a_owned);
549        let b = as_refs(&b_owned);
550
551        let results = temporal_join(&a, &b, 0.5, 1000).unwrap();
552        assert!(!results.is_empty());
553        assert_eq!(results.len(), 1);
554    }
555
556    #[test]
557    fn join_result_has_point_counts() {
558        let a_owned = vec![
559            (1000i64, vec![0.0f32]),
560            (2000, vec![0.0]),
561            (3000, vec![0.0]),
562        ];
563        let b_owned = vec![(1500i64, vec![0.0f32]), (2500, vec![0.0])];
564        let a = as_refs(&a_owned);
565        let b = as_refs(&b_owned);
566
567        let results = temporal_join(&a, &b, 1.0, 4000).unwrap();
568        assert!(!results.is_empty());
569        assert!(results[0].points_a > 0);
570        assert!(results[0].points_b > 0);
571    }
572
573    // ─── group_temporal_join ────────────────────────────────────
574
575    #[test]
576    fn group_join_insufficient_entities() {
577        let t1_owned = vec![(100i64, vec![1.0f32])];
578        let t1 = as_refs(&t1_owned);
579        let trajectories: Vec<(u64, &[(i64, &[f32])])> = vec![(1, &t1)];
580        let result = group_temporal_join(&trajectories, 0.5, 3, 1000);
581        assert!(result.is_err());
582    }
583
584    #[test]
585    fn group_join_all_converge() {
586        // 4 entities all at the same point
587        let mut owned: Vec<Vec<(i64, Vec<f32>)>> = Vec::new();
588        for _ in 0..4 {
589            owned.push(vec![
590                (1000i64, vec![1.0f32, 1.0]),
591                (2000, vec![1.0, 1.0]),
592                (3000, vec![1.0, 1.0]),
593            ]);
594        }
595        let refs: Vec<Vec<(i64, &[f32])>> = owned.iter().map(|t| as_refs(t)).collect();
596        let trajectories: Vec<(u64, &[(i64, &[f32])])> = refs
597            .iter()
598            .enumerate()
599            .map(|(i, t)| (i as u64, t.as_slice()))
600            .collect();
601
602        let results = group_temporal_join(&trajectories, 0.5, 3, 3000).unwrap();
603        assert!(
604            !results.is_empty(),
605            "all identical entities should converge"
606        );
607        assert!(results[0].n_converged >= 3);
608    }
609
610    #[test]
611    fn group_join_partial_subset() {
612        // 3 close entities + 1 far entity
613        let owned = vec![
614            vec![(1000i64, vec![1.0f32, 1.0]), (2000, vec![1.0, 1.0])],
615            vec![(1000i64, vec![1.0f32, 1.1]), (2000, vec![1.0, 1.1])],
616            vec![(1000i64, vec![1.1f32, 1.0]), (2000, vec![1.1, 1.0])],
617            vec![(1000i64, vec![100.0f32, 100.0]), (2000, vec![100.0, 100.0])], // far
618        ];
619        let refs: Vec<Vec<(i64, &[f32])>> = owned.iter().map(|t| as_refs(t)).collect();
620        let trajectories: Vec<(u64, &[(i64, &[f32])])> = refs
621            .iter()
622            .enumerate()
623            .map(|(i, t)| (i as u64, t.as_slice()))
624            .collect();
625
626        let results = group_temporal_join(&trajectories, 0.5, 3, 2000).unwrap();
627        assert!(!results.is_empty(), "3 close entities should converge");
628
629        // Entity 3 (far) should NOT be in the converged set
630        let converged = &results[0].converged_entities;
631        assert!(
632            !converged.contains(&3),
633            "far entity should not be in cluster"
634        );
635        assert!(converged.len() >= 3, "at least 3 should converge");
636    }
637
638    #[test]
639    fn group_join_no_convergence() {
640        // All entities far apart
641        let owned = [
642            vec![(1000i64, vec![0.0f32, 0.0])],
643            vec![(1000i64, vec![10.0, 0.0])],
644            vec![(1000i64, vec![0.0, 10.0])],
645        ];
646        let refs: Vec<Vec<(i64, &[f32])>> = owned.iter().map(|t| as_refs(t)).collect();
647        let trajectories: Vec<(u64, &[(i64, &[f32])])> = refs
648            .iter()
649            .enumerate()
650            .map(|(i, t)| (i as u64, t.as_slice()))
651            .collect();
652
653        let results = group_temporal_join(&trajectories, 0.5, 2, 1000).unwrap();
654        assert!(results.is_empty(), "distant entities should not converge");
655    }
656
657    // ─── helpers ────────────────────────────────────────────────
658
659    #[test]
660    fn window_distances_empty_window() {
661        let a_owned = vec![(1000i64, vec![0.0f32])];
662        let b_owned = vec![(5000i64, vec![0.0f32])];
663        let a = as_refs(&a_owned);
664        let b = as_refs(&b_owned);
665
666        // Window that only contains A, not B
667        let (min, _, n_a, n_b) = window_distances(&a, &b, 900, 1100);
668        assert_eq!(n_a, 1);
669        assert_eq!(n_b, 0);
670        assert_eq!(min, f32::MAX);
671    }
672
673    #[test]
674    fn find_cluster_all_close() {
675        let reps = vec![
676            (0u64, vec![0.0f32, 0.0]),
677            (1, vec![0.1, 0.0]),
678            (2, vec![0.0, 0.1]),
679        ];
680        let cluster = find_largest_epsilon_cluster(&reps, 0.5);
681        assert_eq!(cluster.len(), 3);
682    }
683
684    #[test]
685    fn find_cluster_with_outlier() {
686        let reps = vec![
687            (0u64, vec![0.0f32, 0.0]),
688            (1, vec![0.1, 0.0]),
689            (2, vec![0.0, 0.1]),
690            (3, vec![100.0, 100.0]),
691        ];
692        let cluster = find_largest_epsilon_cluster(&reps, 0.5);
693        assert_eq!(cluster.len(), 3);
694        assert!(!cluster.contains(&3));
695    }
696}