Skip to content

Commit

Permalink
store version per LSM-tree
Browse files Browse the repository at this point in the history
and check on recovery
  • Loading branch information
marvin-j97 committed Dec 12, 2023
1 parent f57cb37 commit 8a2bc56
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 24 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ This is the most feature-rich LSM-tree implementation in Rust! It features:

Is the disk format stable yet? Not quite, but it will be when:

- Journal format is finalized
- Disk format is pinned by unit tests to prevent breaking changes
- Version 1.0.0 is released

## Future

Expand Down
11 changes: 10 additions & 1 deletion src/recovery.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use crate::{
compaction::worker::start_compaction_thread,
descriptor_table::FileDescriptorTable,
file::{BLOCKS_FILE, FLUSH_MARKER, JOURNALS_FOLDER, LEVELS_MANIFEST_FILE, SEGMENTS_FOLDER},
file::{
BLOCKS_FILE, FLUSH_MARKER, JOURNALS_FOLDER, LEVELS_MANIFEST_FILE, LSM_MARKER,
SEGMENTS_FOLDER,
},
id::generate_segment_id,
journal::Journal,
levels::Levels,
memtable::MemTable,
segment::{self, Segment},
stop_signal::StopSignal,
tree_inner::TreeInner,
version::Version,
BlockCache, Config, Tree,
};
use std::{
Expand Down Expand Up @@ -183,6 +187,11 @@ pub fn recover_tree(config: Config) -> crate::Result<Tree> {

let start = std::time::Instant::now();

log::info!("Checking tree version");
let version_bytes = std::fs::read(config.path.join(LSM_MARKER))?;
let version = Version::parse_file_header(&version_bytes);
assert!(version.is_some(), "Invalid LSM-tree version");

log::info!("Restoring journal");
let active_journal = crate::recovery::recover_active_journal(&config)?;

Expand Down
6 changes: 2 additions & 4 deletions src/segment/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::descriptor_table::FileDescriptorTable;
use crate::disk_block::DiskBlock;
use crate::file::{BLOCKS_FILE, TOP_LEVEL_INDEX_FILE};
use crate::value::UserKey;
use crate::version::Version;
use std::collections::BTreeMap;
use std::fs::File;
use std::io::BufReader;
Expand Down Expand Up @@ -289,13 +288,12 @@ impl BlockIndex {
path.as_ref().display()
);

let version_size = Version::len().into();
let file_size = std::fs::metadata(path.as_ref().join(TOP_LEVEL_INDEX_FILE))?.len();

let index = BlockHandleBlock::from_file_compressed(
&mut BufReader::new(File::open(path.as_ref().join(TOP_LEVEL_INDEX_FILE))?), // TODO:
version_size,
(file_size - version_size) as u32,
0,
file_size as u32,
)?;

/* if !index.check_crc(index.crc)? {
Expand Down
10 changes: 3 additions & 7 deletions src/segment/index/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::{
file::{BLOCKS_FILE, INDEX_BLOCKS_FILE, TOP_LEVEL_INDEX_FILE},
serde::Serializable,
value::UserKey,
version::Version,
};
use lz4_flex::compress_prepend_size;
use std::{
Expand Down Expand Up @@ -44,13 +43,10 @@ pub struct Writer {
impl Writer {
pub fn new<P: AsRef<Path>>(path: P, block_size: u32) -> crate::Result<Self> {
let block_writer = File::create(path.as_ref().join(INDEX_BLOCKS_FILE))?;
let mut block_writer = BufWriter::with_capacity(u16::MAX.into(), block_writer);
let block_writer = BufWriter::with_capacity(u16::MAX.into(), block_writer);

let index_writer = File::create(path.as_ref().join(TOP_LEVEL_INDEX_FILE))?;
let mut index_writer = BufWriter::new(index_writer);

let blocks_start_offset = Version::V0.write_file_header(&mut block_writer)?;
let _index_start_offset = Version::V0.write_file_header(&mut index_writer)?;
let index_writer = BufWriter::new(index_writer);

let block_chunk = DiskBlock {
items: vec![],
Expand All @@ -64,7 +60,7 @@ impl Writer {

Ok(Self {
path: path.as_ref().into(),
file_pos: blocks_start_offset as u64,
file_pos: 0,
block_writer: Some(block_writer),
index_writer,
block_counter: 0,
Expand Down
7 changes: 2 additions & 5 deletions src/segment/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
segment::index::writer::Writer as IndexWriter,
serde::Serializable,
value::{SeqNo, UserKey},
version::Version,
Value,
};
use lz4_flex::compress_prepend_size;
Expand Down Expand Up @@ -148,9 +147,7 @@ impl Writer {
std::fs::create_dir_all(&opts.path)?;

let block_writer = File::create(opts.path.join(BLOCKS_FILE))?;
let mut block_writer = BufWriter::with_capacity(512_000, block_writer);

let start_offset = Version::V0.write_file_header(&mut block_writer)?;
let block_writer = BufWriter::with_capacity(512_000, block_writer);

let index_writer = IndexWriter::new(&opts.path, opts.block_size)?;

Expand All @@ -168,7 +165,7 @@ impl Writer {

block_count: 0,
item_count: 0,
file_pos: start_offset as u64,
file_pos: 0,
uncompressed_size: 0,

first_key: None,
Expand Down
7 changes: 4 additions & 3 deletions src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
stop_signal::StopSignal,
tree_inner::TreeInner,
value::{SeqNo, UserData, UserKey, ValueType},
version::Version,
Batch, Config, Snapshot, Value,
};
use std::{
Expand Down Expand Up @@ -425,10 +426,10 @@ impl Tree {
folder.sync_all()?;
}

// NOTE: Lastly
// fsync .lsm marker
// NOTE: Lastly, fsync .lsm marker, which contains the version
// -> the LSM is fully initialized
let file = std::fs::File::create(marker)?;
let mut file = std::fs::File::create(marker)?;
Version::V0.write_file_header(&mut file)?;
file.sync_all()?;

Ok(Self(Arc::new(inner)))
Expand Down
41 changes: 39 additions & 2 deletions src/version.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use byteorder::{BigEndian, WriteBytesExt};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use serde::{Deserialize, Serialize};
use std::io::Cursor;

#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum Version {
Expand All @@ -20,11 +21,38 @@ impl From<Version> for u16 {
}
}

impl TryFrom<u16> for Version {
type Error = ();
fn try_from(value: u16) -> Result<Self, Self::Error> {
match value {
0 => Ok(Self::V0),
_ => Err(()),
}
}
}

impl Version {
pub fn len() -> u8 {
5
}

pub fn parse_file_header(bytes: &[u8]) -> Option<Self> {
if bytes[0..3] == [b'L', b'S', b'M'] {
let slice = &bytes[3..5];

let mut bytes = [0; 2];
bytes.copy_from_slice(slice);
let mut cursor = Cursor::new(&bytes);

let value = cursor.read_u16::<BigEndian>().ok()?;
let version = Self::try_from(value).ok()?;

Some(version)
} else {
None
}
}

pub fn write_file_header<W: std::io::Write>(self, writer: &mut W) -> std::io::Result<usize> {
writer.write_all(&[b'L', b'S', b'M'])?;
writer.write_u16::<BigEndian>(u16::from(self))?;
Expand All @@ -39,9 +67,18 @@ mod tests {

#[test]
#[allow(clippy::expect_used)]
pub fn test_version_len() {
pub fn version_round_trip() {
let mut buf = vec![];
Version::V0.write_file_header(&mut buf).expect("can't fail");

let version = Version::parse_file_header(&buf).expect("should parse");
assert_eq!(version, Version::V0);
}

#[test]
#[allow(clippy::expect_used)]
pub fn test_version_len() {
let mut buf = vec![];
let size = Version::V0.write_file_header(&mut buf).expect("can't fail");

assert_eq!(Version::len() as usize, size);
Expand Down

0 comments on commit 8a2bc56

Please sign in to comment.