Skip to main content

highlandcows_isam/
manager.rs

1/// `TransactionManager<K, V>` — owns shared storage and creates transactions.
2///
3/// This type is `pub(crate)` — callers interact with it only through `Isam`.
4/// It is `Clone` (via `Arc::clone`) so that `Isam` can be cloned freely.
5use std::path::Path;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread::ThreadId;
9use std::time::{Duration, Instant};
10
11use serde::de::DeserializeOwned;
12use serde::Serialize;
13
14use crate::error::{IsamError, IsamResult};
15use crate::storage::IsamStorage;
16use crate::transaction::Transaction;
17
18pub(crate) struct TransactionManager<K, V> {
19    pub(crate) storage: Arc<Mutex<IsamStorage<K, V>>>,
20    /// `true` while single-user mode is active.
21    pub(crate) single_user_active: Arc<AtomicBool>,
22    /// Thread that owns single-user mode, set when `single_user_active` is `true`.
23    pub(crate) single_user_owner: Arc<Mutex<Option<ThreadId>>>,
24}
25
26impl<K, V> TransactionManager<K, V>
27where
28    K: Serialize + DeserializeOwned + Ord + Clone,
29    V: Serialize + DeserializeOwned,
30{
31    pub(crate) fn create(path: &Path) -> IsamResult<Self> {
32        let storage = IsamStorage::create(path)?;
33        Ok(Self::from_storage(storage))
34    }
35
36    pub(crate) fn open(path: &Path) -> IsamResult<Self> {
37        let storage = IsamStorage::open(path)?;
38        Ok(Self::from_storage(storage))
39    }
40
41    pub(crate) fn from_storage(storage: IsamStorage<K, V>) -> Self {
42        Self {
43            storage: Arc::new(Mutex::new(storage)),
44            single_user_active: Arc::new(AtomicBool::new(false)),
45            single_user_owner: Arc::new(Mutex::new(None)),
46        }
47    }
48
49    /// Acquire the storage lock, checking single-user mode first.
50    ///
51    /// Returns `Err(IsamError::SingleUserMode)` if single-user mode is active
52    /// and the calling thread is not the owner.
53    pub(crate) fn lock_storage(&self) -> IsamResult<std::sync::MutexGuard<'_, IsamStorage<K, V>>> {
54        if self.single_user_active.load(Ordering::Acquire) {
55            let owner = self
56                .single_user_owner
57                .lock()
58                .map_err(|_| IsamError::LockPoisoned)?;
59            if *owner != Some(std::thread::current().id()) {
60                return Err(IsamError::SingleUserMode);
61            }
62        }
63        self.storage.lock().map_err(|_| IsamError::LockPoisoned)
64    }
65
66    pub(crate) fn begin(&self) -> IsamResult<Transaction<'_, K, V>> {
67        let guard = self.lock_storage()?;
68        Ok(Transaction::new(guard))
69    }
70}
71
72impl<K, V> Clone for TransactionManager<K, V> {
73    fn clone(&self) -> Self {
74        Self {
75            storage: Arc::clone(&self.storage),
76            single_user_active: Arc::clone(&self.single_user_active),
77            single_user_owner: Arc::clone(&self.single_user_owner),
78        }
79    }
80}
81
82// ── SingleUserGuard ───────────────────────────────────────────────────────── //
83
84/// RAII guard that holds single-user mode for the duration of its lifetime.
85///
86/// Created internally by [`Isam::as_single_user`]; not part of the public API.
87/// Single-user mode is released when this guard is dropped, even on panic.
88pub(crate) struct SingleUserGuard<K, V> {
89    manager: TransactionManager<K, V>,
90}
91
92impl<K, V> Drop for SingleUserGuard<K, V> {
93    fn drop(&mut self) {
94        if let Ok(mut owner) = self.manager.single_user_owner.lock() {
95            *owner = None;
96        }
97        self.manager
98            .single_user_active
99            .store(false, Ordering::Release);
100    }
101}
102
103impl<K, V> TransactionManager<K, V> {
104    /// Enter single-user mode and return a guard that exits it on drop.
105    ///
106    /// Sets the single-user flag immediately (so other threads start failing at
107    /// once), then spins on `try_lock` until any in-flight transaction finishes
108    /// or `timeout` expires.
109    ///
110    /// Returns:
111    /// - `Err(IsamError::SingleUserMode)` if another thread already holds single-user mode.
112    /// - `Err(IsamError::Timeout)` if an in-flight transaction did not finish within `timeout`.
113    pub(crate) fn enter_single_user_mode(&self, timeout: Duration) -> IsamResult<SingleUserGuard<K, V>> {
114        // Atomically claim single-user mode; fail fast if already active.
115        self.single_user_active
116            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
117            .map_err(|_| IsamError::SingleUserMode)?;
118
119        {
120            let mut owner = self
121                .single_user_owner
122                .lock()
123                .map_err(|_| IsamError::LockPoisoned)?;
124            *owner = Some(std::thread::current().id());
125        }
126
127        // Spin until no transaction holds the storage lock, or timeout expires.
128        let deadline = Instant::now() + timeout;
129        loop {
130            if self.storage.try_lock().is_ok() {
131                // Successfully acquired and immediately released — no active transaction.
132                break;
133            }
134            if Instant::now() >= deadline {
135                // Undo: clear state so other threads are unblocked.
136                if let Ok(mut owner) = self.single_user_owner.lock() {
137                    *owner = None;
138                }
139                self.single_user_active.store(false, Ordering::Release);
140                return Err(IsamError::Timeout);
141            }
142            std::thread::sleep(Duration::from_millis(1));
143        }
144
145        Ok(SingleUserGuard {
146            manager: self.clone(),
147        })
148    }
149}