cvx_storage/
tiered.rs

1//! Tiered storage: routes reads across hot → warm tiers transparently.
2//!
3//! Queries check the hot tier first, then fall through to warm storage.
4//! Writes always go to the hot tier.
5
6use cvx_core::StorageBackend;
7use cvx_core::error::StorageError;
8use cvx_core::types::TemporalPoint;
9
10use crate::memory::InMemoryStore;
11use crate::warm::WarmStore;
12
13/// Composite storage that reads from hot first, then warm.
14pub struct TieredStorage {
15    /// Hot tier (fast, in-memory or RocksDB).
16    hot: InMemoryStore,
17    /// Warm tier (file-based, partitioned).
18    warm: WarmStore,
19}
20
21impl TieredStorage {
22    /// Create a new tiered storage with the given hot and warm stores.
23    pub fn new(hot: InMemoryStore, warm: WarmStore) -> Self {
24        Self { hot, warm }
25    }
26
27    /// Access the hot tier directly.
28    pub fn hot(&self) -> &InMemoryStore {
29        &self.hot
30    }
31
32    /// Access the warm tier directly.
33    pub fn warm(&self) -> &WarmStore {
34        &self.warm
35    }
36
37    /// Migrate points from hot to warm tier.
38    ///
39    /// Moves all points for the given entity+space with timestamp ≤ `cutoff`
40    /// from hot to warm storage.
41    pub fn compact(
42        &self,
43        entity_id: u64,
44        space_id: u32,
45        cutoff: i64,
46    ) -> Result<usize, StorageError> {
47        // Read from hot
48        let points = self.hot.range(entity_id, space_id, i64::MIN, cutoff)?;
49        if points.is_empty() {
50            return Ok(0);
51        }
52
53        let count = points.len();
54
55        // Write to warm
56        self.warm.write_batch(space_id, &points)?;
57
58        // Delete from hot
59        for p in &points {
60            self.hot.delete(entity_id, space_id, p.timestamp())?;
61        }
62
63        Ok(count)
64    }
65}
66
67impl StorageBackend for TieredStorage {
68    fn get(
69        &self,
70        entity_id: u64,
71        space_id: u32,
72        timestamp: i64,
73    ) -> Result<Option<TemporalPoint>, StorageError> {
74        // Try hot first
75        if let Some(point) = self.hot.get(entity_id, space_id, timestamp)? {
76            return Ok(Some(point));
77        }
78        // Fall through to warm
79        self.warm.get(entity_id, space_id, timestamp)
80    }
81
82    fn put(&self, space_id: u32, point: &TemporalPoint) -> Result<(), StorageError> {
83        // Always write to hot
84        self.hot.put(space_id, point)
85    }
86
87    fn range(
88        &self,
89        entity_id: u64,
90        space_id: u32,
91        start: i64,
92        end: i64,
93    ) -> Result<Vec<TemporalPoint>, StorageError> {
94        let mut hot_results = self.hot.range(entity_id, space_id, start, end)?;
95        let warm_results = self.warm.range(entity_id, space_id, start, end)?;
96
97        // Merge and deduplicate by timestamp
98        hot_results.extend(warm_results);
99        hot_results.sort_by_key(|p| p.timestamp());
100        hot_results.dedup_by_key(|p| p.timestamp());
101
102        Ok(hot_results)
103    }
104
105    fn delete(&self, entity_id: u64, space_id: u32, timestamp: i64) -> Result<(), StorageError> {
106        self.hot.delete(entity_id, space_id, timestamp)?;
107        self.warm.delete(entity_id, space_id, timestamp)?;
108        Ok(())
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115
116    fn sample_point(entity_id: u64, timestamp: i64) -> TemporalPoint {
117        TemporalPoint::new(entity_id, timestamp, vec![0.1, 0.2, 0.3])
118    }
119
120    fn make_tiered() -> (TieredStorage, tempfile::TempDir) {
121        let dir = tempfile::tempdir().unwrap();
122        let hot = InMemoryStore::new();
123        let warm = WarmStore::open(&dir.path().join("warm")).unwrap();
124        (TieredStorage::new(hot, warm), dir)
125    }
126
127    #[test]
128    fn write_to_hot_read_from_hot() {
129        let (tiered, _dir) = make_tiered();
130        let p = sample_point(1, 1000);
131        tiered.put(0, &p).unwrap();
132
133        let result = tiered.get(1, 0, 1000).unwrap();
134        assert_eq!(result, Some(p));
135    }
136
137    #[test]
138    fn compact_moves_to_warm() {
139        let (tiered, _dir) = make_tiered();
140
141        // Insert into hot
142        for i in 0..10 {
143            tiered.put(0, &sample_point(1, i * 1000)).unwrap();
144        }
145        assert_eq!(tiered.hot().len(), 10);
146
147        // Compact: move timestamps ≤ 5000 to warm
148        let moved = tiered.compact(1, 0, 5000).unwrap();
149        assert_eq!(moved, 6); // 0, 1000, 2000, 3000, 4000, 5000
150
151        // Hot should have remaining
152        assert_eq!(tiered.hot().len(), 4);
153
154        // Warm should have compacted points
155        assert_eq!(tiered.warm().len(), 6);
156    }
157
158    #[test]
159    fn get_finds_in_warm_after_compaction() {
160        let (tiered, _dir) = make_tiered();
161
162        tiered.put(0, &sample_point(1, 1000)).unwrap();
163        tiered.compact(1, 0, 1000).unwrap();
164
165        // Point is now in warm, not hot
166        assert_eq!(tiered.hot().len(), 0);
167        let result = tiered.get(1, 0, 1000).unwrap();
168        assert!(result.is_some(), "should find point in warm tier");
169    }
170
171    #[test]
172    fn range_merges_hot_and_warm() {
173        let (tiered, _dir) = make_tiered();
174
175        // Insert 10 points, compact first 5 to warm
176        for i in 0..10 {
177            tiered.put(0, &sample_point(1, i * 1000)).unwrap();
178        }
179        tiered.compact(1, 0, 4000).unwrap();
180
181        // Range should span both tiers
182        let results = tiered.range(1, 0, 0, 9000).unwrap();
183        assert_eq!(results.len(), 10);
184
185        // Verify ordering
186        for w in results.windows(2) {
187            assert!(w[0].timestamp() < w[1].timestamp());
188        }
189    }
190
191    #[test]
192    fn range_deduplicates() {
193        let (tiered, _dir) = make_tiered();
194
195        let p = sample_point(1, 1000);
196        tiered.put(0, &p).unwrap();
197        // Manually put in warm too (simulating partial compaction)
198        tiered.warm().put(0, &p).unwrap();
199
200        let results = tiered.range(1, 0, 0, 2000).unwrap();
201        assert_eq!(results.len(), 1, "should deduplicate across tiers");
202    }
203
204    #[test]
205    fn compact_empty_is_noop() {
206        let (tiered, _dir) = make_tiered();
207        let moved = tiered.compact(999, 0, i64::MAX).unwrap();
208        assert_eq!(moved, 0);
209    }
210
211    #[test]
212    fn compact_and_query_large() {
213        let (tiered, _dir) = make_tiered();
214
215        // Insert 1000 points
216        for i in 0..1000 {
217            tiered
218                .put(0, &TemporalPoint::new(1, i * 100, vec![i as f32; 4]))
219                .unwrap();
220        }
221
222        // Compact first 500
223        let moved = tiered.compact(1, 0, 499 * 100).unwrap();
224        assert_eq!(moved, 500);
225
226        // All 1000 should still be findable
227        let all = tiered.range(1, 0, 0, 100_000).unwrap();
228        assert_eq!(all.len(), 1000);
229
230        // Hot has 500, warm has 500
231        assert_eq!(tiered.hot().len(), 500);
232        assert_eq!(tiered.warm().len(), 500);
233    }
234}