From ba7fb44cde6458485e4d09cbee76805693e669e0 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Fri, 26 Jul 2019 17:25:25 +0200 Subject: [PATCH 01/12] add checksum check in ManagedDirectory fix #400 --- Cargo.toml | 1 + src/core/index.rs | 8 ++- src/directory/managed_directory.rs | 83 ++++++++++++++++++++++++++++-- 3 files changed, 87 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c9d246d876..2fcb7055f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ edition = "2018" [dependencies] base64 = "0.10.0" byteorder = "1.0" +crc32fast = "1.2.0" once_cell = "1.0" regex = "1.0" tantivy-fst = "0.1" diff --git a/src/core/index.rs b/src/core/index.rs index 50d184df2d..ac0bea361a 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -26,9 +26,10 @@ use crate::IndexWriter; use crate::Result; use num_cpus; use std::borrow::BorrowMut; +use std::collections::HashSet; use std::fmt; #[cfg(feature = "mmap")] -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; fn load_metas(directory: &dyn Directory, inventory: &SegmentMetaInventory) -> Result { @@ -368,6 +369,11 @@ impl Index { .map(SegmentMeta::id) .collect()) } + + /// Returns the set of corrupted files + pub fn validate_checksum(&self) -> Result> { + self.directory.list_damaged().map_err(Into::into) + } } impl fmt::Debug for Index { diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 859e66d516..49aac03250 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -8,6 +8,7 @@ use crate::directory::{WatchCallback, WatchHandle}; use crate::error::DataCorruption; use crate::Directory; use crate::Result; +use crc32fast::Hasher; use serde_json; use std::collections::HashSet; use std::io; @@ -17,6 +18,8 @@ use std::result; use std::sync::RwLockWriteGuard; use std::sync::{Arc, RwLock}; +const CRC_SIZE: usize = 4; + /// Returns true iff the file is "managed". /// Non-managed file are not subject to garbage collection. /// @@ -207,26 +210,98 @@ impl ManagedDirectory { } Ok(()) } + + /// Verify checksum of a managed file + pub fn validate_checksum(&self, path: &Path) -> result::Result { + let reader = self.directory.open_read(path)?; + let data = reader.as_slice(); + let len = data.len() - CRC_SIZE; + let mut hasher = Hasher::new(); + hasher.update(&data[..len]); + let crc = hasher.finalize().to_be_bytes(); + Ok(data[len..]==crc) + } + + /// List files for which checksum does not match content + pub fn list_damaged(&self) -> result::Result, OpenReadError> { + let mut hashset = HashSet::new(); + let meta_informations_rlock = self + .meta_informations + .read() + .expect("Managed directory rlock poisoned in list damaged."); + + println!("list_of_segments={:?}", meta_informations_rlock.managed_paths); + for path in &meta_informations_rlock.managed_paths { + if !self.validate_checksum(path)? { + hashset.insert(path.clone()); + } + } + Ok(hashset) + } +} + +struct CrcProxy { + hasher: Option, + writer: W, +} + +impl CrcProxy { + fn new(writer: W) -> Self { + CrcProxy { + hasher: Some(Hasher::new()), + writer + } + } +} + +impl Drop for CrcProxy { + fn drop(&mut self) { + // this should probably be logged + let _ = self.writer.write_all(&self.hasher.take().unwrap().finalize().to_be_bytes()); + } } +impl Write for CrcProxy { + fn write(&mut self, buf: &[u8]) -> io::Result { + let count = self.writer.write(buf)?; + self.hasher.as_mut().unwrap().update(&buf[..count]); + Ok(count) + } + + fn flush(&mut self) -> io::Result<()> { + self.writer.flush() + } +} + + impl Directory for ManagedDirectory { fn open_read(&self, path: &Path) -> result::Result { - self.directory.open_read(path) + self.directory.open_read(path).map(|ros| ros.slice_to(ros.as_slice().len() - CRC_SIZE)) } fn open_write(&mut self, path: &Path) -> result::Result { self.register_file_as_managed(path) .map_err(|e| IOError::with_path(path.to_owned(), e))?; - self.directory.open_write(path) + Ok(io::BufWriter::new(Box::new( + CrcProxy::new(self.directory.open_write(path)?.into_inner().map_err(|_|()).expect("buffer is empty")) + ))) } fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { self.register_file_as_managed(path)?; - self.directory.atomic_write(path, data) + let mut with_crc = Vec::with_capacity(data.len() + CRC_SIZE); + with_crc.extend_from_slice(&data); + let mut hasher = Hasher::new(); + hasher.update(&data); + let crc = hasher.finalize().to_be_bytes(); + with_crc.extend_from_slice(&crc); + self.directory.atomic_write(path, &with_crc) } fn atomic_read(&self, path: &Path) -> result::Result, OpenReadError> { - self.directory.atomic_read(path) + let mut vec = self.directory.atomic_read(path)?; + vec.resize(vec.len() - CRC_SIZE, 0); + Ok(vec) } fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { From f8587931fb21408f127963e26ce97ee255bffed1 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Fri, 26 Jul 2019 19:36:38 +0200 Subject: [PATCH 02/12] flush after writing checksum --- src/directory/managed_directory.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 49aac03250..69178bd46d 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -258,6 +258,7 @@ impl Drop for CrcProxy { fn drop(&mut self) { // this should probably be logged let _ = self.writer.write_all(&self.hasher.take().unwrap().finalize().to_be_bytes()); + let _ = self.writer.flush(); } } From fa7151c9d58111acab9e6162dcad85ee46a836b6 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Sat, 27 Jul 2019 11:40:25 +0200 Subject: [PATCH 03/12] don't checksum atomic file access and clone managed_paths --- src/directory/managed_directory.rs | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 69178bd46d..f16244c495 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -225,15 +225,16 @@ impl ManagedDirectory { /// List files for which checksum does not match content pub fn list_damaged(&self) -> result::Result, OpenReadError> { let mut hashset = HashSet::new(); - let meta_informations_rlock = self + let managed_paths = self .meta_informations .read() - .expect("Managed directory rlock poisoned in list damaged."); + .expect("Managed directory rlock poisoned in list damaged.") + .managed_paths + .clone(); - println!("list_of_segments={:?}", meta_informations_rlock.managed_paths); - for path in &meta_informations_rlock.managed_paths { - if !self.validate_checksum(path)? { - hashset.insert(path.clone()); + for path in managed_paths.into_iter() { + if !self.validate_checksum(&path)? { + hashset.insert(path); } } Ok(hashset) @@ -290,19 +291,11 @@ impl Directory for ManagedDirectory { fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { self.register_file_as_managed(path)?; - let mut with_crc = Vec::with_capacity(data.len() + CRC_SIZE); - with_crc.extend_from_slice(&data); - let mut hasher = Hasher::new(); - hasher.update(&data); - let crc = hasher.finalize().to_be_bytes(); - with_crc.extend_from_slice(&crc); - self.directory.atomic_write(path, &with_crc) + self.directory.atomic_write(path, data) } fn atomic_read(&self, path: &Path) -> result::Result, OpenReadError> { - let mut vec = self.directory.atomic_read(path)?; - vec.resize(vec.len() - CRC_SIZE, 0); - Ok(vec) + self.directory.atomic_read(path) } fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { From a4c47aa4eb52394f96e7bd8e7fc128eaf84f180c Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Mon, 29 Jul 2019 13:23:04 +0200 Subject: [PATCH 04/12] implement a footer storing metadata about a file this is more of a poc, it require some refactoring into multiple files `terminate(self)` is implemented, but not used anywhere yet --- src/directory/managed_directory.rs | 179 ++++++++++++++++++++++++----- src/directory/mmap_directory.rs | 4 +- src/directory/mod.rs | 15 ++- src/directory/ram_directory.rs | 4 +- 4 files changed, 170 insertions(+), 32 deletions(-) diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index f16244c495..6da46cae17 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -1,8 +1,9 @@ -use crate::core::MANAGED_FILEPATH; +use crate::core::{MANAGED_FILEPATH, META_FILEPATH}; use crate::directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError}; use crate::directory::DirectoryLock; use crate::directory::Lock; use crate::directory::META_LOCK; +use crate::directory::TerminatingWrite; use crate::directory::{ReadOnlySource, WritePtr}; use crate::directory::{WatchCallback, WatchHandle}; use crate::error::DataCorruption; @@ -18,8 +19,6 @@ use std::result; use std::sync::RwLockWriteGuard; use std::sync::{Arc, RwLock}; -const CRC_SIZE: usize = 4; - /// Returns true iff the file is "managed". /// Non-managed file are not subject to garbage collection. /// @@ -31,6 +30,13 @@ fn is_managed(path: &Path) -> bool { .unwrap_or(true) } +/// Returns true iff the file require a footer. +fn require_footer(path: &Path) -> bool { + is_managed(path) & + ![*MANAGED_FILEPATH, *META_FILEPATH].contains(&path) +} + + /// Wrapper of directories that keeps track of files created by Tantivy. /// /// A managed directory is just a wrapper of a directory @@ -215,11 +221,12 @@ impl ManagedDirectory { pub fn validate_checksum(&self, path: &Path) -> result::Result { let reader = self.directory.open_read(path)?; let data = reader.as_slice(); - let len = data.len() - CRC_SIZE; + let footer = Footer::from_bytes(data); + let data = &data[..data.len() - footer.size() as usize]; let mut hasher = Hasher::new(); - hasher.update(&data[..len]); - let crc = hasher.finalize().to_be_bytes(); - Ok(data[len..]==crc) + hasher.update(data); + let crc = hasher.finalize(); + Ok(crc == footer.crc()) } /// List files for which checksum does not match content @@ -241,32 +248,114 @@ impl ManagedDirectory { } } -struct CrcProxy { - hasher: Option, - writer: W, +#[derive(Debug, Copy, Clone)] +pub enum Footer { + V0(V0) } -impl CrcProxy { - fn new(writer: W) -> Self { - CrcProxy { - hasher: Some(Hasher::new()), - writer +impl Footer { + pub fn to_bytes(&self) -> Vec { + match self { + Footer::V0(f) => f.to_bytes() + } + } + + pub fn from_bytes(data: &[u8]) -> Self { + let len = data.len(); + assert!(len >= 2); + let size = u16::from_be_bytes([data[len - 2], data[len - 1]]); + assert_eq!(size, 10); + let footer = &data[len - size as usize..]; + let index_version = footer[0]; + match index_version { + 0 => Footer::V0(V0::from_bytes(data)), + _ => panic!("unsuported index_version") + } + } + + + pub fn index_version(&self) -> u8 { + match self { + Footer::V0(_) => 0, + } + } + + pub fn tantivy_version(&self) -> (u8, u8, u8) { + match self { + Footer::V0(f) => f.tantivy_version, + } + } + + pub fn crc(&self) -> u32 { + match self { + Footer::V0(f) => f.crc, + } + } + + pub fn size(&self) -> usize { + match self { + Footer::V0(_) => V0::size() as usize, } } } -impl Drop for CrcProxy { - fn drop(&mut self) { - // this should probably be logged - let _ = self.writer.write_all(&self.hasher.take().unwrap().finalize().to_be_bytes()); - let _ = self.writer.flush(); +#[derive(Debug, Copy, Clone)] +pub struct V0 { + pub tantivy_version: (u8,u8,u8), + pub crc: u32, +} + +impl V0 { + pub fn to_bytes(&self) -> Vec { + let crc = self.crc.to_be_bytes(); + let size = Self::size().to_be_bytes(); + let mut res = vec![0, self.tantivy_version.0, self.tantivy_version.1, self.tantivy_version.2]; + res.extend_from_slice(&crc); + res.extend_from_slice(&size); + res + } + + pub fn from_bytes(footer: &[u8]) -> Self { + assert_eq!(footer[0], 0); + assert_eq!(u16::from_be_bytes([footer[8], footer[9]]), 10); + let tantivy_version = (footer[1], footer[2], footer[3]); + let crc = u32::from_be_bytes([footer[4], footer[5], footer[6], footer[7]]); + + V0 { + tantivy_version, + crc + } + } + + pub fn from_crc(crc: u32) -> Self { + Self { + tantivy_version: (0,10,0), + crc + } + } + pub fn size() -> u16 { + 10 } } -impl Write for CrcProxy { +struct FooterProxy { + hasher: Hasher, + writer: W, +} + +impl FooterProxy { + fn new(writer: W) -> Self { + FooterProxy { + hasher: Hasher::new(), + writer + } + } +} + +impl Write for FooterProxy { fn write(&mut self, buf: &[u8]) -> io::Result { let count = self.writer.write(buf)?; - self.hasher.as_mut().unwrap().update(&buf[..count]); + self.hasher.update(&buf[..count]); Ok(count) } @@ -275,27 +364,61 @@ impl Write for CrcProxy { } } +impl TerminatingWrite for FooterProxy { + fn terminate(mut self) -> io::Result<()> { + let crc = self.hasher.finalize(); + let footer = V0::from_crc(crc).to_bytes(); + self.writer.write_all(&footer)?; + self.writer.flush()?;//should we assuse calling terminate on inner already flush? + self.writer.terminate() + } +} impl Directory for ManagedDirectory { fn open_read(&self, path: &Path) -> result::Result { - self.directory.open_read(path).map(|ros| ros.slice_to(ros.as_slice().len() - CRC_SIZE)) + let ros = self.directory.open_read(path)?; + if require_footer(path) { + let footer_size = Footer::from_bytes(ros.as_slice()).size(); + Ok(ros.slice_to(ros.as_slice().len() - footer_size)) + } else { + Ok(ros) + } } fn open_write(&mut self, path: &Path) -> result::Result { self.register_file_as_managed(path) .map_err(|e| IOError::with_path(path.to_owned(), e))?; - Ok(io::BufWriter::new(Box::new( - CrcProxy::new(self.directory.open_write(path)?.into_inner().map_err(|_|()).expect("buffer is empty")) - ))) + if require_footer(path) { + Ok(io::BufWriter::new(Box::new( + FooterProxy::new(self.directory.open_write(path)?.into_inner().map_err(|_|()).expect("buffer should be empty")) + ))) + } else { + self.directory.open_write(path) + } } fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { self.register_file_as_managed(path)?; - self.directory.atomic_write(path, data) + if require_footer(path) { + let mut with_footer = Vec::with_capacity(data.len() + V0::size() as usize); + with_footer.extend_from_slice(&data); + let mut hasher = Hasher::new(); + hasher.update(&data); + let crc = hasher.finalize(); + with_footer.append(&mut V0::from_crc(crc).to_bytes()); + self.directory.atomic_write(path, &with_footer) + } else { + self.directory.atomic_write(path, &data) + } } fn atomic_read(&self, path: &Path) -> result::Result, OpenReadError> { - self.directory.atomic_read(path) + let mut vec = self.directory.atomic_read(path)?; + if require_footer(path) { + let footer_size = Footer::from_bytes(&vec).size(); + vec.resize(vec.len() - footer_size, 0); + } + Ok(vec) } fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 4b1bc03554..bcaf0b4e8b 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -18,7 +18,7 @@ use crate::directory::ReadOnlySource; use crate::directory::WatchCallback; use crate::directory::WatchCallbackList; use crate::directory::WatchHandle; -use crate::directory::WritePtr; +use crate::directory::{TerminatingWrite, WritePtr}; use atomicwrites; use memmap::Mmap; use std::collections::HashMap; @@ -412,6 +412,8 @@ impl Seek for SafeFileWriter { } } +impl TerminatingWrite for SafeFileWriter {} + impl Directory for MmapDirectory { fn open_read(&self, path: &Path) -> result::Result { debug!("Open Read {:?}", path); diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 70fa013487..379082853f 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -24,18 +24,29 @@ pub use self::ram_directory::RAMDirectory; pub use self::read_only_source::ReadOnlySource; pub(crate) use self::watch_event_router::WatchCallbackList; pub use self::watch_event_router::{WatchCallback, WatchHandle}; -use std::io::{BufWriter, Write}; +use std::io::{self, BufWriter, Write}; #[cfg(feature = "mmap")] pub use self::mmap_directory::MmapDirectory; pub use self::managed_directory::ManagedDirectory; + +/// Trait used to indicate when no more write need to be done on a writer +pub trait TerminatingWrite: Write { + /// Indicate that the writer will no longer be used + fn terminate(mut self) -> io::Result<()> where Self: Sized { + self.flush() + } +} + +impl TerminatingWrite for Box {} + /// Write object for Directory. /// /// `WritePtr` are required to implement both Write /// and Seek. -pub type WritePtr = BufWriter>; +pub type WritePtr = BufWriter>; #[cfg(test)] mod tests; diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 8c6d237dcd..66bd28c054 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -1,7 +1,7 @@ use crate::core::META_FILEPATH; use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError}; use crate::directory::WatchCallbackList; -use crate::directory::WritePtr; +use crate::directory::{TerminatingWrite, WritePtr}; use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle}; use fail::fail_point; use std::collections::HashMap; @@ -71,6 +71,8 @@ impl Write for VecWriter { } } +impl TerminatingWrite for VecWriter {} + #[derive(Default)] struct InnerDirectory { fs: HashMap, From 60fe97d43653da1a5b1d2578632c6124b1fd5332 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 3 Sep 2019 12:51:00 +0200 Subject: [PATCH 05/12] address comments and simplify things with new contract use BitOrder for integer to raw byte conversion consider atomic write imply atomic read, which might not actually be true use some indirection to have a boxable terminating writer --- src/directory/managed_directory.rs | 102 +++++++++++------------------ src/directory/mod.rs | 15 ++++- 2 files changed, 53 insertions(+), 64 deletions(-) diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 6da46cae17..a8e5d08421 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -1,14 +1,15 @@ -use crate::core::{MANAGED_FILEPATH, META_FILEPATH}; +use crate::core::MANAGED_FILEPATH; use crate::directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError}; use crate::directory::DirectoryLock; use crate::directory::Lock; use crate::directory::META_LOCK; -use crate::directory::TerminatingWrite; +use crate::directory::{AntiCallToken, TerminatingWrite}; use crate::directory::{ReadOnlySource, WritePtr}; use crate::directory::{WatchCallback, WatchHandle}; use crate::error::DataCorruption; use crate::Directory; use crate::Result; +use byteorder::{ByteOrder, LittleEndian}; use crc32fast::Hasher; use serde_json; use std::collections::HashSet; @@ -30,13 +31,6 @@ fn is_managed(path: &Path) -> bool { .unwrap_or(true) } -/// Returns true iff the file require a footer. -fn require_footer(path: &Path) -> bool { - is_managed(path) & - ![*MANAGED_FILEPATH, *META_FILEPATH].contains(&path) -} - - /// Wrapper of directories that keeps track of files created by Tantivy. /// /// A managed directory is just a wrapper of a directory @@ -263,7 +257,7 @@ impl Footer { pub fn from_bytes(data: &[u8]) -> Self { let len = data.len(); assert!(len >= 2); - let size = u16::from_be_bytes([data[len - 2], data[len - 1]]); + let size = LittleEndian::read_u16(&data[len - 2..]); assert_eq!(size, 10); let footer = &data[len - size as usize..]; let index_version = footer[0]; @@ -282,13 +276,13 @@ impl Footer { pub fn tantivy_version(&self) -> (u8, u8, u8) { match self { - Footer::V0(f) => f.tantivy_version, + Footer::V0(footer) => footer.tantivy_version, } } pub fn crc(&self) -> u32 { match self { - Footer::V0(f) => f.crc, + Footer::V0(footer) => footer.crc, } } @@ -307,19 +301,21 @@ pub struct V0 { impl V0 { pub fn to_bytes(&self) -> Vec { - let crc = self.crc.to_be_bytes(); - let size = Self::size().to_be_bytes(); - let mut res = vec![0, self.tantivy_version.0, self.tantivy_version.1, self.tantivy_version.2]; - res.extend_from_slice(&crc); - res.extend_from_slice(&size); + let mut res = vec![0; 10]; + res[0] = 0; + res[1] = self.tantivy_version.0; + res[2] = self.tantivy_version.1; + res[3] = self.tantivy_version.2; + LittleEndian::write_u32(&mut res[4..8], self.crc); + LittleEndian::write_u16(&mut res[8..], Self::size()); res } pub fn from_bytes(footer: &[u8]) -> Self { assert_eq!(footer[0], 0); - assert_eq!(u16::from_be_bytes([footer[8], footer[9]]), 10); + assert_eq!(LittleEndian::read_u16(&footer[8..10]), 10); let tantivy_version = (footer[1], footer[2], footer[3]); - let crc = u32::from_be_bytes([footer[4], footer[5], footer[6], footer[7]]); + let crc = LittleEndian::read_u32(&footer[4..8]); V0 { tantivy_version, @@ -339,86 +335,68 @@ impl V0 { } struct FooterProxy { - hasher: Hasher, - writer: W, + /// always Some except after terminate call + hasher: Option, + /// always Some except after terminate call + writer: Option, } impl FooterProxy { fn new(writer: W) -> Self { FooterProxy { - hasher: Hasher::new(), - writer + hasher: Some(Hasher::new()), + writer: Some(writer) } } } impl Write for FooterProxy { fn write(&mut self, buf: &[u8]) -> io::Result { - let count = self.writer.write(buf)?; - self.hasher.update(&buf[..count]); + let count = self.writer.as_mut().unwrap().write(buf)?; + self.hasher.as_mut().unwrap().update(&buf[..count]); Ok(count) } fn flush(&mut self) -> io::Result<()> { - self.writer.flush() + self.writer.as_mut().unwrap().flush() } } impl TerminatingWrite for FooterProxy { - fn terminate(mut self) -> io::Result<()> { - let crc = self.hasher.finalize(); + fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { + let crc = self.hasher.take().unwrap().finalize(); let footer = V0::from_crc(crc).to_bytes(); - self.writer.write_all(&footer)?; - self.writer.flush()?;//should we assuse calling terminate on inner already flush? - self.writer.terminate() + let mut writer = self.writer.take().unwrap(); + writer.write_all(&footer)?; + writer.flush()?;//should we assume calling terminate on inner already flush? + writer.terminate() } } impl Directory for ManagedDirectory { fn open_read(&self, path: &Path) -> result::Result { - let ros = self.directory.open_read(path)?; - if require_footer(path) { - let footer_size = Footer::from_bytes(ros.as_slice()).size(); - Ok(ros.slice_to(ros.as_slice().len() - footer_size)) - } else { - Ok(ros) - } + let read_only_source = self.directory.open_read(path)?; + let footer_size = Footer::from_bytes(read_only_source.as_slice()).size(); + Ok(read_only_source.slice_to(read_only_source.as_slice().len() - footer_size)) } fn open_write(&mut self, path: &Path) -> result::Result { self.register_file_as_managed(path) .map_err(|e| IOError::with_path(path.to_owned(), e))?; - if require_footer(path) { - Ok(io::BufWriter::new(Box::new( - FooterProxy::new(self.directory.open_write(path)?.into_inner().map_err(|_|()).expect("buffer should be empty")) - ))) - } else { - self.directory.open_write(path) - } + Ok(io::BufWriter::new(Box::new( + FooterProxy::new(self.directory.open_write(path)?.into_inner().map_err(|_|()).expect("buffer should be empty")) + ))) } + // TODO: to be correct, this require an non enforced contract, atomic writes must only be + // read back as atomics, and normal write must only be read by normal read fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { self.register_file_as_managed(path)?; - if require_footer(path) { - let mut with_footer = Vec::with_capacity(data.len() + V0::size() as usize); - with_footer.extend_from_slice(&data); - let mut hasher = Hasher::new(); - hasher.update(&data); - let crc = hasher.finalize(); - with_footer.append(&mut V0::from_crc(crc).to_bytes()); - self.directory.atomic_write(path, &with_footer) - } else { - self.directory.atomic_write(path, &data) - } + self.directory.atomic_write(path, data) } fn atomic_read(&self, path: &Path) -> result::Result, OpenReadError> { - let mut vec = self.directory.atomic_read(path)?; - if require_footer(path) { - let footer_size = Footer::from_bytes(&vec).size(); - vec.resize(vec.len() - footer_size, 0); - } - Ok(vec) + self.directory.atomic_read(path) } fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 379082853f..6d548acaad 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -31,16 +31,27 @@ pub use self::mmap_directory::MmapDirectory; pub use self::managed_directory::ManagedDirectory; +/// Struct used to prevent from calling [`terminate_ref`](trait.TerminatingWrite#method.terminate_ref) directly +pub struct AntiCallToken(()); /// Trait used to indicate when no more write need to be done on a writer pub trait TerminatingWrite: Write { - /// Indicate that the writer will no longer be used + /// Indicate that the writer will no longer be used. Internally call terminate_ref. fn terminate(mut self) -> io::Result<()> where Self: Sized { + self.terminate_ref(AntiCallToken(())) + } + + /// You should implement this function to define custom behavior. + fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { self.flush() } } -impl TerminatingWrite for Box {} +impl TerminatingWrite for Box { + fn terminate(mut self) -> io::Result<()> { + self.as_mut().terminate_ref(AntiCallToken(())) + } +} /// Write object for Directory. /// From fb27a6aa5473db2cd7d5e43531e5a711c52d4188 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Wed, 4 Sep 2019 12:08:10 +0200 Subject: [PATCH 06/12] implement TerminatingWrite and make terminate() be called where it should add dependancy to drop_bomb to help find where terminate() should be called implement TerminatingWrite for wrapper writers make tests pass /!\ some tests seems to pass where they shouldn't --- Cargo.toml | 2 ++ src/common/composite_file.rs | 7 +++---- src/common/counting_writer.rs | 9 +++++++++ src/directory/managed_directory.rs | 27 ++++++++++++++++++--------- src/directory/mmap_directory.rs | 7 ++++++- src/directory/mod.rs | 15 ++++++++++----- src/directory/ram_directory.rs | 7 ++++++- src/indexer/index_writer.rs | 2 ++ src/store/writer.rs | 3 ++- tests/failpoints/mod.rs | 4 ++-- 10 files changed, 60 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2fcb7055f7..4739b837b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,8 @@ murmurhash32 = "0.2" chrono = "0.4" smallvec = "0.6" +drop_bomb = "0.1.4" + [target.'cfg(windows)'.dependencies] winapi = "0.3" diff --git a/src/common/composite_file.rs b/src/common/composite_file.rs index f2c2d22081..f2d2926f48 100644 --- a/src/common/composite_file.rs +++ b/src/common/composite_file.rs @@ -2,7 +2,7 @@ use crate::common::BinarySerializable; use crate::common::CountingWriter; use crate::common::VInt; use crate::directory::ReadOnlySource; -use crate::directory::WritePtr; +use crate::directory::{TerminatingWrite, WritePtr}; use crate::schema::Field; use crate::space_usage::FieldUsage; use crate::space_usage::PerFieldSpaceUsage; @@ -42,7 +42,7 @@ pub struct CompositeWrite { offsets: HashMap, } -impl CompositeWrite { +impl CompositeWrite { /// Crate a new API writer that writes a composite file /// in a given write. pub fn wrap(w: W) -> CompositeWrite { @@ -91,8 +91,7 @@ impl CompositeWrite { let footer_len = (self.write.written_bytes() - footer_offset) as u32; footer_len.serialize(&mut self.write)?; - self.write.flush()?; - Ok(()) + self.write.terminate() } } diff --git a/src/common/counting_writer.rs b/src/common/counting_writer.rs index 339c60becd..6981ee220c 100644 --- a/src/common/counting_writer.rs +++ b/src/common/counting_writer.rs @@ -1,5 +1,7 @@ use std::io; use std::io::Write; +use crate::directory::AntiCallToken; +use crate::directory::TerminatingWrite; pub struct CountingWriter { underlying: W, @@ -42,6 +44,13 @@ impl Write for CountingWriter { } } +impl TerminatingWrite for CountingWriter { + fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> { + self.flush()?; + self.underlying.terminate_ref(token) + } +} + #[cfg(test)] mod test { diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index a8e5d08421..a64965b66d 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -258,11 +258,11 @@ impl Footer { let len = data.len(); assert!(len >= 2); let size = LittleEndian::read_u16(&data[len - 2..]); - assert_eq!(size, 10); + assert!(len >= size as usize, "len({}) is smaller than size({})", len, size); let footer = &data[len - size as usize..]; let index_version = footer[0]; match index_version { - 0 => Footer::V0(V0::from_bytes(data)), + 0 => Footer::V0(V0::from_bytes(footer)), _ => panic!("unsuported index_version") } } @@ -325,7 +325,11 @@ impl V0 { pub fn from_crc(crc: u32) -> Self { Self { - tantivy_version: (0,10,0), + tantivy_version: ( + env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(), + env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(), + env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(), + ), crc } } @@ -339,13 +343,15 @@ struct FooterProxy { hasher: Option, /// always Some except after terminate call writer: Option, + bomb: drop_bomb::DropBomb, } impl FooterProxy { fn new(writer: W) -> Self { FooterProxy { hasher: Some(Hasher::new()), - writer: Some(writer) + writer: Some(writer), + bomb: drop_bomb::DropBomb::new("This must be terminated before dropping"), } } } @@ -364,6 +370,7 @@ impl Write for FooterProxy { impl TerminatingWrite for FooterProxy { fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { + self.bomb.defuse(); let crc = self.hasher.take().unwrap().finalize(); let footer = V0::from_crc(crc).to_bytes(); let mut writer = self.writer.take().unwrap(); @@ -429,7 +436,7 @@ impl Clone for ManagedDirectory { #[cfg(test)] mod tests_mmap_specific { - use crate::directory::{Directory, ManagedDirectory, MmapDirectory}; + use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite}; use std::collections::HashSet; use std::io::Write; use std::path::{Path, PathBuf}; @@ -445,8 +452,8 @@ mod tests_mmap_specific { { let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap(); let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap(); - let mut write_file = managed_directory.open_write(test_path1).unwrap(); - write_file.flush().unwrap(); + let write_file = managed_directory.open_write(test_path1).unwrap(); + write_file.terminate().unwrap(); managed_directory .atomic_write(test_path2, &[0u8, 1u8]) .unwrap(); @@ -480,9 +487,11 @@ mod tests_mmap_specific { let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap(); let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap(); - managed_directory - .atomic_write(test_path1, &vec![0u8, 1u8]) + let mut write = managed_directory + .open_write(test_path1) .unwrap(); + write.write_all(&[0u8, 1u8]).unwrap(); + write.terminate().unwrap(); assert!(managed_directory.exists(test_path1)); let _mmap_read = managed_directory.open_read(test_path1).unwrap(); diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index bcaf0b4e8b..fa41d66565 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -11,6 +11,7 @@ use crate::directory::error::{ DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError, }; use crate::directory::read_only_source::BoxedData; +use crate::directory::AntiCallToken; use crate::directory::Directory; use crate::directory::DirectoryLock; use crate::directory::Lock; @@ -412,7 +413,11 @@ impl Seek for SafeFileWriter { } } -impl TerminatingWrite for SafeFileWriter {} +impl TerminatingWrite for SafeFileWriter { + fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { + self.flush() + } +} impl Directory for MmapDirectory { fn open_read(&self, path: &Path) -> result::Result { diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 6d548acaad..dae50feca4 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -42,14 +42,19 @@ pub trait TerminatingWrite: Write { } /// You should implement this function to define custom behavior. - fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { - self.flush() - } + fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()>; } impl TerminatingWrite for Box { - fn terminate(mut self) -> io::Result<()> { - self.as_mut().terminate_ref(AntiCallToken(())) + fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> { + self.as_mut().terminate_ref(token) + } +} + +impl TerminatingWrite for BufWriter { + fn terminate_ref(&mut self, a: AntiCallToken) -> io::Result<()> { + self.flush()?; + self.get_mut().terminate_ref(a) } } diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 66bd28c054..ce853871af 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -1,5 +1,6 @@ use crate::core::META_FILEPATH; use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError}; +use crate::directory::AntiCallToken; use crate::directory::WatchCallbackList; use crate::directory::{TerminatingWrite, WritePtr}; use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle}; @@ -71,7 +72,11 @@ impl Write for VecWriter { } } -impl TerminatingWrite for VecWriter {} +impl TerminatingWrite for VecWriter { + fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { + self.flush() + } +} #[derive(Default)] struct InnerDirectory { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 917b90fab4..5e37817499 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -7,6 +7,7 @@ use crate::core::SegmentComponent; use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::core::SegmentReader; +use crate::directory::TerminatingWrite; use crate::directory::DirectoryLock; use crate::docset::DocSet; use crate::error::TantivyError; @@ -168,6 +169,7 @@ pub(crate) fn advance_deletes( segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp); let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; write_delete_bitset(&delete_bitset, &mut delete_file)?; + delete_file.terminate()?; } } segment_entry.set_meta(segment.meta().clone()); diff --git a/src/store/writer.rs b/src/store/writer.rs index bcb74f99da..5ddda2c7f2 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -3,6 +3,7 @@ use super::skiplist::SkipListBuilder; use super::StoreReader; use crate::common::CountingWriter; use crate::common::{BinarySerializable, VInt}; +use crate::directory::TerminatingWrite; use crate::directory::WritePtr; use crate::schema::Document; use crate::DocId; @@ -109,6 +110,6 @@ impl StoreWriter { self.offset_index_writer.write(&mut self.writer)?; header_offset.serialize(&mut self.writer)?; self.doc.serialize(&mut self.writer)?; - self.writer.flush() + self.writer.terminate() } } diff --git a/tests/failpoints/mod.rs b/tests/failpoints/mod.rs index 807ca7abc2..509e3759f9 100644 --- a/tests/failpoints/mod.rs +++ b/tests/failpoints/mod.rs @@ -1,7 +1,7 @@ use fail; use std::io::Write; use std::path::Path; -use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory}; +use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory, TerminatingWrite}; use tantivy::doc; use tantivy::schema::{Schema, TEXT}; use tantivy::{Index, Term}; @@ -17,7 +17,7 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() { managed_directory .open_write(test_path) .unwrap() - .flush() + .terminate() .unwrap(); assert!(managed_directory.exists(test_path)); // triggering gc and setting the delete operation to fail. From 1911b482d2fc6c695d892b2c016387ab36533dca Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Wed, 4 Sep 2019 13:56:43 +0200 Subject: [PATCH 07/12] remove usage of drop_bomb --- Cargo.toml | 2 -- src/directory/managed_directory.rs | 5 +---- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4739b837b0..2fcb7055f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,8 +55,6 @@ murmurhash32 = "0.2" chrono = "0.4" smallvec = "0.6" -drop_bomb = "0.1.4" - [target.'cfg(windows)'.dependencies] winapi = "0.3" diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index a64965b66d..77da3a060c 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -343,7 +343,6 @@ struct FooterProxy { hasher: Option, /// always Some except after terminate call writer: Option, - bomb: drop_bomb::DropBomb, } impl FooterProxy { @@ -351,7 +350,6 @@ impl FooterProxy { FooterProxy { hasher: Some(Hasher::new()), writer: Some(writer), - bomb: drop_bomb::DropBomb::new("This must be terminated before dropping"), } } } @@ -370,9 +368,8 @@ impl Write for FooterProxy { impl TerminatingWrite for FooterProxy { fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { - self.bomb.defuse(); let crc = self.hasher.take().unwrap().finalize(); - let footer = V0::from_crc(crc).to_bytes(); + let footer = Footer::V0(V0::from_crc(crc)).to_bytes(); let mut writer = self.writer.take().unwrap(); writer.write_all(&footer)?; writer.flush()?;//should we assume calling terminate on inner already flush? From 32baf73cc8864edf5de0be5997aa6c0930cb9ed0 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Wed, 4 Sep 2019 14:05:26 +0200 Subject: [PATCH 08/12] fmt --- src/common/counting_writer.rs | 4 +-- src/directory/managed_directory.rs | 42 +++++++++++++++++------------- src/directory/mod.rs | 5 +++- src/directory/ram_directory.rs | 2 +- src/indexer/index_writer.rs | 2 +- 5 files changed, 32 insertions(+), 23 deletions(-) diff --git a/src/common/counting_writer.rs b/src/common/counting_writer.rs index 6981ee220c..8293ba8b3b 100644 --- a/src/common/counting_writer.rs +++ b/src/common/counting_writer.rs @@ -1,7 +1,7 @@ -use std::io; -use std::io::Write; use crate::directory::AntiCallToken; use crate::directory::TerminatingWrite; +use std::io; +use std::io::Write; pub struct CountingWriter { underlying: W, diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 77da3a060c..6b7a58f8d0 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -244,13 +244,13 @@ impl ManagedDirectory { #[derive(Debug, Copy, Clone)] pub enum Footer { - V0(V0) + V0(V0), } impl Footer { pub fn to_bytes(&self) -> Vec { match self { - Footer::V0(f) => f.to_bytes() + Footer::V0(f) => f.to_bytes(), } } @@ -258,16 +258,20 @@ impl Footer { let len = data.len(); assert!(len >= 2); let size = LittleEndian::read_u16(&data[len - 2..]); - assert!(len >= size as usize, "len({}) is smaller than size({})", len, size); + assert!( + len >= size as usize, + "len({}) is smaller than size({})", + len, + size + ); let footer = &data[len - size as usize..]; let index_version = footer[0]; match index_version { 0 => Footer::V0(V0::from_bytes(footer)), - _ => panic!("unsuported index_version") + _ => panic!("unsuported index_version"), } } - pub fn index_version(&self) -> u8 { match self { Footer::V0(_) => 0, @@ -295,7 +299,7 @@ impl Footer { #[derive(Debug, Copy, Clone)] pub struct V0 { - pub tantivy_version: (u8,u8,u8), + pub tantivy_version: (u8, u8, u8), pub crc: u32, } @@ -319,18 +323,18 @@ impl V0 { V0 { tantivy_version, - crc + crc, } } pub fn from_crc(crc: u32) -> Self { Self { tantivy_version: ( - env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(), - env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(), - env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(), + env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(), + env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(), + env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(), ), - crc + crc, } } pub fn size() -> u16 { @@ -372,7 +376,7 @@ impl TerminatingWrite for FooterProxy { let footer = Footer::V0(V0::from_crc(crc)).to_bytes(); let mut writer = self.writer.take().unwrap(); writer.write_all(&footer)?; - writer.flush()?;//should we assume calling terminate on inner already flush? + writer.flush()?; //should we assume calling terminate on inner already flush? writer.terminate() } } @@ -387,9 +391,13 @@ impl Directory for ManagedDirectory { fn open_write(&mut self, path: &Path) -> result::Result { self.register_file_as_managed(path) .map_err(|e| IOError::with_path(path.to_owned(), e))?; - Ok(io::BufWriter::new(Box::new( - FooterProxy::new(self.directory.open_write(path)?.into_inner().map_err(|_|()).expect("buffer should be empty")) - ))) + Ok(io::BufWriter::new(Box::new(FooterProxy::new( + self.directory + .open_write(path)? + .into_inner() + .map_err(|_| ()) + .expect("buffer should be empty"), + )))) } // TODO: to be correct, this require an non enforced contract, atomic writes must only be @@ -484,9 +492,7 @@ mod tests_mmap_specific { let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap(); let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap(); - let mut write = managed_directory - .open_write(test_path1) - .unwrap(); + let mut write = managed_directory.open_write(test_path1).unwrap(); write.write_all(&[0u8, 1u8]).unwrap(); write.terminate().unwrap(); assert!(managed_directory.exists(test_path1)); diff --git a/src/directory/mod.rs b/src/directory/mod.rs index dae50feca4..d8f4fd618b 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -37,7 +37,10 @@ pub struct AntiCallToken(()); /// Trait used to indicate when no more write need to be done on a writer pub trait TerminatingWrite: Write { /// Indicate that the writer will no longer be used. Internally call terminate_ref. - fn terminate(mut self) -> io::Result<()> where Self: Sized { + fn terminate(mut self) -> io::Result<()> + where + Self: Sized, + { self.terminate_ref(AntiCallToken(())) } diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index ce853871af..db19f98111 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -2,8 +2,8 @@ use crate::core::META_FILEPATH; use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError}; use crate::directory::AntiCallToken; use crate::directory::WatchCallbackList; -use crate::directory::{TerminatingWrite, WritePtr}; use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle}; +use crate::directory::{TerminatingWrite, WritePtr}; use fail::fail_point; use std::collections::HashMap; use std::fmt; diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 5e37817499..bf5c4365d8 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -7,8 +7,8 @@ use crate::core::SegmentComponent; use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::core::SegmentReader; -use crate::directory::TerminatingWrite; use crate::directory::DirectoryLock; +use crate::directory::TerminatingWrite; use crate::docset::DocSet; use crate::error::TantivyError; use crate::fastfield::write_delete_bitset; From ce6ff4bacc84025c6d50cfb6f2ed4f4ed8b9a755 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Wed, 4 Sep 2019 15:55:45 +0200 Subject: [PATCH 09/12] add test for checksum --- src/directory/managed_directory.rs | 35 ++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 6b7a58f8d0..bb7203253e 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -443,6 +443,7 @@ mod tests_mmap_specific { use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite}; use std::collections::HashSet; + use std::fs::OpenOptions; use std::io::Write; use std::path::{Path, PathBuf}; use tempfile::TempDir; @@ -513,4 +514,38 @@ mod tests_mmap_specific { } } + #[test] + fn test_checksum() { + let test_path1: &'static Path = Path::new("some_path_for_test"); + let test_path2: &'static Path = Path::new("other_test_path"); + + let tempdir = TempDir::new().unwrap(); + let tempdir_path = PathBuf::from(tempdir.path()); + + let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap(); + let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap(); + let mut write = managed_directory.open_write(test_path1).unwrap(); + write.write_all(&[0u8, 1u8]).unwrap(); + write.terminate().unwrap(); + + let mut write = managed_directory.open_write(test_path2).unwrap(); + write.write_all(&[3u8, 4u8, 5u8]).unwrap(); + write.terminate().unwrap(); + + assert!(managed_directory.list_damaged().unwrap().is_empty()); + + let mut corrupted_path = tempdir_path.clone(); + corrupted_path.push(test_path2); + let mut file = OpenOptions::new() + .write(true) + .open(&corrupted_path) + .unwrap(); + file.write_all(&[255u8]).unwrap(); + file.flush().unwrap(); + drop(file); + + let damaged = managed_directory.list_damaged().unwrap(); + assert_eq!(damaged.len(), 1); + assert!(damaged.contains(test_path2)); + } } From 2fb61c47c35ac6817e09d13f0f22c7723a160103 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 17 Sep 2019 14:56:24 +0200 Subject: [PATCH 10/12] address some review comments --- src/directory/directory.rs | 4 + src/directory/footer.rs | 210 +++++++++++++++++++++++++++++ src/directory/managed_directory.rs | 156 ++------------------- src/directory/mod.rs | 2 + 4 files changed, 224 insertions(+), 148 deletions(-) create mode 100644 src/directory/footer.rs diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 9da1cb2177..6b50925423 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -118,6 +118,8 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { /// /// Specifically, subsequent writes or flushes should /// have no effect on the returned `ReadOnlySource` object. + /// + /// You should only use this to read files create with [`open_write`] fn open_read(&self, path: &Path) -> result::Result; /// Removes a file @@ -157,6 +159,8 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { /// atomic_write. /// /// This should only be used for small files. + /// + /// You should only use this to read files create with [`atomic_write`] fn atomic_read(&self, path: &Path) -> Result, OpenReadError>; /// Atomically replace the content of a file with data. diff --git a/src/directory/footer.rs b/src/directory/footer.rs new file mode 100644 index 0000000000..909322ff1c --- /dev/null +++ b/src/directory/footer.rs @@ -0,0 +1,210 @@ +use crate::directory::{AntiCallToken, TerminatingWrite}; +use crate::directory::read_only_source::ReadOnlySource; +use byteorder::{ByteOrder, LittleEndian}; +use crc32fast::Hasher; +use std::io; +use std::io::Write; + +const COMMON_FOOTER_SIZE: usize = 4*5; + +#[derive(Debug, Clone, PartialEq)] +pub struct Footer { + pub tantivy_version: (u32, u32, u32), + pub meta: String, + pub versioned_footer: VersionedFooter +} + +impl Footer { + pub fn new(versioned_footer: VersionedFooter) -> Self { + let tantivy_version = ( + env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(), + env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(), + env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(), + ); + Footer { + tantivy_version, + meta: format!("tantivy {}.{}.{}, index v{}", + tantivy_version.0, + tantivy_version.1, + tantivy_version.2, + versioned_footer.version()), + versioned_footer, + } + } + + pub fn to_bytes(&self) -> Vec { + let mut res = self.versioned_footer.to_bytes(); + res.extend_from_slice(self.meta.as_bytes()); + let len = res.len(); + res.resize(len + COMMON_FOOTER_SIZE, 0); + let mut common_footer = &mut res[len..]; + LittleEndian::write_u32(&mut common_footer, self.meta.len() as u32); + LittleEndian::write_u32(&mut common_footer[4..], self.tantivy_version.0); + LittleEndian::write_u32(&mut common_footer[8..], self.tantivy_version.1); + LittleEndian::write_u32(&mut common_footer[12..], self.tantivy_version.2); + LittleEndian::write_u32(&mut common_footer[16..], (len + COMMON_FOOTER_SIZE) as u32); + res + } + + pub fn from_bytes(data: &[u8]) -> Result { + let len = data.len(); + if len < COMMON_FOOTER_SIZE + 4 { // 4 bytes for index version, stored in versioned footer + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!("File corrupted. The footer len must be over 24, while the entire file len is {}", len) + ) + ) + } + + let size = LittleEndian::read_u32(&data[len - 4..]) as usize; + if len < size as usize { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!("File corrupted. The footer len is {}, while the entire file len is {}", size, len) + ) + ) + } + let footer = &data[len - size as usize..]; + let meta_len = LittleEndian::read_u32(&footer[size-20..]) as usize; + let tantivy_major = LittleEndian::read_u32(&footer[size-16..]); + let tantivy_minor = LittleEndian::read_u32(&footer[size-12..]); + let tantivy_patch = LittleEndian::read_u32(&footer[size-8..]); + Ok(Footer { + tantivy_version: (tantivy_major, tantivy_minor, tantivy_patch), + meta: String::from_utf8_lossy(&footer[size - meta_len - 20..size - 20]).into_owned(), + versioned_footer: VersionedFooter::from_bytes(&footer[..size - meta_len - 20])? + }) + } + + pub fn extract_footer(source: ReadOnlySource) -> Result<(Footer, ReadOnlySource), io::Error> { + let footer = Footer::from_bytes(source.as_slice())?; + let reader = source.slice_to(source.as_slice().len() - footer.size()); + Ok((footer, reader)) + } + + pub fn size(&self) -> usize { + self.versioned_footer.size() as usize + self.meta.len() + 20 + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum VersionedFooter { + UnknownVersion{ + version: u32, + size: u32 + }, + V0(u32), // crc +} + +impl VersionedFooter { + pub fn to_bytes(&self) -> Vec { + match self { + Self::V0(crc) => { + let mut res = vec![0; 8]; + LittleEndian::write_u32(&mut res, 0); + LittleEndian::write_u32(&mut res[4..], *crc); + res + } + Self::UnknownVersion{..} => { + panic!("Unsupported index should never get serialized"); + } + } + } + + pub fn from_bytes(footer: &[u8]) -> Result { + assert!(footer.len() >= 4); + let version = LittleEndian::read_u32(footer); + match version { + 0 => { + if footer.len() == 8 { + Ok(Self::V0(LittleEndian::read_u32(&footer[4..]))) + } else { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!("File corrupted. The versioned footer len is {}, while it should be 8", footer.len()) + ) + ) + } + } + version => Ok(Self::UnknownVersion { + version, + size: footer.len() as u32, + }) + } + } + + pub fn size(&self) -> u32 { + match self { + Self::V0(_) => 8, + Self::UnknownVersion{size, ..} => *size + } + } + + pub fn version(&self) -> u32 { + match self { + Self::V0(_) => 0, + Self::UnknownVersion{version, ..} => *version + } + } + + pub fn crc(&self) -> Option { + match self { + Self::V0(crc) => Some(*crc), + Self::UnknownVersion{..} => None, + } + } +} + +pub(crate) struct FooterProxy { + /// always Some except after terminate call + hasher: Option, + /// always Some except after terminate call + writer: Option, +} + +impl FooterProxy { + pub fn new(writer: W) -> Self { + FooterProxy { + hasher: Some(Hasher::new()), + writer: Some(writer), + } + } +} + +impl Write for FooterProxy { + fn write(&mut self, buf: &[u8]) -> io::Result { + let count = self.writer.as_mut().unwrap().write(buf)?; + self.hasher.as_mut().unwrap().update(&buf[..count]); + Ok(count) + } + + fn flush(&mut self) -> io::Result<()> { + self.writer.as_mut().unwrap().flush() + } +} + +impl TerminatingWrite for FooterProxy { + fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { + let crc = self.hasher.take().unwrap().finalize(); + + let footer = Footer::new(VersionedFooter::V0(crc)).to_bytes(); + let mut writer = self.writer.take().unwrap(); + writer.write_all(&footer)?; + writer.terminate() + } +} + + +#[cfg(test)] +mod tests { + use crate::directory::footer::{Footer, VersionedFooter}; + + #[test] + fn test_serialize_deserialize_footer() { + let crc = 123456; + let footer = Footer::new(VersionedFooter::V0(crc)); + let footer_bytes = footer.to_bytes(); + + assert_eq!(Footer::from_bytes(&footer_bytes).unwrap(), footer); + } +} diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index bb7203253e..957fb75aed 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -1,15 +1,14 @@ use crate::core::MANAGED_FILEPATH; use crate::directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError}; use crate::directory::DirectoryLock; +use crate::directory::footer::{Footer, FooterProxy}; use crate::directory::Lock; use crate::directory::META_LOCK; -use crate::directory::{AntiCallToken, TerminatingWrite}; use crate::directory::{ReadOnlySource, WritePtr}; use crate::directory::{WatchCallback, WatchHandle}; use crate::error::DataCorruption; use crate::Directory; use crate::Result; -use byteorder::{ByteOrder, LittleEndian}; use crc32fast::Hasher; use serde_json; use std::collections::HashSet; @@ -214,13 +213,12 @@ impl ManagedDirectory { /// Verify checksum of a managed file pub fn validate_checksum(&self, path: &Path) -> result::Result { let reader = self.directory.open_read(path)?; - let data = reader.as_slice(); - let footer = Footer::from_bytes(data); - let data = &data[..data.len() - footer.size() as usize]; + let (footer, data) = Footer::extract_footer(reader) + .map_err(|err| IOError::with_path(path.to_path_buf(), err))?; let mut hasher = Hasher::new(); - hasher.update(data); + hasher.update(data.as_slice()); let crc = hasher.finalize(); - Ok(crc == footer.crc()) + Ok(footer.versioned_footer.crc().map(|v| v==crc).unwrap_or(false)) } /// List files for which checksum does not match content @@ -242,150 +240,14 @@ impl ManagedDirectory { } } -#[derive(Debug, Copy, Clone)] -pub enum Footer { - V0(V0), -} - -impl Footer { - pub fn to_bytes(&self) -> Vec { - match self { - Footer::V0(f) => f.to_bytes(), - } - } - - pub fn from_bytes(data: &[u8]) -> Self { - let len = data.len(); - assert!(len >= 2); - let size = LittleEndian::read_u16(&data[len - 2..]); - assert!( - len >= size as usize, - "len({}) is smaller than size({})", - len, - size - ); - let footer = &data[len - size as usize..]; - let index_version = footer[0]; - match index_version { - 0 => Footer::V0(V0::from_bytes(footer)), - _ => panic!("unsuported index_version"), - } - } - - pub fn index_version(&self) -> u8 { - match self { - Footer::V0(_) => 0, - } - } - - pub fn tantivy_version(&self) -> (u8, u8, u8) { - match self { - Footer::V0(footer) => footer.tantivy_version, - } - } - - pub fn crc(&self) -> u32 { - match self { - Footer::V0(footer) => footer.crc, - } - } - - pub fn size(&self) -> usize { - match self { - Footer::V0(_) => V0::size() as usize, - } - } -} - -#[derive(Debug, Copy, Clone)] -pub struct V0 { - pub tantivy_version: (u8, u8, u8), - pub crc: u32, -} - -impl V0 { - pub fn to_bytes(&self) -> Vec { - let mut res = vec![0; 10]; - res[0] = 0; - res[1] = self.tantivy_version.0; - res[2] = self.tantivy_version.1; - res[3] = self.tantivy_version.2; - LittleEndian::write_u32(&mut res[4..8], self.crc); - LittleEndian::write_u16(&mut res[8..], Self::size()); - res - } - - pub fn from_bytes(footer: &[u8]) -> Self { - assert_eq!(footer[0], 0); - assert_eq!(LittleEndian::read_u16(&footer[8..10]), 10); - let tantivy_version = (footer[1], footer[2], footer[3]); - let crc = LittleEndian::read_u32(&footer[4..8]); - - V0 { - tantivy_version, - crc, - } - } - - pub fn from_crc(crc: u32) -> Self { - Self { - tantivy_version: ( - env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(), - env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(), - env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(), - ), - crc, - } - } - pub fn size() -> u16 { - 10 - } -} - -struct FooterProxy { - /// always Some except after terminate call - hasher: Option, - /// always Some except after terminate call - writer: Option, -} -impl FooterProxy { - fn new(writer: W) -> Self { - FooterProxy { - hasher: Some(Hasher::new()), - writer: Some(writer), - } - } -} - -impl Write for FooterProxy { - fn write(&mut self, buf: &[u8]) -> io::Result { - let count = self.writer.as_mut().unwrap().write(buf)?; - self.hasher.as_mut().unwrap().update(&buf[..count]); - Ok(count) - } - - fn flush(&mut self) -> io::Result<()> { - self.writer.as_mut().unwrap().flush() - } -} - -impl TerminatingWrite for FooterProxy { - fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> { - let crc = self.hasher.take().unwrap().finalize(); - let footer = Footer::V0(V0::from_crc(crc)).to_bytes(); - let mut writer = self.writer.take().unwrap(); - writer.write_all(&footer)?; - writer.flush()?; //should we assume calling terminate on inner already flush? - writer.terminate() - } -} impl Directory for ManagedDirectory { fn open_read(&self, path: &Path) -> result::Result { let read_only_source = self.directory.open_read(path)?; - let footer_size = Footer::from_bytes(read_only_source.as_slice()).size(); - Ok(read_only_source.slice_to(read_only_source.as_slice().len() - footer_size)) + let (_footer, reader) = Footer::extract_footer(read_only_source) + .map_err(|err| IOError::with_path(path.to_path_buf(), err))?; + Ok(reader) } fn open_write(&mut self, path: &Path) -> result::Result { @@ -400,8 +262,6 @@ impl Directory for ManagedDirectory { )))) } - // TODO: to be correct, this require an non enforced contract, atomic writes must only be - // read back as atomics, and normal write must only be read by normal read fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { self.register_file_as_managed(path)?; self.directory.atomic_write(path, data) diff --git a/src/directory/mod.rs b/src/directory/mod.rs index d8f4fd618b..61cf435375 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -13,6 +13,7 @@ mod managed_directory; mod ram_directory; mod read_only_source; mod watch_event_router; +mod footer; /// Errors specific to the directory module. pub mod error; @@ -45,6 +46,7 @@ pub trait TerminatingWrite: Write { } /// You should implement this function to define custom behavior. + /// This function should flush any buffer it may hold. fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()>; } From 33f5f2eed8400d55ba07fd4c85e2596e3fdffe9d Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 17 Sep 2019 14:59:27 +0200 Subject: [PATCH 11/12] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc94b203b8..cf07ff4ca0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Tantivy 0.11.0 - Closes #498 - add support for Elastic-style unbounded range queries for alphanumeric types eg. "title:>hello", "weight:>=70.5", "height:<200" (@petr-tik) - API change around `Box`. See detail in #629 - Avoid rebuilding Regex automaton whenever a regex query is reused. #630 (@brainlock) +- Add footer with some metadata to index files. #605 (@fdb-hiroshima) ## How to update? From 2aac9573c33de97f420cbbdf7bcf71a939df18c9 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 17 Sep 2019 15:29:55 +0200 Subject: [PATCH 12/12] fmt --- src/directory/footer.rs | 79 ++++++++++++++++-------------- src/directory/managed_directory.rs | 10 ++-- src/directory/mod.rs | 2 +- 3 files changed, 48 insertions(+), 43 deletions(-) diff --git a/src/directory/footer.rs b/src/directory/footer.rs index 909322ff1c..bc4601eaaa 100644 --- a/src/directory/footer.rs +++ b/src/directory/footer.rs @@ -1,33 +1,35 @@ -use crate::directory::{AntiCallToken, TerminatingWrite}; use crate::directory::read_only_source::ReadOnlySource; +use crate::directory::{AntiCallToken, TerminatingWrite}; use byteorder::{ByteOrder, LittleEndian}; use crc32fast::Hasher; use std::io; use std::io::Write; -const COMMON_FOOTER_SIZE: usize = 4*5; +const COMMON_FOOTER_SIZE: usize = 4 * 5; #[derive(Debug, Clone, PartialEq)] pub struct Footer { pub tantivy_version: (u32, u32, u32), pub meta: String, - pub versioned_footer: VersionedFooter + pub versioned_footer: VersionedFooter, } impl Footer { pub fn new(versioned_footer: VersionedFooter) -> Self { let tantivy_version = ( - env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(), - env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(), - env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(), - ); + env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(), + env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(), + env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(), + ); Footer { tantivy_version, - meta: format!("tantivy {}.{}.{}, index v{}", - tantivy_version.0, - tantivy_version.1, - tantivy_version.2, - versioned_footer.version()), + meta: format!( + "tantivy {}.{}.{}, index v{}", + tantivy_version.0, + tantivy_version.1, + tantivy_version.2, + versioned_footer.version() + ), versioned_footer, } } @@ -48,31 +50,34 @@ impl Footer { pub fn from_bytes(data: &[u8]) -> Result { let len = data.len(); - if len < COMMON_FOOTER_SIZE + 4 { // 4 bytes for index version, stored in versioned footer + if len < COMMON_FOOTER_SIZE + 4 { + // 4 bytes for index version, stored in versioned footer return Err(io::Error::new( io::ErrorKind::UnexpectedEof, format!("File corrupted. The footer len must be over 24, while the entire file len is {}", len) ) - ) + ); } let size = LittleEndian::read_u32(&data[len - 4..]) as usize; if len < size as usize { return Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - format!("File corrupted. The footer len is {}, while the entire file len is {}", size, len) - ) - ) + io::ErrorKind::UnexpectedEof, + format!( + "File corrupted. The footer len is {}, while the entire file len is {}", + size, len + ), + )); } let footer = &data[len - size as usize..]; - let meta_len = LittleEndian::read_u32(&footer[size-20..]) as usize; - let tantivy_major = LittleEndian::read_u32(&footer[size-16..]); - let tantivy_minor = LittleEndian::read_u32(&footer[size-12..]); - let tantivy_patch = LittleEndian::read_u32(&footer[size-8..]); + let meta_len = LittleEndian::read_u32(&footer[size - 20..]) as usize; + let tantivy_major = LittleEndian::read_u32(&footer[size - 16..]); + let tantivy_minor = LittleEndian::read_u32(&footer[size - 12..]); + let tantivy_patch = LittleEndian::read_u32(&footer[size - 8..]); Ok(Footer { tantivy_version: (tantivy_major, tantivy_minor, tantivy_patch), meta: String::from_utf8_lossy(&footer[size - meta_len - 20..size - 20]).into_owned(), - versioned_footer: VersionedFooter::from_bytes(&footer[..size - meta_len - 20])? + versioned_footer: VersionedFooter::from_bytes(&footer[..size - meta_len - 20])?, }) } @@ -89,10 +94,7 @@ impl Footer { #[derive(Debug, Clone, PartialEq)] pub enum VersionedFooter { - UnknownVersion{ - version: u32, - size: u32 - }, + UnknownVersion { version: u32, size: u32 }, V0(u32), // crc } @@ -105,7 +107,7 @@ impl VersionedFooter { LittleEndian::write_u32(&mut res[4..], *crc); res } - Self::UnknownVersion{..} => { + Self::UnknownVersion { .. } => { panic!("Unsupported index should never get serialized"); } } @@ -115,42 +117,44 @@ impl VersionedFooter { assert!(footer.len() >= 4); let version = LittleEndian::read_u32(footer); match version { - 0 => { + 0 => { if footer.len() == 8 { Ok(Self::V0(LittleEndian::read_u32(&footer[4..]))) } else { Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - format!("File corrupted. The versioned footer len is {}, while it should be 8", footer.len()) - ) - ) + io::ErrorKind::UnexpectedEof, + format!( + "File corrupted. The versioned footer len is {}, while it should be 8", + footer.len() + ), + )) } } version => Ok(Self::UnknownVersion { version, size: footer.len() as u32, - }) + }), } } pub fn size(&self) -> u32 { match self { Self::V0(_) => 8, - Self::UnknownVersion{size, ..} => *size + Self::UnknownVersion { size, .. } => *size, } } pub fn version(&self) -> u32 { match self { Self::V0(_) => 0, - Self::UnknownVersion{version, ..} => *version + Self::UnknownVersion { version, .. } => *version, } } pub fn crc(&self) -> Option { match self { Self::V0(crc) => Some(*crc), - Self::UnknownVersion{..} => None, + Self::UnknownVersion { .. } => None, } } } @@ -194,7 +198,6 @@ impl TerminatingWrite for FooterProxy { } } - #[cfg(test)] mod tests { use crate::directory::footer::{Footer, VersionedFooter}; diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 957fb75aed..f72668fa1c 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -1,7 +1,7 @@ use crate::core::MANAGED_FILEPATH; use crate::directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError}; -use crate::directory::DirectoryLock; use crate::directory::footer::{Footer, FooterProxy}; +use crate::directory::DirectoryLock; use crate::directory::Lock; use crate::directory::META_LOCK; use crate::directory::{ReadOnlySource, WritePtr}; @@ -218,7 +218,11 @@ impl ManagedDirectory { let mut hasher = Hasher::new(); hasher.update(data.as_slice()); let crc = hasher.finalize(); - Ok(footer.versioned_footer.crc().map(|v| v==crc).unwrap_or(false)) + Ok(footer + .versioned_footer + .crc() + .map(|v| v == crc) + .unwrap_or(false)) } /// List files for which checksum does not match content @@ -240,8 +244,6 @@ impl ManagedDirectory { } } - - impl Directory for ManagedDirectory { fn open_read(&self, path: &Path) -> result::Result { let read_only_source = self.directory.open_read(path)?; diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 61cf435375..294beb9f09 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -9,11 +9,11 @@ mod mmap_directory; mod directory; mod directory_lock; +mod footer; mod managed_directory; mod ram_directory; mod read_only_source; mod watch_event_router; -mod footer; /// Errors specific to the directory module. pub mod error;