Skip to content

Commit

Permalink
Reintroduce the prepare_for_closing function and Env/EncryptedEnv: Clone
Browse files Browse the repository at this point in the history
  • Loading branch information
Kerollmops committed Nov 30, 2024
1 parent a9f5a10 commit 6e71efb
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 44 deletions.
8 changes: 4 additions & 4 deletions examples/heed3-encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::path::Path;
use argon2::Argon2;
use chacha20poly1305::{ChaCha20Poly1305, Key};
use heed3::types::*;
use heed3::{Database, EnvOpenOptions};
use heed3::EnvOpenOptions;

fn main() -> Result<(), Box<dyn Error>> {
let env_path = Path::new("target").join("encrypt.mdb");
Expand Down Expand Up @@ -40,14 +40,14 @@ fn main() -> Result<(), Box<dyn Error>> {
db.put(&mut wtxn, key1, val1)?;
db.put(&mut wtxn, key2, val2)?;
wtxn.commit()?;
// env.prepare_for_closing().wait();
env.prepare_for_closing().wait();

// We reopen the environment now
let env = unsafe { options.open(&env_path)? };
let env = unsafe { options.open_encrypted::<ChaCha20Poly1305, _>(key, &env_path)? };

// We check that the secret entries are correctly decrypted
let mut rtxn = env.read_txn()?;
let db: Database<Str, Str> = env.open_database(&rtxn, Some("first"))?.unwrap();
let db = env.open_database::<Str, Str>(&rtxn, Some("first"))?.unwrap();
let mut iter = db.iter(&mut rtxn)?;
assert_eq!(iter.next().transpose()?, Some((key1, val1)));
assert_eq!(iter.next().transpose()?, Some((key2, val2)));
Expand Down
1 change: 1 addition & 0 deletions heed/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ lmdb-master-sys = { version = "0.2.4", path = "../lmdb-master-sys" }
once_cell = "1.19.0"
page_size = "0.6.0"
serde = { version = "1.0.203", features = ["derive"], optional = true }
synchronoise = "1.0.1"

[dev-dependencies]
serde = { version = "1.0.203", features = ["derive"] }
Expand Down
14 changes: 12 additions & 2 deletions heed/src/envs/encrypted_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use std::path::Path;
use aead::generic_array::typenum::Unsigned;
use aead::{AeadMutInPlace, Key, KeyInit, Nonce, Tag};

use super::{Env, EnvInfo, FlagSetMode};
use super::{Env, EnvClosingEvent, EnvInfo, FlagSetMode};
use crate::databases::{EncryptedDatabase, EncryptedDatabaseOpenOptions};
use crate::mdb::ffi::{self};
use crate::{CompactionOption, EnvFlags, Result, RoTxn, RwTxn, Unspecified};

/// An environment handle constructed by using [`EnvOpenOptions::open_encrypted`].
#[derive(Clone)]
pub struct EncryptedEnv {
pub(crate) inner: Env,
}
Expand Down Expand Up @@ -278,6 +279,15 @@ impl EncryptedEnv {
self.inner.path()
}

/// Returns an `EnvClosingEvent` that can be used to wait for the closing event,
/// multiple threads can wait on this event.
///
/// Make sure that you drop all the copies of `Env`s you have, env closing are triggered
/// when all references are dropped, the last one will eventually close the environment.
pub fn prepare_for_closing(self) -> EnvClosingEvent {
self.inner.prepare_for_closing()
}

/// Check for stale entries in the reader lock table and clear them.
///
/// Returns the number of stale readers cleared.
Expand Down Expand Up @@ -311,7 +321,7 @@ unsafe impl Sync for EncryptedEnv {}
impl fmt::Debug for EncryptedEnv {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("EncryptedEnv")
.field("path", &self.inner.path.display())
.field("path", &self.inner.path().display())
.finish_non_exhaustive()
}
}
Expand Down
53 changes: 39 additions & 14 deletions heed/src/envs/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use std::ffi::CString;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::ptr::{self, NonNull};
use std::sync::Arc;
use std::{fmt, io, mem};

use heed_traits::Comparator;
use synchronoise::SignalEvent;

use super::{
custom_key_cmp_wrapper, get_file_fd, metadata_from_fd, DefaultComparator, EnvInfo, FlagSetMode,
IntegerComparator, OPENED_ENV,
custom_key_cmp_wrapper, get_file_fd, metadata_from_fd, DefaultComparator, EnvClosingEvent,
EnvInfo, FlagSetMode, IntegerComparator, OPENED_ENV,
};
use crate::cursor::{MoveOperation, RoCursor};
use crate::mdb::ffi::{self, MDB_env};
Expand All @@ -21,18 +23,24 @@ use crate::{
};

/// An environment handle constructed by using [`EnvOpenOptions::open`].
#[derive(Clone)]
pub struct Env {
env_ptr: NonNull<MDB_env>,
pub(crate) path: PathBuf,
inner: Arc<EnvInner>,
}

impl Env {
pub(crate) fn new(env_ptr: NonNull<MDB_env>, path: PathBuf) -> Env {
Env { env_ptr, path }
Env {
inner: Arc::new(EnvInner {
env_ptr,
path,
signal_event: Arc::new(SignalEvent::manual(false)),
}),
}
}

pub(crate) fn env_mut_ptr(&self) -> NonNull<ffi::MDB_env> {
self.env_ptr
self.inner.env_ptr
}

/// The size of the data file on disk.
Expand Down Expand Up @@ -125,7 +133,7 @@ impl Env {
/// Returns some basic informations about this environment.
pub fn info(&self) -> EnvInfo {
let mut raw_info = mem::MaybeUninit::uninit();
unsafe { ffi::mdb_env_info(self.env_ptr.as_ptr(), raw_info.as_mut_ptr()) };
unsafe { ffi::mdb_env_info(self.inner.env_ptr.as_ptr(), raw_info.as_mut_ptr()) };
let raw_info = unsafe { raw_info.assume_init() };

EnvInfo {
Expand Down Expand Up @@ -410,27 +418,36 @@ impl Env {
option: CompactionOption,
) -> Result<()> {
let flags = if let CompactionOption::Enabled = option { ffi::MDB_CP_COMPACT } else { 0 };
mdb_result(ffi::mdb_env_copyfd2(self.env_ptr.as_ptr(), fd, flags))?;
mdb_result(ffi::mdb_env_copyfd2(self.inner.env_ptr.as_ptr(), fd, flags))?;
Ok(())
}

/// Flush the data buffers to disk.
pub fn force_sync(&self) -> Result<()> {
unsafe { mdb_result(ffi::mdb_env_sync(self.env_ptr.as_ptr(), 1))? }
unsafe { mdb_result(ffi::mdb_env_sync(self.inner.env_ptr.as_ptr(), 1))? }
Ok(())
}

/// Returns the canonicalized path where this env lives.
pub fn path(&self) -> &Path {
&self.path
&self.inner.path
}

/// Returns an `EnvClosingEvent` that can be used to wait for the closing event,
/// multiple threads can wait on this event.
///
/// Make sure that you drop all the copies of `Env`s you have, env closing are triggered
/// when all references are dropped, the last one will eventually close the environment.
pub fn prepare_for_closing(self) -> EnvClosingEvent {
EnvClosingEvent(self.inner.signal_event.clone())
}

/// Check for stale entries in the reader lock table and clear them.
///
/// Returns the number of stale readers cleared.
pub fn clear_stale_readers(&self) -> Result<usize> {
let mut dead: i32 = 0;
unsafe { mdb_result(ffi::mdb_reader_check(self.env_ptr.as_ptr(), &mut dead))? }
unsafe { mdb_result(ffi::mdb_reader_check(self.inner.env_ptr.as_ptr(), &mut dead))? }
// safety: The reader_check function asks for an i32, initialize it to zero
// and never decrements it. It is safe to use either an u32 or u64 (usize).
Ok(dead as usize)
Expand Down Expand Up @@ -471,14 +488,22 @@ unsafe impl Sync for Env {}

impl fmt::Debug for Env {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Env").field("path", &self.path.display()).finish_non_exhaustive()
f.debug_struct("Env").field("path", &self.inner.path.display()).finish_non_exhaustive()
}
}

impl Drop for Env {
#[derive(Clone)]
pub(crate) struct EnvInner {
env_ptr: NonNull<MDB_env>,
signal_event: Arc<SignalEvent>,
pub(crate) path: PathBuf,
}

impl Drop for EnvInner {
fn drop(&mut self) {
unsafe { ffi::mdb_env_close(self.env_ptr.as_mut()) };
let mut lock = OPENED_ENV.write().unwrap();
debug_assert!(lock.remove(&self.path));
let removed = lock.remove(&self.path);
debug_assert!(removed);
}
}
6 changes: 4 additions & 2 deletions heed/src/envs/env_open_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ impl EnvOpenOptions {
match mdb_result(result) {
Ok(()) => {
let env_ptr = NonNull::new(env).unwrap();
debug_assert!(lock.insert(path.clone()));
let inserted = lock.insert(path.clone());
debug_assert!(inserted);
Ok(Env::new(env_ptr, path))
}
Err(e) => {
Expand Down Expand Up @@ -404,7 +405,8 @@ impl EnvOpenOptions {
match mdb_result(result) {
Ok(()) => {
let env_ptr = NonNull::new(env).unwrap();
debug_assert!(lock.insert(path.clone()));
let inserted = lock.insert(path.clone());
debug_assert!(inserted);
Ok(EncryptedEnv { inner: Env::new(env_ptr, path) })
}
Err(e) => {
Expand Down
36 changes: 34 additions & 2 deletions heed/src/envs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ use std::cmp::Ordering;
use std::collections::HashSet;
use std::ffi::c_void;
use std::fs::{File, Metadata};
use std::io;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, BorrowedFd, RawFd};
use std::panic::catch_unwind;
use std::path::{Path, PathBuf};
use std::process::abort;
use std::sync::{LazyLock, RwLock};
use std::sync::{Arc, LazyLock, RwLock};
use std::time::Duration;
#[cfg(windows)]
use std::{
ffi::OsStr,
os::windows::io::{AsRawHandle as _, BorrowedHandle, RawHandle},
};
use std::{fmt, io};

use heed_traits::{Comparator, LexicographicComparator};
use synchronoise::event::SignalEvent;

use crate::mdb::ffi;

Expand Down Expand Up @@ -50,6 +52,36 @@ pub struct EnvInfo {
pub number_of_readers: u32,
}

/// A structure that can be used to wait for the closing event.
/// Multiple threads can wait on this event.
#[derive(Clone)]
pub struct EnvClosingEvent(Arc<SignalEvent>);

impl EnvClosingEvent {
/// Blocks this thread until the environment is effectively closed.
///
/// # Safety
///
/// Make sure that you don't have any copy of the environment in the thread
/// that is waiting for a close event. If you do, you will have a deadlock.
pub fn wait(&self) {
self.0.wait()
}

/// Blocks this thread until either the environment has been closed
/// or until the timeout elapses. Returns `true` if the environment
/// has been effectively closed.
pub fn wait_timeout(&self, timeout: Duration) -> bool {
self.0.wait_timeout(timeout)
}
}

impl fmt::Debug for EnvClosingEvent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("EnvClosingEvent").finish()
}
}

// Thanks to the mozilla/rkv project
// Workaround the UNC path on Windows, see https://github.com/rust-lang/rust/issues/42869.
// Otherwise, `Env::from_env()` will panic with error_no(123).
Expand Down
4 changes: 2 additions & 2 deletions heed/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ pub use self::databases::{EncryptedDatabase, EncryptedDatabaseOpenOptions};
#[cfg(master3)]
pub use self::envs::EncryptedEnv;
pub use self::envs::{
CompactionOption, DefaultComparator, Env, EnvInfo, EnvOpenOptions, FlagSetMode,
IntegerComparator,
CompactionOption, DefaultComparator, Env, EnvClosingEvent, EnvInfo, EnvOpenOptions,
FlagSetMode, IntegerComparator,
};
pub use self::iterator::{
RoIter, RoPrefix, RoRange, RoRevIter, RoRevPrefix, RoRevRange, RwIter, RwPrefix, RwRange,
Expand Down
35 changes: 17 additions & 18 deletions heed/src/txn.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::ops::Deref;
use std::ptr::{self, NonNull};

Expand Down Expand Up @@ -47,7 +48,7 @@ use crate::Result;
pub struct RoTxn<'e> {
/// Makes the struct covariant and !Sync
pub(crate) txn: Option<NonNull<ffi::MDB_txn>>,
env: &'e Env,
env: Cow<'e, Env>,
}

impl<'e> RoTxn<'e> {
Expand All @@ -63,24 +64,22 @@ impl<'e> RoTxn<'e> {
))?
};

Ok(RoTxn { txn: NonNull::new(txn), env })
Ok(RoTxn { txn: NonNull::new(txn), env: Cow::Borrowed(env) })
}

// TODO replace this by an ArcRoTxn
pub(crate) fn static_read_txn(env: Env) -> Result<RoTxn<'static>> {
// let mut txn: *mut ffi::MDB_txn = ptr::null_mut();

// unsafe {
// mdb_result(ffi::mdb_txn_begin(
// env.env_mut_ptr(),
// ptr::null_mut(),
// ffi::MDB_RDONLY,
// &mut txn,
// ))?
// };

// Ok(RoTxn { txn, env: Cow::Owned(env) })
todo!()
let mut txn: *mut ffi::MDB_txn = ptr::null_mut();

unsafe {
mdb_result(ffi::mdb_txn_begin(
env.env_mut_ptr().as_mut(),
ptr::null_mut(),
ffi::MDB_RDONLY,
&mut txn,
))?
};

Ok(RoTxn { txn: NonNull::new(txn), env: Cow::Owned(env) })
}

pub(crate) fn env_mut_ptr(&self) -> NonNull<ffi::MDB_env> {
Expand Down Expand Up @@ -172,7 +171,7 @@ impl<'p> RwTxn<'p> {
))?
};

Ok(RwTxn { txn: RoTxn { txn: NonNull::new(txn), env } })
Ok(RwTxn { txn: RoTxn { txn: NonNull::new(txn), env: Cow::Borrowed(env) } })
}

pub(crate) fn nested(env: &'p Env, parent: &'p mut RwTxn) -> Result<RwTxn<'p>> {
Expand All @@ -183,7 +182,7 @@ impl<'p> RwTxn<'p> {
mdb_result(ffi::mdb_txn_begin(env.env_mut_ptr().as_mut(), parent_ptr, 0, &mut txn))?
};

Ok(RwTxn { txn: RoTxn { txn: NonNull::new(txn), env } })
Ok(RwTxn { txn: RoTxn { txn: NonNull::new(txn), env: Cow::Borrowed(env) } })
}

pub(crate) fn env_mut_ptr(&self) -> NonNull<ffi::MDB_env> {
Expand Down
1 change: 1 addition & 0 deletions heed3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ lmdb-master3-sys = { version = "0.2.4", path = "../lmdb-master3-sys" }
once_cell = "1.19.0"
page_size = "0.6.0"
serde = { version = "1.0.203", features = ["derive"], optional = true }
synchronoise = "1.0.1"

[dev-dependencies]
# TODO update dependencies
Expand Down

0 comments on commit 6e71efb

Please sign in to comment.