1use cvx_core::StorageBackend;
7use cvx_core::error::StorageError;
8use cvx_core::types::TemporalPoint;
9
10use crate::memory::InMemoryStore;
11use crate::warm::WarmStore;
12
13pub struct TieredStorage {
15 hot: InMemoryStore,
17 warm: WarmStore,
19}
20
21impl TieredStorage {
22 pub fn new(hot: InMemoryStore, warm: WarmStore) -> Self {
24 Self { hot, warm }
25 }
26
27 pub fn hot(&self) -> &InMemoryStore {
29 &self.hot
30 }
31
32 pub fn warm(&self) -> &WarmStore {
34 &self.warm
35 }
36
37 pub fn compact(
42 &self,
43 entity_id: u64,
44 space_id: u32,
45 cutoff: i64,
46 ) -> Result<usize, StorageError> {
47 let points = self.hot.range(entity_id, space_id, i64::MIN, cutoff)?;
49 if points.is_empty() {
50 return Ok(0);
51 }
52
53 let count = points.len();
54
55 self.warm.write_batch(space_id, &points)?;
57
58 for p in &points {
60 self.hot.delete(entity_id, space_id, p.timestamp())?;
61 }
62
63 Ok(count)
64 }
65}
66
67impl StorageBackend for TieredStorage {
68 fn get(
69 &self,
70 entity_id: u64,
71 space_id: u32,
72 timestamp: i64,
73 ) -> Result<Option<TemporalPoint>, StorageError> {
74 if let Some(point) = self.hot.get(entity_id, space_id, timestamp)? {
76 return Ok(Some(point));
77 }
78 self.warm.get(entity_id, space_id, timestamp)
80 }
81
82 fn put(&self, space_id: u32, point: &TemporalPoint) -> Result<(), StorageError> {
83 self.hot.put(space_id, point)
85 }
86
87 fn range(
88 &self,
89 entity_id: u64,
90 space_id: u32,
91 start: i64,
92 end: i64,
93 ) -> Result<Vec<TemporalPoint>, StorageError> {
94 let mut hot_results = self.hot.range(entity_id, space_id, start, end)?;
95 let warm_results = self.warm.range(entity_id, space_id, start, end)?;
96
97 hot_results.extend(warm_results);
99 hot_results.sort_by_key(|p| p.timestamp());
100 hot_results.dedup_by_key(|p| p.timestamp());
101
102 Ok(hot_results)
103 }
104
105 fn delete(&self, entity_id: u64, space_id: u32, timestamp: i64) -> Result<(), StorageError> {
106 self.hot.delete(entity_id, space_id, timestamp)?;
107 self.warm.delete(entity_id, space_id, timestamp)?;
108 Ok(())
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115
116 fn sample_point(entity_id: u64, timestamp: i64) -> TemporalPoint {
117 TemporalPoint::new(entity_id, timestamp, vec![0.1, 0.2, 0.3])
118 }
119
120 fn make_tiered() -> (TieredStorage, tempfile::TempDir) {
121 let dir = tempfile::tempdir().unwrap();
122 let hot = InMemoryStore::new();
123 let warm = WarmStore::open(&dir.path().join("warm")).unwrap();
124 (TieredStorage::new(hot, warm), dir)
125 }
126
127 #[test]
128 fn write_to_hot_read_from_hot() {
129 let (tiered, _dir) = make_tiered();
130 let p = sample_point(1, 1000);
131 tiered.put(0, &p).unwrap();
132
133 let result = tiered.get(1, 0, 1000).unwrap();
134 assert_eq!(result, Some(p));
135 }
136
137 #[test]
138 fn compact_moves_to_warm() {
139 let (tiered, _dir) = make_tiered();
140
141 for i in 0..10 {
143 tiered.put(0, &sample_point(1, i * 1000)).unwrap();
144 }
145 assert_eq!(tiered.hot().len(), 10);
146
147 let moved = tiered.compact(1, 0, 5000).unwrap();
149 assert_eq!(moved, 6); assert_eq!(tiered.hot().len(), 4);
153
154 assert_eq!(tiered.warm().len(), 6);
156 }
157
158 #[test]
159 fn get_finds_in_warm_after_compaction() {
160 let (tiered, _dir) = make_tiered();
161
162 tiered.put(0, &sample_point(1, 1000)).unwrap();
163 tiered.compact(1, 0, 1000).unwrap();
164
165 assert_eq!(tiered.hot().len(), 0);
167 let result = tiered.get(1, 0, 1000).unwrap();
168 assert!(result.is_some(), "should find point in warm tier");
169 }
170
171 #[test]
172 fn range_merges_hot_and_warm() {
173 let (tiered, _dir) = make_tiered();
174
175 for i in 0..10 {
177 tiered.put(0, &sample_point(1, i * 1000)).unwrap();
178 }
179 tiered.compact(1, 0, 4000).unwrap();
180
181 let results = tiered.range(1, 0, 0, 9000).unwrap();
183 assert_eq!(results.len(), 10);
184
185 for w in results.windows(2) {
187 assert!(w[0].timestamp() < w[1].timestamp());
188 }
189 }
190
191 #[test]
192 fn range_deduplicates() {
193 let (tiered, _dir) = make_tiered();
194
195 let p = sample_point(1, 1000);
196 tiered.put(0, &p).unwrap();
197 tiered.warm().put(0, &p).unwrap();
199
200 let results = tiered.range(1, 0, 0, 2000).unwrap();
201 assert_eq!(results.len(), 1, "should deduplicate across tiers");
202 }
203
204 #[test]
205 fn compact_empty_is_noop() {
206 let (tiered, _dir) = make_tiered();
207 let moved = tiered.compact(999, 0, i64::MAX).unwrap();
208 assert_eq!(moved, 0);
209 }
210
211 #[test]
212 fn compact_and_query_large() {
213 let (tiered, _dir) = make_tiered();
214
215 for i in 0..1000 {
217 tiered
218 .put(0, &TemporalPoint::new(1, i * 100, vec![i as f32; 4]))
219 .unwrap();
220 }
221
222 let moved = tiered.compact(1, 0, 499 * 100).unwrap();
224 assert_eq!(moved, 500);
225
226 let all = tiered.range(1, 0, 0, 100_000).unwrap();
228 assert_eq!(all.len(), 1000);
229
230 assert_eq!(tiered.hot().len(), 500);
232 assert_eq!(tiered.warm().len(), 500);
233 }
234}