cvx_storage/wal/
mod.rs

1//! Write-Ahead Log (WAL) for crash-safe ingestion.
2//!
3//! Implements an append-only, CRC32-validated log with segment rotation
4//! and recovery protocol per the Storage Layout spec §3.
5//!
6//! # Architecture
7//!
8//! ```text
9//! wal/
10//! ├── segment-000000000000.wal   (64MB max, append-only)
11//! ├── segment-000000000001.wal
12//! └── wal.meta                   (committed state)
13//! ```
14//!
15//! ## Entry lifecycle
16//!
17//! 1. Caller appends entry → WAL writes header + payload + CRC32
18//! 2. After downstream store + index confirm, caller calls `commit()`
19//! 3. On crash before commit, recovery replays uncommitted entries
20//!
21//! ## Recovery protocol
22//!
23//! 1. Read `wal.meta` for committed sequence number
24//! 2. Scan all segments for entries after committed sequence
25//! 3. Validate CRC32 — truncate at first invalid entry
26//! 4. Return uncommitted valid entries for replay
27
28use std::fs::{self, File, OpenOptions};
29use std::io::{BufReader, BufWriter, Read, Write};
30use std::path::{Path, PathBuf};
31
32use cvx_core::error::StorageError;
33
34/// Magic bytes identifying a CVX WAL segment.
35const WAL_MAGIC: [u8; 4] = *b"CVXW";
36
37/// Current WAL format version.
38const WAL_VERSION: u16 = 1;
39
40/// Segment header size in bytes.
41const SEGMENT_HEADER_SIZE: usize = 32;
42
43/// Entry header size in bytes (before payload).
44const ENTRY_HEADER_SIZE: usize = 24;
45
46/// Default maximum segment size: 64 MB.
47const DEFAULT_MAX_SEGMENT_SIZE: u64 = 64 * 1024 * 1024;
48
49/// WAL entry types.
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51#[repr(u8)]
52pub enum EntryType {
53    /// Insert a new temporal point.
54    Insert = 0,
55    /// Delete an existing temporal point.
56    Delete = 1,
57    /// Update an existing temporal point.
58    Update = 2,
59    /// Checkpoint marker.
60    Checkpoint = 3,
61}
62
63impl TryFrom<u8> for EntryType {
64    type Error = StorageError;
65    fn try_from(value: u8) -> Result<Self, Self::Error> {
66        match value {
67            0 => Ok(Self::Insert),
68            1 => Ok(Self::Delete),
69            2 => Ok(Self::Update),
70            3 => Ok(Self::Checkpoint),
71            _ => Err(StorageError::WalCorrupted { offset: 0 }),
72        }
73    }
74}
75
76/// A WAL entry ready for serialization.
77#[derive(Debug, Clone)]
78pub struct WalEntry {
79    /// Global monotonic sequence number.
80    pub sequence: u64,
81    /// Type of operation.
82    pub entry_type: EntryType,
83    /// Flags (bit 0: is_keyframe).
84    pub flags: u8,
85    /// Serialized payload.
86    pub payload: Vec<u8>,
87}
88
89/// WAL configuration.
90#[derive(Debug, Clone)]
91pub struct WalConfig {
92    /// Maximum segment file size in bytes.
93    pub max_segment_size: u64,
94    /// Whether to fsync after every write (durable but slower).
95    pub sync_on_write: bool,
96}
97
98impl Default for WalConfig {
99    fn default() -> Self {
100        Self {
101            max_segment_size: DEFAULT_MAX_SEGMENT_SIZE,
102            sync_on_write: false,
103        }
104    }
105}
106
107/// Persisted WAL metadata.
108#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
109struct WalMeta {
110    format_version: u16,
111    head_segment: u64,
112    head_offset: u64,
113    committed_sequence: u64,
114    last_sequence: u64,
115}
116
117impl Default for WalMeta {
118    fn default() -> Self {
119        Self {
120            format_version: WAL_VERSION,
121            head_segment: 0,
122            head_offset: SEGMENT_HEADER_SIZE as u64,
123            committed_sequence: 0,
124            last_sequence: 0,
125        }
126    }
127}
128
129/// Write-Ahead Log.
130pub struct Wal {
131    dir: PathBuf,
132    config: WalConfig,
133    meta: WalMeta,
134    current_writer: Option<BufWriter<File>>,
135    current_segment_size: u64,
136}
137
138impl Wal {
139    /// Open or create a WAL in the given directory.
140    pub fn open(dir: &Path, config: WalConfig) -> Result<Self, StorageError> {
141        fs::create_dir_all(dir)?;
142
143        let meta = Self::load_or_create_meta(dir)?;
144
145        let mut wal = Self {
146            dir: dir.to_path_buf(),
147            config,
148            meta,
149            current_writer: None,
150            current_segment_size: 0,
151        };
152
153        wal.open_current_segment()?;
154        Ok(wal)
155    }
156
157    fn load_or_create_meta(dir: &Path) -> Result<WalMeta, StorageError> {
158        let meta_path = dir.join("wal.meta");
159        if meta_path.exists() {
160            let content = fs::read_to_string(&meta_path)?;
161            let meta: WalMeta = serde_json::from_str(&content)
162                .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
163            Ok(meta)
164        } else {
165            let meta = WalMeta::default();
166            let content = serde_json::to_string_pretty(&meta)
167                .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
168            fs::write(&meta_path, &content)?;
169            Ok(meta)
170        }
171    }
172
173    fn segment_path(&self, segment_id: u64) -> PathBuf {
174        self.dir.join(format!("segment-{segment_id:012}.wal"))
175    }
176
177    fn open_current_segment(&mut self) -> Result<(), StorageError> {
178        let path = self.segment_path(self.meta.head_segment);
179
180        if path.exists() {
181            let file = OpenOptions::new().append(true).open(&path)?;
182            let size = file.metadata()?.len();
183            self.current_segment_size = size;
184            self.current_writer = Some(BufWriter::new(file));
185        } else {
186            let file = File::create(&path)?;
187            let mut writer = BufWriter::new(file);
188            Self::write_segment_header(&mut writer, self.meta.head_segment)?;
189            writer.flush()?;
190            self.current_segment_size = SEGMENT_HEADER_SIZE as u64;
191            self.current_writer = Some(writer);
192        }
193
194        Ok(())
195    }
196
197    fn write_segment_header(
198        writer: &mut BufWriter<File>,
199        segment_id: u64,
200    ) -> Result<(), StorageError> {
201        let mut header = [0u8; SEGMENT_HEADER_SIZE];
202        header[0..4].copy_from_slice(&WAL_MAGIC);
203        header[4..6].copy_from_slice(&WAL_VERSION.to_le_bytes());
204        header[6..14].copy_from_slice(&segment_id.to_le_bytes());
205        // CreatedAt: current time in microseconds (or 0 for simplicity)
206        let now = std::time::SystemTime::now()
207            .duration_since(std::time::UNIX_EPOCH)
208            .unwrap_or_default()
209            .as_micros() as i64;
210        header[14..22].copy_from_slice(&now.to_le_bytes());
211        // Reserved: [22..32] already zeroed
212        writer.write_all(&header)?;
213        Ok(())
214    }
215
216    fn rotate_segment(&mut self) -> Result<(), StorageError> {
217        // Flush and close current segment
218        if let Some(ref mut writer) = self.current_writer {
219            writer.flush()?;
220        }
221        self.current_writer = None;
222
223        self.meta.head_segment += 1;
224        self.meta.head_offset = SEGMENT_HEADER_SIZE as u64;
225        self.persist_meta()?;
226        self.open_current_segment()?;
227
228        Ok(())
229    }
230
231    /// Append an entry to the WAL. Returns the assigned sequence number.
232    pub fn append(
233        &mut self,
234        entry_type: EntryType,
235        flags: u8,
236        payload: &[u8],
237    ) -> Result<u64, StorageError> {
238        self.meta.last_sequence += 1;
239        let sequence = self.meta.last_sequence;
240
241        let entry_length = (ENTRY_HEADER_SIZE + payload.len()) as u32;
242
243        // Check if we need to rotate
244        if self.current_segment_size + entry_length as u64 > self.config.max_segment_size {
245            self.rotate_segment()?;
246        }
247
248        let crc = crc32_hash(payload);
249
250        // Write entry header
251        let writer = self.current_writer.as_mut().unwrap();
252        writer.write_all(&entry_length.to_le_bytes())?; // 4 bytes
253        writer.write_all(&sequence.to_le_bytes())?; // 8 bytes
254        writer.write_all(&[entry_type as u8])?; // 1 byte
255        writer.write_all(&[flags])?; // 1 byte
256        writer.write_all(&[0u8; 2])?; // 2 bytes reserved
257        writer.write_all(&crc.to_le_bytes())?; // 4 bytes
258        // Padding to 24 bytes
259        writer.write_all(&[0u8; 4])?; // 4 bytes padding
260
261        // Write payload
262        writer.write_all(payload)?;
263
264        if self.config.sync_on_write {
265            writer.flush()?;
266            writer.get_ref().sync_all()?;
267        }
268
269        self.current_segment_size += entry_length as u64;
270        self.meta.head_offset = self.current_segment_size;
271
272        Ok(sequence)
273    }
274
275    /// Mark all entries up to `sequence` as committed.
276    ///
277    /// Ensures durability by syncing segment data to disk before updating
278    /// metadata, then syncing the directory to make the rename durable.
279    /// See RFC-002-01 (Pillai et al., OSDI 2014).
280    pub fn commit(&mut self, sequence: u64) -> Result<(), StorageError> {
281        self.meta.committed_sequence = sequence;
282        // 1. Flush and fsync segment data to disk BEFORE updating metadata
283        if let Some(ref mut writer) = self.current_writer {
284            writer.flush()?;
285            writer.get_ref().sync_all()?;
286        }
287        // 2. Write metadata atomically (temp file + rename)
288        self.persist_meta()?;
289        // 3. Fsync directory to ensure rename is durable
290        let dir = std::fs::File::open(&self.dir)?;
291        dir.sync_all()?;
292        Ok(())
293    }
294
295    /// Get the last committed sequence number.
296    pub fn committed_sequence(&self) -> u64 {
297        self.meta.committed_sequence
298    }
299
300    /// Get the last appended sequence number.
301    pub fn last_sequence(&self) -> u64 {
302        self.meta.last_sequence
303    }
304
305    /// Flush buffered writes to disk.
306    pub fn flush(&mut self) -> Result<(), StorageError> {
307        if let Some(ref mut writer) = self.current_writer {
308            writer.flush()?;
309        }
310        Ok(())
311    }
312
313    fn persist_meta(&self) -> Result<(), StorageError> {
314        let meta_path = self.dir.join("wal.meta");
315        let content = serde_json::to_string_pretty(&self.meta)
316            .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
317        // Write atomically via temp file
318        let tmp_path = self.dir.join("wal.meta.tmp");
319        fs::write(&tmp_path, &content)?;
320        fs::rename(&tmp_path, &meta_path)?;
321        Ok(())
322    }
323
324    /// Recover uncommitted entries after a crash.
325    ///
326    /// Scans all segments for entries with sequence > committed_sequence.
327    /// Validates CRC32 for each entry. Stops at first corrupted entry.
328    /// Returns valid uncommitted entries for replay.
329    pub fn recover(&mut self) -> Result<Vec<WalEntry>, StorageError> {
330        self.flush()?;
331        // Close current writer so we can read the segments
332        self.current_writer = None;
333
334        let committed = self.meta.committed_sequence;
335        let mut uncommitted = Vec::new();
336
337        // Find all segment files
338        let mut segments = self.list_segments()?;
339        segments.sort();
340
341        for seg_id in segments {
342            let entries = self.read_segment(seg_id, committed)?;
343            uncommitted.extend(entries);
344        }
345
346        // Reopen current segment for further writes
347        self.open_current_segment()?;
348
349        Ok(uncommitted)
350    }
351
352    fn list_segments(&self) -> Result<Vec<u64>, StorageError> {
353        let mut segments = Vec::new();
354        for entry in fs::read_dir(&self.dir)? {
355            let entry = entry?;
356            let name = entry.file_name();
357            let name = name.to_string_lossy();
358            if let Some(rest) = name.strip_prefix("segment-") {
359                if let Some(num_str) = rest.strip_suffix(".wal") {
360                    if let Ok(id) = num_str.parse::<u64>() {
361                        segments.push(id);
362                    }
363                }
364            }
365        }
366        Ok(segments)
367    }
368
369    fn read_segment(
370        &self,
371        segment_id: u64,
372        committed_seq: u64,
373    ) -> Result<Vec<WalEntry>, StorageError> {
374        let path = self.segment_path(segment_id);
375        let file = File::open(&path)?;
376        let file_len = file.metadata()?.len();
377        let mut reader = BufReader::new(file);
378
379        // Validate and skip segment header
380        let mut header = [0u8; SEGMENT_HEADER_SIZE];
381        if reader.read_exact(&mut header).is_err() {
382            return Ok(Vec::new()); // Empty or too-small segment
383        }
384        if header[0..4] != WAL_MAGIC {
385            return Err(StorageError::WalCorrupted { offset: 0 });
386        }
387
388        let mut entries = Vec::new();
389        let mut offset = SEGMENT_HEADER_SIZE as u64;
390
391        while offset + ENTRY_HEADER_SIZE as u64 <= file_len {
392            // Read entry header
393            let mut entry_header = [0u8; ENTRY_HEADER_SIZE];
394            if reader.read_exact(&mut entry_header).is_err() {
395                break; // Partial header = truncated entry
396            }
397
398            let entry_length = u32::from_le_bytes(entry_header[0..4].try_into().unwrap());
399            let sequence = u64::from_le_bytes(entry_header[4..12].try_into().unwrap());
400            let entry_type_byte = entry_header[12];
401            let flags = entry_header[13];
402            // [14..16] reserved
403            let stored_crc = u32::from_le_bytes(entry_header[16..20].try_into().unwrap());
404            // [20..24] padding
405
406            let payload_len = entry_length as usize - ENTRY_HEADER_SIZE;
407            if payload_len > self.config.max_segment_size as usize {
408                break; // Corrupted entry length
409            }
410
411            let mut payload = vec![0u8; payload_len];
412            if reader.read_exact(&mut payload).is_err() {
413                break; // Truncated payload
414            }
415
416            // Validate CRC32
417            let computed_crc = crc32_hash(&payload);
418            if computed_crc != stored_crc {
419                // Corrupted entry — stop here per recovery protocol
420                break;
421            }
422
423            offset += entry_length as u64;
424
425            // Only return uncommitted entries
426            if sequence > committed_seq {
427                let entry_type = EntryType::try_from(entry_type_byte)?;
428                entries.push(WalEntry {
429                    sequence,
430                    entry_type,
431                    flags,
432                    payload,
433                });
434            }
435        }
436
437        Ok(entries)
438    }
439
440    /// Truncate the WAL at the current committed position.
441    ///
442    /// Removes all segments before the committed segment and truncates
443    /// the committed segment at the committed offset. Used after recovery.
444    pub fn truncate_uncommitted(&mut self) -> Result<(), StorageError> {
445        self.meta.last_sequence = self.meta.committed_sequence;
446        self.persist_meta()?;
447        Ok(())
448    }
449}
450
451/// Simple CRC32 hash (IEEE polynomial).
452fn crc32_hash(data: &[u8]) -> u32 {
453    // Using a simple CRC32 implementation to avoid adding a dependency.
454    // IEEE polynomial: 0xEDB88320 (reflected)
455    let mut crc: u32 = 0xFFFF_FFFF;
456    for &byte in data {
457        crc ^= byte as u32;
458        for _ in 0..8 {
459            if crc & 1 != 0 {
460                crc = (crc >> 1) ^ 0xEDB8_8320;
461            } else {
462                crc >>= 1;
463            }
464        }
465    }
466    !crc
467}
468
469/// Encode an insert payload: entity_id + timestamp + dimension + vector.
470pub fn encode_insert_payload(entity_id: u64, timestamp: i64, vector: &[f32]) -> Vec<u8> {
471    let dim = vector.len() as u16;
472    let mut buf = Vec::with_capacity(8 + 8 + 2 + vector.len() * 4);
473    buf.extend_from_slice(&entity_id.to_le_bytes());
474    buf.extend_from_slice(&timestamp.to_le_bytes());
475    buf.extend_from_slice(&dim.to_le_bytes());
476    for &v in vector {
477        buf.extend_from_slice(&v.to_le_bytes());
478    }
479    // MetadataLen = 0 (no metadata for now)
480    buf.extend_from_slice(&0u32.to_le_bytes());
481    buf
482}
483
484/// Decode an insert payload back into components.
485pub fn decode_insert_payload(payload: &[u8]) -> Result<(u64, i64, Vec<f32>), StorageError> {
486    if payload.len() < 18 {
487        return Err(StorageError::WalCorrupted { offset: 0 });
488    }
489    let entity_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
490    let timestamp = i64::from_le_bytes(payload[8..16].try_into().unwrap());
491    let dim = u16::from_le_bytes(payload[16..18].try_into().unwrap()) as usize;
492
493    let vector_start = 18;
494    let vector_end = vector_start + dim * 4;
495    if payload.len() < vector_end {
496        return Err(StorageError::WalCorrupted { offset: 16 });
497    }
498
499    let vector: Vec<f32> = (0..dim)
500        .map(|i| {
501            let offset = vector_start + i * 4;
502            f32::from_le_bytes(payload[offset..offset + 4].try_into().unwrap())
503        })
504        .collect();
505
506    Ok((entity_id, timestamp, vector))
507}
508
509/// Encode a delete payload: entity_id + timestamp.
510pub fn encode_delete_payload(entity_id: u64, timestamp: i64) -> Vec<u8> {
511    let mut buf = Vec::with_capacity(16);
512    buf.extend_from_slice(&entity_id.to_le_bytes());
513    buf.extend_from_slice(&timestamp.to_le_bytes());
514    buf
515}
516
517/// Decode a delete payload.
518pub fn decode_delete_payload(payload: &[u8]) -> Result<(u64, i64), StorageError> {
519    if payload.len() < 16 {
520        return Err(StorageError::WalCorrupted { offset: 0 });
521    }
522    let entity_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
523    let timestamp = i64::from_le_bytes(payload[8..16].try_into().unwrap());
524    Ok((entity_id, timestamp))
525}
526
527#[cfg(test)]
528mod tests {
529    use super::*;
530
531    fn tmp_wal_dir() -> tempfile::TempDir {
532        tempfile::tempdir().unwrap()
533    }
534
535    // ─── Basic operations ───────────────────────────────────────────────
536
537    #[test]
538    fn create_and_append() {
539        let dir = tmp_wal_dir();
540        let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
541
542        let payload = encode_insert_payload(42, 1000, &[0.1, 0.2, 0.3]);
543        let seq = wal.append(EntryType::Insert, 0, &payload).unwrap();
544        assert_eq!(seq, 1);
545
546        let seq2 = wal.append(EntryType::Insert, 0, &payload).unwrap();
547        assert_eq!(seq2, 2);
548    }
549
550    #[test]
551    fn commit_updates_sequence() {
552        let dir = tmp_wal_dir();
553        let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
554
555        let payload = encode_insert_payload(1, 100, &[1.0]);
556        let seq = wal.append(EntryType::Insert, 0, &payload).unwrap();
557        wal.commit(seq).unwrap();
558        assert_eq!(wal.committed_sequence(), seq);
559    }
560
561    // ─── Recovery ───────────────────────────────────────────────────────
562
563    #[test]
564    fn recover_uncommitted_entries() {
565        let dir = tmp_wal_dir();
566
567        // Write 5 entries, commit first 3
568        {
569            let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
570            for i in 0..5u64 {
571                let payload = encode_insert_payload(i, (i * 1000) as i64, &[i as f32]);
572                wal.append(EntryType::Insert, 0, &payload).unwrap();
573            }
574            wal.commit(3).unwrap();
575        }
576
577        // Reopen and recover
578        {
579            let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
580            let uncommitted = wal.recover().unwrap();
581            assert_eq!(uncommitted.len(), 2); // entries 4 and 5
582            assert_eq!(uncommitted[0].sequence, 4);
583            assert_eq!(uncommitted[1].sequence, 5);
584
585            // Verify payloads
586            let (eid, ts, _vec) = decode_insert_payload(&uncommitted[0].payload).unwrap();
587            assert_eq!(eid, 3);
588            assert_eq!(ts, 3000);
589        }
590    }
591
592    #[test]
593    fn recover_all_committed_returns_empty() {
594        let dir = tmp_wal_dir();
595
596        {
597            let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
598            for i in 0..3u64 {
599                let payload = encode_insert_payload(i, 0, &[0.0]);
600                wal.append(EntryType::Insert, 0, &payload).unwrap();
601            }
602            wal.commit(3).unwrap();
603        }
604
605        {
606            let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
607            let uncommitted = wal.recover().unwrap();
608            assert!(uncommitted.is_empty());
609        }
610    }
611
612    #[test]
613    fn recover_truncated_entry_is_skipped() {
614        let dir = tmp_wal_dir();
615
616        // Write entries normally
617        {
618            let mut wal = Wal::open(
619                dir.path(),
620                WalConfig {
621                    sync_on_write: true,
622                    ..Default::default()
623                },
624            )
625            .unwrap();
626            let payload = encode_insert_payload(1, 1000, &[1.0, 2.0, 3.0]);
627            wal.append(EntryType::Insert, 0, &payload).unwrap();
628            let payload2 = encode_insert_payload(2, 2000, &[4.0, 5.0, 6.0]);
629            wal.append(EntryType::Insert, 0, &payload2).unwrap();
630            wal.flush().unwrap();
631        }
632
633        // Corrupt the file by truncating the last few bytes
634        {
635            let seg_path = dir.path().join("segment-000000000000.wal");
636            let file = OpenOptions::new().write(true).open(&seg_path).unwrap();
637            let current_len = file.metadata().unwrap().len();
638            file.set_len(current_len - 5).unwrap(); // truncate last 5 bytes
639        }
640
641        // Recovery should return only the first valid entry
642        {
643            let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
644            let entries = wal.recover().unwrap();
645            assert_eq!(entries.len(), 1); // only first entry survived
646            assert_eq!(entries[0].sequence, 1);
647        }
648    }
649
650    #[test]
651    fn recover_corrupted_crc_stops() {
652        let dir = tmp_wal_dir();
653
654        {
655            let mut wal = Wal::open(
656                dir.path(),
657                WalConfig {
658                    sync_on_write: true,
659                    ..Default::default()
660                },
661            )
662            .unwrap();
663            for i in 0..3u64 {
664                let payload = encode_insert_payload(i, 0, &[i as f32; 4]);
665                wal.append(EntryType::Insert, 0, &payload).unwrap();
666            }
667            wal.flush().unwrap();
668        }
669
670        // Corrupt the payload of the second entry (flip a byte)
671        {
672            let seg_path = dir.path().join("segment-000000000000.wal");
673            let mut data = fs::read(&seg_path).unwrap();
674            // Second entry starts at: header(32) + first_entry_size
675            // First entry: header(24) + payload(8+8+2+4*4+4 = 38) = 62
676            let second_entry_payload_offset = SEGMENT_HEADER_SIZE + 62 + ENTRY_HEADER_SIZE;
677            if second_entry_payload_offset < data.len() {
678                data[second_entry_payload_offset] ^= 0xFF; // flip bits
679            }
680            fs::write(&seg_path, &data).unwrap();
681        }
682
683        {
684            let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
685            let entries = wal.recover().unwrap();
686            // Should stop at the corrupted second entry, returning only the first
687            assert_eq!(entries.len(), 1);
688        }
689    }
690
691    // ─── Segment rotation ───────────────────────────────────────────────
692
693    #[test]
694    fn segment_rotation_on_size_limit() {
695        let dir = tmp_wal_dir();
696        let config = WalConfig {
697            max_segment_size: 256, // tiny segments for testing
698            sync_on_write: false,
699        };
700        let mut wal = Wal::open(dir.path(), config).unwrap();
701
702        // Write enough entries to trigger rotation
703        for i in 0..20u64 {
704            let payload = encode_insert_payload(i, 0, &[0.0; 8]);
705            wal.append(EntryType::Insert, 0, &payload).unwrap();
706        }
707        wal.flush().unwrap();
708
709        // Should have created multiple segment files
710        let segments = wal.list_segments().unwrap();
711        assert!(
712            segments.len() > 1,
713            "expected multiple segments, got {}",
714            segments.len()
715        );
716    }
717
718    #[test]
719    fn recovery_across_segments() {
720        let dir = tmp_wal_dir();
721        let config = WalConfig {
722            max_segment_size: 256,
723            sync_on_write: true,
724        };
725
726        let total_entries = 20u64;
727        let commit_at = 10u64;
728
729        {
730            let mut wal = Wal::open(dir.path(), config.clone()).unwrap();
731            for i in 0..total_entries {
732                let payload = encode_insert_payload(i, (i * 100) as i64, &[i as f32]);
733                wal.append(EntryType::Insert, 0, &payload).unwrap();
734            }
735            wal.commit(commit_at).unwrap();
736        }
737
738        {
739            let mut wal = Wal::open(dir.path(), config).unwrap();
740            let uncommitted = wal.recover().unwrap();
741            assert_eq!(uncommitted.len(), (total_entries - commit_at) as usize);
742            // Verify all have sequence > commit_at
743            for entry in &uncommitted {
744                assert!(entry.sequence > commit_at);
745            }
746        }
747    }
748
749    // ─── Payload encoding/decoding ──────────────────────────────────────
750
751    #[test]
752    fn insert_payload_roundtrip() {
753        let vector = vec![0.1, 0.2, 0.3, 0.4];
754        let payload = encode_insert_payload(42, -5000, &vector);
755        let (eid, ts, vec) = decode_insert_payload(&payload).unwrap();
756        assert_eq!(eid, 42);
757        assert_eq!(ts, -5000);
758        assert_eq!(vec, vector);
759    }
760
761    #[test]
762    fn delete_payload_roundtrip() {
763        let payload = encode_delete_payload(99, 12345);
764        let (eid, ts) = decode_delete_payload(&payload).unwrap();
765        assert_eq!(eid, 99);
766        assert_eq!(ts, 12345);
767    }
768
769    #[test]
770    fn insert_payload_d768_roundtrip() {
771        let vector: Vec<f32> = (0..768).map(|i| i as f32 * 0.001).collect();
772        let payload = encode_insert_payload(1, 1_000_000, &vector);
773        let (eid, ts, vec) = decode_insert_payload(&payload).unwrap();
774        assert_eq!(eid, 1);
775        assert_eq!(ts, 1_000_000);
776        assert_eq!(vec.len(), 768);
777        for (a, b) in vec.iter().zip(vector.iter()) {
778            assert!((a - b).abs() < f32::EPSILON);
779        }
780    }
781
782    // ─── CRC32 ──────────────────────────────────────────────────────────
783
784    #[test]
785    fn crc32_deterministic() {
786        let data = b"hello world";
787        let crc1 = crc32_hash(data);
788        let crc2 = crc32_hash(data);
789        assert_eq!(crc1, crc2);
790    }
791
792    #[test]
793    fn crc32_different_data() {
794        let crc1 = crc32_hash(b"hello");
795        let crc2 = crc32_hash(b"world");
796        assert_ne!(crc1, crc2);
797    }
798
799    // ─── Write 100K entries and recover ─────────────────────────────────
800
801    #[test]
802    fn write_100k_and_recover() {
803        let dir = tmp_wal_dir();
804        let config = WalConfig {
805            max_segment_size: DEFAULT_MAX_SEGMENT_SIZE,
806            sync_on_write: false,
807        };
808
809        let n = 100_000u64;
810        let commit_at = 99_990u64;
811
812        {
813            let mut wal = Wal::open(dir.path(), config.clone()).unwrap();
814            for i in 0..n {
815                let payload = encode_insert_payload(i % 1000, (i * 10) as i64, &[i as f32; 4]);
816                wal.append(EntryType::Insert, 0, &payload).unwrap();
817            }
818            wal.commit(commit_at).unwrap();
819        }
820
821        {
822            let mut wal = Wal::open(dir.path(), config).unwrap();
823            let uncommitted = wal.recover().unwrap();
824            assert_eq!(uncommitted.len(), (n - commit_at) as usize);
825            // All committed entries should NOT be returned
826            for entry in &uncommitted {
827                assert!(entry.sequence > commit_at);
828            }
829        }
830    }
831
832    // ─── Reopen and continue writing ────────────────────────────────────
833
834    #[test]
835    fn reopen_and_continue() {
836        let dir = tmp_wal_dir();
837        let config = WalConfig::default();
838
839        {
840            let mut wal = Wal::open(dir.path(), config.clone()).unwrap();
841            let payload = encode_insert_payload(1, 100, &[1.0]);
842            wal.append(EntryType::Insert, 0, &payload).unwrap();
843            wal.commit(1).unwrap();
844        }
845
846        {
847            let mut wal = Wal::open(dir.path(), config).unwrap();
848            assert_eq!(wal.committed_sequence(), 1);
849            let payload = encode_insert_payload(2, 200, &[2.0]);
850            let seq = wal.append(EntryType::Insert, 0, &payload).unwrap();
851            assert_eq!(seq, 2);
852            wal.commit(2).unwrap();
853        }
854    }
855
856    // ─── Entry types ────────────────────────────────────────────────────
857
858    #[test]
859    fn delete_entry_type() {
860        let dir = tmp_wal_dir();
861        let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
862
863        let payload = encode_delete_payload(42, 1000);
864        let seq = wal.append(EntryType::Delete, 0, &payload).unwrap();
865        assert_eq!(seq, 1);
866
867        let entries = wal.recover().unwrap();
868        assert_eq!(entries.len(), 1);
869        assert_eq!(entries[0].entry_type, EntryType::Delete);
870    }
871
872    #[test]
873    fn keyframe_flag() {
874        let dir = tmp_wal_dir();
875        let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
876
877        let payload = encode_insert_payload(1, 100, &[1.0]);
878        wal.append(EntryType::Insert, 0x01, &payload).unwrap(); // keyframe flag
879
880        let entries = wal.recover().unwrap();
881        assert_eq!(entries[0].flags, 0x01);
882    }
883}