cvx_storage/warm/
mod.rs

1//! Warm storage tier: file-based partitioned storage.
2//!
3//! Stores temporal points in postcard-serialized files, partitioned by entity_id.
4//! Designed for data that's accessed less frequently than hot tier but still
5//! needs reasonable read performance.
6//!
7//! ## Directory Layout
8//!
9//! ```text
10//! warm/
11//! ├── entity_0000000042/
12//! │   ├── space_0000_chunk_000000.warm
13//! │   └── space_0000_chunk_000001.warm
14//! └── manifest.json
15//! ```
16//!
17//! Each chunk file contains a sorted sequence of serialized `TemporalPoint`s.
18
19use std::collections::BTreeMap;
20use std::fs;
21use std::path::{Path, PathBuf};
22
23use cvx_core::StorageBackend;
24use cvx_core::error::StorageError;
25use cvx_core::types::TemporalPoint;
26use parking_lot::RwLock;
27
28/// Maximum number of points per chunk file.
29const DEFAULT_CHUNK_SIZE: usize = 10_000;
30
31/// Per-chunk metadata for zone map pruning (RFC-002-08).
32///
33/// Stores min/max timestamps so range queries can skip non-overlapping chunks
34/// without deserializing their contents. See Lamb et al. (VLDB 2012).
35#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
36struct ChunkMeta {
37    filename: String,
38    min_timestamp: i64,
39    max_timestamp: i64,
40    point_count: u32,
41}
42
43/// Zone map manifest: per-entity-space chunk metadata.
44/// Keys are serialized as `"entity_id:space_id"` strings for JSON compatibility.
45#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
46struct ZoneManifest {
47    chunks: BTreeMap<String, Vec<ChunkMeta>>,
48}
49
50impl ZoneManifest {
51    fn key(entity_id: u64, space_id: u32) -> String {
52        format!("{entity_id}:{space_id}")
53    }
54}
55
56/// Warm storage backend using partitioned files.
57pub struct WarmStore {
58    dir: PathBuf,
59    /// In-memory index: (entity_id, space_id) → sorted timestamps in this tier.
60    index: RwLock<BTreeMap<(u64, u32), Vec<i64>>>,
61    /// Zone map manifest for chunk-level temporal pruning.
62    zone_map: RwLock<ZoneManifest>,
63    chunk_size: usize,
64}
65
66impl WarmStore {
67    /// Open or create a warm store at the given directory.
68    pub fn open(dir: &Path) -> Result<Self, StorageError> {
69        fs::create_dir_all(dir)?;
70        let zone_map = Self::load_zone_manifest(dir)?;
71        let store = Self {
72            dir: dir.to_path_buf(),
73            index: RwLock::new(BTreeMap::new()),
74            zone_map: RwLock::new(zone_map),
75            chunk_size: DEFAULT_CHUNK_SIZE,
76        };
77        store.load_index()?;
78        Ok(store)
79    }
80
81    fn load_zone_manifest(dir: &Path) -> Result<ZoneManifest, StorageError> {
82        let manifest_path = dir.join("manifest.json");
83        if manifest_path.exists() {
84            let content = fs::read_to_string(&manifest_path)?;
85            serde_json::from_str(&content).map_err(|_| StorageError::WalCorrupted { offset: 0 })
86        } else {
87            Ok(ZoneManifest::default())
88        }
89    }
90
91    fn persist_zone_manifest(&self) -> Result<(), StorageError> {
92        let manifest = self.zone_map.read();
93        let content = serde_json::to_string_pretty(&*manifest)
94            .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
95        let tmp = self.dir.join("manifest.json.tmp");
96        fs::write(&tmp, &content)?;
97        fs::rename(&tmp, self.dir.join("manifest.json"))?;
98        Ok(())
99    }
100
101    /// Write a batch of points to warm storage.
102    ///
103    /// Points are grouped by (entity_id, space_id) and written to chunk files.
104    pub fn write_batch(&self, space_id: u32, points: &[TemporalPoint]) -> Result<(), StorageError> {
105        // Group by entity_id
106        let mut by_entity: BTreeMap<u64, Vec<&TemporalPoint>> = BTreeMap::new();
107        for p in points {
108            by_entity.entry(p.entity_id()).or_default().push(p);
109        }
110
111        let mut index = self.index.write();
112
113        for (entity_id, entity_points) in &by_entity {
114            let entity_dir = self.entity_dir(*entity_id);
115            fs::create_dir_all(&entity_dir)?;
116
117            // Determine chunk number
118            let existing_count = index
119                .get(&(*entity_id, space_id))
120                .map(|v| v.len())
121                .unwrap_or(0);
122            let chunk_num = existing_count / self.chunk_size;
123
124            let chunk_path =
125                entity_dir.join(format!("space_{space_id:04}_chunk_{chunk_num:06}.warm"));
126
127            // Serialize and append
128            let mut data = if chunk_path.exists() {
129                fs::read(&chunk_path)?
130            } else {
131                Vec::new()
132            };
133
134            for p in entity_points {
135                let encoded = postcard::to_allocvec(p)
136                    .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
137                let len = encoded.len() as u32;
138                data.extend_from_slice(&len.to_le_bytes());
139                data.extend_from_slice(&encoded);
140
141                // Update index
142                index
143                    .entry((*entity_id, space_id))
144                    .or_default()
145                    .push(p.timestamp());
146            }
147
148            fs::write(&chunk_path, &data)?;
149
150            // Update zone map metadata
151            let chunk_filename = format!("space_{space_id:04}_chunk_{chunk_num:06}.warm");
152            let timestamps: Vec<i64> = entity_points.iter().map(|p| p.timestamp()).collect();
153            let min_ts = timestamps.iter().copied().min().unwrap_or(0);
154            let max_ts = timestamps.iter().copied().max().unwrap_or(0);
155
156            let mut zone = self.zone_map.write();
157            let zone_key = ZoneManifest::key(*entity_id, space_id);
158            let chunk_list = zone.chunks.entry(zone_key).or_default();
159            // Update existing chunk meta or add new one
160            if let Some(meta) = chunk_list.iter_mut().find(|m| m.filename == chunk_filename) {
161                meta.min_timestamp = meta.min_timestamp.min(min_ts);
162                meta.max_timestamp = meta.max_timestamp.max(max_ts);
163                meta.point_count += entity_points.len() as u32;
164            } else {
165                chunk_list.push(ChunkMeta {
166                    filename: chunk_filename,
167                    min_timestamp: min_ts,
168                    max_timestamp: max_ts,
169                    point_count: entity_points.len() as u32,
170                });
171            }
172        }
173
174        // Sort timestamps in index
175        for ts_list in index.values_mut() {
176            ts_list.sort_unstable();
177            ts_list.dedup();
178        }
179
180        // Persist zone manifest
181        self.persist_zone_manifest()?;
182
183        Ok(())
184    }
185
186    /// Read all points for an entity+space from chunk files.
187    fn read_entity_chunks(
188        &self,
189        entity_id: u64,
190        space_id: u32,
191    ) -> Result<Vec<TemporalPoint>, StorageError> {
192        let entity_dir = self.entity_dir(entity_id);
193        if !entity_dir.exists() {
194            return Ok(Vec::new());
195        }
196
197        let prefix = format!("space_{space_id:04}_chunk_");
198        let mut points = Vec::new();
199
200        for entry in fs::read_dir(&entity_dir)? {
201            let entry = entry?;
202            let name = entry.file_name();
203            let name = name.to_string_lossy();
204            if !name.starts_with(&prefix) || !name.ends_with(".warm") {
205                continue;
206            }
207
208            let data = fs::read(entry.path())?;
209            let mut offset = 0;
210            while offset + 4 <= data.len() {
211                let len = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize;
212                offset += 4;
213                if offset + len > data.len() {
214                    break;
215                }
216                let point: TemporalPoint = postcard::from_bytes(&data[offset..offset + len])
217                    .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
218                points.push(point);
219                offset += len;
220            }
221        }
222
223        points.sort_by_key(|p| p.timestamp());
224        Ok(points)
225    }
226
227    /// Read entity chunks filtered by zone map — only opens chunks whose
228    /// [min_ts, max_ts] overlaps with [start, end]. Falls back to full
229    /// read if no zone map data exists.
230    fn read_entity_chunks_filtered(
231        &self,
232        entity_id: u64,
233        space_id: u32,
234        start: i64,
235        end: i64,
236    ) -> Result<Vec<TemporalPoint>, StorageError> {
237        let entity_dir = self.entity_dir(entity_id);
238        if !entity_dir.exists() {
239            return Ok(Vec::new());
240        }
241
242        let zone = self.zone_map.read();
243        let zone_key = ZoneManifest::key(entity_id, space_id);
244        let chunk_metas = zone.chunks.get(&zone_key);
245
246        // If we have zone map data, only read overlapping chunks
247        if let Some(metas) = chunk_metas {
248            let mut points = Vec::new();
249            for meta in metas {
250                // Zone map pruning: skip if chunk's range doesn't overlap query range
251                if meta.max_timestamp < start || meta.min_timestamp > end {
252                    continue;
253                }
254
255                let chunk_path = entity_dir.join(&meta.filename);
256                if !chunk_path.exists() {
257                    continue;
258                }
259
260                let data = fs::read(&chunk_path)?;
261                let mut offset = 0;
262                while offset + 4 <= data.len() {
263                    let len =
264                        u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize;
265                    offset += 4;
266                    if offset + len > data.len() {
267                        break;
268                    }
269                    let point: TemporalPoint = postcard::from_bytes(&data[offset..offset + len])
270                        .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
271                    points.push(point);
272                    offset += len;
273                }
274            }
275            points.sort_by_key(|p| p.timestamp());
276            Ok(points)
277        } else {
278            // No zone map — fall back to full read
279            self.read_entity_chunks(entity_id, space_id)
280        }
281    }
282
283    fn entity_dir(&self, entity_id: u64) -> PathBuf {
284        self.dir.join(format!("entity_{entity_id:016}"))
285    }
286
287    fn load_index(&self) -> Result<(), StorageError> {
288        // Scan existing files to rebuild index
289        if !self.dir.exists() {
290            return Ok(());
291        }
292
293        let mut index = self.index.write();
294        for entry in fs::read_dir(&self.dir)? {
295            let entry = entry?;
296            if !entry.file_type()?.is_dir() {
297                continue;
298            }
299            let name = entry.file_name();
300            let name = name.to_string_lossy();
301            let Some(eid_str) = name.strip_prefix("entity_") else {
302                continue;
303            };
304            let Ok(entity_id) = eid_str.parse::<u64>() else {
305                continue;
306            };
307
308            for file in fs::read_dir(entry.path())? {
309                let file = file?;
310                let fname = file.file_name();
311                let fname = fname.to_string_lossy();
312                if !fname.ends_with(".warm") {
313                    continue;
314                }
315                // Extract space_id from filename: space_XXXX_chunk_YYYYYY.warm
316                if let Some(rest) = fname.strip_prefix("space_") {
317                    if let Some(space_str) = rest.get(..4) {
318                        if let Ok(space_id) = space_str.parse::<u32>() {
319                            // Read timestamps from file
320                            let data = fs::read(file.path())?;
321                            let mut offset = 0;
322                            while offset + 4 <= data.len() {
323                                let len = u32::from_le_bytes(
324                                    data[offset..offset + 4].try_into().unwrap(),
325                                ) as usize;
326                                offset += 4;
327                                if offset + len > data.len() {
328                                    break;
329                                }
330                                if let Ok(point) = postcard::from_bytes::<TemporalPoint>(
331                                    &data[offset..offset + len],
332                                ) {
333                                    index
334                                        .entry((entity_id, space_id))
335                                        .or_default()
336                                        .push(point.timestamp());
337                                }
338                                offset += len;
339                            }
340                        }
341                    }
342                }
343            }
344        }
345
346        for ts_list in index.values_mut() {
347            ts_list.sort_unstable();
348            ts_list.dedup();
349        }
350
351        Ok(())
352    }
353
354    /// Check if a point exists in warm storage.
355    pub fn contains(&self, entity_id: u64, space_id: u32, timestamp: i64) -> bool {
356        let index = self.index.read();
357        index
358            .get(&(entity_id, space_id))
359            .is_some_and(|ts| ts.binary_search(&timestamp).is_ok())
360    }
361
362    /// Number of points tracked in the index.
363    pub fn len(&self) -> usize {
364        self.index.read().values().map(|v| v.len()).sum()
365    }
366
367    /// Whether the store is empty.
368    pub fn is_empty(&self) -> bool {
369        self.index.read().is_empty()
370    }
371}
372
373impl StorageBackend for WarmStore {
374    fn get(
375        &self,
376        entity_id: u64,
377        space_id: u32,
378        timestamp: i64,
379    ) -> Result<Option<TemporalPoint>, StorageError> {
380        if !self.contains(entity_id, space_id, timestamp) {
381            return Ok(None);
382        }
383        let points = self.read_entity_chunks(entity_id, space_id)?;
384        Ok(points.into_iter().find(|p| p.timestamp() == timestamp))
385    }
386
387    fn put(&self, space_id: u32, point: &TemporalPoint) -> Result<(), StorageError> {
388        self.write_batch(space_id, &[point.clone()])
389    }
390
391    fn range(
392        &self,
393        entity_id: u64,
394        space_id: u32,
395        start: i64,
396        end: i64,
397    ) -> Result<Vec<TemporalPoint>, StorageError> {
398        // Use zone map to skip non-overlapping chunks (RFC-002-08)
399        let points = self.read_entity_chunks_filtered(entity_id, space_id, start, end)?;
400        Ok(points
401            .into_iter()
402            .filter(|p| p.timestamp() >= start && p.timestamp() <= end)
403            .collect())
404    }
405
406    fn delete(&self, entity_id: u64, space_id: u32, timestamp: i64) -> Result<(), StorageError> {
407        // Remove from index only (lazy deletion)
408        let mut index = self.index.write();
409        if let Some(ts_list) = index.get_mut(&(entity_id, space_id)) {
410            ts_list.retain(|&t| t != timestamp);
411        }
412        Ok(())
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419
420    fn sample_point(entity_id: u64, timestamp: i64) -> TemporalPoint {
421        TemporalPoint::new(entity_id, timestamp, vec![0.1, 0.2, 0.3])
422    }
423
424    #[test]
425    fn write_and_read_batch() {
426        let dir = tempfile::tempdir().unwrap();
427        let store = WarmStore::open(dir.path()).unwrap();
428
429        let points: Vec<TemporalPoint> = (0..10).map(|i| sample_point(1, i * 1000)).collect();
430        store.write_batch(0, &points).unwrap();
431
432        assert_eq!(store.len(), 10);
433        assert!(store.contains(1, 0, 5000));
434        assert!(!store.contains(1, 0, 99999));
435    }
436
437    #[test]
438    fn get_specific_point() {
439        let dir = tempfile::tempdir().unwrap();
440        let store = WarmStore::open(dir.path()).unwrap();
441
442        let p = sample_point(42, 1000);
443        store.put(0, &p).unwrap();
444
445        let result = store.get(42, 0, 1000).unwrap();
446        assert_eq!(result, Some(p));
447    }
448
449    #[test]
450    fn get_nonexistent_returns_none() {
451        let dir = tempfile::tempdir().unwrap();
452        let store = WarmStore::open(dir.path()).unwrap();
453        assert_eq!(store.get(1, 0, 1000).unwrap(), None);
454    }
455
456    #[test]
457    fn range_query() {
458        let dir = tempfile::tempdir().unwrap();
459        let store = WarmStore::open(dir.path()).unwrap();
460
461        let points: Vec<TemporalPoint> = (0..20).map(|i| sample_point(1, i * 100)).collect();
462        store.write_batch(0, &points).unwrap();
463
464        let results = store.range(1, 0, 500, 1500).unwrap();
465        assert_eq!(results.len(), 11); // 500, 600, ..., 1500
466        assert_eq!(results[0].timestamp(), 500);
467        assert_eq!(results.last().unwrap().timestamp(), 1500);
468    }
469
470    #[test]
471    fn cross_entity_isolation() {
472        let dir = tempfile::tempdir().unwrap();
473        let store = WarmStore::open(dir.path()).unwrap();
474
475        store.put(0, &sample_point(1, 100)).unwrap();
476        store.put(0, &sample_point(2, 100)).unwrap();
477
478        let results = store.range(1, 0, 0, i64::MAX).unwrap();
479        assert_eq!(results.len(), 1);
480        assert_eq!(results[0].entity_id(), 1);
481    }
482
483    #[test]
484    fn data_survives_reopen() {
485        let dir = tempfile::tempdir().unwrap();
486
487        {
488            let store = WarmStore::open(dir.path()).unwrap();
489            let points: Vec<TemporalPoint> = (0..5).map(|i| sample_point(1, i * 1000)).collect();
490            store.write_batch(0, &points).unwrap();
491        }
492
493        {
494            let store = WarmStore::open(dir.path()).unwrap();
495            assert_eq!(store.len(), 5);
496            let result = store.get(1, 0, 3000).unwrap();
497            assert!(result.is_some());
498        }
499    }
500
501    #[test]
502    fn zone_map_persists_and_prunes() {
503        let dir = tempfile::tempdir().unwrap();
504
505        // Write two batches with non-overlapping time ranges
506        {
507            let store = WarmStore::open(dir.path()).unwrap();
508            let early: Vec<TemporalPoint> = (0..10)
509                .map(|i| TemporalPoint::new(1, i * 100, vec![0.1; 4]))
510                .collect();
511            store.write_batch(0, &early).unwrap();
512
513            let late: Vec<TemporalPoint> = (0..10)
514                .map(|i| TemporalPoint::new(1, 10_000 + i * 100, vec![0.2; 4]))
515                .collect();
516            store.write_batch(0, &late).unwrap();
517        }
518
519        // Reopen — zone map should survive
520        {
521            let store = WarmStore::open(dir.path()).unwrap();
522            assert_eq!(store.len(), 20);
523
524            // Range query for early timestamps only
525            let early_results = store.range(1, 0, 0, 999).unwrap();
526            assert_eq!(early_results.len(), 10);
527
528            // Range query for late timestamps only
529            let late_results = store.range(1, 0, 10_000, 11_000).unwrap();
530            assert_eq!(late_results.len(), 10);
531
532            // Manifest file should exist
533            assert!(dir.path().join("manifest.json").exists());
534        }
535    }
536
537    #[test]
538    fn large_batch() {
539        let dir = tempfile::tempdir().unwrap();
540        let store = WarmStore::open(dir.path()).unwrap();
541
542        let points: Vec<TemporalPoint> = (0..1000)
543            .map(|i| TemporalPoint::new(i / 100, (i % 100) as i64 * 1000, vec![i as f32; 8]))
544            .collect();
545        store.write_batch(0, &points).unwrap();
546
547        assert_eq!(store.len(), 1000);
548
549        // Retrieve specific entity
550        let results = store.range(5, 0, 0, 100_000).unwrap();
551        assert_eq!(results.len(), 100); // entity 5 has 100 points
552    }
553}