Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring to prepare for the addition of dynamic fast field #1730

Merged
merged 4 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/core/inverted_index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl InvertedIndexReader {
///
/// Most users should prefer using [`Self::read_postings()`] instead.
pub async fn warm_postings(&self, term: &Term, with_positions: bool) -> io::Result<()> {
let term_info_opt = self.get_term_info_async(term).await?;
let term_info_opt: Option<TermInfo> = self.get_term_info_async(term).await?;
if let Some(term_info) = term_info_opt {
self.postings_file_slice
.read_bytes_slice_async(term_info.postings_range.clone())
Expand Down
2 changes: 1 addition & 1 deletion src/termdict/fst_termdict/term_info_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 {
}

impl TermInfoStore {
pub fn open(term_info_store_file: FileSlice) -> crate::Result<TermInfoStore> {
pub fn open(term_info_store_file: FileSlice) -> io::Result<TermInfoStore> {
let (len_slice, main_slice) = term_info_store_file.split(16);
let mut bytes = len_slice.read_bytes()?;
let len = u64::deserialize(&mut bytes)? as usize;
Expand Down
17 changes: 10 additions & 7 deletions src/termdict/fst_termdict/termdict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use tantivy_fst::Automaton;
use super::term_info_store::{TermInfoStore, TermInfoStoreWriter};
use super::{TermStreamer, TermStreamerBuilder};
use crate::directory::{FileSlice, OwnedBytes};
use crate::error::DataCorruption;
use crate::postings::TermInfo;
use crate::termdict::TermOrdinal;

Expand Down Expand Up @@ -55,7 +54,7 @@ where W: Write
/// to insert_key and insert_value.
///
/// Prefer using `.insert(key, value)`
pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
self.fst_builder
.insert(key, self.term_ord)
.map_err(convert_fst_error)?;
Expand All @@ -66,7 +65,7 @@ where W: Write
/// # Warning
///
/// Horribly dangerous internal API. See `.insert_key(...)`.
pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> {
pub fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> {
self.term_info_store_writer.write_term_info(term_info)?;
Ok(())
}
Expand All @@ -86,10 +85,14 @@ where W: Write
}
}

fn open_fst_index(fst_file: FileSlice) -> crate::Result<tantivy_fst::Map<OwnedBytes>> {
fn open_fst_index(fst_file: FileSlice) -> io::Result<tantivy_fst::Map<OwnedBytes>> {
let bytes = fst_file.read_bytes()?;
let fst = Fst::new(bytes)
.map_err(|err| DataCorruption::comment_only(format!("Fst data is corrupted: {:?}", err)))?;
let fst = Fst::new(bytes).map_err(|err| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Fst data is corrupted: {:?}", err),
)
})?;
Ok(tantivy_fst::Map::from(fst))
}

Expand All @@ -114,7 +117,7 @@ pub struct TermDictionary {

impl TermDictionary {
/// Opens a `TermDictionary`.
pub fn open(file: FileSlice) -> crate::Result<Self> {
pub fn open(file: FileSlice) -> io::Result<Self> {
let (main_slice, footer_len_slice) = file.split_from_end(8);
let mut footer_len_bytes = footer_len_slice.read_bytes()?;
let footer_size = u64::deserialize(&mut footer_len_bytes)?;
Expand Down
70 changes: 35 additions & 35 deletions src/termdict/sstable_termdict/mod.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,39 @@
use std::io;

mod merger;
mod streamer;
mod termdict;

use std::iter::ExactSizeIterator;

use common::VInt;
use sstable::value::{ValueReader, ValueWriter};
use sstable::{BlockReader, SSTable};
use sstable::SSTable;
use tantivy_fst::automaton::AlwaysMatch;

pub use self::merger::TermMerger;
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
use crate::postings::TermInfo;

/// The term dictionary contains all of the terms in
/// `tantivy index` in a sorted manner.
///
/// The `Fst` crate is used to associate terms to their
/// respective `TermOrdinal`. The `TermInfoStore` then makes it
/// possible to fetch the associated `TermInfo`.
pub type TermDictionary = sstable::Dictionary<TermSSTable>;

/// Builder for the new term dictionary.
pub type TermDictionaryBuilder<W> = sstable::Writer<W, TermInfoWriter>;

/// `TermStreamer` acts as a cursor over a range of terms of a segment.
/// Terms are guaranteed to be sorted.
pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, A>;

/// SSTable used to store TermInfo objects.
pub struct TermSSTable;

impl SSTable for TermSSTable {
type Value = TermInfo;
type Reader = TermInfoReader;
type Writer = TermInfoWriter;
type ValueReader = TermInfoReader;
type ValueWriter = TermInfoWriter;
}

#[derive(Default)]
Expand All @@ -35,15 +48,16 @@ impl ValueReader for TermInfoReader {
&self.term_infos[idx]
}

fn read(&mut self, reader: &mut BlockReader) -> io::Result<()> {
fn load(&mut self, mut data: &[u8]) -> io::Result<usize> {
let len_before = data.len();
self.term_infos.clear();
let num_els = VInt::deserialize_u64(reader)?;
let mut postings_start = VInt::deserialize_u64(reader)? as usize;
let mut positions_start = VInt::deserialize_u64(reader)? as usize;
let num_els = VInt::deserialize_u64(&mut data)?;
let mut postings_start = VInt::deserialize_u64(&mut data)? as usize;
let mut positions_start = VInt::deserialize_u64(&mut data)? as usize;
for _ in 0..num_els {
let doc_freq = VInt::deserialize_u64(reader)? as u32;
let postings_num_bytes = VInt::deserialize_u64(reader)?;
let positions_num_bytes = VInt::deserialize_u64(reader)?;
let doc_freq = VInt::deserialize_u64(&mut data)? as u32;
let postings_num_bytes = VInt::deserialize_u64(&mut data)?;
let positions_num_bytes = VInt::deserialize_u64(&mut data)?;
let postings_end = postings_start + postings_num_bytes as usize;
let positions_end = positions_start + positions_num_bytes as usize;
let term_info = TermInfo {
Expand All @@ -55,7 +69,8 @@ impl ValueReader for TermInfoReader {
postings_start = postings_end;
positions_start = positions_end;
}
Ok(())
let consumed_len = len_before - data.len();
Ok(consumed_len)
}
}

Expand All @@ -71,7 +86,7 @@ impl ValueWriter for TermInfoWriter {
self.term_infos.push(term_info.clone());
}

fn write_block(&mut self, buffer: &mut Vec<u8>) {
fn serialize_block(&mut self, buffer: &mut Vec<u8>) {
VInt(self.term_infos.len() as u64).serialize_into_vec(buffer);
if self.term_infos.is_empty() {
return;
Expand All @@ -89,17 +104,13 @@ impl ValueWriter for TermInfoWriter {

#[cfg(test)]
mod tests {
use std::io;

use sstable::value::{ValueReader, ValueWriter};

use super::BlockReader;
use crate::directory::OwnedBytes;
use crate::postings::TermInfo;
use crate::termdict::sstable_termdict::TermInfoReader;

#[test]
fn test_block_terminfos() -> io::Result<()> {
fn test_block_terminfos() {
let mut term_info_writer = super::TermInfoWriter::default();
term_info_writer.write(&TermInfo {
doc_freq: 120u32,
Expand All @@ -117,10 +128,9 @@ mod tests {
positions_range: 1100..1302,
});
let mut buffer = Vec::new();
term_info_writer.write_block(&mut buffer);
let mut block_reader = make_block_reader(&buffer[..]);
term_info_writer.serialize_block(&mut buffer);
let mut term_info_reader = TermInfoReader::default();
term_info_reader.read(&mut block_reader)?;
let num_bytes: usize = term_info_reader.load(&buffer[..]).unwrap();
assert_eq!(
term_info_reader.value(0),
&TermInfo {
Expand All @@ -129,16 +139,6 @@ mod tests {
positions_range: 10..122
}
);
assert!(block_reader.buffer().is_empty());
Ok(())
}

fn make_block_reader(data: &[u8]) -> BlockReader {
let mut buffer = (data.len() as u32).to_le_bytes().to_vec();
buffer.extend_from_slice(data);
let owned_bytes = OwnedBytes::new(buffer);
let mut block_reader = BlockReader::new(Box::new(owned_bytes));
block_reader.read_block().unwrap();
block_reader
assert_eq!(buffer.len(), num_bytes);
}
}
4 changes: 2 additions & 2 deletions src/termdict/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::path::PathBuf;
use std::str;
use std::{io, str};

use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
use crate::directory::{Directory, FileSlice, RamDirectory, TerminatingWrite};
Expand Down Expand Up @@ -247,7 +247,7 @@ fn test_empty_string() -> crate::Result<()> {
Ok(())
}

fn stream_range_test_dict() -> crate::Result<TermDictionary> {
fn stream_range_test_dict() -> io::Result<TermDictionary> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
for i in 0u8..10u8 {
Expand Down
9 changes: 8 additions & 1 deletion sstable/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@ edition = "2021"
[dependencies]
common = {path="../common", package="tantivy-common"}
ciborium = "0.2"
byteorder = "1"
serde = "1"
tantivy-fst = "0.4"

[dev-dependencies]
proptest = "1"
criterion = "0.4"
names = "0.14"
rand = "0.8"

[[bench]]
name = "stream_bench"
harness = false
87 changes: 87 additions & 0 deletions sstable/benches/stream_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::collections::BTreeSet;
use std::io;

use common::file_slice::FileSlice;
use criterion::{criterion_group, criterion_main, Criterion};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tantivy_sstable::{self, Dictionary, SSTableMonotonicU64};

const CHARSET: &'static [u8] = b"abcdefghij";

fn generate_key(rng: &mut impl Rng) -> String {
let len = rng.gen_range(3..12);
std::iter::from_fn(|| {
let idx = rng.gen_range(0..CHARSET.len());
Some(CHARSET[idx] as char)
})
.take(len)
.collect()
}

fn prepare_sstable() -> io::Result<Dictionary<SSTableMonotonicU64>> {
let mut rng = StdRng::from_seed([3u8; 32]);
let mut els = BTreeSet::new();
while els.len() < 100_000 {
els.insert(generate_key(&mut rng));
}
let mut dictionary_builder = Dictionary::<SSTableMonotonicU64>::builder(Vec::new())?;
for (ord, word) in els.iter().enumerate() {
dictionary_builder.insert(word, &(ord as u64))?;
}
let buffer = dictionary_builder.finish()?;
let dictionary = Dictionary::open(FileSlice::from(buffer))?;
Ok(dictionary)
}

fn stream_bench(
dictionary: &Dictionary<SSTableMonotonicU64>,
lower: &[u8],
upper: &[u8],
do_scan: bool,
) -> usize {
let mut stream = dictionary
.range()
.ge(lower)
.lt(upper)
.into_stream()
.unwrap();
if !do_scan {
return 0;
}
let mut count = 0;
while stream.advance() {
count += 1;
}
count
}

pub fn criterion_benchmark(c: &mut Criterion) {
let dict = prepare_sstable().unwrap();
c.bench_function("short_scan_init", |b| {
b.iter(|| stream_bench(&dict, b"fa", b"fana", false))
});
c.bench_function("short_scan_init_and_scan", |b| {
b.iter(|| {
assert_eq!(stream_bench(&dict, b"fa", b"faz", true), 971);
})
});
c.bench_function("full_scan_init_and_scan_full_with_bound", |b| {
b.iter(|| {
assert_eq!(stream_bench(&dict, b"", b"z", true), 100_000);
})
});
c.bench_function("full_scan_init_and_scan_full_no_bounds", |b| {
b.iter(|| {
let mut stream = dict.stream().unwrap();
let mut count = 0;
while stream.advance() {
count += 1;
}
count
})
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
13 changes: 9 additions & 4 deletions sstable/src/block_reader.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use std::io::{self, Read};

use byteorder::{LittleEndian, ReadBytesExt};
use std::io;

pub struct BlockReader<'a> {
buffer: Vec<u8>,
reader: Box<dyn io::Read + 'a>,
offset: usize,
}

#[inline]
fn read_u32(read: &mut dyn io::Read) -> io::Result<u32> {
let mut buf = [0u8; 4];
read.read_exact(&mut buf)?;
Ok(u32::from_le_bytes(buf))
}

impl<'a> BlockReader<'a> {
pub fn new(reader: Box<dyn io::Read + 'a>) -> BlockReader<'a> {
BlockReader {
Expand All @@ -30,7 +35,7 @@ impl<'a> BlockReader<'a> {

pub fn read_block(&mut self) -> io::Result<bool> {
self.offset = 0;
let block_len_res = self.reader.read_u32::<LittleEndian>();
let block_len_res = read_u32(self.reader.as_mut());
if let Err(err) = &block_len_res {
if err.kind() == io::ErrorKind::UnexpectedEof {
return Ok(false);
Expand Down
Loading