Skip to content

Commit

Permalink
Automatically abort transactions on drop
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed Aug 6, 2022
1 parent 226a4ec commit 727c949
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 43 deletions.
25 changes: 2 additions & 23 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ use crate::{ReadTransaction, Result, WriteTransaction};
use std::collections::btree_set::BTreeSet;
use std::fmt::{Display, Formatter};
use std::fs::{File, OpenOptions};
use std::io;
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::ops::RangeFull;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::{io, panic};

use crate::multimap_table::parse_subtree_roots;
#[cfg(feature = "logging")]
use log::{error, info};
use log::info;

pub(crate) type TransactionId = u64;
type AtomicTransactionId = AtomicU64;
Expand Down Expand Up @@ -150,7 +150,6 @@ pub struct Database {
next_transaction_id: AtomicTransactionId,
live_read_transactions: Mutex<BTreeSet<TransactionId>>,
live_write_transaction: Mutex<Option<TransactionId>>,
leaked_write_transaction: Mutex<Option<&'static panic::Location<'static>>>,
}

impl Database {
Expand Down Expand Up @@ -345,23 +344,9 @@ impl Database {
next_transaction_id: AtomicTransactionId::new(next_transaction_id),
live_write_transaction: Mutex::new(None),
live_read_transactions: Mutex::new(Default::default()),
leaked_write_transaction: Mutex::new(Default::default()),
})
}

pub(crate) fn record_leaked_write_transaction(&self, transaction_id: TransactionId) {
assert_eq!(
transaction_id,
self.live_write_transaction.lock().unwrap().unwrap()
);
*self.leaked_write_transaction.lock().unwrap() = Some(panic::Location::caller());
#[cfg(feature = "logging")]
error!(
"Leaked write transaction from {}",
panic::Location::caller()
);
}

pub(crate) fn deallocate_read_transaction(&self, id: TransactionId) {
self.live_read_transactions.lock().unwrap().remove(&id);
}
Expand Down Expand Up @@ -391,12 +376,6 @@ impl Database {
/// Returns a [`WriteTransaction`] which may be used to read/write to the database. Only a single
/// write may be in progress at a time
pub fn begin_write(&self) -> Result<WriteTransaction> {
let guard = self.leaked_write_transaction.lock().unwrap();
if let Some(leaked) = *guard {
return Err(Error::LeakedWriteTransaction(leaked));
}
drop(guard);

assert!(self.live_write_transaction.lock().unwrap().is_none());
let id = self.next_transaction_id.fetch_add(1, Ordering::AcqRel);
*self.live_write_transaction.lock().unwrap() = Some(id);
Expand Down
4 changes: 0 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub enum Error {
requested_size: usize,
},
TableDoesNotExist(String),
LeakedWriteTransaction(&'static panic::Location<'static>),
// Tables cannot be opened for writing multiple times, since they could retrieve immutable &
// mutable references to the same dirty pages, or multiple mutable references via insert_reserve()
TableAlreadyOpen(String, &'static panic::Location<'static>),
Expand Down Expand Up @@ -57,9 +56,6 @@ impl Display for Error {
Error::TableDoesNotExist(table) => {
write!(f, "Table '{}' does not exist", table)
}
Error::LeakedWriteTransaction(location) => {
write!(f, "Leaked write transaction: {}", location)
}
Error::TableAlreadyOpen(name, location) => {
write!(f, "Table '{}' already opened at: {}", name, location)
}
Expand Down
25 changes: 16 additions & 9 deletions src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ use crate::{
Result, Table, TableDefinition,
};
#[cfg(feature = "logging")]
use log::info;
use log::{info, warn};
use std::cell::RefCell;
use std::cmp::min;
use std::collections::HashMap;
use std::mem::size_of;
use std::panic;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};

/// Informational storage stats about the database
#[derive(Debug)]
Expand Down Expand Up @@ -104,7 +103,7 @@ pub struct WriteTransaction<'db> {
freed_tree: BtreeMut<'db, FreedTableKey, [u8]>,
freed_pages: Rc<RefCell<Vec<PageNumber>>>,
open_tables: RefCell<HashMap<String, &'static panic::Location<'static>>>,
completed: AtomicBool,
completed: bool,
durability: Durability,
}

Expand All @@ -127,7 +126,7 @@ impl<'db> WriteTransaction<'db> {
freed_tree: BtreeMut::new(freed_root, db.get_memory(), freed_pages.clone()),
freed_pages,
open_tables: RefCell::new(Default::default()),
completed: Default::default(),
completed: false,
durability: Durability::Immediate,
})
}
Expand Down Expand Up @@ -296,7 +295,7 @@ impl<'db> WriteTransaction<'db> {
Durability::Immediate => self.durable_commit(false)?,
}

self.completed.store(true, Ordering::Release);
self.completed = true;
#[cfg(feature = "logging")]
info!("Finished commit of transaction id={}", self.transaction_id);

Expand All @@ -306,13 +305,17 @@ impl<'db> WriteTransaction<'db> {
/// Abort the transaction
///
/// All writes performed in this transaction will be rolled back
pub fn abort(self) -> Result {
pub fn abort(mut self) -> Result {
self.abort_inner()
}

fn abort_inner(&mut self) -> Result {
#[cfg(feature = "logging")]
info!("Aborting transaction id={}", self.transaction_id);
self.table_tree.borrow_mut().clear_table_root_updates();
self.mem.rollback_uncommitted_writes()?;
self.db.deallocate_write_transaction(self.transaction_id);
self.completed.store(true, Ordering::Release);
self.completed = true;
#[cfg(feature = "logging")]
info!("Finished abort of transaction id={}", self.transaction_id);
Ok(())
Expand Down Expand Up @@ -475,8 +478,12 @@ impl<'db> WriteTransaction<'db> {

impl<'a> Drop for WriteTransaction<'a> {
fn drop(&mut self) {
if !self.completed.load(Ordering::Acquire) {
self.db.record_leaked_write_transaction(self.transaction_id);
if !self.completed {
#[allow(unused_variables)]
if let Err(error) = self.abort_inner() {
#[cfg(feature = "logging")]
warn!("Failure automatically aborting transaction: {}", error);
}
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -951,18 +951,19 @@ fn delete_table() {
}

#[test]
fn leaked_write() {
fn dropped_write() {
let tmpfile: NamedTempFile = NamedTempFile::new().unwrap();
let db = unsafe { Database::create(tmpfile.path(), 1024 * 1024).unwrap() };

let write_txn = db.begin_write().unwrap();
drop(write_txn);
let result = db.begin_write();
if let Err(Error::LeakedWriteTransaction(_message)) = result {
// Good
} else {
panic!();
{
let mut table = write_txn.open_table(SLICE_TABLE).unwrap();
table.insert(b"hello", b"world").unwrap();
}
drop(write_txn);
let read_txn = db.begin_read().unwrap();
let result = read_txn.open_table(SLICE_TABLE);
assert!(matches!(result, Err(Error::TableDoesNotExist(_))));
}

#[test]
Expand Down

0 comments on commit 727c949

Please sign in to comment.