diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index ceddc8fa03..457b6c0ef4 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -1,8 +1,9 @@ -use crate::core::MANAGED_FILEPATH; +use crate::core::{MANAGED_FILEPATH, META_FILEPATH}; use crate::directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError}; use crate::directory::DirectoryLock; use crate::directory::Lock; use crate::directory::META_LOCK; +use crate::directory::TerminatingWrite; use crate::directory::{ReadOnlySource, WritePtr}; use crate::directory::{WatchCallback, WatchHandle}; use crate::error::DataCorruption; @@ -18,8 +19,6 @@ use std::result; use std::sync::RwLockWriteGuard; use std::sync::{Arc, RwLock}; -const CRC_SIZE: usize = 4; - /// Returns true iff the file is "managed". /// Non-managed file are not subject to garbage collection. /// @@ -31,6 +30,13 @@ fn is_managed(path: &Path) -> bool { .unwrap_or(true) } +/// Returns true iff the file require a footer. +fn require_footer(path: &Path) -> bool { + is_managed(path) & + ![*MANAGED_FILEPATH, *META_FILEPATH].contains(&path) +} + + /// Wrapper of directories that keeps track of files created by Tantivy. /// /// A managed directory is just a wrapper of a directory @@ -215,11 +221,12 @@ impl ManagedDirectory { pub fn validate_checksum(&self, path: &Path) -> result::Result { let reader = self.directory.open_read(path)?; let data = reader.as_slice(); - let len = data.len() - CRC_SIZE; + let footer = Footer::from_bytes(data); + let data = &data[..data.len() - footer.size() as usize]; let mut hasher = Hasher::new(); - hasher.update(&data[..len]); - let crc = hasher.finalize().to_be_bytes(); - Ok(data[len..]==crc) + hasher.update(data); + let crc = hasher.finalize(); + Ok(crc == footer.crc()) } /// List files for which checksum does not match content @@ -241,32 +248,114 @@ impl ManagedDirectory { } } -struct CrcProxy { - hasher: Option, - writer: W, +#[derive(Debug, Copy, Clone)] +pub enum Footer { + V0(V0) } -impl CrcProxy { - fn new(writer: W) -> Self { - CrcProxy { - hasher: Some(Hasher::new()), - writer +impl Footer { + pub fn to_bytes(&self) -> Vec { + match self { + Footer::V0(f) => f.to_bytes() + } + } + + pub fn from_bytes(data: &[u8]) -> Self { + let len = data.len(); + assert!(len >= 2); + let size = u16::from_be_bytes([data[len - 2], data[len - 1]]); + assert_eq!(size, 10); + let footer = &data[len - size as usize..]; + let index_version = footer[0]; + match index_version { + 0 => Footer::V0(V0::from_bytes(data)), + _ => panic!("unsuported index_version") + } + } + + + pub fn index_version(&self) -> u8 { + match self { + Footer::V0(_) => 0, + } + } + + pub fn tantivy_version(&self) -> (u8, u8, u8) { + match self { + Footer::V0(f) => f.tantivy_version, + } + } + + pub fn crc(&self) -> u32 { + match self { + Footer::V0(f) => f.crc, + } + } + + pub fn size(&self) -> usize { + match self { + Footer::V0(_) => V0::size() as usize, } } } -impl Drop for CrcProxy { - fn drop(&mut self) { - // this should probably be logged - let _ = self.writer.write_all(&self.hasher.take().unwrap().finalize().to_be_bytes()); - let _ = self.writer.flush(); +#[derive(Debug, Copy, Clone)] +pub struct V0 { + pub tantivy_version: (u8,u8,u8), + pub crc: u32, +} + +impl V0 { + pub fn to_bytes(&self) -> Vec { + let crc = self.crc.to_be_bytes(); + let size = Self::size().to_be_bytes(); + let mut res = vec![0, self.tantivy_version.0, self.tantivy_version.1, self.tantivy_version.2]; + res.extend_from_slice(&crc); + res.extend_from_slice(&size); + res + } + + pub fn from_bytes(footer: &[u8]) -> Self { + assert_eq!(footer[0], 0); + assert_eq!(u16::from_be_bytes([footer[8], footer[9]]), 10); + let tantivy_version = (footer[1], footer[2], footer[3]); + let crc = u32::from_be_bytes([footer[4], footer[5], footer[6], footer[7]]); + + V0 { + tantivy_version, + crc + } + } + + pub fn from_crc(crc: u32) -> Self { + Self { + tantivy_version: (0,10,0), + crc + } + } + pub fn size() -> u16 { + 10 } } -impl Write for CrcProxy { +struct FooterProxy { + hasher: Hasher, + writer: W, +} + +impl FooterProxy { + fn new(writer: W) -> Self { + FooterProxy { + hasher: Hasher::new(), + writer + } + } +} + +impl Write for FooterProxy { fn write(&mut self, buf: &[u8]) -> io::Result { let count = self.writer.write(buf)?; - self.hasher.as_mut().unwrap().update(&buf[..count]); + self.hasher.update(&buf[..count]); Ok(count) } @@ -275,27 +364,61 @@ impl Write for CrcProxy { } } +impl TerminatingWrite for FooterProxy { + fn terminate(mut self) -> io::Result<()> { + let crc = self.hasher.finalize(); + let footer = V0::from_crc(crc).to_bytes(); + self.writer.write_all(&footer)?; + self.writer.flush()?;//should we assuse calling terminate on inner already flush? + self.writer.terminate() + } +} impl Directory for ManagedDirectory { fn open_read(&self, path: &Path) -> result::Result { - self.directory.open_read(path).map(|ros| ros.slice_to(ros.as_slice().len() - CRC_SIZE)) + let ros = self.directory.open_read(path)?; + if require_footer(path) { + let footer_size = Footer::from_bytes(ros.as_slice()).size(); + Ok(ros.slice_to(ros.as_slice().len() - footer_size)) + } else { + Ok(ros) + } } fn open_write(&mut self, path: &Path) -> result::Result { self.register_file_as_managed(path) .map_err(|e| IOError::with_path(path.to_owned(), e))?; - Ok(io::BufWriter::new(Box::new( - CrcProxy::new(self.directory.open_write(path)?.into_inner().map_err(|_|()).expect("buffer is empty")) - ))) + if require_footer(path) { + Ok(io::BufWriter::new(Box::new( + FooterProxy::new(self.directory.open_write(path)?.into_inner().map_err(|_|()).expect("buffer should be empty")) + ))) + } else { + self.directory.open_write(path) + } } fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { self.register_file_as_managed(path)?; - self.directory.atomic_write(path, data) + if require_footer(path) { + let mut with_footer = Vec::with_capacity(data.len() + V0::size() as usize); + with_footer.extend_from_slice(&data); + let mut hasher = Hasher::new(); + hasher.update(&data); + let crc = hasher.finalize(); + with_footer.append(&mut V0::from_crc(crc).to_bytes()); + self.directory.atomic_write(path, &with_footer) + } else { + self.directory.atomic_write(path, &data) + } } fn atomic_read(&self, path: &Path) -> result::Result, OpenReadError> { - self.directory.atomic_read(path) + let mut vec = self.directory.atomic_read(path)?; + if require_footer(path) { + let footer_size = Footer::from_bytes(&vec).size(); + vec.resize(vec.len() - footer_size, 0); + } + Ok(vec) } fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index e19b430575..b941e28f9f 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -18,7 +18,7 @@ use crate::directory::ReadOnlySource; use crate::directory::WatchCallback; use crate::directory::WatchCallbackList; use crate::directory::WatchHandle; -use crate::directory::WritePtr; +use crate::directory::{TerminatingWrite, WritePtr}; use atomicwrites; use memmap::Mmap; use std::collections::HashMap; @@ -395,6 +395,8 @@ impl Seek for SafeFileWriter { } } +impl TerminatingWrite for SafeFileWriter {} + impl Directory for MmapDirectory { fn open_read(&self, path: &Path) -> result::Result { debug!("Open Read {:?}", path); diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 70fa013487..379082853f 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -24,18 +24,29 @@ pub use self::ram_directory::RAMDirectory; pub use self::read_only_source::ReadOnlySource; pub(crate) use self::watch_event_router::WatchCallbackList; pub use self::watch_event_router::{WatchCallback, WatchHandle}; -use std::io::{BufWriter, Write}; +use std::io::{self, BufWriter, Write}; #[cfg(feature = "mmap")] pub use self::mmap_directory::MmapDirectory; pub use self::managed_directory::ManagedDirectory; + +/// Trait used to indicate when no more write need to be done on a writer +pub trait TerminatingWrite: Write { + /// Indicate that the writer will no longer be used + fn terminate(mut self) -> io::Result<()> where Self: Sized { + self.flush() + } +} + +impl TerminatingWrite for Box {} + /// Write object for Directory. /// /// `WritePtr` are required to implement both Write /// and Seek. -pub type WritePtr = BufWriter>; +pub type WritePtr = BufWriter>; #[cfg(test)] mod tests; diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 53e7715f8b..5e08f59214 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -1,7 +1,7 @@ use crate::core::META_FILEPATH; use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError}; use crate::directory::WatchCallbackList; -use crate::directory::WritePtr; +use crate::directory::{TerminatingWrite, WritePtr}; use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle}; use fail::fail_point; use std::collections::HashMap; @@ -71,6 +71,8 @@ impl Write for VecWriter { } } +impl TerminatingWrite for VecWriter {} + #[derive(Default)] struct InnerDirectory { fs: HashMap,