From de53beca8928e12bba468e322a78b5a960ddf50b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sun, 1 Dec 2024 12:38:30 +0100 Subject: [PATCH] Fix the prepare_for_closing system --- heed/src/envs/env.rs | 342 +++++++++++++++++++++++++++++- heed/src/envs/env_open_options.rs | 11 +- heed/src/envs/mod.rs | 11 +- heed/src/lib.rs | 4 +- 4 files changed, 349 insertions(+), 19 deletions(-) diff --git a/heed/src/envs/env.rs b/heed/src/envs/env.rs index c7e60b41..91e30cc4 100644 --- a/heed/src/envs/env.rs +++ b/heed/src/envs/env.rs @@ -29,14 +29,12 @@ pub struct Env { } impl Env { - pub(crate) fn new(env_ptr: NonNull, path: PathBuf) -> Env { - Env { - inner: Arc::new(EnvInner { - env_ptr, - path, - signal_event: Arc::new(SignalEvent::manual(false)), - }), - } + pub(crate) fn new( + env_ptr: NonNull, + path: PathBuf, + signal_event: Arc, + ) -> Env { + Env { inner: Arc::new(EnvInner { env_ptr, path, signal_event }) } } pub(crate) fn env_mut_ptr(&self) -> NonNull { @@ -492,7 +490,6 @@ impl fmt::Debug for Env { } } -#[derive(Clone)] pub(crate) struct EnvInner { env_ptr: NonNull, signal_event: Arc, @@ -501,9 +498,332 @@ pub(crate) struct EnvInner { impl Drop for EnvInner { fn drop(&mut self) { - unsafe { ffi::mdb_env_close(self.env_ptr.as_mut()) }; let mut lock = OPENED_ENV.write().unwrap(); let removed = lock.remove(&self.path); - debug_assert!(removed); + debug_assert!(removed.is_some()); + unsafe { ffi::mdb_env_close(self.env_ptr.as_mut()) }; + self.signal_event.signal(); + } +} + +#[cfg(test)] +mod tests { + use std::io::ErrorKind; + use std::time::Duration; + use std::{fs, thread}; + + use crate::types::*; + use crate::{env_closing_event, EnvOpenOptions, Error}; + + #[test] + fn close_env() { + let dir = tempfile::tempdir().unwrap(); + let env = unsafe { + EnvOpenOptions::new() + .map_size(10 * 1024 * 1024) // 10MB + .max_dbs(30) + .open(dir.path()) + .unwrap() + }; + + // Force a thread to keep the env for 1 second. + let env_cloned = env.clone(); + thread::spawn(move || { + let _env = env_cloned; + thread::sleep(Duration::from_secs(1)); + }); + + let mut wtxn = env.write_txn().unwrap(); + let db = env.create_database::(&mut wtxn, None).unwrap(); + wtxn.commit().unwrap(); + + // Create an ordered list of keys... + let mut wtxn = env.write_txn().unwrap(); + db.put(&mut wtxn, "hello", "hello").unwrap(); + db.put(&mut wtxn, "world", "world").unwrap(); + + let mut iter = db.iter(&wtxn).unwrap(); + assert_eq!(iter.next().transpose().unwrap(), Some(("hello", "hello"))); + assert_eq!(iter.next().transpose().unwrap(), Some(("world", "world"))); + assert_eq!(iter.next().transpose().unwrap(), None); + drop(iter); + + wtxn.commit().unwrap(); + + let signal_event = env.prepare_for_closing(); + + eprintln!("waiting for the env to be closed"); + signal_event.wait(); + eprintln!("env closed successfully"); + + // Make sure we don't have a reference to the env + assert!(env_closing_event(dir.path()).is_none()); + } + + #[test] + fn reopen_env_with_different_options_is_err() { + let dir = tempfile::tempdir().unwrap(); + let _env = unsafe { + EnvOpenOptions::new() + .map_size(10 * 1024 * 1024) // 10MB + .open(dir.path()) + .unwrap() + }; + + let result = unsafe { + EnvOpenOptions::new() + .map_size(12 * 1024 * 1024) // 12MB + .open(dir.path()) + }; + + assert!(matches!(result, Err(Error::EnvAlreadyOpened))); + } + + #[test] + fn open_env_with_named_path() { + let dir = tempfile::tempdir().unwrap(); + fs::create_dir_all(dir.path().join("babar.mdb")).unwrap(); + let _env = unsafe { + EnvOpenOptions::new() + .map_size(10 * 1024 * 1024) // 10MB + .open(dir.path().join("babar.mdb")) + .unwrap() + }; + + let error = unsafe { + EnvOpenOptions::new() + .map_size(10 * 1024 * 1024) // 10MB + .open(dir.path().join("babar.mdb")) + .unwrap_err() + }; + + assert!(matches!(error, Error::EnvAlreadyOpened)); + } + + #[test] + #[cfg(not(windows))] + fn open_database_with_writemap_flag() { + let dir = tempfile::tempdir().unwrap(); + let mut envbuilder = EnvOpenOptions::new(); + envbuilder.map_size(10 * 1024 * 1024); // 10MB + envbuilder.max_dbs(10); + unsafe { envbuilder.flags(crate::EnvFlags::WRITE_MAP) }; + let env = unsafe { envbuilder.open(dir.path()).unwrap() }; + + let mut wtxn = env.write_txn().unwrap(); + let _db = env.create_database::(&mut wtxn, Some("my-super-db")).unwrap(); + wtxn.commit().unwrap(); + } + + #[test] + fn open_database_with_nosubdir() { + let dir = tempfile::tempdir().unwrap(); + let mut envbuilder = EnvOpenOptions::new(); + unsafe { envbuilder.flags(crate::EnvFlags::NO_SUB_DIR) }; + let _env = unsafe { envbuilder.open(dir.path().join("data.mdb")).unwrap() }; + } + + #[test] + fn create_database_without_commit() { + let dir = tempfile::tempdir().unwrap(); + let env = unsafe { + EnvOpenOptions::new() + .map_size(10 * 1024 * 1024) // 10MB + .max_dbs(10) + .open(dir.path()) + .unwrap() + }; + + let mut wtxn = env.write_txn().unwrap(); + let _db = env.create_database::(&mut wtxn, Some("my-super-db")).unwrap(); + wtxn.abort(); + + let rtxn = env.read_txn().unwrap(); + let option = env.open_database::(&rtxn, Some("my-super-db")).unwrap(); + assert!(option.is_none()); + } + + #[test] + fn open_already_existing_database() { + let dir = tempfile::tempdir().unwrap(); + let env = unsafe { + EnvOpenOptions::new() + .map_size(10 * 1024 * 1024) // 10MB + .max_dbs(10) + .open(dir.path()) + .unwrap() + }; + + // we first create a database + let mut wtxn = env.write_txn().unwrap(); + let _db = env.create_database::(&mut wtxn, Some("my-super-db")).unwrap(); + wtxn.commit().unwrap(); + + // Close the environement and reopen it, databases must not be loaded in memory. + env.prepare_for_closing().wait(); + let env = unsafe { + EnvOpenOptions::new() + .map_size(10 * 1024 * 1024) // 10MB + .max_dbs(10) + .open(dir.path()) + .unwrap() + }; + + let rtxn = env.read_txn().unwrap(); + let option = env.open_database::(&rtxn, Some("my-super-db")).unwrap(); + assert!(option.is_some()); + } + + #[test] + fn resize_database() { + let dir = tempfile::tempdir().unwrap(); + let page_size = page_size::get(); + let env = unsafe { + EnvOpenOptions::new().map_size(9 * page_size).max_dbs(1).open(dir.path()).unwrap() + }; + + let mut wtxn = env.write_txn().unwrap(); + let db = env.create_database::(&mut wtxn, Some("my-super-db")).unwrap(); + wtxn.commit().unwrap(); + + let mut wtxn = env.write_txn().unwrap(); + for i in 0..64 { + db.put(&mut wtxn, &i.to_string(), "world").unwrap(); + } + wtxn.commit().unwrap(); + + let mut wtxn = env.write_txn().unwrap(); + for i in 64..128 { + db.put(&mut wtxn, &i.to_string(), "world").unwrap(); + } + wtxn.commit().expect_err("cannot commit a transaction that would reach the map size limit"); + + unsafe { + env.resize(10 * page_size).unwrap(); + } + let mut wtxn = env.write_txn().unwrap(); + for i in 64..128 { + db.put(&mut wtxn, &i.to_string(), "world").unwrap(); + } + wtxn.commit().expect("transaction should commit after resizing the map size"); + + assert_eq!(10 * page_size, env.info().map_size); + } + + /// Non-regression test for + /// + /// + /// We should be able to open database Read-Only Env with + /// no prior Read-Write Env opening. And query data. + #[test] + fn open_read_only_without_no_env_opened_before() { + let expected_data0 = "Data Expected db0"; + let dir = tempfile::tempdir().unwrap(); + + { + // We really need this env to be dropped before the read-only access. + let env = unsafe { + EnvOpenOptions::new() + .map_size(16 * 1024 * 1024 * 1024) // 10MB + .max_dbs(32) + .open(dir.path()) + .unwrap() + }; + let mut wtxn = env.write_txn().unwrap(); + let database0 = env.create_database::(&mut wtxn, Some("shared0")).unwrap(); + + wtxn.commit().unwrap(); + let mut wtxn = env.write_txn().unwrap(); + database0.put(&mut wtxn, "shared0", expected_data0).unwrap(); + wtxn.commit().unwrap(); + // We also really need that no other env reside in memory in other thread doing tests. + env.prepare_for_closing().wait(); + } + + { + // Open now we do a read-only opening + let env = unsafe { + EnvOpenOptions::new() + .map_size(16 * 1024 * 1024 * 1024) // 10MB + .max_dbs(32) + .open(dir.path()) + .unwrap() + }; + let database0 = { + let rtxn = env.read_txn().unwrap(); + let database0 = + env.open_database::(&rtxn, Some("shared0")).unwrap().unwrap(); + // This commit is mandatory if not committed you might get + // Io(Os { code: 22, kind: InvalidInput, message: "Invalid argument" }) + rtxn.commit().unwrap(); + database0 + }; + + { + // If we didn't committed the opening it might fail with EINVAL. + let rtxn = env.read_txn().unwrap(); + let value = database0.get(&rtxn, "shared0").unwrap().unwrap(); + assert_eq!(value, expected_data0); + } + + env.prepare_for_closing().wait(); + } + + // To avoid reintroducing the bug let's try to open again but without the commit + { + // Open now we do a read-only opening + let env = unsafe { + EnvOpenOptions::new() + .map_size(16 * 1024 * 1024 * 1024) // 10MB + .max_dbs(32) + .open(dir.path()) + .unwrap() + }; + let database0 = { + let rtxn = env.read_txn().unwrap(); + let database0 = + env.open_database::(&rtxn, Some("shared0")).unwrap().unwrap(); + // No commit it's important, dropping explicitly + drop(rtxn); + database0 + }; + + { + // We didn't committed the opening we will get EINVAL. + let rtxn = env.read_txn().unwrap(); + // The dbg!() is intentional in case of a change in rust-std or in lmdb related + // to the windows error. + let err = dbg!(database0.get(&rtxn, "shared0")); + + // The error kind is still ErrorKind Uncategorized on windows. + // Behind it's a ERROR_BAD_COMMAND code 22 like EINVAL. + if cfg!(windows) { + assert!(err.is_err()); + } else { + assert!( + matches!(err, Err(Error::Io(ref e)) if e.kind() == ErrorKind::InvalidInput) + ); + } + } + + env.prepare_for_closing().wait(); + } + } + + #[test] + fn max_key_size() { + let dir = tempfile::tempdir().unwrap(); + let env = unsafe { EnvOpenOptions::new().open(dir.path().join(dir.path())).unwrap() }; + let maxkeysize = env.max_key_size(); + + eprintln!("maxkeysize: {}", maxkeysize); + + if cfg!(feature = "longer-keys") { + // Should be larger than the default of 511 + assert!(maxkeysize > 511); + } else { + // Should be the default of 511 + assert_eq!(maxkeysize, 511); + } } } diff --git a/heed/src/envs/env_open_options.rs b/heed/src/envs/env_open_options.rs index cbf0e500..f4c7c241 100644 --- a/heed/src/envs/env_open_options.rs +++ b/heed/src/envs/env_open_options.rs @@ -6,10 +6,12 @@ use std::io::ErrorKind::NotFound; use std::os::unix::ffi::OsStrExt; use std::path::Path; use std::ptr::NonNull; +use std::sync::Arc; use std::{io, ptr}; #[cfg(master3)] use aead::{generic_array::typenum::Unsigned, AeadCore, AeadMutInPlace, Key, KeyInit}; +use synchronoise::SignalEvent; #[cfg(master3)] use super::encrypted_env::{encrypt_func_wrapper, EncryptedEnv}; @@ -297,7 +299,7 @@ impl EnvOpenOptions { Ok(path) => path, }; - if lock.contains(&path) { + if lock.contains_key(&path) { Err(Error::EnvAlreadyOpened) } else { let path_str = CString::new(path.as_os_str().as_bytes()).unwrap(); @@ -350,9 +352,10 @@ impl EnvOpenOptions { match mdb_result(result) { Ok(()) => { let env_ptr = NonNull::new(env).unwrap(); - let inserted = lock.insert(path.clone()); - debug_assert!(inserted); - Ok(Env::new(env_ptr, path)) + let signal_event = Arc::new(SignalEvent::manual(false)); + let inserted = lock.insert(path.clone(), signal_event.clone()); + debug_assert!(inserted.is_none()); + Ok(Env::new(env_ptr, path, signal_event)) } Err(e) => { ffi::mdb_env_close(env); diff --git a/heed/src/envs/mod.rs b/heed/src/envs/mod.rs index 3454099a..de2f3c62 100644 --- a/heed/src/envs/mod.rs +++ b/heed/src/envs/mod.rs @@ -1,5 +1,5 @@ use std::cmp::Ordering; -use std::collections::HashSet; +use std::collections::HashMap; use std::ffi::c_void; use std::fs::{File, Metadata}; #[cfg(unix)] @@ -33,7 +33,14 @@ pub use env_open_options::EnvOpenOptions; /// Records the current list of opened environments for tracking purposes. The canonical /// path of an environment is removed when either an `Env` or `EncryptedEnv` is closed. -static OPENED_ENV: LazyLock>> = LazyLock::new(RwLock::default); +static OPENED_ENV: LazyLock>>> = + LazyLock::new(RwLock::default); + +/// Returns a struct that allows to wait for the effective closing of an environment. +pub fn env_closing_event>(path: P) -> Option { + let lock = OPENED_ENV.read().unwrap(); + lock.get(path.as_ref()).map(|signal_event| EnvClosingEvent(signal_event.clone())) +} /// Contains information about the environment. #[derive(Debug, Clone, Copy)] diff --git a/heed/src/lib.rs b/heed/src/lib.rs index 0ce180e7..355c0475 100644 --- a/heed/src/lib.rs +++ b/heed/src/lib.rs @@ -85,8 +85,8 @@ pub use self::databases::{EncryptedDatabase, EncryptedDatabaseOpenOptions}; #[cfg(master3)] pub use self::envs::EncryptedEnv; pub use self::envs::{ - CompactionOption, DefaultComparator, Env, EnvClosingEvent, EnvInfo, EnvOpenOptions, - FlagSetMode, IntegerComparator, + env_closing_event, CompactionOption, DefaultComparator, Env, EnvClosingEvent, EnvInfo, + EnvOpenOptions, FlagSetMode, IntegerComparator, }; pub use self::iterator::{ RoIter, RoPrefix, RoRange, RoRevIter, RoRevPrefix, RoRevRange, RwIter, RwPrefix, RwRange,