highlandcows_isam/
manager.rs1use std::path::Path;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8use std::thread::ThreadId;
9use std::time::{Duration, Instant};
10
11use serde::de::DeserializeOwned;
12use serde::Serialize;
13
14use crate::error::{IsamError, IsamResult};
15use crate::storage::IsamStorage;
16use crate::transaction::Transaction;
17
18pub(crate) struct TransactionManager<K, V> {
19 pub(crate) storage: Arc<Mutex<IsamStorage<K, V>>>,
20 pub(crate) single_user_active: Arc<AtomicBool>,
22 pub(crate) single_user_owner: Arc<Mutex<Option<ThreadId>>>,
24}
25
26impl<K, V> TransactionManager<K, V>
27where
28 K: Serialize + DeserializeOwned + Ord + Clone,
29 V: Serialize + DeserializeOwned,
30{
31 pub(crate) fn create(path: &Path) -> IsamResult<Self> {
32 let storage = IsamStorage::create(path)?;
33 Ok(Self::from_storage(storage))
34 }
35
36 pub(crate) fn open(path: &Path) -> IsamResult<Self> {
37 let storage = IsamStorage::open(path)?;
38 Ok(Self::from_storage(storage))
39 }
40
41 pub(crate) fn from_storage(storage: IsamStorage<K, V>) -> Self {
42 Self {
43 storage: Arc::new(Mutex::new(storage)),
44 single_user_active: Arc::new(AtomicBool::new(false)),
45 single_user_owner: Arc::new(Mutex::new(None)),
46 }
47 }
48
49 pub(crate) fn lock_storage(&self) -> IsamResult<std::sync::MutexGuard<'_, IsamStorage<K, V>>> {
54 if self.single_user_active.load(Ordering::Acquire) {
55 let owner = self
56 .single_user_owner
57 .lock()
58 .map_err(|_| IsamError::LockPoisoned)?;
59 if *owner != Some(std::thread::current().id()) {
60 return Err(IsamError::SingleUserMode);
61 }
62 }
63 self.storage.lock().map_err(|_| IsamError::LockPoisoned)
64 }
65
66 pub(crate) fn begin(&self) -> IsamResult<Transaction<'_, K, V>> {
67 let guard = self.lock_storage()?;
68 Ok(Transaction::new(guard))
69 }
70}
71
72impl<K, V> Clone for TransactionManager<K, V> {
73 fn clone(&self) -> Self {
74 Self {
75 storage: Arc::clone(&self.storage),
76 single_user_active: Arc::clone(&self.single_user_active),
77 single_user_owner: Arc::clone(&self.single_user_owner),
78 }
79 }
80}
81
82pub(crate) struct SingleUserGuard<K, V> {
89 manager: TransactionManager<K, V>,
90}
91
92impl<K, V> Drop for SingleUserGuard<K, V> {
93 fn drop(&mut self) {
94 if let Ok(mut owner) = self.manager.single_user_owner.lock() {
95 *owner = None;
96 }
97 self.manager
98 .single_user_active
99 .store(false, Ordering::Release);
100 }
101}
102
103impl<K, V> TransactionManager<K, V> {
104 pub(crate) fn enter_single_user_mode(&self, timeout: Duration) -> IsamResult<SingleUserGuard<K, V>> {
114 self.single_user_active
116 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
117 .map_err(|_| IsamError::SingleUserMode)?;
118
119 {
120 let mut owner = self
121 .single_user_owner
122 .lock()
123 .map_err(|_| IsamError::LockPoisoned)?;
124 *owner = Some(std::thread::current().id());
125 }
126
127 let deadline = Instant::now() + timeout;
129 loop {
130 if self.storage.try_lock().is_ok() {
131 break;
133 }
134 if Instant::now() >= deadline {
135 if let Ok(mut owner) = self.single_user_owner.lock() {
137 *owner = None;
138 }
139 self.single_user_active.store(false, Ordering::Release);
140 return Err(IsamError::Timeout);
141 }
142 std::thread::sleep(Duration::from_millis(1));
143 }
144
145 Ok(SingleUserGuard {
146 manager: self.clone(),
147 })
148 }
149}