1use std::fs::{self, File, OpenOptions};
29use std::io::{BufReader, BufWriter, Read, Write};
30use std::path::{Path, PathBuf};
31
32use cvx_core::error::StorageError;
33
34const WAL_MAGIC: [u8; 4] = *b"CVXW";
36
37const WAL_VERSION: u16 = 1;
39
40const SEGMENT_HEADER_SIZE: usize = 32;
42
43const ENTRY_HEADER_SIZE: usize = 24;
45
46const DEFAULT_MAX_SEGMENT_SIZE: u64 = 64 * 1024 * 1024;
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51#[repr(u8)]
52pub enum EntryType {
53 Insert = 0,
55 Delete = 1,
57 Update = 2,
59 Checkpoint = 3,
61}
62
63impl TryFrom<u8> for EntryType {
64 type Error = StorageError;
65 fn try_from(value: u8) -> Result<Self, Self::Error> {
66 match value {
67 0 => Ok(Self::Insert),
68 1 => Ok(Self::Delete),
69 2 => Ok(Self::Update),
70 3 => Ok(Self::Checkpoint),
71 _ => Err(StorageError::WalCorrupted { offset: 0 }),
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct WalEntry {
79 pub sequence: u64,
81 pub entry_type: EntryType,
83 pub flags: u8,
85 pub payload: Vec<u8>,
87}
88
89#[derive(Debug, Clone)]
91pub struct WalConfig {
92 pub max_segment_size: u64,
94 pub sync_on_write: bool,
96}
97
98impl Default for WalConfig {
99 fn default() -> Self {
100 Self {
101 max_segment_size: DEFAULT_MAX_SEGMENT_SIZE,
102 sync_on_write: false,
103 }
104 }
105}
106
107#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
109struct WalMeta {
110 format_version: u16,
111 head_segment: u64,
112 head_offset: u64,
113 committed_sequence: u64,
114 last_sequence: u64,
115}
116
117impl Default for WalMeta {
118 fn default() -> Self {
119 Self {
120 format_version: WAL_VERSION,
121 head_segment: 0,
122 head_offset: SEGMENT_HEADER_SIZE as u64,
123 committed_sequence: 0,
124 last_sequence: 0,
125 }
126 }
127}
128
129pub struct Wal {
131 dir: PathBuf,
132 config: WalConfig,
133 meta: WalMeta,
134 current_writer: Option<BufWriter<File>>,
135 current_segment_size: u64,
136}
137
138impl Wal {
139 pub fn open(dir: &Path, config: WalConfig) -> Result<Self, StorageError> {
141 fs::create_dir_all(dir)?;
142
143 let meta = Self::load_or_create_meta(dir)?;
144
145 let mut wal = Self {
146 dir: dir.to_path_buf(),
147 config,
148 meta,
149 current_writer: None,
150 current_segment_size: 0,
151 };
152
153 wal.open_current_segment()?;
154 Ok(wal)
155 }
156
157 fn load_or_create_meta(dir: &Path) -> Result<WalMeta, StorageError> {
158 let meta_path = dir.join("wal.meta");
159 if meta_path.exists() {
160 let content = fs::read_to_string(&meta_path)?;
161 let meta: WalMeta = serde_json::from_str(&content)
162 .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
163 Ok(meta)
164 } else {
165 let meta = WalMeta::default();
166 let content = serde_json::to_string_pretty(&meta)
167 .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
168 fs::write(&meta_path, &content)?;
169 Ok(meta)
170 }
171 }
172
173 fn segment_path(&self, segment_id: u64) -> PathBuf {
174 self.dir.join(format!("segment-{segment_id:012}.wal"))
175 }
176
177 fn open_current_segment(&mut self) -> Result<(), StorageError> {
178 let path = self.segment_path(self.meta.head_segment);
179
180 if path.exists() {
181 let file = OpenOptions::new().append(true).open(&path)?;
182 let size = file.metadata()?.len();
183 self.current_segment_size = size;
184 self.current_writer = Some(BufWriter::new(file));
185 } else {
186 let file = File::create(&path)?;
187 let mut writer = BufWriter::new(file);
188 Self::write_segment_header(&mut writer, self.meta.head_segment)?;
189 writer.flush()?;
190 self.current_segment_size = SEGMENT_HEADER_SIZE as u64;
191 self.current_writer = Some(writer);
192 }
193
194 Ok(())
195 }
196
197 fn write_segment_header(
198 writer: &mut BufWriter<File>,
199 segment_id: u64,
200 ) -> Result<(), StorageError> {
201 let mut header = [0u8; SEGMENT_HEADER_SIZE];
202 header[0..4].copy_from_slice(&WAL_MAGIC);
203 header[4..6].copy_from_slice(&WAL_VERSION.to_le_bytes());
204 header[6..14].copy_from_slice(&segment_id.to_le_bytes());
205 let now = std::time::SystemTime::now()
207 .duration_since(std::time::UNIX_EPOCH)
208 .unwrap_or_default()
209 .as_micros() as i64;
210 header[14..22].copy_from_slice(&now.to_le_bytes());
211 writer.write_all(&header)?;
213 Ok(())
214 }
215
216 fn rotate_segment(&mut self) -> Result<(), StorageError> {
217 if let Some(ref mut writer) = self.current_writer {
219 writer.flush()?;
220 }
221 self.current_writer = None;
222
223 self.meta.head_segment += 1;
224 self.meta.head_offset = SEGMENT_HEADER_SIZE as u64;
225 self.persist_meta()?;
226 self.open_current_segment()?;
227
228 Ok(())
229 }
230
231 pub fn append(
233 &mut self,
234 entry_type: EntryType,
235 flags: u8,
236 payload: &[u8],
237 ) -> Result<u64, StorageError> {
238 self.meta.last_sequence += 1;
239 let sequence = self.meta.last_sequence;
240
241 let entry_length = (ENTRY_HEADER_SIZE + payload.len()) as u32;
242
243 if self.current_segment_size + entry_length as u64 > self.config.max_segment_size {
245 self.rotate_segment()?;
246 }
247
248 let crc = crc32_hash(payload);
249
250 let writer = self.current_writer.as_mut().unwrap();
252 writer.write_all(&entry_length.to_le_bytes())?; writer.write_all(&sequence.to_le_bytes())?; writer.write_all(&[entry_type as u8])?; writer.write_all(&[flags])?; writer.write_all(&[0u8; 2])?; writer.write_all(&crc.to_le_bytes())?; writer.write_all(&[0u8; 4])?; writer.write_all(payload)?;
263
264 if self.config.sync_on_write {
265 writer.flush()?;
266 writer.get_ref().sync_all()?;
267 }
268
269 self.current_segment_size += entry_length as u64;
270 self.meta.head_offset = self.current_segment_size;
271
272 Ok(sequence)
273 }
274
275 pub fn commit(&mut self, sequence: u64) -> Result<(), StorageError> {
281 self.meta.committed_sequence = sequence;
282 if let Some(ref mut writer) = self.current_writer {
284 writer.flush()?;
285 writer.get_ref().sync_all()?;
286 }
287 self.persist_meta()?;
289 let dir = std::fs::File::open(&self.dir)?;
291 dir.sync_all()?;
292 Ok(())
293 }
294
295 pub fn committed_sequence(&self) -> u64 {
297 self.meta.committed_sequence
298 }
299
300 pub fn last_sequence(&self) -> u64 {
302 self.meta.last_sequence
303 }
304
305 pub fn flush(&mut self) -> Result<(), StorageError> {
307 if let Some(ref mut writer) = self.current_writer {
308 writer.flush()?;
309 }
310 Ok(())
311 }
312
313 fn persist_meta(&self) -> Result<(), StorageError> {
314 let meta_path = self.dir.join("wal.meta");
315 let content = serde_json::to_string_pretty(&self.meta)
316 .map_err(|_| StorageError::WalCorrupted { offset: 0 })?;
317 let tmp_path = self.dir.join("wal.meta.tmp");
319 fs::write(&tmp_path, &content)?;
320 fs::rename(&tmp_path, &meta_path)?;
321 Ok(())
322 }
323
324 pub fn recover(&mut self) -> Result<Vec<WalEntry>, StorageError> {
330 self.flush()?;
331 self.current_writer = None;
333
334 let committed = self.meta.committed_sequence;
335 let mut uncommitted = Vec::new();
336
337 let mut segments = self.list_segments()?;
339 segments.sort();
340
341 for seg_id in segments {
342 let entries = self.read_segment(seg_id, committed)?;
343 uncommitted.extend(entries);
344 }
345
346 self.open_current_segment()?;
348
349 Ok(uncommitted)
350 }
351
352 fn list_segments(&self) -> Result<Vec<u64>, StorageError> {
353 let mut segments = Vec::new();
354 for entry in fs::read_dir(&self.dir)? {
355 let entry = entry?;
356 let name = entry.file_name();
357 let name = name.to_string_lossy();
358 if let Some(rest) = name.strip_prefix("segment-") {
359 if let Some(num_str) = rest.strip_suffix(".wal") {
360 if let Ok(id) = num_str.parse::<u64>() {
361 segments.push(id);
362 }
363 }
364 }
365 }
366 Ok(segments)
367 }
368
369 fn read_segment(
370 &self,
371 segment_id: u64,
372 committed_seq: u64,
373 ) -> Result<Vec<WalEntry>, StorageError> {
374 let path = self.segment_path(segment_id);
375 let file = File::open(&path)?;
376 let file_len = file.metadata()?.len();
377 let mut reader = BufReader::new(file);
378
379 let mut header = [0u8; SEGMENT_HEADER_SIZE];
381 if reader.read_exact(&mut header).is_err() {
382 return Ok(Vec::new()); }
384 if header[0..4] != WAL_MAGIC {
385 return Err(StorageError::WalCorrupted { offset: 0 });
386 }
387
388 let mut entries = Vec::new();
389 let mut offset = SEGMENT_HEADER_SIZE as u64;
390
391 while offset + ENTRY_HEADER_SIZE as u64 <= file_len {
392 let mut entry_header = [0u8; ENTRY_HEADER_SIZE];
394 if reader.read_exact(&mut entry_header).is_err() {
395 break; }
397
398 let entry_length = u32::from_le_bytes(entry_header[0..4].try_into().unwrap());
399 let sequence = u64::from_le_bytes(entry_header[4..12].try_into().unwrap());
400 let entry_type_byte = entry_header[12];
401 let flags = entry_header[13];
402 let stored_crc = u32::from_le_bytes(entry_header[16..20].try_into().unwrap());
404 let payload_len = entry_length as usize - ENTRY_HEADER_SIZE;
407 if payload_len > self.config.max_segment_size as usize {
408 break; }
410
411 let mut payload = vec![0u8; payload_len];
412 if reader.read_exact(&mut payload).is_err() {
413 break; }
415
416 let computed_crc = crc32_hash(&payload);
418 if computed_crc != stored_crc {
419 break;
421 }
422
423 offset += entry_length as u64;
424
425 if sequence > committed_seq {
427 let entry_type = EntryType::try_from(entry_type_byte)?;
428 entries.push(WalEntry {
429 sequence,
430 entry_type,
431 flags,
432 payload,
433 });
434 }
435 }
436
437 Ok(entries)
438 }
439
440 pub fn truncate_uncommitted(&mut self) -> Result<(), StorageError> {
445 self.meta.last_sequence = self.meta.committed_sequence;
446 self.persist_meta()?;
447 Ok(())
448 }
449}
450
451fn crc32_hash(data: &[u8]) -> u32 {
453 let mut crc: u32 = 0xFFFF_FFFF;
456 for &byte in data {
457 crc ^= byte as u32;
458 for _ in 0..8 {
459 if crc & 1 != 0 {
460 crc = (crc >> 1) ^ 0xEDB8_8320;
461 } else {
462 crc >>= 1;
463 }
464 }
465 }
466 !crc
467}
468
469pub fn encode_insert_payload(entity_id: u64, timestamp: i64, vector: &[f32]) -> Vec<u8> {
471 let dim = vector.len() as u16;
472 let mut buf = Vec::with_capacity(8 + 8 + 2 + vector.len() * 4);
473 buf.extend_from_slice(&entity_id.to_le_bytes());
474 buf.extend_from_slice(×tamp.to_le_bytes());
475 buf.extend_from_slice(&dim.to_le_bytes());
476 for &v in vector {
477 buf.extend_from_slice(&v.to_le_bytes());
478 }
479 buf.extend_from_slice(&0u32.to_le_bytes());
481 buf
482}
483
484pub fn decode_insert_payload(payload: &[u8]) -> Result<(u64, i64, Vec<f32>), StorageError> {
486 if payload.len() < 18 {
487 return Err(StorageError::WalCorrupted { offset: 0 });
488 }
489 let entity_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
490 let timestamp = i64::from_le_bytes(payload[8..16].try_into().unwrap());
491 let dim = u16::from_le_bytes(payload[16..18].try_into().unwrap()) as usize;
492
493 let vector_start = 18;
494 let vector_end = vector_start + dim * 4;
495 if payload.len() < vector_end {
496 return Err(StorageError::WalCorrupted { offset: 16 });
497 }
498
499 let vector: Vec<f32> = (0..dim)
500 .map(|i| {
501 let offset = vector_start + i * 4;
502 f32::from_le_bytes(payload[offset..offset + 4].try_into().unwrap())
503 })
504 .collect();
505
506 Ok((entity_id, timestamp, vector))
507}
508
509pub fn encode_delete_payload(entity_id: u64, timestamp: i64) -> Vec<u8> {
511 let mut buf = Vec::with_capacity(16);
512 buf.extend_from_slice(&entity_id.to_le_bytes());
513 buf.extend_from_slice(×tamp.to_le_bytes());
514 buf
515}
516
517pub fn decode_delete_payload(payload: &[u8]) -> Result<(u64, i64), StorageError> {
519 if payload.len() < 16 {
520 return Err(StorageError::WalCorrupted { offset: 0 });
521 }
522 let entity_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
523 let timestamp = i64::from_le_bytes(payload[8..16].try_into().unwrap());
524 Ok((entity_id, timestamp))
525}
526
527#[cfg(test)]
528mod tests {
529 use super::*;
530
531 fn tmp_wal_dir() -> tempfile::TempDir {
532 tempfile::tempdir().unwrap()
533 }
534
535 #[test]
538 fn create_and_append() {
539 let dir = tmp_wal_dir();
540 let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
541
542 let payload = encode_insert_payload(42, 1000, &[0.1, 0.2, 0.3]);
543 let seq = wal.append(EntryType::Insert, 0, &payload).unwrap();
544 assert_eq!(seq, 1);
545
546 let seq2 = wal.append(EntryType::Insert, 0, &payload).unwrap();
547 assert_eq!(seq2, 2);
548 }
549
550 #[test]
551 fn commit_updates_sequence() {
552 let dir = tmp_wal_dir();
553 let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
554
555 let payload = encode_insert_payload(1, 100, &[1.0]);
556 let seq = wal.append(EntryType::Insert, 0, &payload).unwrap();
557 wal.commit(seq).unwrap();
558 assert_eq!(wal.committed_sequence(), seq);
559 }
560
561 #[test]
564 fn recover_uncommitted_entries() {
565 let dir = tmp_wal_dir();
566
567 {
569 let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
570 for i in 0..5u64 {
571 let payload = encode_insert_payload(i, (i * 1000) as i64, &[i as f32]);
572 wal.append(EntryType::Insert, 0, &payload).unwrap();
573 }
574 wal.commit(3).unwrap();
575 }
576
577 {
579 let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
580 let uncommitted = wal.recover().unwrap();
581 assert_eq!(uncommitted.len(), 2); assert_eq!(uncommitted[0].sequence, 4);
583 assert_eq!(uncommitted[1].sequence, 5);
584
585 let (eid, ts, _vec) = decode_insert_payload(&uncommitted[0].payload).unwrap();
587 assert_eq!(eid, 3);
588 assert_eq!(ts, 3000);
589 }
590 }
591
592 #[test]
593 fn recover_all_committed_returns_empty() {
594 let dir = tmp_wal_dir();
595
596 {
597 let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
598 for i in 0..3u64 {
599 let payload = encode_insert_payload(i, 0, &[0.0]);
600 wal.append(EntryType::Insert, 0, &payload).unwrap();
601 }
602 wal.commit(3).unwrap();
603 }
604
605 {
606 let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
607 let uncommitted = wal.recover().unwrap();
608 assert!(uncommitted.is_empty());
609 }
610 }
611
612 #[test]
613 fn recover_truncated_entry_is_skipped() {
614 let dir = tmp_wal_dir();
615
616 {
618 let mut wal = Wal::open(
619 dir.path(),
620 WalConfig {
621 sync_on_write: true,
622 ..Default::default()
623 },
624 )
625 .unwrap();
626 let payload = encode_insert_payload(1, 1000, &[1.0, 2.0, 3.0]);
627 wal.append(EntryType::Insert, 0, &payload).unwrap();
628 let payload2 = encode_insert_payload(2, 2000, &[4.0, 5.0, 6.0]);
629 wal.append(EntryType::Insert, 0, &payload2).unwrap();
630 wal.flush().unwrap();
631 }
632
633 {
635 let seg_path = dir.path().join("segment-000000000000.wal");
636 let file = OpenOptions::new().write(true).open(&seg_path).unwrap();
637 let current_len = file.metadata().unwrap().len();
638 file.set_len(current_len - 5).unwrap(); }
640
641 {
643 let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
644 let entries = wal.recover().unwrap();
645 assert_eq!(entries.len(), 1); assert_eq!(entries[0].sequence, 1);
647 }
648 }
649
650 #[test]
651 fn recover_corrupted_crc_stops() {
652 let dir = tmp_wal_dir();
653
654 {
655 let mut wal = Wal::open(
656 dir.path(),
657 WalConfig {
658 sync_on_write: true,
659 ..Default::default()
660 },
661 )
662 .unwrap();
663 for i in 0..3u64 {
664 let payload = encode_insert_payload(i, 0, &[i as f32; 4]);
665 wal.append(EntryType::Insert, 0, &payload).unwrap();
666 }
667 wal.flush().unwrap();
668 }
669
670 {
672 let seg_path = dir.path().join("segment-000000000000.wal");
673 let mut data = fs::read(&seg_path).unwrap();
674 let second_entry_payload_offset = SEGMENT_HEADER_SIZE + 62 + ENTRY_HEADER_SIZE;
677 if second_entry_payload_offset < data.len() {
678 data[second_entry_payload_offset] ^= 0xFF; }
680 fs::write(&seg_path, &data).unwrap();
681 }
682
683 {
684 let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
685 let entries = wal.recover().unwrap();
686 assert_eq!(entries.len(), 1);
688 }
689 }
690
691 #[test]
694 fn segment_rotation_on_size_limit() {
695 let dir = tmp_wal_dir();
696 let config = WalConfig {
697 max_segment_size: 256, sync_on_write: false,
699 };
700 let mut wal = Wal::open(dir.path(), config).unwrap();
701
702 for i in 0..20u64 {
704 let payload = encode_insert_payload(i, 0, &[0.0; 8]);
705 wal.append(EntryType::Insert, 0, &payload).unwrap();
706 }
707 wal.flush().unwrap();
708
709 let segments = wal.list_segments().unwrap();
711 assert!(
712 segments.len() > 1,
713 "expected multiple segments, got {}",
714 segments.len()
715 );
716 }
717
718 #[test]
719 fn recovery_across_segments() {
720 let dir = tmp_wal_dir();
721 let config = WalConfig {
722 max_segment_size: 256,
723 sync_on_write: true,
724 };
725
726 let total_entries = 20u64;
727 let commit_at = 10u64;
728
729 {
730 let mut wal = Wal::open(dir.path(), config.clone()).unwrap();
731 for i in 0..total_entries {
732 let payload = encode_insert_payload(i, (i * 100) as i64, &[i as f32]);
733 wal.append(EntryType::Insert, 0, &payload).unwrap();
734 }
735 wal.commit(commit_at).unwrap();
736 }
737
738 {
739 let mut wal = Wal::open(dir.path(), config).unwrap();
740 let uncommitted = wal.recover().unwrap();
741 assert_eq!(uncommitted.len(), (total_entries - commit_at) as usize);
742 for entry in &uncommitted {
744 assert!(entry.sequence > commit_at);
745 }
746 }
747 }
748
749 #[test]
752 fn insert_payload_roundtrip() {
753 let vector = vec![0.1, 0.2, 0.3, 0.4];
754 let payload = encode_insert_payload(42, -5000, &vector);
755 let (eid, ts, vec) = decode_insert_payload(&payload).unwrap();
756 assert_eq!(eid, 42);
757 assert_eq!(ts, -5000);
758 assert_eq!(vec, vector);
759 }
760
761 #[test]
762 fn delete_payload_roundtrip() {
763 let payload = encode_delete_payload(99, 12345);
764 let (eid, ts) = decode_delete_payload(&payload).unwrap();
765 assert_eq!(eid, 99);
766 assert_eq!(ts, 12345);
767 }
768
769 #[test]
770 fn insert_payload_d768_roundtrip() {
771 let vector: Vec<f32> = (0..768).map(|i| i as f32 * 0.001).collect();
772 let payload = encode_insert_payload(1, 1_000_000, &vector);
773 let (eid, ts, vec) = decode_insert_payload(&payload).unwrap();
774 assert_eq!(eid, 1);
775 assert_eq!(ts, 1_000_000);
776 assert_eq!(vec.len(), 768);
777 for (a, b) in vec.iter().zip(vector.iter()) {
778 assert!((a - b).abs() < f32::EPSILON);
779 }
780 }
781
782 #[test]
785 fn crc32_deterministic() {
786 let data = b"hello world";
787 let crc1 = crc32_hash(data);
788 let crc2 = crc32_hash(data);
789 assert_eq!(crc1, crc2);
790 }
791
792 #[test]
793 fn crc32_different_data() {
794 let crc1 = crc32_hash(b"hello");
795 let crc2 = crc32_hash(b"world");
796 assert_ne!(crc1, crc2);
797 }
798
799 #[test]
802 fn write_100k_and_recover() {
803 let dir = tmp_wal_dir();
804 let config = WalConfig {
805 max_segment_size: DEFAULT_MAX_SEGMENT_SIZE,
806 sync_on_write: false,
807 };
808
809 let n = 100_000u64;
810 let commit_at = 99_990u64;
811
812 {
813 let mut wal = Wal::open(dir.path(), config.clone()).unwrap();
814 for i in 0..n {
815 let payload = encode_insert_payload(i % 1000, (i * 10) as i64, &[i as f32; 4]);
816 wal.append(EntryType::Insert, 0, &payload).unwrap();
817 }
818 wal.commit(commit_at).unwrap();
819 }
820
821 {
822 let mut wal = Wal::open(dir.path(), config).unwrap();
823 let uncommitted = wal.recover().unwrap();
824 assert_eq!(uncommitted.len(), (n - commit_at) as usize);
825 for entry in &uncommitted {
827 assert!(entry.sequence > commit_at);
828 }
829 }
830 }
831
832 #[test]
835 fn reopen_and_continue() {
836 let dir = tmp_wal_dir();
837 let config = WalConfig::default();
838
839 {
840 let mut wal = Wal::open(dir.path(), config.clone()).unwrap();
841 let payload = encode_insert_payload(1, 100, &[1.0]);
842 wal.append(EntryType::Insert, 0, &payload).unwrap();
843 wal.commit(1).unwrap();
844 }
845
846 {
847 let mut wal = Wal::open(dir.path(), config).unwrap();
848 assert_eq!(wal.committed_sequence(), 1);
849 let payload = encode_insert_payload(2, 200, &[2.0]);
850 let seq = wal.append(EntryType::Insert, 0, &payload).unwrap();
851 assert_eq!(seq, 2);
852 wal.commit(2).unwrap();
853 }
854 }
855
856 #[test]
859 fn delete_entry_type() {
860 let dir = tmp_wal_dir();
861 let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
862
863 let payload = encode_delete_payload(42, 1000);
864 let seq = wal.append(EntryType::Delete, 0, &payload).unwrap();
865 assert_eq!(seq, 1);
866
867 let entries = wal.recover().unwrap();
868 assert_eq!(entries.len(), 1);
869 assert_eq!(entries[0].entry_type, EntryType::Delete);
870 }
871
872 #[test]
873 fn keyframe_flag() {
874 let dir = tmp_wal_dir();
875 let mut wal = Wal::open(dir.path(), WalConfig::default()).unwrap();
876
877 let payload = encode_insert_payload(1, 100, &[1.0]);
878 wal.append(EntryType::Insert, 0x01, &payload).unwrap(); let entries = wal.recover().unwrap();
881 assert_eq!(entries[0].flags, 0x01);
882 }
883}