Skip to content

Commit

Permalink
implement a footer storing metadata about a file
Browse files Browse the repository at this point in the history
this is more of a poc, it require some refactoring into multiple files
`terminate(self)` is implemented, but not used anywhere yet
  • Loading branch information
trinity-1686a committed Jul 29, 2019
1 parent a1e3885 commit 07c819a
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 32 deletions.
179 changes: 151 additions & 28 deletions src/directory/managed_directory.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::core::MANAGED_FILEPATH;
use crate::core::{MANAGED_FILEPATH, META_FILEPATH};
use crate::directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError};
use crate::directory::DirectoryLock;
use crate::directory::Lock;
use crate::directory::META_LOCK;
use crate::directory::TerminatingWrite;
use crate::directory::{ReadOnlySource, WritePtr};
use crate::directory::{WatchCallback, WatchHandle};
use crate::error::DataCorruption;
Expand All @@ -18,8 +19,6 @@ use std::result;
use std::sync::RwLockWriteGuard;
use std::sync::{Arc, RwLock};

const CRC_SIZE: usize = 4;

/// Returns true iff the file is "managed".
/// Non-managed file are not subject to garbage collection.
///
Expand All @@ -31,6 +30,13 @@ fn is_managed(path: &Path) -> bool {
.unwrap_or(true)
}

/// Returns true iff the file require a footer.
fn require_footer(path: &Path) -> bool {
is_managed(path) &
![*MANAGED_FILEPATH, *META_FILEPATH].contains(&path)
}


/// Wrapper of directories that keeps track of files created by Tantivy.
///
/// A managed directory is just a wrapper of a directory
Expand Down Expand Up @@ -215,11 +221,12 @@ impl ManagedDirectory {
pub fn validate_checksum(&self, path: &Path) -> result::Result<bool, OpenReadError> {
let reader = self.directory.open_read(path)?;
let data = reader.as_slice();
let len = data.len() - CRC_SIZE;
let footer = Footer::from_bytes(data);
let data = &data[..data.len() - footer.size() as usize];
let mut hasher = Hasher::new();
hasher.update(&data[..len]);
let crc = hasher.finalize().to_be_bytes();
Ok(data[len..]==crc)
hasher.update(data);
let crc = hasher.finalize();
Ok(crc == footer.crc())
}

/// List files for which checksum does not match content
Expand All @@ -241,32 +248,114 @@ impl ManagedDirectory {
}
}

struct CrcProxy<W: Write> {
hasher: Option<Hasher>,
writer: W,
#[derive(Debug, Copy, Clone)]
pub enum Footer {
V0(V0)
}

impl<W: Write> CrcProxy<W> {
fn new(writer: W) -> Self {
CrcProxy {
hasher: Some(Hasher::new()),
writer
impl Footer {
pub fn to_bytes(&self) -> Vec<u8> {
match self {
Footer::V0(f) => f.to_bytes()
}
}

pub fn from_bytes(data: &[u8]) -> Self {
let len = data.len();
assert!(len >= 2);
let size = u16::from_be_bytes([data[len - 2], data[len - 1]]);
assert_eq!(size, 10);
let footer = &data[len - size as usize..];
let index_version = footer[0];
match index_version {
0 => Footer::V0(V0::from_bytes(data)),
_ => panic!("unsuported index_version")
}
}


pub fn index_version(&self) -> u8 {
match self {
Footer::V0(_) => 0,
}
}

pub fn tantivy_version(&self) -> (u8, u8, u8) {
match self {
Footer::V0(f) => f.tantivy_version,
}
}

pub fn crc(&self) -> u32 {
match self {
Footer::V0(f) => f.crc,
}
}

pub fn size(&self) -> usize {
match self {
Footer::V0(_) => V0::size() as usize,
}
}
}

impl<W: Write> Drop for CrcProxy<W> {
fn drop(&mut self) {
// this should probably be logged
let _ = self.writer.write_all(&self.hasher.take().unwrap().finalize().to_be_bytes());
let _ = self.writer.flush();
#[derive(Debug, Copy, Clone)]
pub struct V0 {
pub tantivy_version: (u8,u8,u8),
pub crc: u32,
}

impl V0 {
pub fn to_bytes(&self) -> Vec<u8> {
let crc = self.crc.to_be_bytes();
let size = Self::size().to_be_bytes();
let mut res = vec![0, self.tantivy_version.0, self.tantivy_version.1, self.tantivy_version.2];
res.extend_from_slice(&crc);
res.extend_from_slice(&size);
res
}

pub fn from_bytes(footer: &[u8]) -> Self {
assert_eq!(footer[0], 0);
assert_eq!(u16::from_be_bytes([footer[8], footer[9]]), 10);
let tantivy_version = (footer[1], footer[2], footer[3]);
let crc = u32::from_be_bytes([footer[4], footer[5], footer[6], footer[7]]);

V0 {
tantivy_version,
crc
}
}

pub fn from_crc(crc: u32) -> Self {
Self {
tantivy_version: (0,10,0),
crc
}
}
pub fn size() -> u16 {
10
}
}

impl<W: Write> Write for CrcProxy<W> {
struct FooterProxy<W: TerminatingWrite> {
hasher: Hasher,
writer: W,
}

impl<W: TerminatingWrite> FooterProxy<W> {
fn new(writer: W) -> Self {
FooterProxy {
hasher: Hasher::new(),
writer
}
}
}

impl<W: TerminatingWrite> Write for FooterProxy<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let count = self.writer.write(buf)?;
self.hasher.as_mut().unwrap().update(&buf[..count]);
self.hasher.update(&buf[..count]);
Ok(count)
}

Expand All @@ -275,27 +364,61 @@ impl<W: Write> Write for CrcProxy<W> {
}
}

impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
fn terminate(mut self) -> io::Result<()> {
let crc = self.hasher.finalize();
let footer = V0::from_crc(crc).to_bytes();
self.writer.write_all(&footer)?;
self.writer.flush()?;//should we assuse calling terminate on inner already flush?
self.writer.terminate()
}
}

impl Directory for ManagedDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
self.directory.open_read(path).map(|ros| ros.slice_to(ros.as_slice().len() - CRC_SIZE))
let ros = self.directory.open_read(path)?;
if require_footer(path) {
let footer_size = Footer::from_bytes(ros.as_slice()).size();
Ok(ros.slice_to(ros.as_slice().len() - footer_size))
} else {
Ok(ros)
}
}

fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
self.register_file_as_managed(path)
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
Ok(io::BufWriter::new(Box::new(
CrcProxy::new(self.directory.open_write(path)?.into_inner().map_err(|_|()).expect("buffer is empty"))
)))
if require_footer(path) {
Ok(io::BufWriter::new(Box::new(
FooterProxy::new(self.directory.open_write(path)?.into_inner().map_err(|_|()).expect("buffer should be empty"))
)))
} else {
self.directory.open_write(path)
}
}

fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
self.register_file_as_managed(path)?;
self.directory.atomic_write(path, data)
if require_footer(path) {
let mut with_footer = Vec::with_capacity(data.len() + V0::size() as usize);
with_footer.extend_from_slice(&data);
let mut hasher = Hasher::new();
hasher.update(&data);
let crc = hasher.finalize();
with_footer.append(&mut V0::from_crc(crc).to_bytes());
self.directory.atomic_write(path, &with_footer)
} else {
self.directory.atomic_write(path, &data)
}
}

fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
self.directory.atomic_read(path)
let mut vec = self.directory.atomic_read(path)?;
if require_footer(path) {
let footer_size = Footer::from_bytes(&vec).size();
vec.resize(vec.len() - footer_size, 0);
}
Ok(vec)
}

fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
Expand Down
4 changes: 3 additions & 1 deletion src/directory/mmap_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::directory::ReadOnlySource;
use crate::directory::WatchCallback;
use crate::directory::WatchCallbackList;
use crate::directory::WatchHandle;
use crate::directory::WritePtr;
use crate::directory::{TerminatingWrite, WritePtr};
use atomicwrites;
use memmap::Mmap;
use std::collections::HashMap;
Expand Down Expand Up @@ -395,6 +395,8 @@ impl Seek for SafeFileWriter {
}
}

impl TerminatingWrite for SafeFileWriter {}

impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
debug!("Open Read {:?}", path);
Expand Down
15 changes: 13 additions & 2 deletions src/directory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,29 @@ pub use self::ram_directory::RAMDirectory;
pub use self::read_only_source::ReadOnlySource;
pub(crate) use self::watch_event_router::WatchCallbackList;
pub use self::watch_event_router::{WatchCallback, WatchHandle};
use std::io::{BufWriter, Write};
use std::io::{self, BufWriter, Write};

#[cfg(feature = "mmap")]
pub use self::mmap_directory::MmapDirectory;

pub use self::managed_directory::ManagedDirectory;


/// Trait used to indicate when no more write need to be done on a writer
pub trait TerminatingWrite: Write {
/// Indicate that the writer will no longer be used
fn terminate(mut self) -> io::Result<()> where Self: Sized {
self.flush()
}
}

impl<W: TerminatingWrite + ?Sized> TerminatingWrite for Box<W> {}

/// Write object for Directory.
///
/// `WritePtr` are required to implement both Write
/// and Seek.
pub type WritePtr = BufWriter<Box<dyn Write>>;
pub type WritePtr = BufWriter<Box<dyn TerminatingWrite>>;

#[cfg(test)]
mod tests;
4 changes: 3 additions & 1 deletion src/directory/ram_directory.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::core::META_FILEPATH;
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
use crate::directory::WatchCallbackList;
use crate::directory::WritePtr;
use crate::directory::{TerminatingWrite, WritePtr};
use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle};
use fail::fail_point;
use std::collections::HashMap;
Expand Down Expand Up @@ -71,6 +71,8 @@ impl Write for VecWriter {
}
}

impl TerminatingWrite for VecWriter {}

#[derive(Default)]
struct InnerDirectory {
fs: HashMap<PathBuf, ReadOnlySource>,
Expand Down

0 comments on commit 07c819a

Please sign in to comment.