1use std::collections::BTreeMap;
20use std::fs;
21use std::path::{Path, PathBuf};
22
23use cvx_core::StorageBackend;
24use cvx_core::error::StorageError;
25use cvx_core::types::TemporalPoint;
26use parking_lot::RwLock;
27
28const DEFAULT_CHUNK_SIZE: usize = 10_000;
30
31#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
36struct ChunkMeta {
37 filename: String,
38 min_timestamp: i64,
39 max_timestamp: i64,
40 point_count: u32,
41}
42
43#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
46struct ZoneManifest {
47 chunks: BTreeMap<String, Vec<ChunkMeta>>,
48}
49
50impl ZoneManifest {
51 fn key(entity_id: u64, space_id: u32) -> String {
52 format!("{entity_id}:{space_id}")
53 }
54}
55
56pub struct WarmStore {
58 dir: PathBuf,
59 index: RwLock<BTreeMap<(u64, u32), Vec<i64>>>,
61 zone_map: RwLock<ZoneManifest>,
63 chunk_size: usize,
64}
65
66impl WarmStore {
67 pub fn open(dir: &Path) -> Result<Self, StorageError> {
69 fs::create_dir_all(dir)?;
70 let zone_map = Self::load_zone_manifest(dir)?;
71 let store = Self {
72 dir: dir.to_path_buf(),
73 index: RwLock::new(BTreeMap::new()),
74 zone_map: RwLock::new(zone_map),
75 chunk_size: DEFAULT_CHUNK_SIZE,
76 };
77 store.load_index()?;
78 Ok(store)
79 }
80
81 fn load_zone_manifest(dir: &Path) -> Result<ZoneManifest, StorageError> {
82 let manifest_path = dir.join("manifest.json");
83 if manifest_path.exists() {
84 let content = fs::read_to_string(&manifest_path)?;
85 serde_json::from_str(&content).map_err(|_| StorageError::WalCorrupted { offset: 0 })
86 } else {
87 Ok(ZoneManifest::default())
88 }
89 }
90
91 fn persist_zone_manifest(&self) -> Result<(), StorageError> {
92 let manifest = self.zone_map.read();
93 let content = serde_json::to_string_pretty(&*manifest)
94 .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
95 let tmp = self.dir.join("manifest.json.tmp");
96 fs::write(&tmp, &content)?;
97 fs::rename(&tmp, self.dir.join("manifest.json"))?;
98 Ok(())
99 }
100
101 pub fn write_batch(&self, space_id: u32, points: &[TemporalPoint]) -> Result<(), StorageError> {
105 let mut by_entity: BTreeMap<u64, Vec<&TemporalPoint>> = BTreeMap::new();
107 for p in points {
108 by_entity.entry(p.entity_id()).or_default().push(p);
109 }
110
111 let mut index = self.index.write();
112
113 for (entity_id, entity_points) in &by_entity {
114 let entity_dir = self.entity_dir(*entity_id);
115 fs::create_dir_all(&entity_dir)?;
116
117 let existing_count = index
119 .get(&(*entity_id, space_id))
120 .map(|v| v.len())
121 .unwrap_or(0);
122 let chunk_num = existing_count / self.chunk_size;
123
124 let chunk_path =
125 entity_dir.join(format!("space_{space_id:04}_chunk_{chunk_num:06}.warm"));
126
127 let mut data = if chunk_path.exists() {
129 fs::read(&chunk_path)?
130 } else {
131 Vec::new()
132 };
133
134 for p in entity_points {
135 let encoded = postcard::to_allocvec(p)
136 .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
137 let len = encoded.len() as u32;
138 data.extend_from_slice(&len.to_le_bytes());
139 data.extend_from_slice(&encoded);
140
141 index
143 .entry((*entity_id, space_id))
144 .or_default()
145 .push(p.timestamp());
146 }
147
148 fs::write(&chunk_path, &data)?;
149
150 let chunk_filename = format!("space_{space_id:04}_chunk_{chunk_num:06}.warm");
152 let timestamps: Vec<i64> = entity_points.iter().map(|p| p.timestamp()).collect();
153 let min_ts = timestamps.iter().copied().min().unwrap_or(0);
154 let max_ts = timestamps.iter().copied().max().unwrap_or(0);
155
156 let mut zone = self.zone_map.write();
157 let zone_key = ZoneManifest::key(*entity_id, space_id);
158 let chunk_list = zone.chunks.entry(zone_key).or_default();
159 if let Some(meta) = chunk_list.iter_mut().find(|m| m.filename == chunk_filename) {
161 meta.min_timestamp = meta.min_timestamp.min(min_ts);
162 meta.max_timestamp = meta.max_timestamp.max(max_ts);
163 meta.point_count += entity_points.len() as u32;
164 } else {
165 chunk_list.push(ChunkMeta {
166 filename: chunk_filename,
167 min_timestamp: min_ts,
168 max_timestamp: max_ts,
169 point_count: entity_points.len() as u32,
170 });
171 }
172 }
173
174 for ts_list in index.values_mut() {
176 ts_list.sort_unstable();
177 ts_list.dedup();
178 }
179
180 self.persist_zone_manifest()?;
182
183 Ok(())
184 }
185
186 fn read_entity_chunks(
188 &self,
189 entity_id: u64,
190 space_id: u32,
191 ) -> Result<Vec<TemporalPoint>, StorageError> {
192 let entity_dir = self.entity_dir(entity_id);
193 if !entity_dir.exists() {
194 return Ok(Vec::new());
195 }
196
197 let prefix = format!("space_{space_id:04}_chunk_");
198 let mut points = Vec::new();
199
200 for entry in fs::read_dir(&entity_dir)? {
201 let entry = entry?;
202 let name = entry.file_name();
203 let name = name.to_string_lossy();
204 if !name.starts_with(&prefix) || !name.ends_with(".warm") {
205 continue;
206 }
207
208 let data = fs::read(entry.path())?;
209 let mut offset = 0;
210 while offset + 4 <= data.len() {
211 let len = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize;
212 offset += 4;
213 if offset + len > data.len() {
214 break;
215 }
216 let point: TemporalPoint = postcard::from_bytes(&data[offset..offset + len])
217 .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
218 points.push(point);
219 offset += len;
220 }
221 }
222
223 points.sort_by_key(|p| p.timestamp());
224 Ok(points)
225 }
226
227 fn read_entity_chunks_filtered(
231 &self,
232 entity_id: u64,
233 space_id: u32,
234 start: i64,
235 end: i64,
236 ) -> Result<Vec<TemporalPoint>, StorageError> {
237 let entity_dir = self.entity_dir(entity_id);
238 if !entity_dir.exists() {
239 return Ok(Vec::new());
240 }
241
242 let zone = self.zone_map.read();
243 let zone_key = ZoneManifest::key(entity_id, space_id);
244 let chunk_metas = zone.chunks.get(&zone_key);
245
246 if let Some(metas) = chunk_metas {
248 let mut points = Vec::new();
249 for meta in metas {
250 if meta.max_timestamp < start || meta.min_timestamp > end {
252 continue;
253 }
254
255 let chunk_path = entity_dir.join(&meta.filename);
256 if !chunk_path.exists() {
257 continue;
258 }
259
260 let data = fs::read(&chunk_path)?;
261 let mut offset = 0;
262 while offset + 4 <= data.len() {
263 let len =
264 u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize;
265 offset += 4;
266 if offset + len > data.len() {
267 break;
268 }
269 let point: TemporalPoint = postcard::from_bytes(&data[offset..offset + len])
270 .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
271 points.push(point);
272 offset += len;
273 }
274 }
275 points.sort_by_key(|p| p.timestamp());
276 Ok(points)
277 } else {
278 self.read_entity_chunks(entity_id, space_id)
280 }
281 }
282
283 fn entity_dir(&self, entity_id: u64) -> PathBuf {
284 self.dir.join(format!("entity_{entity_id:016}"))
285 }
286
287 fn load_index(&self) -> Result<(), StorageError> {
288 if !self.dir.exists() {
290 return Ok(());
291 }
292
293 let mut index = self.index.write();
294 for entry in fs::read_dir(&self.dir)? {
295 let entry = entry?;
296 if !entry.file_type()?.is_dir() {
297 continue;
298 }
299 let name = entry.file_name();
300 let name = name.to_string_lossy();
301 let Some(eid_str) = name.strip_prefix("entity_") else {
302 continue;
303 };
304 let Ok(entity_id) = eid_str.parse::<u64>() else {
305 continue;
306 };
307
308 for file in fs::read_dir(entry.path())? {
309 let file = file?;
310 let fname = file.file_name();
311 let fname = fname.to_string_lossy();
312 if !fname.ends_with(".warm") {
313 continue;
314 }
315 if let Some(rest) = fname.strip_prefix("space_") {
317 if let Some(space_str) = rest.get(..4) {
318 if let Ok(space_id) = space_str.parse::<u32>() {
319 let data = fs::read(file.path())?;
321 let mut offset = 0;
322 while offset + 4 <= data.len() {
323 let len = u32::from_le_bytes(
324 data[offset..offset + 4].try_into().unwrap(),
325 ) as usize;
326 offset += 4;
327 if offset + len > data.len() {
328 break;
329 }
330 if let Ok(point) = postcard::from_bytes::<TemporalPoint>(
331 &data[offset..offset + len],
332 ) {
333 index
334 .entry((entity_id, space_id))
335 .or_default()
336 .push(point.timestamp());
337 }
338 offset += len;
339 }
340 }
341 }
342 }
343 }
344 }
345
346 for ts_list in index.values_mut() {
347 ts_list.sort_unstable();
348 ts_list.dedup();
349 }
350
351 Ok(())
352 }
353
354 pub fn contains(&self, entity_id: u64, space_id: u32, timestamp: i64) -> bool {
356 let index = self.index.read();
357 index
358 .get(&(entity_id, space_id))
359 .is_some_and(|ts| ts.binary_search(×tamp).is_ok())
360 }
361
362 pub fn len(&self) -> usize {
364 self.index.read().values().map(|v| v.len()).sum()
365 }
366
367 pub fn is_empty(&self) -> bool {
369 self.index.read().is_empty()
370 }
371}
372
373impl StorageBackend for WarmStore {
374 fn get(
375 &self,
376 entity_id: u64,
377 space_id: u32,
378 timestamp: i64,
379 ) -> Result<Option<TemporalPoint>, StorageError> {
380 if !self.contains(entity_id, space_id, timestamp) {
381 return Ok(None);
382 }
383 let points = self.read_entity_chunks(entity_id, space_id)?;
384 Ok(points.into_iter().find(|p| p.timestamp() == timestamp))
385 }
386
387 fn put(&self, space_id: u32, point: &TemporalPoint) -> Result<(), StorageError> {
388 self.write_batch(space_id, &[point.clone()])
389 }
390
391 fn range(
392 &self,
393 entity_id: u64,
394 space_id: u32,
395 start: i64,
396 end: i64,
397 ) -> Result<Vec<TemporalPoint>, StorageError> {
398 let points = self.read_entity_chunks_filtered(entity_id, space_id, start, end)?;
400 Ok(points
401 .into_iter()
402 .filter(|p| p.timestamp() >= start && p.timestamp() <= end)
403 .collect())
404 }
405
406 fn delete(&self, entity_id: u64, space_id: u32, timestamp: i64) -> Result<(), StorageError> {
407 let mut index = self.index.write();
409 if let Some(ts_list) = index.get_mut(&(entity_id, space_id)) {
410 ts_list.retain(|&t| t != timestamp);
411 }
412 Ok(())
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419
420 fn sample_point(entity_id: u64, timestamp: i64) -> TemporalPoint {
421 TemporalPoint::new(entity_id, timestamp, vec![0.1, 0.2, 0.3])
422 }
423
424 #[test]
425 fn write_and_read_batch() {
426 let dir = tempfile::tempdir().unwrap();
427 let store = WarmStore::open(dir.path()).unwrap();
428
429 let points: Vec<TemporalPoint> = (0..10).map(|i| sample_point(1, i * 1000)).collect();
430 store.write_batch(0, &points).unwrap();
431
432 assert_eq!(store.len(), 10);
433 assert!(store.contains(1, 0, 5000));
434 assert!(!store.contains(1, 0, 99999));
435 }
436
437 #[test]
438 fn get_specific_point() {
439 let dir = tempfile::tempdir().unwrap();
440 let store = WarmStore::open(dir.path()).unwrap();
441
442 let p = sample_point(42, 1000);
443 store.put(0, &p).unwrap();
444
445 let result = store.get(42, 0, 1000).unwrap();
446 assert_eq!(result, Some(p));
447 }
448
449 #[test]
450 fn get_nonexistent_returns_none() {
451 let dir = tempfile::tempdir().unwrap();
452 let store = WarmStore::open(dir.path()).unwrap();
453 assert_eq!(store.get(1, 0, 1000).unwrap(), None);
454 }
455
456 #[test]
457 fn range_query() {
458 let dir = tempfile::tempdir().unwrap();
459 let store = WarmStore::open(dir.path()).unwrap();
460
461 let points: Vec<TemporalPoint> = (0..20).map(|i| sample_point(1, i * 100)).collect();
462 store.write_batch(0, &points).unwrap();
463
464 let results = store.range(1, 0, 500, 1500).unwrap();
465 assert_eq!(results.len(), 11); assert_eq!(results[0].timestamp(), 500);
467 assert_eq!(results.last().unwrap().timestamp(), 1500);
468 }
469
470 #[test]
471 fn cross_entity_isolation() {
472 let dir = tempfile::tempdir().unwrap();
473 let store = WarmStore::open(dir.path()).unwrap();
474
475 store.put(0, &sample_point(1, 100)).unwrap();
476 store.put(0, &sample_point(2, 100)).unwrap();
477
478 let results = store.range(1, 0, 0, i64::MAX).unwrap();
479 assert_eq!(results.len(), 1);
480 assert_eq!(results[0].entity_id(), 1);
481 }
482
483 #[test]
484 fn data_survives_reopen() {
485 let dir = tempfile::tempdir().unwrap();
486
487 {
488 let store = WarmStore::open(dir.path()).unwrap();
489 let points: Vec<TemporalPoint> = (0..5).map(|i| sample_point(1, i * 1000)).collect();
490 store.write_batch(0, &points).unwrap();
491 }
492
493 {
494 let store = WarmStore::open(dir.path()).unwrap();
495 assert_eq!(store.len(), 5);
496 let result = store.get(1, 0, 3000).unwrap();
497 assert!(result.is_some());
498 }
499 }
500
501 #[test]
502 fn zone_map_persists_and_prunes() {
503 let dir = tempfile::tempdir().unwrap();
504
505 {
507 let store = WarmStore::open(dir.path()).unwrap();
508 let early: Vec<TemporalPoint> = (0..10)
509 .map(|i| TemporalPoint::new(1, i * 100, vec![0.1; 4]))
510 .collect();
511 store.write_batch(0, &early).unwrap();
512
513 let late: Vec<TemporalPoint> = (0..10)
514 .map(|i| TemporalPoint::new(1, 10_000 + i * 100, vec![0.2; 4]))
515 .collect();
516 store.write_batch(0, &late).unwrap();
517 }
518
519 {
521 let store = WarmStore::open(dir.path()).unwrap();
522 assert_eq!(store.len(), 20);
523
524 let early_results = store.range(1, 0, 0, 999).unwrap();
526 assert_eq!(early_results.len(), 10);
527
528 let late_results = store.range(1, 0, 10_000, 11_000).unwrap();
530 assert_eq!(late_results.len(), 10);
531
532 assert!(dir.path().join("manifest.json").exists());
534 }
535 }
536
537 #[test]
538 fn large_batch() {
539 let dir = tempfile::tempdir().unwrap();
540 let store = WarmStore::open(dir.path()).unwrap();
541
542 let points: Vec<TemporalPoint> = (0..1000)
543 .map(|i| TemporalPoint::new(i / 100, (i % 100) as i64 * 1000, vec![i as f32; 8]))
544 .collect();
545 store.write_batch(0, &points).unwrap();
546
547 assert_eq!(store.len(), 1000);
548
549 let results = store.range(5, 0, 0, 100_000).unwrap();
551 assert_eq!(results.len(), 100); }
553}