cvx_ingest/delta/
mod.rs

1//! Delta encoding and decoding for vector compression.
2//!
3//! Instead of storing every embedding in full, CVX stores **keyframes**
4//! (full vectors) at regular intervals and **deltas** (sparse changes)
5//! between them. This is analogous to video compression (I-frames + P-frames).
6//!
7//! A delta is sparse: only dimensions that changed by more than ε are stored.
8//! For slowly-drifting embeddings, most dimensions don't change between updates,
9//! yielding 3-10x compression.
10//!
11//! # Example
12//!
13//! ```
14//! use cvx_ingest::delta::{DeltaEncoder, DeltaDecoder};
15//!
16//! let encoder = DeltaEncoder::new(10, 0.001); // keyframe every 10, threshold 0.001
17//!
18//! let v1 = vec![1.0, 2.0, 3.0];
19//! let v2 = vec![1.0, 2.1, 3.0]; // only dim 1 changed above ε
20//!
21//! let entry1 = encoder.encode(0, 1000, &v1, None);
22//! assert!(entry1.is_keyframe()); // first vector is always a keyframe
23//!
24//! let entry2 = encoder.encode(1, 2000, &v2, Some(&v1));
25//! assert!(!entry2.is_keyframe());
26//! assert_eq!(entry2.nnz(), 1); // only 1 dimension stored
27//!
28//! // Reconstruct v2 from keyframe + delta
29//! let reconstructed = DeltaDecoder::apply(&v1, &entry2);
30//! assert!((reconstructed[1] - 2.1).abs() < 1e-7);
31//! ```
32
33use cvx_core::DeltaEntry;
34
35/// Encodes vectors into keyframe + delta sequences.
36///
37/// Configuration:
38/// - `keyframe_interval`: store a full vector every K updates
39/// - `threshold`: minimum absolute change per dimension to include in delta (ε)
40pub struct DeltaEncoder {
41    keyframe_interval: u32,
42    threshold: f32,
43}
44
45impl DeltaEncoder {
46    /// Create a new delta encoder.
47    ///
48    /// - `keyframe_interval` (K): store a full keyframe every K updates.
49    ///   K=10 means every 10th vector is stored in full.
50    /// - `threshold` (ε): minimum per-dimension change to include in delta.
51    ///   ε=0.001 means changes smaller than 0.001 are discarded.
52    pub fn new(keyframe_interval: u32, threshold: f32) -> Self {
53        Self {
54            keyframe_interval,
55            threshold,
56        }
57    }
58
59    /// Encode a vector, producing either a keyframe or a delta entry.
60    ///
61    /// - `sequence_index`: the index of this vector in the entity's sequence (0, 1, 2, ...)
62    /// - `timestamp`: the timestamp for this vector
63    /// - `vector`: the full vector
64    /// - `previous`: the previous full vector (None for the first vector)
65    ///
66    /// Returns a `DeltaEntry`. If it's a keyframe, the full vector should be
67    /// stored separately; if it's a delta, only the sparse changes are stored.
68    pub fn encode(
69        &self,
70        sequence_index: u32,
71        timestamp: i64,
72        vector: &[f32],
73        previous: Option<&[f32]>,
74    ) -> DeltaEntry {
75        // First vector or every K-th vector is a keyframe
76        if previous.is_none() || sequence_index % self.keyframe_interval == 0 {
77            return DeltaEntry::keyframe(0, timestamp);
78        }
79
80        let prev = previous.unwrap();
81        assert_eq!(
82            vector.len(),
83            prev.len(),
84            "vector dimensions must match: {} vs {}",
85            vector.len(),
86            prev.len()
87        );
88
89        // Compute sparse delta: only store dimensions that changed by more than ε
90        let mut indices = Vec::new();
91        let mut values = Vec::new();
92
93        for (i, (&curr, &prev_val)) in vector.iter().zip(prev.iter()).enumerate() {
94            let delta = curr - prev_val;
95            if delta.abs() > self.threshold {
96                indices.push(i as u32);
97                values.push(delta);
98            }
99        }
100
101        DeltaEntry::delta(0, timestamp - 1, timestamp, indices, values)
102    }
103
104    /// The configured keyframe interval.
105    pub fn keyframe_interval(&self) -> u32 {
106        self.keyframe_interval
107    }
108
109    /// The configured threshold (ε).
110    pub fn threshold(&self) -> f32 {
111        self.threshold
112    }
113}
114
115/// Reconstructs full vectors from keyframes and delta chains.
116pub struct DeltaDecoder;
117
118impl DeltaDecoder {
119    /// Apply a delta entry to a base vector to reconstruct the full vector.
120    ///
121    /// - `base`: the previous full vector (typically a keyframe or already-reconstructed vector)
122    /// - `delta`: the delta entry to apply
123    ///
124    /// Returns the reconstructed full vector.
125    ///
126    /// # Panics
127    ///
128    /// Panics if any delta index is out of bounds for the base vector.
129    pub fn apply(base: &[f32], delta: &DeltaEntry) -> Vec<f32> {
130        let mut result = base.to_vec();
131        for (&idx, &val) in delta.indices().iter().zip(delta.values().iter()) {
132            result[idx as usize] += val;
133        }
134        result
135    }
136
137    /// Reconstruct a full vector from a keyframe and a chain of deltas.
138    ///
139    /// - `keyframe`: the base full vector
140    /// - `deltas`: sequence of delta entries to apply in order
141    ///
142    /// Returns the final reconstructed vector.
143    pub fn reconstruct(keyframe: &[f32], deltas: &[DeltaEntry]) -> Vec<f32> {
144        let mut current = keyframe.to_vec();
145        for delta in deltas {
146            if delta.is_keyframe() {
147                continue; // keyframes are stored separately, skip
148            }
149            current = Self::apply(&current, delta);
150        }
151        current
152    }
153}
154
155/// Compute the compression ratio: full_size / delta_size.
156///
157/// Returns how many times smaller the delta representation is.
158pub fn compression_ratio(dim: usize, deltas: &[DeltaEntry]) -> f64 {
159    let full_size = deltas.len() * dim * 4; // f32 = 4 bytes per dimension
160    let delta_size: usize = deltas
161        .iter()
162        .map(|d| {
163            if d.is_keyframe() {
164                dim * 4 // keyframes store full vector
165            } else {
166                d.nnz() * (4 + 4) // index (u32) + value (f32) per non-zero
167            }
168        })
169        .sum();
170    if delta_size == 0 {
171        return 0.0;
172    }
173    full_size as f64 / delta_size as f64
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179
180    #[test]
181    fn first_vector_is_keyframe() {
182        let enc = DeltaEncoder::new(10, 0.001);
183        let v = vec![1.0, 2.0, 3.0];
184        let entry = enc.encode(0, 1000, &v, None);
185        assert!(entry.is_keyframe());
186    }
187
188    #[test]
189    fn keyframe_every_k() {
190        let enc = DeltaEncoder::new(5, 0.001);
191        let v = vec![1.0; 10];
192        for i in 0..20u32 {
193            let entry = enc.encode(i, i as i64 * 1000, &v, Some(&v));
194            if i % 5 == 0 {
195                assert!(entry.is_keyframe(), "index {i} should be keyframe");
196            } else {
197                assert!(!entry.is_keyframe(), "index {i} should be delta");
198            }
199        }
200    }
201
202    #[test]
203    fn delta_only_stores_changed_dims() {
204        let enc = DeltaEncoder::new(100, 0.01);
205        let v1 = vec![1.0, 2.0, 3.0, 4.0, 5.0];
206        let v2 = vec![1.0, 2.05, 3.0, 4.0, 5.1]; // dims 1 and 4 changed
207
208        let entry = enc.encode(1, 2000, &v2, Some(&v1));
209        assert!(!entry.is_keyframe());
210        assert_eq!(entry.nnz(), 2);
211        assert_eq!(entry.indices(), &[1, 4]);
212    }
213
214    #[test]
215    fn small_changes_below_threshold_ignored() {
216        let enc = DeltaEncoder::new(100, 0.01);
217        let v1 = vec![1.0, 2.0, 3.0];
218        let v2 = vec![1.005, 2.005, 3.005]; // all changes < ε=0.01
219
220        let entry = enc.encode(1, 2000, &v2, Some(&v1));
221        assert_eq!(entry.nnz(), 0); // nothing stored
222    }
223
224    #[test]
225    fn apply_reconstructs_correctly() {
226        let base = vec![1.0, 2.0, 3.0, 4.0];
227        let delta = DeltaEntry::delta(0, 0, 1, vec![1, 3], vec![0.5, -0.3]);
228
229        let result = DeltaDecoder::apply(&base, &delta);
230        assert!((result[0] - 1.0).abs() < 1e-7);
231        assert!((result[1] - 2.5).abs() < 1e-7);
232        assert!((result[2] - 3.0).abs() < 1e-7);
233        assert!((result[3] - 3.7).abs() < 1e-7);
234    }
235
236    #[test]
237    fn reconstruct_chain() {
238        let keyframe = vec![1.0, 2.0, 3.0];
239        let deltas = vec![
240            DeltaEntry::keyframe(0, 0), // skipped
241            DeltaEntry::delta(0, 0, 1, vec![0], vec![0.1]),
242            DeltaEntry::delta(0, 1, 2, vec![1], vec![-0.2]),
243            DeltaEntry::delta(0, 2, 3, vec![0, 2], vec![0.3, 0.5]),
244        ];
245
246        let result = DeltaDecoder::reconstruct(&keyframe, &deltas);
247        assert!((result[0] - 1.4).abs() < 1e-6); // 1.0 + 0.1 + 0.3
248        assert!((result[1] - 1.8).abs() < 1e-6); // 2.0 - 0.2
249        assert!((result[2] - 3.5).abs() < 1e-6); // 3.0 + 0.5
250    }
251
252    #[test]
253    fn encode_decode_roundtrip_precision() {
254        let enc = DeltaEncoder::new(100, 1e-8); // very low threshold
255        let dim = 768;
256        let mut rng = 42u64; // simple PRNG for determinism
257        let mut pseudo_random = || {
258            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
259            (rng >> 33) as f32 / (u32::MAX >> 1) as f32
260        };
261
262        let v1: Vec<f32> = (0..dim).map(|_| pseudo_random()).collect();
263        // Slowly drifting: add small perturbations
264        let v2: Vec<f32> = v1.iter().map(|&x| x + pseudo_random() * 0.001).collect();
265
266        let entry = enc.encode(1, 1000, &v2, Some(&v1));
267        let reconstructed = DeltaDecoder::apply(&v1, &entry);
268
269        for (i, (&original, &recovered)) in v2.iter().zip(reconstructed.iter()).enumerate() {
270            assert!(
271                (original - recovered).abs() < 1e-7,
272                "dim {i}: original={original}, recovered={recovered}"
273            );
274        }
275    }
276
277    #[test]
278    fn compression_ratio_slow_drift() {
279        let enc = DeltaEncoder::new(10, 0.01);
280        let dim = 768;
281        let n_vectors = 100;
282
283        let mut rng = 123u64;
284        let mut pseudo_random = || {
285            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
286            (rng >> 33) as f32 / (u32::MAX >> 1) as f32
287        };
288
289        let base: Vec<f32> = (0..dim).map(|_| pseudo_random()).collect();
290        let mut prev = base.clone();
291        let mut deltas = Vec::new();
292
293        for i in 0..n_vectors {
294            // Slow drift: ~5% of dimensions change per step
295            let current: Vec<f32> = prev
296                .iter()
297                .map(|&x| {
298                    if pseudo_random() < 0.05 {
299                        x + pseudo_random() * 0.1
300                    } else {
301                        x + pseudo_random() * 0.001 // below threshold
302                    }
303                })
304                .collect();
305            let entry = enc.encode(i as u32, i as i64 * 1000, &current, Some(&prev));
306            deltas.push(entry);
307            prev = current;
308        }
309
310        let ratio = compression_ratio(dim, &deltas);
311        assert!(
312            ratio >= 3.0,
313            "compression ratio {ratio:.1}x, expected >= 3x for slow drift"
314        );
315    }
316}
317
318#[cfg(test)]
319mod proptests {
320    use super::*;
321    use proptest::prelude::*;
322
323    proptest! {
324        /// Apply(base, encode(base, target)) ≈ target
325        #[test]
326        fn encode_decode_roundtrip(
327            base in prop::collection::vec(-10.0f32..10.0, 32..=32),
328            perturbation in prop::collection::vec(-0.1f32..0.1, 32..=32),
329        ) {
330            let target: Vec<f32> = base.iter().zip(perturbation.iter())
331                .map(|(&b, &p)| b + p).collect();
332
333            let enc = DeltaEncoder::new(100, 1e-8);
334            let entry = enc.encode(1, 1000, &target, Some(&base));
335            let recovered = DeltaDecoder::apply(&base, &entry);
336
337            for (i, (&orig, &rec)) in target.iter().zip(recovered.iter()).enumerate() {
338                prop_assert!((orig - rec).abs() < 1e-6,
339                    "dim {i}: orig={orig}, rec={rec}");
340            }
341        }
342    }
343}