cvx_analytics/
multiscale.rs

1//! Multi-scale temporal analysis: resampling and per-scale drift.
2//!
3//! Enables analysis at different temporal resolutions (hourly → daily → weekly)
4//! and cross-scale drift comparison.
5
6use cvx_core::error::AnalyticsError;
7
8// ─── Resampling ─────────────────────────────────────────────────────
9
10/// Resampling strategy for temporal downsampling.
11#[derive(Debug, Clone, Copy)]
12pub enum ResampleMethod {
13    /// Use the last value in each bucket (sample-and-hold).
14    LastValue,
15    /// Linear interpolation at bucket midpoints.
16    Linear,
17}
18
19/// Resample a trajectory to a coarser temporal resolution.
20///
21/// Groups points into buckets of `bucket_width` microseconds,
22/// then applies the chosen resampling method.
23///
24/// Returns `(bucket_timestamp, vector)` pairs.
25pub fn resample(
26    trajectory: &[(i64, &[f32])],
27    bucket_width: i64,
28    method: ResampleMethod,
29) -> Result<Vec<(i64, Vec<f32>)>, AnalyticsError> {
30    if trajectory.is_empty() {
31        return Ok(Vec::new());
32    }
33    if bucket_width <= 0 {
34        return Err(AnalyticsError::InsufficientData { needed: 1, have: 0 });
35    }
36
37    let t_min = trajectory[0].0;
38    let t_max = trajectory.last().unwrap().0;
39
40    let mut result = Vec::new();
41    let mut bucket_start = t_min;
42
43    while bucket_start <= t_max {
44        let bucket_end = bucket_start + bucket_width;
45        let bucket_mid = bucket_start + bucket_width / 2;
46
47        match method {
48            ResampleMethod::LastValue => {
49                // Find the last point in this bucket
50                let last_in_bucket = trajectory
51                    .iter()
52                    .rev()
53                    .find(|(t, _)| *t >= bucket_start && *t < bucket_end);
54
55                if let Some((_, v)) = last_in_bucket {
56                    result.push((bucket_mid, v.to_vec()));
57                }
58            }
59            ResampleMethod::Linear => {
60                // Interpolate at bucket midpoint
61                if let Some(vec) = interpolate_at(trajectory, bucket_mid) {
62                    result.push((bucket_mid, vec));
63                }
64            }
65        }
66
67        bucket_start = bucket_end;
68    }
69
70    Ok(result)
71}
72
73/// Linear interpolation at a specific timestamp.
74fn interpolate_at(trajectory: &[(i64, &[f32])], target: i64) -> Option<Vec<f32>> {
75    if trajectory.is_empty() {
76        return None;
77    }
78
79    // Find bracketing points
80    let idx = trajectory.partition_point(|(t, _)| *t <= target);
81
82    if idx == 0 {
83        // Before first point — extrapolate with first value
84        return Some(trajectory[0].1.to_vec());
85    }
86    if idx >= trajectory.len() {
87        // After last point — extrapolate with last value
88        return Some(trajectory.last().unwrap().1.to_vec());
89    }
90
91    let (t0, v0) = &trajectory[idx - 1];
92    let (t1, v1) = &trajectory[idx];
93
94    if t1 == t0 {
95        return Some(v0.to_vec());
96    }
97
98    let alpha = (target - t0) as f64 / (t1 - t0) as f64;
99    let dim = v0.len();
100    let interpolated: Vec<f32> = (0..dim)
101        .map(|d| (v0[d] as f64 * (1.0 - alpha) + v1[d] as f64 * alpha) as f32)
102        .collect();
103
104    Some(interpolated)
105}
106
107// ─── Per-scale drift analysis ───────────────────────────────────────
108
109/// Per-scale drift: L2 displacement at a given temporal resolution.
110///
111/// Returns `(bucket_timestamp, drift_magnitude)` pairs.
112pub fn scale_drift(
113    trajectory: &[(i64, &[f32])],
114    bucket_width: i64,
115) -> Result<Vec<(i64, f32)>, AnalyticsError> {
116    let resampled = resample(trajectory, bucket_width, ResampleMethod::LastValue)?;
117
118    if resampled.len() < 2 {
119        return Ok(Vec::new());
120    }
121
122    let drifts: Vec<(i64, f32)> = resampled
123        .windows(2)
124        .map(|w| {
125            let drift: f32 = w[0]
126                .1
127                .iter()
128                .zip(w[1].1.iter())
129                .map(|(a, b)| (a - b) * (a - b))
130                .sum::<f32>()
131                .sqrt();
132            (w[1].0, drift)
133        })
134        .collect();
135
136    Ok(drifts)
137}
138
139// ─── Behavioral alignment ───────────────────────────────────────────
140
141/// Behavioral alignment: Pearson correlation between two drift series.
142///
143/// Returns a value in `[-1, 1]`:
144/// - `1.0`: perfectly correlated drift
145/// - `0.0`: uncorrelated
146/// - `-1.0`: perfectly anti-correlated
147pub fn behavioral_alignment(drift_a: &[f32], drift_b: &[f32]) -> f32 {
148    let n = drift_a.len().min(drift_b.len());
149    if n < 2 {
150        return 0.0;
151    }
152
153    let a = &drift_a[..n];
154    let b = &drift_b[..n];
155
156    let mean_a: f64 = a.iter().map(|&x| x as f64).sum::<f64>() / n as f64;
157    let mean_b: f64 = b.iter().map(|&x| x as f64).sum::<f64>() / n as f64;
158
159    let mut cov = 0.0f64;
160    let mut var_a = 0.0f64;
161    let mut var_b = 0.0f64;
162
163    for i in 0..n {
164        let da = a[i] as f64 - mean_a;
165        let db = b[i] as f64 - mean_b;
166        cov += da * db;
167        var_a += da * da;
168        var_b += db * db;
169    }
170
171    let denom = (var_a * var_b).sqrt();
172    if denom == 0.0 {
173        return 0.0;
174    }
175
176    (cov / denom) as f32
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182
183    fn make_trajectory(points: &[(i64, Vec<f32>)]) -> Vec<(i64, &[f32])> {
184        points.iter().map(|(t, v)| (*t, v.as_slice())).collect()
185    }
186
187    // ─── Resampling ─────────────────────────────────────────────────
188
189    #[test]
190    fn resample_hourly_to_daily() {
191        // 24 hourly points → should produce ~1 daily bucket
192        let hour = 3_600_000_000i64; // 1 hour in µs
193        let day = 24 * hour;
194        let points: Vec<(i64, Vec<f32>)> =
195            (0..24).map(|i| (i as i64 * hour, vec![i as f32])).collect();
196        let traj = make_trajectory(&points);
197
198        let resampled = resample(&traj, day, ResampleMethod::LastValue).unwrap();
199        assert_eq!(resampled.len(), 1); // one daily bucket
200    }
201
202    #[test]
203    fn resample_preserves_count_same_resolution() {
204        // 10 points at 1000µs intervals, bucket width = 1000µs → 10 buckets
205        let points: Vec<(i64, Vec<f32>)> =
206            (0..10).map(|i| (i as i64 * 1000, vec![i as f32])).collect();
207        let traj = make_trajectory(&points);
208
209        let resampled = resample(&traj, 1000, ResampleMethod::LastValue).unwrap();
210        assert_eq!(resampled.len(), 10);
211    }
212
213    #[test]
214    fn resample_linear_interpolation() {
215        // Points spanning [0, 999] → two 500µs buckets: [0,500), [500,1000)
216        let points = vec![(0i64, vec![0.0f32, 0.0]), (999, vec![10.0, 20.0])];
217        let traj = make_trajectory(&points);
218
219        let resampled = resample(&traj, 500, ResampleMethod::Linear).unwrap();
220        assert_eq!(resampled.len(), 2);
221        // First bucket midpoint at 250: ~25% of [0,999]
222        assert!((resampled[0].1[0] - 2.5).abs() < 0.5);
223        assert!((resampled[0].1[1] - 5.0).abs() < 1.0);
224    }
225
226    #[test]
227    fn resample_empty() {
228        let traj: Vec<(i64, &[f32])> = Vec::new();
229        let result = resample(&traj, 1000, ResampleMethod::LastValue).unwrap();
230        assert!(result.is_empty());
231    }
232
233    // ─── Scale drift ────────────────────────────────────────────────
234
235    #[test]
236    fn scale_drift_stationary() {
237        let points: Vec<(i64, Vec<f32>)> =
238            (0..10).map(|i| (i as i64 * 1000, vec![1.0, 2.0])).collect();
239        let traj = make_trajectory(&points);
240
241        let drifts = scale_drift(&traj, 1000).unwrap();
242        for (_, d) in &drifts {
243            assert!(*d < 1e-6, "stationary entity should have zero drift");
244        }
245    }
246
247    #[test]
248    fn scale_drift_linear() {
249        let points: Vec<(i64, Vec<f32>)> =
250            (0..10).map(|i| (i as i64 * 1000, vec![i as f32])).collect();
251        let traj = make_trajectory(&points);
252
253        let drifts = scale_drift(&traj, 1000).unwrap();
254        // Each step should have drift = 1.0
255        for (_, d) in &drifts {
256            assert!((*d - 1.0).abs() < 1e-6);
257        }
258    }
259
260    // ─── Behavioral alignment ───────────────────────────────────────
261
262    #[test]
263    fn alignment_identical_series() {
264        let a = vec![1.0, 2.0, 3.0, 4.0, 5.0];
265        let b = vec![1.0, 2.0, 3.0, 4.0, 5.0];
266        let corr = behavioral_alignment(&a, &b);
267        assert!(
268            (corr - 1.0).abs() < 1e-5,
269            "identical series should have corr ≈ 1.0"
270        );
271    }
272
273    #[test]
274    fn alignment_scaled_series() {
275        let a = vec![1.0, 2.0, 3.0, 4.0, 5.0];
276        let b: Vec<f32> = a.iter().map(|x| x * 3.0).collect();
277        let corr = behavioral_alignment(&a, &b);
278        assert!(
279            (corr - 1.0).abs() < 1e-5,
280            "scaled series should have corr ≈ 1.0"
281        );
282    }
283
284    #[test]
285    fn alignment_anticorrelated() {
286        let a = vec![1.0, 2.0, 3.0, 4.0, 5.0];
287        let b: Vec<f32> = a.iter().map(|x| -x).collect();
288        let corr = behavioral_alignment(&a, &b);
289        assert!(
290            (corr + 1.0).abs() < 1e-5,
291            "anti-correlated should be ≈ -1.0"
292        );
293    }
294
295    #[test]
296    fn alignment_constant_is_zero() {
297        let a = vec![1.0, 1.0, 1.0, 1.0];
298        let b = vec![1.0, 2.0, 3.0, 4.0];
299        let corr = behavioral_alignment(&a, &b);
300        assert!(corr.abs() < 1e-5, "constant vs trend should be ≈ 0.0");
301    }
302}