highlandcows_isam/index/
mod.rs

1/// B-tree index over the `.idx` file.
2///
3/// ## Page wire format
4///
5/// **Leaf page** (page_type = 0):
6/// ```text
7/// [page_type:    u8      ] = 0
8/// [num_entries:  u16 LE  ]
9/// [next_leaf_id: u32 LE  ]   0 = end of linked list
10/// for each entry:
11///   [key_len:    u16 LE  ]
12///   [key:        bytes   ]   bincode-encoded key
13///   [data_offset:u64 LE  ]   byte offset in .idb
14///   [data_len:   u32 LE  ]   byte length of record in .idb
15/// ```
16///
17/// **Internal page** (page_type = 1):
18/// ```text
19/// [page_type:  u8      ] = 1
20/// [num_keys:   u16 LE  ]
21/// [child_0:    u32 LE  ]
22/// for each key i:
23///   [key_len:  u16 LE  ]
24///   [key:      bytes   ]
25///   [child_(i+1): u32 LE]
26/// ```
27pub mod pager;
28
29use std::path::Path;
30
31use serde::de::DeserializeOwned;
32use serde::Serialize;
33
34use crate::error::{IsamError, IsamResult};
35use crate::store::RecordRef;
36use pager::{
37    Pager, PAGE_SIZE, PAGE_TYPE_INTERNAL, PAGE_TYPE_LEAF,
38};
39
40// ───────────────────────────────────────────────────────────────────────── //
41//  In-memory representations of decoded pages
42// ───────────────────────────────────────────────────────────────────────── //
43
44#[derive(Debug, Clone)]
45struct LeafEntry {
46    key_bytes: Vec<u8>,
47    data_offset: u64,
48    data_len: u32,
49}
50
51#[derive(Debug, Clone)]
52struct LeafPage {
53    next_leaf_id: u32,
54    entries: Vec<LeafEntry>,
55}
56
57#[derive(Debug, Clone)]
58struct InternalPage {
59    /// children[i] is the child to the left of keys[i].
60    /// children has one more element than keys.
61    children: Vec<u32>,
62    key_bytes: Vec<Vec<u8>>,
63}
64
65// ───────────────────────────────────────────────────────────────────────── //
66//  Encode / decode helpers
67// ───────────────────────────────────────────────────────────────────────── //
68
69fn encode_leaf(page: &LeafPage) -> Vec<u8> {
70    let mut buf = vec![0u8; PAGE_SIZE];
71    let mut pos = 0;
72
73    buf[pos] = PAGE_TYPE_LEAF;
74    pos += 1;
75
76    let n = page.entries.len() as u16;
77    buf[pos..pos + 2].copy_from_slice(&n.to_le_bytes());
78    pos += 2;
79
80    buf[pos..pos + 4].copy_from_slice(&page.next_leaf_id.to_le_bytes());
81    pos += 4;
82
83    for e in &page.entries {
84        let klen = e.key_bytes.len() as u16;
85        buf[pos..pos + 2].copy_from_slice(&klen.to_le_bytes());
86        pos += 2;
87        buf[pos..pos + e.key_bytes.len()].copy_from_slice(&e.key_bytes);
88        pos += e.key_bytes.len();
89        buf[pos..pos + 8].copy_from_slice(&e.data_offset.to_le_bytes());
90        pos += 8;
91        buf[pos..pos + 4].copy_from_slice(&e.data_len.to_le_bytes());
92        pos += 4;
93    }
94
95    buf
96}
97
98fn decode_leaf(data: &[u8]) -> IsamResult<LeafPage> {
99    if data[0] != PAGE_TYPE_LEAF {
100        return Err(IsamError::CorruptIndex("expected leaf page".into()));
101    }
102    let num_entries = u16::from_le_bytes(data[1..3].try_into().unwrap()) as usize;
103    let next_leaf_id = u32::from_le_bytes(data[3..7].try_into().unwrap());
104
105    let mut pos = 7;
106    let mut entries = Vec::with_capacity(num_entries);
107
108    for _ in 0..num_entries {
109        let klen = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
110        pos += 2;
111        let key_bytes = data[pos..pos + klen].to_vec();
112        pos += klen;
113        let data_offset = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
114        pos += 8;
115        let data_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
116        pos += 4;
117        entries.push(LeafEntry {
118            key_bytes,
119            data_offset,
120            data_len,
121        });
122    }
123
124    Ok(LeafPage {
125        next_leaf_id,
126        entries,
127    })
128}
129
130fn encode_internal(page: &InternalPage) -> Vec<u8> {
131    let mut buf = vec![0u8; PAGE_SIZE];
132    let mut pos = 0;
133
134    buf[pos] = PAGE_TYPE_INTERNAL;
135    pos += 1;
136
137    let n = page.key_bytes.len() as u16;
138    buf[pos..pos + 2].copy_from_slice(&n.to_le_bytes());
139    pos += 2;
140
141    // Write child_0 first.
142    buf[pos..pos + 4].copy_from_slice(&page.children[0].to_le_bytes());
143    pos += 4;
144
145    for (i, kb) in page.key_bytes.iter().enumerate() {
146        let klen = kb.len() as u16;
147        buf[pos..pos + 2].copy_from_slice(&klen.to_le_bytes());
148        pos += 2;
149        buf[pos..pos + kb.len()].copy_from_slice(kb);
150        pos += kb.len();
151        buf[pos..pos + 4].copy_from_slice(&page.children[i + 1].to_le_bytes());
152        pos += 4;
153    }
154
155    buf
156}
157
158fn decode_internal(data: &[u8]) -> IsamResult<InternalPage> {
159    if data[0] != PAGE_TYPE_INTERNAL {
160        return Err(IsamError::CorruptIndex("expected internal page".into()));
161    }
162    let num_keys = u16::from_le_bytes(data[1..3].try_into().unwrap()) as usize;
163    let mut pos = 3;
164
165    let child_0 = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
166    pos += 4;
167
168    let mut children = vec![child_0];
169    let mut key_bytes = Vec::with_capacity(num_keys);
170
171    for _ in 0..num_keys {
172        let klen = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
173        pos += 2;
174        let kb = data[pos..pos + klen].to_vec();
175        pos += klen;
176        let child = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
177        pos += 4;
178        key_bytes.push(kb);
179        children.push(child);
180    }
181
182    Ok(InternalPage { children, key_bytes })
183}
184
185// ───────────────────────────────────────────────────────────────────────── //
186//  Capacity helpers
187// ───────────────────────────────────────────────────────────────────────── //
188
189/// Compute how many bytes a leaf entry occupies on the page.
190fn leaf_entry_size(key_len: usize) -> usize {
191    2 + key_len + 8 + 4 // key_len_field + key + data_offset + data_len
192}
193
194/// Compute how many bytes an internal key+right-child pair occupies.
195fn internal_key_size(key_len: usize) -> usize {
196    2 + key_len + 4 // key_len_field + key + child_id
197}
198
199/// Fixed overhead bytes for an empty leaf page.
200const LEAF_HEADER: usize = 1 + 2 + 4; // type + num_entries + next_leaf_id
201/// Fixed overhead bytes for an empty internal page.
202const INTERNAL_HEADER: usize = 1 + 2 + 4; // type + num_keys + child_0
203
204// ───────────────────────────────────────────────────────────────────────── //
205//  BTree
206// ───────────────────────────────────────────────────────────────────────── //
207
208/// On-disk B-tree index.
209///
210/// `K` is the key type; it must be serializable, deserializable, ordered,
211/// and cheap to clone.  These bounds appear once on the `impl` block so we
212/// don't have to repeat them on every method.
213pub struct BTree<K> {
214    pager: Pager,
215    // `PhantomData` tells the compiler that `BTree` logically "owns" values
216    // of type `K`, even though the struct field only stores bytes on disk.
217    // Without this, Rust would complain that `K` is unused.
218    _phantom: std::marker::PhantomData<K>,
219}
220
221impl<K> BTree<K>
222where
223    K: Serialize + DeserializeOwned + Ord + Clone,
224{
225    pub fn create(path: &Path) -> IsamResult<Self> {
226        Ok(Self {
227            pager: Pager::create(path)?,
228            _phantom: std::marker::PhantomData,
229        })
230    }
231
232    pub fn open(path: &Path) -> IsamResult<Self> {
233        Ok(Self {
234            pager: Pager::open(path)?,
235            _phantom: std::marker::PhantomData,
236        })
237    }
238
239    // ------------------------------------------------------------------ //
240    //  Public API
241    // ------------------------------------------------------------------ //
242
243    /// Look up `key` and return its `RecordRef` if found.
244    pub fn search(&mut self, key: &K) -> IsamResult<Option<RecordRef>> {
245        let key_bytes = bincode::serialize(key)?;
246        let root_id = self.pager.meta.root_page_id;
247        self.search_page(root_id, &key_bytes)
248    }
249
250    /// Insert `key → rec` into the tree.  Returns `DuplicateKey` if the
251    /// key already exists.
252    pub fn insert(&mut self, key: &K, rec: RecordRef) -> IsamResult<()> {
253        let key_bytes = bincode::serialize(key)?;
254        // search_down returns the path from root to the target leaf.
255        let root_id = self.pager.meta.root_page_id;
256        let path = self.find_leaf_path(root_id, &key_bytes)?;
257        let leaf_id = *path.last().unwrap();
258
259        let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
260
261        // Check for duplicate.
262        if leaf.entries.iter().any(|e| e.key_bytes == key_bytes) {
263            return Err(IsamError::DuplicateKey);
264        }
265
266        // Insert in sorted position using K::Ord, not raw byte order.
267        let pos = leaf
268            .entries
269            .partition_point(|e| self.cmp_key_bytes(&e.key_bytes, &key_bytes) == std::cmp::Ordering::Less);
270        leaf.entries.insert(
271            pos,
272            LeafEntry {
273                key_bytes: key_bytes.clone(),
274                data_offset: rec.offset,
275                data_len: rec.len,
276            },
277        );
278
279        // Check if the page fits.
280        if self.leaf_fits(&leaf) {
281            self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
282        } else {
283            // Need to split — do a full insert+split pass from the root.
284            self.insert_with_splits(&key_bytes, rec, path)?;
285        }
286
287        self.pager.flush()
288    }
289
290    /// Delete `key` from the tree.  Returns `KeyNotFound` if absent.
291    pub fn delete(&mut self, key: &K) -> IsamResult<()> {
292        let key_bytes = bincode::serialize(key)?;
293        let root_id = self.pager.meta.root_page_id;
294        let path = self.find_leaf_path(root_id, &key_bytes)?;
295        let leaf_id = *path.last().unwrap();
296
297        let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
298
299        let pos = leaf
300            .entries
301            .iter()
302            .position(|e| e.key_bytes == key_bytes)
303            .ok_or(IsamError::KeyNotFound)?;
304        leaf.entries.remove(pos);
305
306        self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
307
308        // Under-fill handling: borrow from sibling or merge.
309        // We walk back up the path attempting to rebalance.
310        if path.len() >= 2 {
311            let parent_id = path[path.len() - 2];
312            self.rebalance_after_delete(&path, leaf_id, parent_id)?;
313        }
314
315        self.pager.flush()
316    }
317
318    /// Update the `RecordRef` stored for `key`.
319    pub fn update(&mut self, key: &K, rec: RecordRef) -> IsamResult<()> {
320        let key_bytes = bincode::serialize(key)?;
321        let root_id = self.pager.meta.root_page_id;
322        let path = self.find_leaf_path(root_id, &key_bytes)?;
323        let leaf_id = *path.last().unwrap();
324
325        let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
326
327        let entry = leaf
328            .entries
329            .iter_mut()
330            .find(|e| e.key_bytes == key_bytes)
331            .ok_or(IsamError::KeyNotFound)?;
332        entry.data_offset = rec.offset;
333        entry.data_len = rec.len;
334
335        self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
336        self.pager.flush()
337    }
338
339    /// Return the leaf page id where `key` would be found (or inserted).
340    ///
341    /// Unlike `search`, this never returns `None` — it always finds the
342    /// appropriate leaf even if the key is not present.  Used by range scans
343    /// to position the iterator at the correct starting leaf.
344    pub fn find_leaf_for_key(&mut self, key: &K) -> IsamResult<u32> {
345        let key_bytes = bincode::serialize(key)?;
346        let root_id = self.pager.meta.root_page_id;
347        let path = self.find_leaf_path(root_id, &key_bytes)?;
348        Ok(*path.last().unwrap())
349    }
350
351    /// Return the id of the first (leftmost) leaf page for sequential scan.
352    pub fn first_leaf_id(&mut self) -> IsamResult<u32> {
353        let root_id = self.pager.meta.root_page_id;
354        self.leftmost_leaf(root_id)
355    }
356
357    /// Return the smallest key in the tree, or `None` if the tree is empty.
358    pub fn min_key(&mut self) -> IsamResult<Option<K>> {
359        let root_id = self.pager.meta.root_page_id;
360        let leaf_id = self.leftmost_leaf(root_id)?;
361        let leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
362        match leaf.entries.first() {
363            None => Ok(None),
364            Some(e) => Ok(Some(bincode::deserialize(&e.key_bytes)?)),
365        }
366    }
367
368    /// Return the largest key in the tree, or `None` if the tree is empty.
369    pub fn max_key(&mut self) -> IsamResult<Option<K>> {
370        let root_id = self.pager.meta.root_page_id;
371        let leaf_id = self.rightmost_leaf(root_id)?;
372        let leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
373        match leaf.entries.last() {
374            None => Ok(None),
375            Some(e) => Ok(Some(bincode::deserialize(&e.key_bytes)?)),
376        }
377    }
378
379    /// Read leaf page `id` and return its entries and `next_leaf_id`.
380    pub fn read_leaf(&mut self, id: u32) -> IsamResult<(Vec<(K, RecordRef)>, u32)> {
381        let data = self.pager.read_page(id)?;
382        let leaf = decode_leaf(&data)?;
383        let mut out = Vec::with_capacity(leaf.entries.len());
384        for e in leaf.entries {
385            let key: K = bincode::deserialize(&e.key_bytes)?;
386            out.push((
387                key,
388                RecordRef {
389                    offset: e.data_offset,
390                    len: e.data_len,
391                },
392            ));
393        }
394        Ok((out, leaf.next_leaf_id))
395    }
396
397    pub fn key_schema_version(&self) -> u32 {
398        self.pager.meta.key_schema_version
399    }
400
401    pub fn val_schema_version(&self) -> u32 {
402        self.pager.meta.val_schema_version
403    }
404
405    pub fn set_schema_versions(&mut self, key_v: u32, val_v: u32) -> IsamResult<()> {
406        self.pager.meta.key_schema_version = key_v;
407        self.pager.meta.val_schema_version = val_v;
408        self.pager.flush_meta()
409    }
410
411    pub fn flush(&mut self) -> IsamResult<()> {
412        self.pager.flush()
413    }
414
415    pub fn fsync(&mut self) -> IsamResult<()> {
416        self.pager.fsync()
417    }
418
419    // ------------------------------------------------------------------ //
420    //  Private helpers — search
421    // ------------------------------------------------------------------ //
422
423    fn search_page(&mut self, page_id: u32, key_bytes: &[u8]) -> IsamResult<Option<RecordRef>> {
424        let data = self.pager.read_page(page_id)?;
425        match data[0] {
426            PAGE_TYPE_LEAF => {
427                let leaf = decode_leaf(&data)?;
428                for e in &leaf.entries {
429                    if e.key_bytes == key_bytes {
430                        return Ok(Some(RecordRef {
431                            offset: e.data_offset,
432                            len: e.data_len,
433                        }));
434                    }
435                }
436                Ok(None)
437            }
438            PAGE_TYPE_INTERNAL => {
439                let internal = decode_internal(&data)?;
440                let child_id = self.find_child(&internal, key_bytes);
441                self.search_page(child_id, key_bytes)
442            }
443            t => Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
444        }
445    }
446
447    /// Compare two serialized keys using K's `Ord` (not raw byte order).
448    ///
449    /// Bincode uses little-endian integers, so raw byte comparison diverges
450    /// from numeric order for values ≥ 256 in multi-byte types.  By
451    /// deserializing and using `K::cmp` we guarantee the B-tree is ordered
452    /// the same way the caller's `Ord` impl defines.
453    fn cmp_key_bytes(&self, a: &[u8], b: &[u8]) -> std::cmp::Ordering {
454        let ka: K = bincode::deserialize(a).expect("corrupt key bytes in index");
455        let kb: K = bincode::deserialize(b).expect("corrupt key bytes in index");
456        ka.cmp(&kb)
457    }
458
459    /// Binary-search the internal page's keys to pick the right child.
460    fn find_child(&self, page: &InternalPage, key_bytes: &[u8]) -> u32 {
461        // children[i] holds all keys < keys[i].
462        // We want the largest i such that keys[i-1] <= key_bytes,
463        // or child[0] if key_bytes < keys[0].
464        // Use K::Ord (via cmp_key_bytes) so the routing matches the insertion
465        // order — raw byte order diverges from K::Ord for multi-byte integers.
466        let pos = page
467            .key_bytes
468            .partition_point(|kb| !matches!(self.cmp_key_bytes(kb, key_bytes), std::cmp::Ordering::Greater));
469        page.children[pos]
470    }
471
472    /// Walk from `page_id` down to a leaf, collecting page ids along the way.
473    fn find_leaf_path(&mut self, page_id: u32, key_bytes: &[u8]) -> IsamResult<Vec<u32>> {
474        let mut path = vec![page_id];
475        let mut current = page_id;
476        loop {
477            let data = self.pager.read_page(current)?;
478            match data[0] {
479                PAGE_TYPE_LEAF => break,
480                PAGE_TYPE_INTERNAL => {
481                    let internal = decode_internal(&data)?;
482                    let child = self.find_child(&internal, key_bytes);
483                    path.push(child);
484                    current = child;
485                }
486                t => return Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
487            }
488        }
489        Ok(path)
490    }
491
492    fn leftmost_leaf(&mut self, page_id: u32) -> IsamResult<u32> {
493        let data = self.pager.read_page(page_id)?;
494        match data[0] {
495            PAGE_TYPE_LEAF => Ok(page_id),
496            PAGE_TYPE_INTERNAL => {
497                let internal = decode_internal(&data)?;
498                self.leftmost_leaf(internal.children[0])
499            }
500            t => Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
501        }
502    }
503
504    fn rightmost_leaf(&mut self, page_id: u32) -> IsamResult<u32> {
505        let data = self.pager.read_page(page_id)?;
506        match data[0] {
507            PAGE_TYPE_LEAF => Ok(page_id),
508            PAGE_TYPE_INTERNAL => {
509                let internal = decode_internal(&data)?;
510                self.rightmost_leaf(*internal.children.last().unwrap())
511            }
512            t => Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
513        }
514    }
515
516    // ------------------------------------------------------------------ //
517    //  Private helpers — capacity
518    // ------------------------------------------------------------------ //
519
520    fn leaf_byte_usage(leaf: &LeafPage) -> usize {
521        LEAF_HEADER
522            + leaf
523                .entries
524                .iter()
525                .map(|e| leaf_entry_size(e.key_bytes.len()))
526                .sum::<usize>()
527    }
528
529    fn leaf_fits(&self, leaf: &LeafPage) -> bool {
530        Self::leaf_byte_usage(leaf) <= PAGE_SIZE
531    }
532
533    fn internal_byte_usage(page: &InternalPage) -> usize {
534        INTERNAL_HEADER
535            + page
536                .key_bytes
537                .iter()
538                .map(|kb| internal_key_size(kb.len()))
539                .sum::<usize>()
540    }
541
542    fn internal_fits(page: &InternalPage) -> bool {
543        Self::internal_byte_usage(page) <= PAGE_SIZE
544    }
545
546    // ------------------------------------------------------------------ //
547    //  Private helpers — insert with splits
548    // ------------------------------------------------------------------ //
549
550    /// Full insert-and-split pass. Called when the simple insert overflows.
551    fn insert_with_splits(
552        &mut self,
553        key_bytes: &[u8],
554        rec: RecordRef,
555        path: Vec<u32>,
556    ) -> IsamResult<()> {
557        // Re-insert into the leaf (undo the speculative insert above by
558        // re-reading fresh state from disk and doing it properly).
559        let leaf_id = *path.last().unwrap();
560        let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
561
562        // The entry may already be there from the speculative path — remove it
563        // if so, then re-insert cleanly.
564        leaf.entries.retain(|e| e.key_bytes != key_bytes);
565        let pos = leaf
566            .entries
567            .partition_point(|e| self.cmp_key_bytes(&e.key_bytes, key_bytes) == std::cmp::Ordering::Less);
568        leaf.entries.insert(
569            pos,
570            LeafEntry {
571                key_bytes: key_bytes.to_vec(),
572                data_offset: rec.offset,
573                data_len: rec.len,
574            },
575        );
576
577        // Split the leaf in half.
578        let mid = leaf.entries.len() / 2;
579        let right_entries = leaf.entries.split_off(mid);
580        let promote_key = right_entries[0].key_bytes.clone();
581
582        // Allocate a new right-leaf page.
583        let right_id = self.pager.alloc_page()?;
584
585        // Fix the linked list: new_right.next = old_leaf.next
586        let right_leaf = LeafPage {
587            next_leaf_id: leaf.next_leaf_id,
588            entries: right_entries,
589        };
590        leaf.next_leaf_id = right_id;
591
592        self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
593        self.pager.write_page(right_id, &encode_leaf(&right_leaf))?;
594
595        // Propagate the promoted key upward.
596        self.insert_into_parent(&path, path.len() - 1, promote_key, right_id)?;
597
598        Ok(())
599    }
600
601    /// Recursively insert a promoted key+right_child into the parent.
602    fn insert_into_parent(
603        &mut self,
604        path: &[u32],
605        child_path_idx: usize, // index of the child that was split
606        promote_key: Vec<u8>,
607        right_child_id: u32,
608    ) -> IsamResult<()> {
609        if child_path_idx == 0 {
610            // The root was split — create a new root.
611            let left_child_id = path[0];
612            let new_root = InternalPage {
613                children: vec![left_child_id, right_child_id],
614                key_bytes: vec![promote_key],
615            };
616            let new_root_id = self.pager.alloc_page()?;
617            self.pager
618                .write_page(new_root_id, &encode_internal(&new_root))?;
619            self.pager.meta.root_page_id = new_root_id;
620            self.pager.flush_meta()?;
621            return Ok(());
622        }
623
624        let parent_id = path[child_path_idx - 1];
625        let mut parent = decode_internal(&self.pager.read_page(parent_id)?)?;
626
627        // Find where in parent's children the left child sits.
628        let left_child_id = path[child_path_idx];
629        let child_pos = parent
630            .children
631            .iter()
632            .position(|&c| c == left_child_id)
633            .ok_or_else(|| IsamError::CorruptIndex("child not found in parent".into()))?;
634
635        parent.key_bytes.insert(child_pos, promote_key);
636        parent.children.insert(child_pos + 1, right_child_id);
637
638        if Self::internal_fits(&parent) {
639            self.pager
640                .write_page(parent_id, &encode_internal(&parent))?;
641        } else {
642            // Split the internal page.
643            let mid = parent.key_bytes.len() / 2;
644            let promote_up = parent.key_bytes[mid].clone();
645
646            let right_keys = parent.key_bytes.split_off(mid + 1);
647            parent.key_bytes.truncate(mid); // remove the promoted key itself
648
649            let right_children = parent.children.split_off(mid + 1);
650
651            let right_internal = InternalPage {
652                children: right_children,
653                key_bytes: right_keys,
654            };
655            let right_id = self.pager.alloc_page()?;
656            self.pager
657                .write_page(parent_id, &encode_internal(&parent))?;
658            self.pager
659                .write_page(right_id, &encode_internal(&right_internal))?;
660
661            self.insert_into_parent(path, child_path_idx - 1, promote_up, right_id)?;
662        }
663
664        Ok(())
665    }
666
667    // ------------------------------------------------------------------ //
668    //  Private helpers — delete rebalancing
669    // ------------------------------------------------------------------ //
670
671    fn rebalance_after_delete(
672        &mut self,
673        _path: &[u32],
674        leaf_id: u32,
675        parent_id: u32,
676    ) -> IsamResult<()> {
677        let leaf_data = self.pager.read_page(leaf_id)?;
678        let leaf = decode_leaf(&leaf_data)?;
679
680        // Minimum occupancy: a leaf should hold at least some entries.
681        // We use a simple heuristic: if the leaf is less than 1/4 full and
682        // there is a sibling we can steal from or merge with, do so.
683        // For simplicity we only merge (no borrow), which is correct but
684        // may cause tree height to drop faster than strictly necessary.
685        let min_bytes = PAGE_SIZE / 4;
686        if Self::leaf_byte_usage(&leaf) >= min_bytes {
687            return Ok(()); // still sufficiently full
688        }
689
690        let parent_data = self.pager.read_page(parent_id)?;
691        let mut parent = decode_internal(&parent_data)?;
692
693        // Find the index of leaf_id in parent.children.
694        let idx = match parent.children.iter().position(|&c| c == leaf_id) {
695            Some(i) => i,
696            None => return Ok(()), // shouldn't happen, bail safely
697        };
698
699        // Try to merge with the right sibling first, then left.
700        if idx + 1 < parent.children.len() {
701            let right_id = parent.children[idx + 1];
702            let right_data = self.pager.read_page(right_id)?;
703            let mut right = decode_leaf(&right_data)?;
704
705            // Merge: absorb right into leaf.
706            let mut merged = leaf.clone();
707            merged.entries.append(&mut right.entries);
708            merged.next_leaf_id = right.next_leaf_id;
709
710            if Self::leaf_byte_usage(&merged) <= PAGE_SIZE {
711                self.pager.write_page(leaf_id, &encode_leaf(&merged))?;
712                // Remove the separator key and right child from parent.
713                parent.key_bytes.remove(idx);
714                parent.children.remove(idx + 1);
715                self.pager
716                    .write_page(parent_id, &encode_internal(&parent))?;
717
718                // If parent is now the root and empty, shrink tree height.
719                if parent_id == self.pager.meta.root_page_id && parent.children.len() == 1 {
720                    self.pager.meta.root_page_id = parent.children[0];
721                    self.pager.flush_meta()?;
722                }
723                return Ok(());
724            }
725        }
726
727        if idx > 0 {
728            let left_id = parent.children[idx - 1];
729            let left_data = self.pager.read_page(left_id)?;
730            let left = decode_leaf(&left_data)?;
731
732            let mut merged = left.clone();
733            merged.entries.append(&mut leaf.entries.clone());
734            merged.next_leaf_id = leaf.next_leaf_id;
735
736            if Self::leaf_byte_usage(&merged) <= PAGE_SIZE {
737                self.pager.write_page(left_id, &encode_leaf(&merged))?;
738                parent.key_bytes.remove(idx - 1);
739                parent.children.remove(idx);
740                self.pager
741                    .write_page(parent_id, &encode_internal(&parent))?;
742
743                if parent_id == self.pager.meta.root_page_id && parent.children.len() == 1 {
744                    self.pager.meta.root_page_id = parent.children[0];
745                    self.pager.flush_meta()?;
746                }
747            }
748        }
749
750        Ok(())
751    }
752}