1pub mod pager;
28
29use std::path::Path;
30
31use serde::de::DeserializeOwned;
32use serde::Serialize;
33
34use crate::error::{IsamError, IsamResult};
35use crate::store::RecordRef;
36use pager::{
37 Pager, PAGE_SIZE, PAGE_TYPE_INTERNAL, PAGE_TYPE_LEAF,
38};
39
40#[derive(Debug, Clone)]
45struct LeafEntry {
46 key_bytes: Vec<u8>,
47 data_offset: u64,
48 data_len: u32,
49}
50
51#[derive(Debug, Clone)]
52struct LeafPage {
53 next_leaf_id: u32,
54 entries: Vec<LeafEntry>,
55}
56
57#[derive(Debug, Clone)]
58struct InternalPage {
59 children: Vec<u32>,
62 key_bytes: Vec<Vec<u8>>,
63}
64
65fn encode_leaf(page: &LeafPage) -> Vec<u8> {
70 let mut buf = vec![0u8; PAGE_SIZE];
71 let mut pos = 0;
72
73 buf[pos] = PAGE_TYPE_LEAF;
74 pos += 1;
75
76 let n = page.entries.len() as u16;
77 buf[pos..pos + 2].copy_from_slice(&n.to_le_bytes());
78 pos += 2;
79
80 buf[pos..pos + 4].copy_from_slice(&page.next_leaf_id.to_le_bytes());
81 pos += 4;
82
83 for e in &page.entries {
84 let klen = e.key_bytes.len() as u16;
85 buf[pos..pos + 2].copy_from_slice(&klen.to_le_bytes());
86 pos += 2;
87 buf[pos..pos + e.key_bytes.len()].copy_from_slice(&e.key_bytes);
88 pos += e.key_bytes.len();
89 buf[pos..pos + 8].copy_from_slice(&e.data_offset.to_le_bytes());
90 pos += 8;
91 buf[pos..pos + 4].copy_from_slice(&e.data_len.to_le_bytes());
92 pos += 4;
93 }
94
95 buf
96}
97
98fn decode_leaf(data: &[u8]) -> IsamResult<LeafPage> {
99 if data[0] != PAGE_TYPE_LEAF {
100 return Err(IsamError::CorruptIndex("expected leaf page".into()));
101 }
102 let num_entries = u16::from_le_bytes(data[1..3].try_into().unwrap()) as usize;
103 let next_leaf_id = u32::from_le_bytes(data[3..7].try_into().unwrap());
104
105 let mut pos = 7;
106 let mut entries = Vec::with_capacity(num_entries);
107
108 for _ in 0..num_entries {
109 let klen = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
110 pos += 2;
111 let key_bytes = data[pos..pos + klen].to_vec();
112 pos += klen;
113 let data_offset = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
114 pos += 8;
115 let data_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
116 pos += 4;
117 entries.push(LeafEntry {
118 key_bytes,
119 data_offset,
120 data_len,
121 });
122 }
123
124 Ok(LeafPage {
125 next_leaf_id,
126 entries,
127 })
128}
129
130fn encode_internal(page: &InternalPage) -> Vec<u8> {
131 let mut buf = vec![0u8; PAGE_SIZE];
132 let mut pos = 0;
133
134 buf[pos] = PAGE_TYPE_INTERNAL;
135 pos += 1;
136
137 let n = page.key_bytes.len() as u16;
138 buf[pos..pos + 2].copy_from_slice(&n.to_le_bytes());
139 pos += 2;
140
141 buf[pos..pos + 4].copy_from_slice(&page.children[0].to_le_bytes());
143 pos += 4;
144
145 for (i, kb) in page.key_bytes.iter().enumerate() {
146 let klen = kb.len() as u16;
147 buf[pos..pos + 2].copy_from_slice(&klen.to_le_bytes());
148 pos += 2;
149 buf[pos..pos + kb.len()].copy_from_slice(kb);
150 pos += kb.len();
151 buf[pos..pos + 4].copy_from_slice(&page.children[i + 1].to_le_bytes());
152 pos += 4;
153 }
154
155 buf
156}
157
158fn decode_internal(data: &[u8]) -> IsamResult<InternalPage> {
159 if data[0] != PAGE_TYPE_INTERNAL {
160 return Err(IsamError::CorruptIndex("expected internal page".into()));
161 }
162 let num_keys = u16::from_le_bytes(data[1..3].try_into().unwrap()) as usize;
163 let mut pos = 3;
164
165 let child_0 = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
166 pos += 4;
167
168 let mut children = vec![child_0];
169 let mut key_bytes = Vec::with_capacity(num_keys);
170
171 for _ in 0..num_keys {
172 let klen = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
173 pos += 2;
174 let kb = data[pos..pos + klen].to_vec();
175 pos += klen;
176 let child = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
177 pos += 4;
178 key_bytes.push(kb);
179 children.push(child);
180 }
181
182 Ok(InternalPage { children, key_bytes })
183}
184
185fn leaf_entry_size(key_len: usize) -> usize {
191 2 + key_len + 8 + 4 }
193
194fn internal_key_size(key_len: usize) -> usize {
196 2 + key_len + 4 }
198
199const LEAF_HEADER: usize = 1 + 2 + 4; const INTERNAL_HEADER: usize = 1 + 2 + 4; pub struct BTree<K> {
214 pager: Pager,
215 _phantom: std::marker::PhantomData<K>,
219}
220
221impl<K> BTree<K>
222where
223 K: Serialize + DeserializeOwned + Ord + Clone,
224{
225 pub fn create(path: &Path) -> IsamResult<Self> {
226 Ok(Self {
227 pager: Pager::create(path)?,
228 _phantom: std::marker::PhantomData,
229 })
230 }
231
232 pub fn open(path: &Path) -> IsamResult<Self> {
233 Ok(Self {
234 pager: Pager::open(path)?,
235 _phantom: std::marker::PhantomData,
236 })
237 }
238
239 pub fn search(&mut self, key: &K) -> IsamResult<Option<RecordRef>> {
245 let key_bytes = bincode::serialize(key)?;
246 let root_id = self.pager.meta.root_page_id;
247 self.search_page(root_id, &key_bytes)
248 }
249
250 pub fn insert(&mut self, key: &K, rec: RecordRef) -> IsamResult<()> {
253 let key_bytes = bincode::serialize(key)?;
254 let root_id = self.pager.meta.root_page_id;
256 let path = self.find_leaf_path(root_id, &key_bytes)?;
257 let leaf_id = *path.last().unwrap();
258
259 let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
260
261 if leaf.entries.iter().any(|e| e.key_bytes == key_bytes) {
263 return Err(IsamError::DuplicateKey);
264 }
265
266 let pos = leaf
268 .entries
269 .partition_point(|e| self.cmp_key_bytes(&e.key_bytes, &key_bytes) == std::cmp::Ordering::Less);
270 leaf.entries.insert(
271 pos,
272 LeafEntry {
273 key_bytes: key_bytes.clone(),
274 data_offset: rec.offset,
275 data_len: rec.len,
276 },
277 );
278
279 if self.leaf_fits(&leaf) {
281 self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
282 } else {
283 self.insert_with_splits(&key_bytes, rec, path)?;
285 }
286
287 self.pager.flush()
288 }
289
290 pub fn delete(&mut self, key: &K) -> IsamResult<()> {
292 let key_bytes = bincode::serialize(key)?;
293 let root_id = self.pager.meta.root_page_id;
294 let path = self.find_leaf_path(root_id, &key_bytes)?;
295 let leaf_id = *path.last().unwrap();
296
297 let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
298
299 let pos = leaf
300 .entries
301 .iter()
302 .position(|e| e.key_bytes == key_bytes)
303 .ok_or(IsamError::KeyNotFound)?;
304 leaf.entries.remove(pos);
305
306 self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
307
308 if path.len() >= 2 {
311 let parent_id = path[path.len() - 2];
312 self.rebalance_after_delete(&path, leaf_id, parent_id)?;
313 }
314
315 self.pager.flush()
316 }
317
318 pub fn update(&mut self, key: &K, rec: RecordRef) -> IsamResult<()> {
320 let key_bytes = bincode::serialize(key)?;
321 let root_id = self.pager.meta.root_page_id;
322 let path = self.find_leaf_path(root_id, &key_bytes)?;
323 let leaf_id = *path.last().unwrap();
324
325 let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
326
327 let entry = leaf
328 .entries
329 .iter_mut()
330 .find(|e| e.key_bytes == key_bytes)
331 .ok_or(IsamError::KeyNotFound)?;
332 entry.data_offset = rec.offset;
333 entry.data_len = rec.len;
334
335 self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
336 self.pager.flush()
337 }
338
339 pub fn find_leaf_for_key(&mut self, key: &K) -> IsamResult<u32> {
345 let key_bytes = bincode::serialize(key)?;
346 let root_id = self.pager.meta.root_page_id;
347 let path = self.find_leaf_path(root_id, &key_bytes)?;
348 Ok(*path.last().unwrap())
349 }
350
351 pub fn first_leaf_id(&mut self) -> IsamResult<u32> {
353 let root_id = self.pager.meta.root_page_id;
354 self.leftmost_leaf(root_id)
355 }
356
357 pub fn min_key(&mut self) -> IsamResult<Option<K>> {
359 let root_id = self.pager.meta.root_page_id;
360 let leaf_id = self.leftmost_leaf(root_id)?;
361 let leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
362 match leaf.entries.first() {
363 None => Ok(None),
364 Some(e) => Ok(Some(bincode::deserialize(&e.key_bytes)?)),
365 }
366 }
367
368 pub fn max_key(&mut self) -> IsamResult<Option<K>> {
370 let root_id = self.pager.meta.root_page_id;
371 let leaf_id = self.rightmost_leaf(root_id)?;
372 let leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
373 match leaf.entries.last() {
374 None => Ok(None),
375 Some(e) => Ok(Some(bincode::deserialize(&e.key_bytes)?)),
376 }
377 }
378
379 pub fn read_leaf(&mut self, id: u32) -> IsamResult<(Vec<(K, RecordRef)>, u32)> {
381 let data = self.pager.read_page(id)?;
382 let leaf = decode_leaf(&data)?;
383 let mut out = Vec::with_capacity(leaf.entries.len());
384 for e in leaf.entries {
385 let key: K = bincode::deserialize(&e.key_bytes)?;
386 out.push((
387 key,
388 RecordRef {
389 offset: e.data_offset,
390 len: e.data_len,
391 },
392 ));
393 }
394 Ok((out, leaf.next_leaf_id))
395 }
396
397 pub fn key_schema_version(&self) -> u32 {
398 self.pager.meta.key_schema_version
399 }
400
401 pub fn val_schema_version(&self) -> u32 {
402 self.pager.meta.val_schema_version
403 }
404
405 pub fn set_schema_versions(&mut self, key_v: u32, val_v: u32) -> IsamResult<()> {
406 self.pager.meta.key_schema_version = key_v;
407 self.pager.meta.val_schema_version = val_v;
408 self.pager.flush_meta()
409 }
410
411 pub fn flush(&mut self) -> IsamResult<()> {
412 self.pager.flush()
413 }
414
415 pub fn fsync(&mut self) -> IsamResult<()> {
416 self.pager.fsync()
417 }
418
419 fn search_page(&mut self, page_id: u32, key_bytes: &[u8]) -> IsamResult<Option<RecordRef>> {
424 let data = self.pager.read_page(page_id)?;
425 match data[0] {
426 PAGE_TYPE_LEAF => {
427 let leaf = decode_leaf(&data)?;
428 for e in &leaf.entries {
429 if e.key_bytes == key_bytes {
430 return Ok(Some(RecordRef {
431 offset: e.data_offset,
432 len: e.data_len,
433 }));
434 }
435 }
436 Ok(None)
437 }
438 PAGE_TYPE_INTERNAL => {
439 let internal = decode_internal(&data)?;
440 let child_id = self.find_child(&internal, key_bytes);
441 self.search_page(child_id, key_bytes)
442 }
443 t => Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
444 }
445 }
446
447 fn cmp_key_bytes(&self, a: &[u8], b: &[u8]) -> std::cmp::Ordering {
454 let ka: K = bincode::deserialize(a).expect("corrupt key bytes in index");
455 let kb: K = bincode::deserialize(b).expect("corrupt key bytes in index");
456 ka.cmp(&kb)
457 }
458
459 fn find_child(&self, page: &InternalPage, key_bytes: &[u8]) -> u32 {
461 let pos = page
467 .key_bytes
468 .partition_point(|kb| !matches!(self.cmp_key_bytes(kb, key_bytes), std::cmp::Ordering::Greater));
469 page.children[pos]
470 }
471
472 fn find_leaf_path(&mut self, page_id: u32, key_bytes: &[u8]) -> IsamResult<Vec<u32>> {
474 let mut path = vec![page_id];
475 let mut current = page_id;
476 loop {
477 let data = self.pager.read_page(current)?;
478 match data[0] {
479 PAGE_TYPE_LEAF => break,
480 PAGE_TYPE_INTERNAL => {
481 let internal = decode_internal(&data)?;
482 let child = self.find_child(&internal, key_bytes);
483 path.push(child);
484 current = child;
485 }
486 t => return Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
487 }
488 }
489 Ok(path)
490 }
491
492 fn leftmost_leaf(&mut self, page_id: u32) -> IsamResult<u32> {
493 let data = self.pager.read_page(page_id)?;
494 match data[0] {
495 PAGE_TYPE_LEAF => Ok(page_id),
496 PAGE_TYPE_INTERNAL => {
497 let internal = decode_internal(&data)?;
498 self.leftmost_leaf(internal.children[0])
499 }
500 t => Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
501 }
502 }
503
504 fn rightmost_leaf(&mut self, page_id: u32) -> IsamResult<u32> {
505 let data = self.pager.read_page(page_id)?;
506 match data[0] {
507 PAGE_TYPE_LEAF => Ok(page_id),
508 PAGE_TYPE_INTERNAL => {
509 let internal = decode_internal(&data)?;
510 self.rightmost_leaf(*internal.children.last().unwrap())
511 }
512 t => Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
513 }
514 }
515
516 fn leaf_byte_usage(leaf: &LeafPage) -> usize {
521 LEAF_HEADER
522 + leaf
523 .entries
524 .iter()
525 .map(|e| leaf_entry_size(e.key_bytes.len()))
526 .sum::<usize>()
527 }
528
529 fn leaf_fits(&self, leaf: &LeafPage) -> bool {
530 Self::leaf_byte_usage(leaf) <= PAGE_SIZE
531 }
532
533 fn internal_byte_usage(page: &InternalPage) -> usize {
534 INTERNAL_HEADER
535 + page
536 .key_bytes
537 .iter()
538 .map(|kb| internal_key_size(kb.len()))
539 .sum::<usize>()
540 }
541
542 fn internal_fits(page: &InternalPage) -> bool {
543 Self::internal_byte_usage(page) <= PAGE_SIZE
544 }
545
546 fn insert_with_splits(
552 &mut self,
553 key_bytes: &[u8],
554 rec: RecordRef,
555 path: Vec<u32>,
556 ) -> IsamResult<()> {
557 let leaf_id = *path.last().unwrap();
560 let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
561
562 leaf.entries.retain(|e| e.key_bytes != key_bytes);
565 let pos = leaf
566 .entries
567 .partition_point(|e| self.cmp_key_bytes(&e.key_bytes, key_bytes) == std::cmp::Ordering::Less);
568 leaf.entries.insert(
569 pos,
570 LeafEntry {
571 key_bytes: key_bytes.to_vec(),
572 data_offset: rec.offset,
573 data_len: rec.len,
574 },
575 );
576
577 let mid = leaf.entries.len() / 2;
579 let right_entries = leaf.entries.split_off(mid);
580 let promote_key = right_entries[0].key_bytes.clone();
581
582 let right_id = self.pager.alloc_page()?;
584
585 let right_leaf = LeafPage {
587 next_leaf_id: leaf.next_leaf_id,
588 entries: right_entries,
589 };
590 leaf.next_leaf_id = right_id;
591
592 self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
593 self.pager.write_page(right_id, &encode_leaf(&right_leaf))?;
594
595 self.insert_into_parent(&path, path.len() - 1, promote_key, right_id)?;
597
598 Ok(())
599 }
600
601 fn insert_into_parent(
603 &mut self,
604 path: &[u32],
605 child_path_idx: usize, promote_key: Vec<u8>,
607 right_child_id: u32,
608 ) -> IsamResult<()> {
609 if child_path_idx == 0 {
610 let left_child_id = path[0];
612 let new_root = InternalPage {
613 children: vec![left_child_id, right_child_id],
614 key_bytes: vec![promote_key],
615 };
616 let new_root_id = self.pager.alloc_page()?;
617 self.pager
618 .write_page(new_root_id, &encode_internal(&new_root))?;
619 self.pager.meta.root_page_id = new_root_id;
620 self.pager.flush_meta()?;
621 return Ok(());
622 }
623
624 let parent_id = path[child_path_idx - 1];
625 let mut parent = decode_internal(&self.pager.read_page(parent_id)?)?;
626
627 let left_child_id = path[child_path_idx];
629 let child_pos = parent
630 .children
631 .iter()
632 .position(|&c| c == left_child_id)
633 .ok_or_else(|| IsamError::CorruptIndex("child not found in parent".into()))?;
634
635 parent.key_bytes.insert(child_pos, promote_key);
636 parent.children.insert(child_pos + 1, right_child_id);
637
638 if Self::internal_fits(&parent) {
639 self.pager
640 .write_page(parent_id, &encode_internal(&parent))?;
641 } else {
642 let mid = parent.key_bytes.len() / 2;
644 let promote_up = parent.key_bytes[mid].clone();
645
646 let right_keys = parent.key_bytes.split_off(mid + 1);
647 parent.key_bytes.truncate(mid); let right_children = parent.children.split_off(mid + 1);
650
651 let right_internal = InternalPage {
652 children: right_children,
653 key_bytes: right_keys,
654 };
655 let right_id = self.pager.alloc_page()?;
656 self.pager
657 .write_page(parent_id, &encode_internal(&parent))?;
658 self.pager
659 .write_page(right_id, &encode_internal(&right_internal))?;
660
661 self.insert_into_parent(path, child_path_idx - 1, promote_up, right_id)?;
662 }
663
664 Ok(())
665 }
666
667 fn rebalance_after_delete(
672 &mut self,
673 _path: &[u32],
674 leaf_id: u32,
675 parent_id: u32,
676 ) -> IsamResult<()> {
677 let leaf_data = self.pager.read_page(leaf_id)?;
678 let leaf = decode_leaf(&leaf_data)?;
679
680 let min_bytes = PAGE_SIZE / 4;
686 if Self::leaf_byte_usage(&leaf) >= min_bytes {
687 return Ok(()); }
689
690 let parent_data = self.pager.read_page(parent_id)?;
691 let mut parent = decode_internal(&parent_data)?;
692
693 let idx = match parent.children.iter().position(|&c| c == leaf_id) {
695 Some(i) => i,
696 None => return Ok(()), };
698
699 if idx + 1 < parent.children.len() {
701 let right_id = parent.children[idx + 1];
702 let right_data = self.pager.read_page(right_id)?;
703 let mut right = decode_leaf(&right_data)?;
704
705 let mut merged = leaf.clone();
707 merged.entries.append(&mut right.entries);
708 merged.next_leaf_id = right.next_leaf_id;
709
710 if Self::leaf_byte_usage(&merged) <= PAGE_SIZE {
711 self.pager.write_page(leaf_id, &encode_leaf(&merged))?;
712 parent.key_bytes.remove(idx);
714 parent.children.remove(idx + 1);
715 self.pager
716 .write_page(parent_id, &encode_internal(&parent))?;
717
718 if parent_id == self.pager.meta.root_page_id && parent.children.len() == 1 {
720 self.pager.meta.root_page_id = parent.children[0];
721 self.pager.flush_meta()?;
722 }
723 return Ok(());
724 }
725 }
726
727 if idx > 0 {
728 let left_id = parent.children[idx - 1];
729 let left_data = self.pager.read_page(left_id)?;
730 let left = decode_leaf(&left_data)?;
731
732 let mut merged = left.clone();
733 merged.entries.append(&mut leaf.entries.clone());
734 merged.next_leaf_id = leaf.next_leaf_id;
735
736 if Self::leaf_byte_usage(&merged) <= PAGE_SIZE {
737 self.pager.write_page(left_id, &encode_leaf(&merged))?;
738 parent.key_bytes.remove(idx - 1);
739 parent.children.remove(idx);
740 self.pager
741 .write_page(parent_id, &encode_internal(&parent))?;
742
743 if parent_id == self.pager.meta.root_page_id && parent.children.len() == 1 {
744 self.pager.meta.root_page_id = parent.children[0];
745 self.pager.flush_meta()?;
746 }
747 }
748 }
749
750 Ok(())
751 }
752}