highlandcows_isam/
store.rs

1/// DataStore — manages the append-only `.idb` data file.
2///
3/// ## Record format (per record, sequentially appended)
4/// ```text
5/// [status:   u8       ]   0 = alive, 1 = tombstone
6/// [key_len:  u32 LE   ]
7/// [val_len:  u32 LE   ]
8/// [key:      key_len bytes]   bincode-encoded key
9/// [val:      val_len bytes]   bincode-encoded value (0 bytes for tombstones)
10/// ```
11///
12/// Because records are only ever appended (never overwritten), the file
13/// grows monotonically.  Stale or deleted records are reclaimed by
14/// `IsamFile::compact()`.
15use std::fs::{File, OpenOptions};
16use std::io::{Read, Seek, SeekFrom, Write};
17use std::path::Path;
18
19use serde::de::DeserializeOwned;
20use serde::Serialize;
21
22use crate::error::IsamResult;
23
24pub const STATUS_ALIVE: u8 = 0;
25pub const STATUS_TOMBSTONE: u8 = 1;
26
27/// Returned by `DataStore::append` so the caller can record the record's
28/// location in the B-tree index.
29#[derive(Debug, Clone, Copy)]
30pub struct RecordRef {
31    /// Byte offset of the *start* of this record in the `.idb` file.
32    pub offset: u64,
33    /// Total byte length of the encoded record (header + key + value).
34    pub len: u32,
35}
36
37pub struct DataStore {
38    file: File,
39}
40
41impl DataStore {
42    /// Create a new, empty `.idb` file, truncating any existing one.
43    pub fn create(path: &Path) -> IsamResult<Self> {
44        let file = OpenOptions::new()
45            .read(true)
46            .write(true)
47            .create(true)
48            .truncate(true)
49            .open(path)?;
50        Ok(Self { file })
51    }
52
53    /// Open an existing `.idb` file for reading and appending.
54    pub fn open(path: &Path) -> IsamResult<Self> {
55        let file = OpenOptions::new().read(true).write(true).open(path)?;
56        Ok(Self { file })
57    }
58
59    /// Append a live record to the file.
60    ///
61    /// Returns a `RecordRef` containing the byte offset and length so the
62    /// B-tree index can locate this record later.
63    pub fn append<K, V>(&mut self, key: &K, value: &V) -> IsamResult<RecordRef>
64    where
65        K: Serialize,
66        V: Serialize,
67    {
68        // bincode::serialize converts a value to a Vec<u8>.
69        // The `?` operator propagates any error upward automatically.
70        let key_bytes = bincode::serialize(key)?;
71        let val_bytes = bincode::serialize(value)?;
72
73        // Seek to the end so we always append.
74        // `seek` returns the new absolute position — that's our offset.
75        let offset = self.file.seek(SeekFrom::End(0))?;
76
77        let key_len = key_bytes.len() as u32;
78        let val_len = val_bytes.len() as u32;
79
80        // Write the 9-byte header: status + key_len + val_len.
81        // `to_le_bytes()` converts an integer to little-endian byte array.
82        self.file.write_all(&[STATUS_ALIVE])?;
83        self.file.write_all(&key_len.to_le_bytes())?;
84        self.file.write_all(&val_len.to_le_bytes())?;
85        self.file.write_all(&key_bytes)?;
86        self.file.write_all(&val_bytes)?;
87
88        // Total record size = 1 (status) + 4 (key_len) + 4 (val_len) + key + val
89        let len = 1 + 4 + 4 + key_len + val_len;
90        Ok(RecordRef { offset, len })
91    }
92
93    /// Append a tombstone record for `key`.
94    ///
95    /// The value portion is zero bytes; the B-tree entry will be removed
96    /// separately, so tombstones in the data file are only needed for
97    /// compaction safety.
98    pub fn append_tombstone<K>(&mut self, key: &K) -> IsamResult<()>
99    where
100        K: Serialize,
101    {
102        let key_bytes = bincode::serialize(key)?;
103        self.file.seek(SeekFrom::End(0))?;
104
105        let key_len = key_bytes.len() as u32;
106        let val_len: u32 = 0;
107
108        self.file.write_all(&[STATUS_TOMBSTONE])?;
109        self.file.write_all(&key_len.to_le_bytes())?;
110        self.file.write_all(&val_len.to_le_bytes())?;
111        self.file.write_all(&key_bytes)?;
112        Ok(())
113    }
114
115    /// Read and deserialize the *value* portion of the record at `rec`.
116    ///
117    /// `&mut self` because `Seek` requires mutability on the file handle.
118    pub fn read_value<V>(&mut self, rec: RecordRef) -> IsamResult<V>
119    where
120        V: DeserializeOwned,
121    {
122        // Jump to the record start.
123        self.file.seek(SeekFrom::Start(rec.offset))?;
124
125        // Read header bytes.
126        let mut header = [0u8; 9];
127        self.file.read_exact(&mut header)?;
128
129        let _status = header[0];
130        let key_len = u32::from_le_bytes(header[1..5].try_into().unwrap()) as usize;
131        let val_len = u32::from_le_bytes(header[5..9].try_into().unwrap()) as usize;
132
133        // Skip over the key bytes.
134        self.file.seek(SeekFrom::Current(key_len as i64))?;
135
136        // Read the value bytes and deserialize.
137        let mut val_buf = vec![0u8; val_len];
138        self.file.read_exact(&mut val_buf)?;
139        let value: V = bincode::deserialize(&val_buf)?;
140        Ok(value)
141    }
142
143    /// Read the raw bytes of a record (for use during compaction).
144    ///
145    /// Returns `(status, key_bytes, val_bytes)`.
146    pub fn read_record_raw(&mut self, offset: u64) -> IsamResult<(u8, Vec<u8>, Vec<u8>)> {
147        self.file.seek(SeekFrom::Start(offset))?;
148
149        let mut header = [0u8; 9];
150        self.file.read_exact(&mut header)?;
151
152        let status = header[0];
153        let key_len = u32::from_le_bytes(header[1..5].try_into().unwrap()) as usize;
154        let val_len = u32::from_le_bytes(header[5..9].try_into().unwrap()) as usize;
155
156        let mut key_buf = vec![0u8; key_len];
157        self.file.read_exact(&mut key_buf)?;
158
159        let mut val_buf = vec![0u8; val_len];
160        self.file.read_exact(&mut val_buf)?;
161
162        Ok((status, key_buf, val_buf))
163    }
164
165    /// Write a raw pre-encoded record directly (used during compaction to
166    /// copy records without re-serializing).
167    pub fn write_raw_record(
168        &mut self,
169        status: u8,
170        key_bytes: &[u8],
171        val_bytes: &[u8],
172    ) -> IsamResult<RecordRef> {
173        let offset = self.file.seek(SeekFrom::End(0))?;
174
175        let key_len = key_bytes.len() as u32;
176        let val_len = val_bytes.len() as u32;
177
178        self.file.write_all(&[status])?;
179        self.file.write_all(&key_len.to_le_bytes())?;
180        self.file.write_all(&val_len.to_le_bytes())?;
181        self.file.write_all(key_bytes)?;
182        self.file.write_all(val_bytes)?;
183
184        let len = 1 + 4 + 4 + key_len + val_len;
185        Ok(RecordRef { offset, len })
186    }
187
188    /// Flush OS buffers to disk.
189    pub fn flush(&mut self) -> IsamResult<()> {
190        self.file.flush()?;
191        Ok(())
192    }
193
194    /// Flush OS buffers and call `fsync` to ensure durability.
195    pub fn fsync(&mut self) -> IsamResult<()> {
196        self.file.flush()?;
197        self.file.sync_all()?;
198        Ok(())
199    }
200}