Skip to main content

highlandcows_isam/index/
mod.rs

1/// B-tree index over the `.idx` file.
2///
3/// ## Page wire format
4///
5/// **Leaf page** (page_type = 0):
6/// ```text
7/// [page_type:    u8      ] = 0
8/// [num_entries:  u16 LE  ]
9/// [next_leaf_id: u32 LE  ]   0 = end of linked list
10/// for each entry:
11///   [key_len:    u16 LE  ]
12///   [key:        bytes   ]   bincode-encoded key
13///   [data_offset:u64 LE  ]   byte offset in .idb
14///   [data_len:   u32 LE  ]   byte length of record in .idb
15/// ```
16///
17/// **Internal page** (page_type = 1):
18/// ```text
19/// [page_type:  u8      ] = 1
20/// [num_keys:   u16 LE  ]
21/// [child_0:    u32 LE  ]
22/// for each key i:
23///   [key_len:  u16 LE  ]
24///   [key:      bytes   ]
25///   [child_(i+1): u32 LE]
26/// ```
27pub mod pager;
28
29use std::path::Path;
30
31use serde::de::DeserializeOwned;
32use serde::Serialize;
33
34use crate::error::{IsamError, IsamResult};
35use crate::store::RecordRef;
36use pager::{
37    Pager, PAGE_SIZE, PAGE_TYPE_INTERNAL, PAGE_TYPE_LEAF,
38};
39
40// ───────────────────────────────────────────────────────────────────────── //
41//  In-memory representations of decoded pages
42// ───────────────────────────────────────────────────────────────────────── //
43
44#[derive(Debug, Clone)]
45struct LeafEntry {
46    key_bytes: Vec<u8>,
47    data_offset: u64,
48    data_len: u32,
49}
50
51#[derive(Debug, Clone)]
52struct LeafPage {
53    next_leaf_id: u32,
54    entries: Vec<LeafEntry>,
55}
56
57#[derive(Debug, Clone)]
58struct InternalPage {
59    /// children[i] is the child to the left of keys[i].
60    /// children has one more element than keys.
61    children: Vec<u32>,
62    key_bytes: Vec<Vec<u8>>,
63}
64
65// ───────────────────────────────────────────────────────────────────────── //
66//  Encode / decode helpers
67// ───────────────────────────────────────────────────────────────────────── //
68
69fn encode_leaf(page: &LeafPage) -> Vec<u8> {
70    let mut buf = vec![0u8; PAGE_SIZE];
71    let mut pos = 0;
72
73    buf[pos] = PAGE_TYPE_LEAF;
74    pos += 1;
75
76    let n = page.entries.len() as u16;
77    buf[pos..pos + 2].copy_from_slice(&n.to_le_bytes());
78    pos += 2;
79
80    buf[pos..pos + 4].copy_from_slice(&page.next_leaf_id.to_le_bytes());
81    pos += 4;
82
83    for e in &page.entries {
84        let klen = e.key_bytes.len() as u16;
85        buf[pos..pos + 2].copy_from_slice(&klen.to_le_bytes());
86        pos += 2;
87        buf[pos..pos + e.key_bytes.len()].copy_from_slice(&e.key_bytes);
88        pos += e.key_bytes.len();
89        buf[pos..pos + 8].copy_from_slice(&e.data_offset.to_le_bytes());
90        pos += 8;
91        buf[pos..pos + 4].copy_from_slice(&e.data_len.to_le_bytes());
92        pos += 4;
93    }
94
95    buf
96}
97
98fn decode_leaf(data: &[u8]) -> IsamResult<LeafPage> {
99    if data[0] != PAGE_TYPE_LEAF {
100        return Err(IsamError::CorruptIndex("expected leaf page".into()));
101    }
102    let num_entries = u16::from_le_bytes(data[1..3].try_into().unwrap()) as usize;
103    let next_leaf_id = u32::from_le_bytes(data[3..7].try_into().unwrap());
104
105    let mut pos = 7;
106    let mut entries = Vec::with_capacity(num_entries);
107
108    for _ in 0..num_entries {
109        let klen = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
110        pos += 2;
111        let key_bytes = data[pos..pos + klen].to_vec();
112        pos += klen;
113        let data_offset = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
114        pos += 8;
115        let data_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
116        pos += 4;
117        entries.push(LeafEntry {
118            key_bytes,
119            data_offset,
120            data_len,
121        });
122    }
123
124    Ok(LeafPage {
125        next_leaf_id,
126        entries,
127    })
128}
129
130fn encode_internal(page: &InternalPage) -> Vec<u8> {
131    let mut buf = vec![0u8; PAGE_SIZE];
132    let mut pos = 0;
133
134    buf[pos] = PAGE_TYPE_INTERNAL;
135    pos += 1;
136
137    let n = page.key_bytes.len() as u16;
138    buf[pos..pos + 2].copy_from_slice(&n.to_le_bytes());
139    pos += 2;
140
141    // Write child_0 first.
142    buf[pos..pos + 4].copy_from_slice(&page.children[0].to_le_bytes());
143    pos += 4;
144
145    for (i, kb) in page.key_bytes.iter().enumerate() {
146        let klen = kb.len() as u16;
147        buf[pos..pos + 2].copy_from_slice(&klen.to_le_bytes());
148        pos += 2;
149        buf[pos..pos + kb.len()].copy_from_slice(kb);
150        pos += kb.len();
151        buf[pos..pos + 4].copy_from_slice(&page.children[i + 1].to_le_bytes());
152        pos += 4;
153    }
154
155    buf
156}
157
158fn decode_internal(data: &[u8]) -> IsamResult<InternalPage> {
159    if data[0] != PAGE_TYPE_INTERNAL {
160        return Err(IsamError::CorruptIndex("expected internal page".into()));
161    }
162    let num_keys = u16::from_le_bytes(data[1..3].try_into().unwrap()) as usize;
163    let mut pos = 3;
164
165    let child_0 = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
166    pos += 4;
167
168    let mut children = vec![child_0];
169    let mut key_bytes = Vec::with_capacity(num_keys);
170
171    for _ in 0..num_keys {
172        let klen = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
173        pos += 2;
174        let kb = data[pos..pos + klen].to_vec();
175        pos += klen;
176        let child = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
177        pos += 4;
178        key_bytes.push(kb);
179        children.push(child);
180    }
181
182    Ok(InternalPage { children, key_bytes })
183}
184
185// ───────────────────────────────────────────────────────────────────────── //
186//  Capacity helpers
187// ───────────────────────────────────────────────────────────────────────── //
188
189/// Compute how many bytes a leaf entry occupies on the page.
190fn leaf_entry_size(key_len: usize) -> usize {
191    2 + key_len + 8 + 4 // key_len_field + key + data_offset + data_len
192}
193
194/// Compute how many bytes an internal key+right-child pair occupies.
195fn internal_key_size(key_len: usize) -> usize {
196    2 + key_len + 4 // key_len_field + key + child_id
197}
198
199/// Fixed overhead bytes for an empty leaf page.
200const LEAF_HEADER: usize = 1 + 2 + 4; // type + num_entries + next_leaf_id
201/// Fixed overhead bytes for an empty internal page.
202const INTERNAL_HEADER: usize = 1 + 2 + 4; // type + num_keys + child_0
203
204// ───────────────────────────────────────────────────────────────────────── //
205//  BTree
206// ───────────────────────────────────────────────────────────────────────── //
207
208/// On-disk B-tree index.
209///
210/// `K` is the key type; it must be serializable, deserializable, ordered,
211/// and cheap to clone.  These bounds appear once on the `impl` block so we
212/// don't have to repeat them on every method.
213pub struct BTree<K> {
214    pager: Pager,
215    // `PhantomData` tells the compiler that `BTree` logically "owns" values
216    // of type `K`, even though the struct field only stores bytes on disk.
217    // Without this, Rust would complain that `K` is unused.
218    _phantom: std::marker::PhantomData<K>,
219}
220
221impl<K> BTree<K>
222where
223    K: Serialize + DeserializeOwned + Ord + Clone,
224{
225    pub fn create(path: &Path) -> IsamResult<Self> {
226        Ok(Self {
227            pager: Pager::create(path)?,
228            _phantom: std::marker::PhantomData,
229        })
230    }
231
232    pub fn open(path: &Path) -> IsamResult<Self> {
233        Ok(Self {
234            pager: Pager::open(path)?,
235            _phantom: std::marker::PhantomData,
236        })
237    }
238
239    // ------------------------------------------------------------------ //
240    //  Public API
241    // ------------------------------------------------------------------ //
242
243    /// Look up `key` and return its `RecordRef` if found.
244    pub fn search(&mut self, key: &K) -> IsamResult<Option<RecordRef>> {
245        let key_bytes = bincode::serialize(key)?;
246        let root_id = self.pager.meta.root_page_id;
247        self.search_page(root_id, &key_bytes)
248    }
249
250    /// Insert `key → rec` into the tree.  Returns `DuplicateKey` if the
251    /// key already exists.
252    pub fn insert(&mut self, key: &K, rec: RecordRef) -> IsamResult<()> {
253        let key_bytes = bincode::serialize(key)?;
254        // search_down returns the path from root to the target leaf.
255        let root_id = self.pager.meta.root_page_id;
256        let path = self.find_leaf_path(root_id, &key_bytes)?;
257        let leaf_id = *path.last().unwrap();
258
259        let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
260
261        // Check for duplicate.
262        if leaf.entries.iter().any(|e| e.key_bytes == key_bytes) {
263            return Err(IsamError::DuplicateKey);
264        }
265
266        // Insert in sorted position using K::Ord, not raw byte order.
267        let pos = leaf
268            .entries
269            .partition_point(|e| self.cmp_key_bytes(&e.key_bytes, &key_bytes) == std::cmp::Ordering::Less);
270        leaf.entries.insert(
271            pos,
272            LeafEntry {
273                key_bytes: key_bytes.clone(),
274                data_offset: rec.offset,
275                data_len: rec.len,
276            },
277        );
278
279        // Check if the page fits.
280        if self.leaf_fits(&leaf) {
281            self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
282        } else {
283            // Need to split — do a full insert+split pass from the root.
284            self.insert_with_splits(&key_bytes, rec, path)?;
285        }
286
287        self.pager.flush()
288    }
289
290    /// Delete `key` from the tree.  Returns `KeyNotFound` if absent.
291    pub fn delete(&mut self, key: &K) -> IsamResult<()> {
292        let key_bytes = bincode::serialize(key)?;
293        let root_id = self.pager.meta.root_page_id;
294        let path = self.find_leaf_path(root_id, &key_bytes)?;
295        let leaf_id = *path.last().unwrap();
296
297        let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
298
299        let pos = leaf
300            .entries
301            .iter()
302            .position(|e| e.key_bytes == key_bytes)
303            .ok_or(IsamError::KeyNotFound)?;
304        leaf.entries.remove(pos);
305
306        self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
307
308        // Under-fill handling: borrow from sibling or merge.
309        // We walk back up the path attempting to rebalance.
310        if path.len() >= 2 {
311            let parent_id = path[path.len() - 2];
312            self.rebalance_after_delete(&path, leaf_id, parent_id)?;
313        }
314
315        self.pager.flush()
316    }
317
318    /// Update the `RecordRef` stored for `key`.
319    pub fn update(&mut self, key: &K, rec: RecordRef) -> IsamResult<()> {
320        let key_bytes = bincode::serialize(key)?;
321        let root_id = self.pager.meta.root_page_id;
322        let path = self.find_leaf_path(root_id, &key_bytes)?;
323        let leaf_id = *path.last().unwrap();
324
325        let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
326
327        let entry = leaf
328            .entries
329            .iter_mut()
330            .find(|e| e.key_bytes == key_bytes)
331            .ok_or(IsamError::KeyNotFound)?;
332        entry.data_offset = rec.offset;
333        entry.data_len = rec.len;
334
335        self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
336        self.pager.flush()
337    }
338
339    /// Return the leaf page id where `key` would be found (or inserted).
340    ///
341    /// Unlike `search`, this never returns `None` — it always finds the
342    /// appropriate leaf even if the key is not present.  Used by range scans
343    /// to position the iterator at the correct starting leaf.
344    pub fn find_leaf_for_key(&mut self, key: &K) -> IsamResult<u32> {
345        let key_bytes = bincode::serialize(key)?;
346        let root_id = self.pager.meta.root_page_id;
347        let path = self.find_leaf_path(root_id, &key_bytes)?;
348        Ok(*path.last().unwrap())
349    }
350
351    /// Return the id of the first (leftmost) leaf page for sequential scan.
352    pub fn first_leaf_id(&mut self) -> IsamResult<u32> {
353        let root_id = self.pager.meta.root_page_id;
354        self.leftmost_leaf(root_id)
355    }
356
357    /// Return the smallest key in the tree, or `None` if the tree is empty.
358    pub fn min_key(&mut self) -> IsamResult<Option<K>> {
359        let root_id = self.pager.meta.root_page_id;
360        let leaf_id = self.leftmost_leaf(root_id)?;
361        let leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
362        match leaf.entries.first() {
363            None => Ok(None),
364            Some(e) => Ok(Some(bincode::deserialize(&e.key_bytes)?)),
365        }
366    }
367
368    /// Return the largest key in the tree, or `None` if the tree is empty.
369    pub fn max_key(&mut self) -> IsamResult<Option<K>> {
370        let root_id = self.pager.meta.root_page_id;
371        let leaf_id = self.rightmost_leaf(root_id)?;
372        let leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
373        match leaf.entries.last() {
374            None => Ok(None),
375            Some(e) => Ok(Some(bincode::deserialize(&e.key_bytes)?)),
376        }
377    }
378
379    /// Read leaf page `id` and return its entries and `next_leaf_id`.
380    pub fn read_leaf(&mut self, id: u32) -> IsamResult<(Vec<(K, RecordRef)>, u32)> {
381        let data = self.pager.read_page(id)?;
382        let leaf = decode_leaf(&data)?;
383        let mut out = Vec::with_capacity(leaf.entries.len());
384        for e in leaf.entries {
385            let key: K = bincode::deserialize(&e.key_bytes)?;
386            out.push((
387                key,
388                RecordRef {
389                    offset: e.data_offset,
390                    len: e.data_len,
391                },
392            ));
393        }
394        Ok((out, leaf.next_leaf_id))
395    }
396
397    pub fn key_schema_version(&self) -> u32 {
398        self.pager.meta.key_schema_version
399    }
400
401    pub fn val_schema_version(&self) -> u32 {
402        self.pager.meta.val_schema_version
403    }
404
405    pub fn set_schema_versions(&mut self, key_v: u32, val_v: u32) -> IsamResult<()> {
406        self.pager.meta.key_schema_version = key_v;
407        self.pager.meta.val_schema_version = val_v;
408        self.pager.flush_meta()
409    }
410
411    pub fn index_schema_version(&self) -> u32 {
412        self.pager.meta.index_schema_version
413    }
414
415    pub fn set_index_schema_version(&mut self, v: u32) -> IsamResult<()> {
416        self.pager.meta.index_schema_version = v;
417        self.pager.flush_meta()
418    }
419
420    pub fn flush(&mut self) -> IsamResult<()> {
421        self.pager.flush()
422    }
423
424    pub fn fsync(&mut self) -> IsamResult<()> {
425        self.pager.fsync()
426    }
427
428    // ------------------------------------------------------------------ //
429    //  Private helpers — search
430    // ------------------------------------------------------------------ //
431
432    fn search_page(&mut self, page_id: u32, key_bytes: &[u8]) -> IsamResult<Option<RecordRef>> {
433        let data = self.pager.read_page(page_id)?;
434        match data[0] {
435            PAGE_TYPE_LEAF => {
436                let leaf = decode_leaf(&data)?;
437                for e in &leaf.entries {
438                    if e.key_bytes == key_bytes {
439                        return Ok(Some(RecordRef {
440                            offset: e.data_offset,
441                            len: e.data_len,
442                        }));
443                    }
444                }
445                Ok(None)
446            }
447            PAGE_TYPE_INTERNAL => {
448                let internal = decode_internal(&data)?;
449                let child_id = self.find_child(&internal, key_bytes);
450                self.search_page(child_id, key_bytes)
451            }
452            t => Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
453        }
454    }
455
456    /// Compare two serialized keys using K's `Ord` (not raw byte order).
457    ///
458    /// Bincode uses little-endian integers, so raw byte comparison diverges
459    /// from numeric order for values ≥ 256 in multi-byte types.  By
460    /// deserializing and using `K::cmp` we guarantee the B-tree is ordered
461    /// the same way the caller's `Ord` impl defines.
462    fn cmp_key_bytes(&self, a: &[u8], b: &[u8]) -> std::cmp::Ordering {
463        let ka: K = bincode::deserialize(a).expect("corrupt key bytes in index");
464        let kb: K = bincode::deserialize(b).expect("corrupt key bytes in index");
465        ka.cmp(&kb)
466    }
467
468    /// Binary-search the internal page's keys to pick the right child.
469    fn find_child(&self, page: &InternalPage, key_bytes: &[u8]) -> u32 {
470        // children[i] holds all keys < keys[i].
471        // We want the largest i such that keys[i-1] <= key_bytes,
472        // or child[0] if key_bytes < keys[0].
473        // Use K::Ord (via cmp_key_bytes) so the routing matches the insertion
474        // order — raw byte order diverges from K::Ord for multi-byte integers.
475        let pos = page
476            .key_bytes
477            .partition_point(|kb| !matches!(self.cmp_key_bytes(kb, key_bytes), std::cmp::Ordering::Greater));
478        page.children[pos]
479    }
480
481    /// Walk from `page_id` down to a leaf, collecting page ids along the way.
482    fn find_leaf_path(&mut self, page_id: u32, key_bytes: &[u8]) -> IsamResult<Vec<u32>> {
483        let mut path = vec![page_id];
484        let mut current = page_id;
485        loop {
486            let data = self.pager.read_page(current)?;
487            match data[0] {
488                PAGE_TYPE_LEAF => break,
489                PAGE_TYPE_INTERNAL => {
490                    let internal = decode_internal(&data)?;
491                    let child = self.find_child(&internal, key_bytes);
492                    path.push(child);
493                    current = child;
494                }
495                t => return Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
496            }
497        }
498        Ok(path)
499    }
500
501    fn leftmost_leaf(&mut self, page_id: u32) -> IsamResult<u32> {
502        let data = self.pager.read_page(page_id)?;
503        match data[0] {
504            PAGE_TYPE_LEAF => Ok(page_id),
505            PAGE_TYPE_INTERNAL => {
506                let internal = decode_internal(&data)?;
507                self.leftmost_leaf(internal.children[0])
508            }
509            t => Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
510        }
511    }
512
513    fn rightmost_leaf(&mut self, page_id: u32) -> IsamResult<u32> {
514        let data = self.pager.read_page(page_id)?;
515        match data[0] {
516            PAGE_TYPE_LEAF => Ok(page_id),
517            PAGE_TYPE_INTERNAL => {
518                let internal = decode_internal(&data)?;
519                self.rightmost_leaf(*internal.children.last().unwrap())
520            }
521            t => Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
522        }
523    }
524
525    // ------------------------------------------------------------------ //
526    //  Private helpers — capacity
527    // ------------------------------------------------------------------ //
528
529    fn leaf_byte_usage(leaf: &LeafPage) -> usize {
530        LEAF_HEADER
531            + leaf
532                .entries
533                .iter()
534                .map(|e| leaf_entry_size(e.key_bytes.len()))
535                .sum::<usize>()
536    }
537
538    fn leaf_fits(&self, leaf: &LeafPage) -> bool {
539        Self::leaf_byte_usage(leaf) <= PAGE_SIZE
540    }
541
542    fn internal_byte_usage(page: &InternalPage) -> usize {
543        INTERNAL_HEADER
544            + page
545                .key_bytes
546                .iter()
547                .map(|kb| internal_key_size(kb.len()))
548                .sum::<usize>()
549    }
550
551    fn internal_fits(page: &InternalPage) -> bool {
552        Self::internal_byte_usage(page) <= PAGE_SIZE
553    }
554
555    // ------------------------------------------------------------------ //
556    //  Private helpers — insert with splits
557    // ------------------------------------------------------------------ //
558
559    /// Full insert-and-split pass. Called when the simple insert overflows.
560    fn insert_with_splits(
561        &mut self,
562        key_bytes: &[u8],
563        rec: RecordRef,
564        path: Vec<u32>,
565    ) -> IsamResult<()> {
566        // Re-insert into the leaf (undo the speculative insert above by
567        // re-reading fresh state from disk and doing it properly).
568        let leaf_id = *path.last().unwrap();
569        let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
570
571        // The entry may already be there from the speculative path — remove it
572        // if so, then re-insert cleanly.
573        leaf.entries.retain(|e| e.key_bytes != key_bytes);
574        let pos = leaf
575            .entries
576            .partition_point(|e| self.cmp_key_bytes(&e.key_bytes, key_bytes) == std::cmp::Ordering::Less);
577        leaf.entries.insert(
578            pos,
579            LeafEntry {
580                key_bytes: key_bytes.to_vec(),
581                data_offset: rec.offset,
582                data_len: rec.len,
583            },
584        );
585
586        // Split the leaf in half.
587        let mid = leaf.entries.len() / 2;
588        let right_entries = leaf.entries.split_off(mid);
589        let promote_key = right_entries[0].key_bytes.clone();
590
591        // Allocate a new right-leaf page.
592        let right_id = self.pager.alloc_page()?;
593
594        // Fix the linked list: new_right.next = old_leaf.next
595        let right_leaf = LeafPage {
596            next_leaf_id: leaf.next_leaf_id,
597            entries: right_entries,
598        };
599        leaf.next_leaf_id = right_id;
600
601        self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
602        self.pager.write_page(right_id, &encode_leaf(&right_leaf))?;
603
604        // Propagate the promoted key upward.
605        self.insert_into_parent(&path, path.len() - 1, promote_key, right_id)?;
606
607        Ok(())
608    }
609
610    /// Recursively insert a promoted key+right_child into the parent.
611    fn insert_into_parent(
612        &mut self,
613        path: &[u32],
614        child_path_idx: usize, // index of the child that was split
615        promote_key: Vec<u8>,
616        right_child_id: u32,
617    ) -> IsamResult<()> {
618        if child_path_idx == 0 {
619            // The root was split — create a new root.
620            let left_child_id = path[0];
621            let new_root = InternalPage {
622                children: vec![left_child_id, right_child_id],
623                key_bytes: vec![promote_key],
624            };
625            let new_root_id = self.pager.alloc_page()?;
626            self.pager
627                .write_page(new_root_id, &encode_internal(&new_root))?;
628            self.pager.meta.root_page_id = new_root_id;
629            self.pager.flush_meta()?;
630            return Ok(());
631        }
632
633        let parent_id = path[child_path_idx - 1];
634        let mut parent = decode_internal(&self.pager.read_page(parent_id)?)?;
635
636        // Find where in parent's children the left child sits.
637        let left_child_id = path[child_path_idx];
638        let child_pos = parent
639            .children
640            .iter()
641            .position(|&c| c == left_child_id)
642            .ok_or_else(|| IsamError::CorruptIndex("child not found in parent".into()))?;
643
644        parent.key_bytes.insert(child_pos, promote_key);
645        parent.children.insert(child_pos + 1, right_child_id);
646
647        if Self::internal_fits(&parent) {
648            self.pager
649                .write_page(parent_id, &encode_internal(&parent))?;
650        } else {
651            // Split the internal page.
652            let mid = parent.key_bytes.len() / 2;
653            let promote_up = parent.key_bytes[mid].clone();
654
655            let right_keys = parent.key_bytes.split_off(mid + 1);
656            parent.key_bytes.truncate(mid); // remove the promoted key itself
657
658            let right_children = parent.children.split_off(mid + 1);
659
660            let right_internal = InternalPage {
661                children: right_children,
662                key_bytes: right_keys,
663            };
664            let right_id = self.pager.alloc_page()?;
665            self.pager
666                .write_page(parent_id, &encode_internal(&parent))?;
667            self.pager
668                .write_page(right_id, &encode_internal(&right_internal))?;
669
670            self.insert_into_parent(path, child_path_idx - 1, promote_up, right_id)?;
671        }
672
673        Ok(())
674    }
675
676    // ------------------------------------------------------------------ //
677    //  Private helpers — delete rebalancing
678    // ------------------------------------------------------------------ //
679
680    fn rebalance_after_delete(
681        &mut self,
682        _path: &[u32],
683        leaf_id: u32,
684        parent_id: u32,
685    ) -> IsamResult<()> {
686        let leaf_data = self.pager.read_page(leaf_id)?;
687        let leaf = decode_leaf(&leaf_data)?;
688
689        // Minimum occupancy: a leaf should hold at least some entries.
690        // We use a simple heuristic: if the leaf is less than 1/4 full and
691        // there is a sibling we can steal from or merge with, do so.
692        // For simplicity we only merge (no borrow), which is correct but
693        // may cause tree height to drop faster than strictly necessary.
694        let min_bytes = PAGE_SIZE / 4;
695        if Self::leaf_byte_usage(&leaf) >= min_bytes {
696            return Ok(()); // still sufficiently full
697        }
698
699        let parent_data = self.pager.read_page(parent_id)?;
700        let mut parent = decode_internal(&parent_data)?;
701
702        // Find the index of leaf_id in parent.children.
703        let idx = match parent.children.iter().position(|&c| c == leaf_id) {
704            Some(i) => i,
705            None => return Ok(()), // shouldn't happen, bail safely
706        };
707
708        // Try to merge with the right sibling first, then left.
709        if idx + 1 < parent.children.len() {
710            let right_id = parent.children[idx + 1];
711            let right_data = self.pager.read_page(right_id)?;
712            let mut right = decode_leaf(&right_data)?;
713
714            // Merge: absorb right into leaf.
715            let mut merged = leaf.clone();
716            merged.entries.append(&mut right.entries);
717            merged.next_leaf_id = right.next_leaf_id;
718
719            if Self::leaf_byte_usage(&merged) <= PAGE_SIZE {
720                self.pager.write_page(leaf_id, &encode_leaf(&merged))?;
721                // Remove the separator key and right child from parent.
722                parent.key_bytes.remove(idx);
723                parent.children.remove(idx + 1);
724                self.pager
725                    .write_page(parent_id, &encode_internal(&parent))?;
726
727                // If parent is now the root and empty, shrink tree height.
728                if parent_id == self.pager.meta.root_page_id && parent.children.len() == 1 {
729                    self.pager.meta.root_page_id = parent.children[0];
730                    self.pager.flush_meta()?;
731                }
732                return Ok(());
733            }
734        }
735
736        if idx > 0 {
737            let left_id = parent.children[idx - 1];
738            let left_data = self.pager.read_page(left_id)?;
739            let left = decode_leaf(&left_data)?;
740
741            let mut merged = left.clone();
742            merged.entries.append(&mut leaf.entries.clone());
743            merged.next_leaf_id = leaf.next_leaf_id;
744
745            if Self::leaf_byte_usage(&merged) <= PAGE_SIZE {
746                self.pager.write_page(left_id, &encode_leaf(&merged))?;
747                parent.key_bytes.remove(idx - 1);
748                parent.children.remove(idx);
749                self.pager
750                    .write_page(parent_id, &encode_internal(&parent))?;
751
752                if parent_id == self.pager.meta.root_page_id && parent.children.len() == 1 {
753                    self.pager.meta.root_page_id = parent.children[0];
754                    self.pager.flush_meta()?;
755                }
756            }
757        }
758
759        Ok(())
760    }
761}