1pub mod pager;
28
29use std::path::Path;
30
31use serde::de::DeserializeOwned;
32use serde::Serialize;
33
34use crate::error::{IsamError, IsamResult};
35use crate::store::RecordRef;
36use pager::{
37 Pager, PAGE_SIZE, PAGE_TYPE_INTERNAL, PAGE_TYPE_LEAF,
38};
39
40#[derive(Debug, Clone)]
45struct LeafEntry {
46 key_bytes: Vec<u8>,
47 data_offset: u64,
48 data_len: u32,
49}
50
51#[derive(Debug, Clone)]
52struct LeafPage {
53 next_leaf_id: u32,
54 entries: Vec<LeafEntry>,
55}
56
57#[derive(Debug, Clone)]
58struct InternalPage {
59 children: Vec<u32>,
62 key_bytes: Vec<Vec<u8>>,
63}
64
65fn encode_leaf(page: &LeafPage) -> Vec<u8> {
70 let mut buf = vec![0u8; PAGE_SIZE];
71 let mut pos = 0;
72
73 buf[pos] = PAGE_TYPE_LEAF;
74 pos += 1;
75
76 let n = page.entries.len() as u16;
77 buf[pos..pos + 2].copy_from_slice(&n.to_le_bytes());
78 pos += 2;
79
80 buf[pos..pos + 4].copy_from_slice(&page.next_leaf_id.to_le_bytes());
81 pos += 4;
82
83 for e in &page.entries {
84 let klen = e.key_bytes.len() as u16;
85 buf[pos..pos + 2].copy_from_slice(&klen.to_le_bytes());
86 pos += 2;
87 buf[pos..pos + e.key_bytes.len()].copy_from_slice(&e.key_bytes);
88 pos += e.key_bytes.len();
89 buf[pos..pos + 8].copy_from_slice(&e.data_offset.to_le_bytes());
90 pos += 8;
91 buf[pos..pos + 4].copy_from_slice(&e.data_len.to_le_bytes());
92 pos += 4;
93 }
94
95 buf
96}
97
98fn decode_leaf(data: &[u8]) -> IsamResult<LeafPage> {
99 if data[0] != PAGE_TYPE_LEAF {
100 return Err(IsamError::CorruptIndex("expected leaf page".into()));
101 }
102 let num_entries = u16::from_le_bytes(data[1..3].try_into().unwrap()) as usize;
103 let next_leaf_id = u32::from_le_bytes(data[3..7].try_into().unwrap());
104
105 let mut pos = 7;
106 let mut entries = Vec::with_capacity(num_entries);
107
108 for _ in 0..num_entries {
109 let klen = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
110 pos += 2;
111 let key_bytes = data[pos..pos + klen].to_vec();
112 pos += klen;
113 let data_offset = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
114 pos += 8;
115 let data_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
116 pos += 4;
117 entries.push(LeafEntry {
118 key_bytes,
119 data_offset,
120 data_len,
121 });
122 }
123
124 Ok(LeafPage {
125 next_leaf_id,
126 entries,
127 })
128}
129
130fn encode_internal(page: &InternalPage) -> Vec<u8> {
131 let mut buf = vec![0u8; PAGE_SIZE];
132 let mut pos = 0;
133
134 buf[pos] = PAGE_TYPE_INTERNAL;
135 pos += 1;
136
137 let n = page.key_bytes.len() as u16;
138 buf[pos..pos + 2].copy_from_slice(&n.to_le_bytes());
139 pos += 2;
140
141 buf[pos..pos + 4].copy_from_slice(&page.children[0].to_le_bytes());
143 pos += 4;
144
145 for (i, kb) in page.key_bytes.iter().enumerate() {
146 let klen = kb.len() as u16;
147 buf[pos..pos + 2].copy_from_slice(&klen.to_le_bytes());
148 pos += 2;
149 buf[pos..pos + kb.len()].copy_from_slice(kb);
150 pos += kb.len();
151 buf[pos..pos + 4].copy_from_slice(&page.children[i + 1].to_le_bytes());
152 pos += 4;
153 }
154
155 buf
156}
157
158fn decode_internal(data: &[u8]) -> IsamResult<InternalPage> {
159 if data[0] != PAGE_TYPE_INTERNAL {
160 return Err(IsamError::CorruptIndex("expected internal page".into()));
161 }
162 let num_keys = u16::from_le_bytes(data[1..3].try_into().unwrap()) as usize;
163 let mut pos = 3;
164
165 let child_0 = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
166 pos += 4;
167
168 let mut children = vec![child_0];
169 let mut key_bytes = Vec::with_capacity(num_keys);
170
171 for _ in 0..num_keys {
172 let klen = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
173 pos += 2;
174 let kb = data[pos..pos + klen].to_vec();
175 pos += klen;
176 let child = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap());
177 pos += 4;
178 key_bytes.push(kb);
179 children.push(child);
180 }
181
182 Ok(InternalPage { children, key_bytes })
183}
184
185fn leaf_entry_size(key_len: usize) -> usize {
191 2 + key_len + 8 + 4 }
193
194fn internal_key_size(key_len: usize) -> usize {
196 2 + key_len + 4 }
198
199const LEAF_HEADER: usize = 1 + 2 + 4; const INTERNAL_HEADER: usize = 1 + 2 + 4; pub struct BTree<K> {
214 pager: Pager,
215 _phantom: std::marker::PhantomData<K>,
219}
220
221impl<K> BTree<K>
222where
223 K: Serialize + DeserializeOwned + Ord + Clone,
224{
225 pub fn create(path: &Path) -> IsamResult<Self> {
226 Ok(Self {
227 pager: Pager::create(path)?,
228 _phantom: std::marker::PhantomData,
229 })
230 }
231
232 pub fn open(path: &Path) -> IsamResult<Self> {
233 Ok(Self {
234 pager: Pager::open(path)?,
235 _phantom: std::marker::PhantomData,
236 })
237 }
238
239 pub fn search(&mut self, key: &K) -> IsamResult<Option<RecordRef>> {
245 let key_bytes = bincode::serialize(key)?;
246 let root_id = self.pager.meta.root_page_id;
247 self.search_page(root_id, &key_bytes)
248 }
249
250 pub fn insert(&mut self, key: &K, rec: RecordRef) -> IsamResult<()> {
253 let key_bytes = bincode::serialize(key)?;
254 let root_id = self.pager.meta.root_page_id;
256 let path = self.find_leaf_path(root_id, &key_bytes)?;
257 let leaf_id = *path.last().unwrap();
258
259 let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
260
261 if leaf.entries.iter().any(|e| e.key_bytes == key_bytes) {
263 return Err(IsamError::DuplicateKey);
264 }
265
266 let pos = leaf
268 .entries
269 .partition_point(|e| self.cmp_key_bytes(&e.key_bytes, &key_bytes) == std::cmp::Ordering::Less);
270 leaf.entries.insert(
271 pos,
272 LeafEntry {
273 key_bytes: key_bytes.clone(),
274 data_offset: rec.offset,
275 data_len: rec.len,
276 },
277 );
278
279 if self.leaf_fits(&leaf) {
281 self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
282 } else {
283 self.insert_with_splits(&key_bytes, rec, path)?;
285 }
286
287 self.pager.flush()
288 }
289
290 pub fn delete(&mut self, key: &K) -> IsamResult<()> {
292 let key_bytes = bincode::serialize(key)?;
293 let root_id = self.pager.meta.root_page_id;
294 let path = self.find_leaf_path(root_id, &key_bytes)?;
295 let leaf_id = *path.last().unwrap();
296
297 let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
298
299 let pos = leaf
300 .entries
301 .iter()
302 .position(|e| e.key_bytes == key_bytes)
303 .ok_or(IsamError::KeyNotFound)?;
304 leaf.entries.remove(pos);
305
306 self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
307
308 if path.len() >= 2 {
311 let parent_id = path[path.len() - 2];
312 self.rebalance_after_delete(&path, leaf_id, parent_id)?;
313 }
314
315 self.pager.flush()
316 }
317
318 pub fn update(&mut self, key: &K, rec: RecordRef) -> IsamResult<()> {
320 let key_bytes = bincode::serialize(key)?;
321 let root_id = self.pager.meta.root_page_id;
322 let path = self.find_leaf_path(root_id, &key_bytes)?;
323 let leaf_id = *path.last().unwrap();
324
325 let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
326
327 let entry = leaf
328 .entries
329 .iter_mut()
330 .find(|e| e.key_bytes == key_bytes)
331 .ok_or(IsamError::KeyNotFound)?;
332 entry.data_offset = rec.offset;
333 entry.data_len = rec.len;
334
335 self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
336 self.pager.flush()
337 }
338
339 pub fn find_leaf_for_key(&mut self, key: &K) -> IsamResult<u32> {
345 let key_bytes = bincode::serialize(key)?;
346 let root_id = self.pager.meta.root_page_id;
347 let path = self.find_leaf_path(root_id, &key_bytes)?;
348 Ok(*path.last().unwrap())
349 }
350
351 pub fn first_leaf_id(&mut self) -> IsamResult<u32> {
353 let root_id = self.pager.meta.root_page_id;
354 self.leftmost_leaf(root_id)
355 }
356
357 pub fn min_key(&mut self) -> IsamResult<Option<K>> {
359 let root_id = self.pager.meta.root_page_id;
360 let leaf_id = self.leftmost_leaf(root_id)?;
361 let leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
362 match leaf.entries.first() {
363 None => Ok(None),
364 Some(e) => Ok(Some(bincode::deserialize(&e.key_bytes)?)),
365 }
366 }
367
368 pub fn max_key(&mut self) -> IsamResult<Option<K>> {
370 let root_id = self.pager.meta.root_page_id;
371 let leaf_id = self.rightmost_leaf(root_id)?;
372 let leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
373 match leaf.entries.last() {
374 None => Ok(None),
375 Some(e) => Ok(Some(bincode::deserialize(&e.key_bytes)?)),
376 }
377 }
378
379 pub fn read_leaf(&mut self, id: u32) -> IsamResult<(Vec<(K, RecordRef)>, u32)> {
381 let data = self.pager.read_page(id)?;
382 let leaf = decode_leaf(&data)?;
383 let mut out = Vec::with_capacity(leaf.entries.len());
384 for e in leaf.entries {
385 let key: K = bincode::deserialize(&e.key_bytes)?;
386 out.push((
387 key,
388 RecordRef {
389 offset: e.data_offset,
390 len: e.data_len,
391 },
392 ));
393 }
394 Ok((out, leaf.next_leaf_id))
395 }
396
397 pub fn key_schema_version(&self) -> u32 {
398 self.pager.meta.key_schema_version
399 }
400
401 pub fn val_schema_version(&self) -> u32 {
402 self.pager.meta.val_schema_version
403 }
404
405 pub fn set_schema_versions(&mut self, key_v: u32, val_v: u32) -> IsamResult<()> {
406 self.pager.meta.key_schema_version = key_v;
407 self.pager.meta.val_schema_version = val_v;
408 self.pager.flush_meta()
409 }
410
411 pub fn index_schema_version(&self) -> u32 {
412 self.pager.meta.index_schema_version
413 }
414
415 pub fn set_index_schema_version(&mut self, v: u32) -> IsamResult<()> {
416 self.pager.meta.index_schema_version = v;
417 self.pager.flush_meta()
418 }
419
420 pub fn flush(&mut self) -> IsamResult<()> {
421 self.pager.flush()
422 }
423
424 pub fn fsync(&mut self) -> IsamResult<()> {
425 self.pager.fsync()
426 }
427
428 fn search_page(&mut self, page_id: u32, key_bytes: &[u8]) -> IsamResult<Option<RecordRef>> {
433 let data = self.pager.read_page(page_id)?;
434 match data[0] {
435 PAGE_TYPE_LEAF => {
436 let leaf = decode_leaf(&data)?;
437 for e in &leaf.entries {
438 if e.key_bytes == key_bytes {
439 return Ok(Some(RecordRef {
440 offset: e.data_offset,
441 len: e.data_len,
442 }));
443 }
444 }
445 Ok(None)
446 }
447 PAGE_TYPE_INTERNAL => {
448 let internal = decode_internal(&data)?;
449 let child_id = self.find_child(&internal, key_bytes);
450 self.search_page(child_id, key_bytes)
451 }
452 t => Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
453 }
454 }
455
456 fn cmp_key_bytes(&self, a: &[u8], b: &[u8]) -> std::cmp::Ordering {
463 let ka: K = bincode::deserialize(a).expect("corrupt key bytes in index");
464 let kb: K = bincode::deserialize(b).expect("corrupt key bytes in index");
465 ka.cmp(&kb)
466 }
467
468 fn find_child(&self, page: &InternalPage, key_bytes: &[u8]) -> u32 {
470 let pos = page
476 .key_bytes
477 .partition_point(|kb| !matches!(self.cmp_key_bytes(kb, key_bytes), std::cmp::Ordering::Greater));
478 page.children[pos]
479 }
480
481 fn find_leaf_path(&mut self, page_id: u32, key_bytes: &[u8]) -> IsamResult<Vec<u32>> {
483 let mut path = vec![page_id];
484 let mut current = page_id;
485 loop {
486 let data = self.pager.read_page(current)?;
487 match data[0] {
488 PAGE_TYPE_LEAF => break,
489 PAGE_TYPE_INTERNAL => {
490 let internal = decode_internal(&data)?;
491 let child = self.find_child(&internal, key_bytes);
492 path.push(child);
493 current = child;
494 }
495 t => return Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
496 }
497 }
498 Ok(path)
499 }
500
501 fn leftmost_leaf(&mut self, page_id: u32) -> IsamResult<u32> {
502 let data = self.pager.read_page(page_id)?;
503 match data[0] {
504 PAGE_TYPE_LEAF => Ok(page_id),
505 PAGE_TYPE_INTERNAL => {
506 let internal = decode_internal(&data)?;
507 self.leftmost_leaf(internal.children[0])
508 }
509 t => Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
510 }
511 }
512
513 fn rightmost_leaf(&mut self, page_id: u32) -> IsamResult<u32> {
514 let data = self.pager.read_page(page_id)?;
515 match data[0] {
516 PAGE_TYPE_LEAF => Ok(page_id),
517 PAGE_TYPE_INTERNAL => {
518 let internal = decode_internal(&data)?;
519 self.rightmost_leaf(*internal.children.last().unwrap())
520 }
521 t => Err(IsamError::CorruptIndex(format!("unknown page type {t}"))),
522 }
523 }
524
525 fn leaf_byte_usage(leaf: &LeafPage) -> usize {
530 LEAF_HEADER
531 + leaf
532 .entries
533 .iter()
534 .map(|e| leaf_entry_size(e.key_bytes.len()))
535 .sum::<usize>()
536 }
537
538 fn leaf_fits(&self, leaf: &LeafPage) -> bool {
539 Self::leaf_byte_usage(leaf) <= PAGE_SIZE
540 }
541
542 fn internal_byte_usage(page: &InternalPage) -> usize {
543 INTERNAL_HEADER
544 + page
545 .key_bytes
546 .iter()
547 .map(|kb| internal_key_size(kb.len()))
548 .sum::<usize>()
549 }
550
551 fn internal_fits(page: &InternalPage) -> bool {
552 Self::internal_byte_usage(page) <= PAGE_SIZE
553 }
554
555 fn insert_with_splits(
561 &mut self,
562 key_bytes: &[u8],
563 rec: RecordRef,
564 path: Vec<u32>,
565 ) -> IsamResult<()> {
566 let leaf_id = *path.last().unwrap();
569 let mut leaf = decode_leaf(&self.pager.read_page(leaf_id)?)?;
570
571 leaf.entries.retain(|e| e.key_bytes != key_bytes);
574 let pos = leaf
575 .entries
576 .partition_point(|e| self.cmp_key_bytes(&e.key_bytes, key_bytes) == std::cmp::Ordering::Less);
577 leaf.entries.insert(
578 pos,
579 LeafEntry {
580 key_bytes: key_bytes.to_vec(),
581 data_offset: rec.offset,
582 data_len: rec.len,
583 },
584 );
585
586 let mid = leaf.entries.len() / 2;
588 let right_entries = leaf.entries.split_off(mid);
589 let promote_key = right_entries[0].key_bytes.clone();
590
591 let right_id = self.pager.alloc_page()?;
593
594 let right_leaf = LeafPage {
596 next_leaf_id: leaf.next_leaf_id,
597 entries: right_entries,
598 };
599 leaf.next_leaf_id = right_id;
600
601 self.pager.write_page(leaf_id, &encode_leaf(&leaf))?;
602 self.pager.write_page(right_id, &encode_leaf(&right_leaf))?;
603
604 self.insert_into_parent(&path, path.len() - 1, promote_key, right_id)?;
606
607 Ok(())
608 }
609
610 fn insert_into_parent(
612 &mut self,
613 path: &[u32],
614 child_path_idx: usize, promote_key: Vec<u8>,
616 right_child_id: u32,
617 ) -> IsamResult<()> {
618 if child_path_idx == 0 {
619 let left_child_id = path[0];
621 let new_root = InternalPage {
622 children: vec![left_child_id, right_child_id],
623 key_bytes: vec![promote_key],
624 };
625 let new_root_id = self.pager.alloc_page()?;
626 self.pager
627 .write_page(new_root_id, &encode_internal(&new_root))?;
628 self.pager.meta.root_page_id = new_root_id;
629 self.pager.flush_meta()?;
630 return Ok(());
631 }
632
633 let parent_id = path[child_path_idx - 1];
634 let mut parent = decode_internal(&self.pager.read_page(parent_id)?)?;
635
636 let left_child_id = path[child_path_idx];
638 let child_pos = parent
639 .children
640 .iter()
641 .position(|&c| c == left_child_id)
642 .ok_or_else(|| IsamError::CorruptIndex("child not found in parent".into()))?;
643
644 parent.key_bytes.insert(child_pos, promote_key);
645 parent.children.insert(child_pos + 1, right_child_id);
646
647 if Self::internal_fits(&parent) {
648 self.pager
649 .write_page(parent_id, &encode_internal(&parent))?;
650 } else {
651 let mid = parent.key_bytes.len() / 2;
653 let promote_up = parent.key_bytes[mid].clone();
654
655 let right_keys = parent.key_bytes.split_off(mid + 1);
656 parent.key_bytes.truncate(mid); let right_children = parent.children.split_off(mid + 1);
659
660 let right_internal = InternalPage {
661 children: right_children,
662 key_bytes: right_keys,
663 };
664 let right_id = self.pager.alloc_page()?;
665 self.pager
666 .write_page(parent_id, &encode_internal(&parent))?;
667 self.pager
668 .write_page(right_id, &encode_internal(&right_internal))?;
669
670 self.insert_into_parent(path, child_path_idx - 1, promote_up, right_id)?;
671 }
672
673 Ok(())
674 }
675
676 fn rebalance_after_delete(
681 &mut self,
682 _path: &[u32],
683 leaf_id: u32,
684 parent_id: u32,
685 ) -> IsamResult<()> {
686 let leaf_data = self.pager.read_page(leaf_id)?;
687 let leaf = decode_leaf(&leaf_data)?;
688
689 let min_bytes = PAGE_SIZE / 4;
695 if Self::leaf_byte_usage(&leaf) >= min_bytes {
696 return Ok(()); }
698
699 let parent_data = self.pager.read_page(parent_id)?;
700 let mut parent = decode_internal(&parent_data)?;
701
702 let idx = match parent.children.iter().position(|&c| c == leaf_id) {
704 Some(i) => i,
705 None => return Ok(()), };
707
708 if idx + 1 < parent.children.len() {
710 let right_id = parent.children[idx + 1];
711 let right_data = self.pager.read_page(right_id)?;
712 let mut right = decode_leaf(&right_data)?;
713
714 let mut merged = leaf.clone();
716 merged.entries.append(&mut right.entries);
717 merged.next_leaf_id = right.next_leaf_id;
718
719 if Self::leaf_byte_usage(&merged) <= PAGE_SIZE {
720 self.pager.write_page(leaf_id, &encode_leaf(&merged))?;
721 parent.key_bytes.remove(idx);
723 parent.children.remove(idx + 1);
724 self.pager
725 .write_page(parent_id, &encode_internal(&parent))?;
726
727 if parent_id == self.pager.meta.root_page_id && parent.children.len() == 1 {
729 self.pager.meta.root_page_id = parent.children[0];
730 self.pager.flush_meta()?;
731 }
732 return Ok(());
733 }
734 }
735
736 if idx > 0 {
737 let left_id = parent.children[idx - 1];
738 let left_data = self.pager.read_page(left_id)?;
739 let left = decode_leaf(&left_data)?;
740
741 let mut merged = left.clone();
742 merged.entries.append(&mut leaf.entries.clone());
743 merged.next_leaf_id = leaf.next_leaf_id;
744
745 if Self::leaf_byte_usage(&merged) <= PAGE_SIZE {
746 self.pager.write_page(left_id, &encode_leaf(&merged))?;
747 parent.key_bytes.remove(idx - 1);
748 parent.children.remove(idx);
749 self.pager
750 .write_page(parent_id, &encode_internal(&parent))?;
751
752 if parent_id == self.pager.meta.root_page_id && parent.children.len() == 1 {
753 self.pager.meta.root_page_id = parent.children[0];
754 self.pager.flush_meta()?;
755 }
756 }
757 }
758
759 Ok(())
760 }
761}