Skip to content

Commit

Permalink
Minor refactoring fast fields
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Sep 21, 2022
1 parent 8cca101 commit 0349fdc
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 207 deletions.
6 changes: 3 additions & 3 deletions fastfield_codecs/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl<'a, T: Copy + PartialOrd + Send + Sync> Column<T> for VecColumn<'a, T> {
self.values[position as usize]
}

fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = T> + 'b> {
fn iter(&self) -> Box<dyn Iterator<Item = T> + '_> {
Box::new(self.values.iter().copied())
}

Expand Down Expand Up @@ -190,7 +190,7 @@ where
self.from_column.num_vals()
}

fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = Output> + 'a> {
fn iter(&self) -> Box<dyn Iterator<Item = Output> + '_> {
Box::new(self.from_column.iter().map(&self.monotonic_mapping))
}

Expand Down Expand Up @@ -252,7 +252,7 @@ where
self.0.len() as u64
}

fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = T::Item> + 'a> {
fn iter(&self) -> Box<dyn Iterator<Item = T::Item> + '_> {
Box::new(self.0.clone())
}
}
Expand Down
2 changes: 1 addition & 1 deletion fastfield_codecs/src/compact_space/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl Column<u128> for CompactSpaceDecompressor {
}

#[inline]
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = u128> + 'a> {
fn iter(&self) -> Box<dyn Iterator<Item = u128> + '_> {
Box::new(self.iter())
}
fn get_between_vals(&self, range: RangeInclusive<u128>) -> Vec<u64> {
Expand Down
211 changes: 9 additions & 202 deletions src/indexer/merger.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::cmp;
use std::collections::HashMap;
use std::io::Write;
use std::sync::Arc;
Expand All @@ -15,6 +14,8 @@ use crate::fastfield::{
};
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping};
use crate::indexer::sorted_doc_id_column::SortedDocIdColumn;
use crate::indexer::sorted_doc_id_multivalue_column::SortedDocIdMultiValueColumn;
use crate::indexer::SegmentSerializer;
use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings};
use crate::schema::{Cardinality, Field, FieldType, Schema};
Expand Down Expand Up @@ -87,28 +88,6 @@ pub struct IndexMerger {
max_doc: u32,
}

fn compute_min_max_val(
u64_reader: &dyn Column<u64>,
segment_reader: &SegmentReader,
) -> Option<(u64, u64)> {
if segment_reader.max_doc() == 0 {
return None;
}

if segment_reader.alive_bitset().is_none() {
// no deleted documents,
// we can use the previous min_val, max_val.
return Some((u64_reader.min_value(), u64_reader.max_value()));
}
// some deleted documents,
// we need to recompute the max / min
segment_reader
.doc_ids_alive()
.map(|doc_id| u64_reader.get_val(doc_id as u64))
.minmax()
.into_option()
}

struct TermOrdinalMapping {
per_segment_new_term_ordinals: Vec<Vec<TermOrdinal>>,
}
Expand Down Expand Up @@ -331,81 +310,9 @@ impl IndexMerger {
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
let (min_value, max_value) = self
.readers
.iter()
.filter_map(|reader| {
let u64_reader: Arc<dyn Column<u64>> =
reader.fast_fields().typed_fast_field_reader(field).expect(
"Failed to find a reader for single fast field. This is a tantivy bug and \
it should never happen.",
);
compute_min_max_val(&*u64_reader, reader)
})
.reduce(|a, b| (a.0.min(b.0), a.1.max(b.1)))
.expect("Unexpected error, empty readers in IndexMerger");

let fast_field_readers = self
.readers
.iter()
.map(|reader| {
let u64_reader: Arc<dyn Column<u64>> =
reader.fast_fields().typed_fast_field_reader(field).expect(
"Failed to find a reader for single fast field. This is a tantivy bug and \
it should never happen.",
);
u64_reader
})
.collect::<Vec<_>>();

#[derive(Clone)]
struct SortedDocIdFieldAccessProvider<'a> {
doc_id_mapping: &'a SegmentDocIdMapping,
fast_field_readers: &'a Vec<Arc<dyn Column<u64>>>,
min_value: u64,
max_value: u64,
num_vals: u64,
}
impl<'a> Column for SortedDocIdFieldAccessProvider<'a> {
fn get_val(&self, doc: u64) -> u64 {
let DocAddress {
doc_id,
segment_ord,
} = self.doc_id_mapping.get_old_doc_addr(doc as u32);
self.fast_field_readers[segment_ord as usize].get_val(doc_id as u64)
}

fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Box::new(
self.doc_id_mapping
.iter_old_doc_addrs()
.map(|old_doc_addr| {
let fast_field_reader =
&self.fast_field_readers[old_doc_addr.segment_ord as usize];
fast_field_reader.get_val(old_doc_addr.doc_id as u64)
}),
)
}
fn min_value(&self) -> u64 {
self.min_value
}

fn max_value(&self) -> u64 {
self.max_value
}

fn num_vals(&self) -> u64 {
self.num_vals
}
}
let fastfield_accessor = SortedDocIdFieldAccessProvider {
doc_id_mapping,
fast_field_readers: &fast_field_readers,
min_value,
max_value,
num_vals: doc_id_mapping.len() as u64,
};
fast_field_serializer.create_auto_detect_u64_fast_field(field, fastfield_accessor)?;
let fast_field_accessor = SortedDocIdColumn::new(&self.readers, doc_id_mapping, field);
fast_field_serializer.create_auto_detect_u64_fast_field(field, fast_field_accessor)?;

Ok(())
}
Expand Down Expand Up @@ -663,113 +570,13 @@ impl IndexMerger {
let offsets =
self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?;

let mut min_value = u64::MAX;
let mut max_value = u64::MIN;
let mut num_vals = 0;

let mut vals = Vec::with_capacity(100);

let mut ff_readers = Vec::new();

// Our values are bitpacked and we need to know what should be
// our bitwidth and our minimum value before serializing any values.
//
// Computing those is non-trivial if some documents are deleted.
// We go through a complete first pass to compute the minimum and the
// maximum value and initialize our Serializer.
for reader in &self.readers {
let ff_reader: MultiValuedFastFieldReader<u64> = reader
.fast_fields()
.typed_fast_field_multi_reader::<u64>(field)
.expect(
"Failed to find multivalued fast field reader. This is a bug in tantivy. \
Please report.",
);
for doc in reader.doc_ids_alive() {
ff_reader.get_vals(doc, &mut vals);
for &val in &vals {
min_value = cmp::min(val, min_value);
max_value = cmp::max(val, max_value);
}
num_vals += vals.len();
}
ff_readers.push(ff_reader);
// TODO optimize when no deletes
}

if min_value > max_value {
min_value = 0;
max_value = 0;
}

// We can now initialize our serializer, and push it the different values
struct SortedDocIdMultiValueAccessProvider<'a> {
doc_id_mapping: &'a SegmentDocIdMapping,
fast_field_readers: &'a Vec<MultiValuedFastFieldReader<u64>>,
offsets: Vec<u64>,
min_value: u64,
max_value: u64,
num_vals: u64,
}
impl<'a> Column for SortedDocIdMultiValueAccessProvider<'a> {
fn get_val(&self, pos: u64) -> u64 {
// use the offsets index to find the doc_id which will contain the position.
// the offsets are strictly increasing so we can do a simple search on it.
let new_doc_id: DocId =
self.offsets
.iter()
.position(|&offset| offset > pos)
.expect("pos is out of bounds") as DocId
- 1u32;

// now we need to find the position of `pos` in the multivalued bucket
let num_pos_covered_until_now = self.offsets[new_doc_id as usize];
let pos_in_values = pos - num_pos_covered_until_now;

let old_doc_addr = self.doc_id_mapping.get_old_doc_addr(new_doc_id);
let num_vals = self.fast_field_readers[old_doc_addr.segment_ord as usize]
.get_len(old_doc_addr.doc_id);
assert!(num_vals >= pos_in_values);
let mut vals = Vec::new();
self.fast_field_readers[old_doc_addr.segment_ord as usize]
.get_vals(old_doc_addr.doc_id, &mut vals);

vals[pos_in_values as usize]
}

fn iter(&self) -> Box<dyn Iterator<Item = u64> + '_> {
Box::new(
self.doc_id_mapping
.iter_old_doc_addrs()
.flat_map(|old_doc_addr| {
let ff_reader =
&self.fast_field_readers[old_doc_addr.segment_ord as usize];
let mut vals = Vec::new();
ff_reader.get_vals(old_doc_addr.doc_id, &mut vals);
vals.into_iter()
}),
)
}
fn min_value(&self) -> u64 {
self.min_value
}

fn max_value(&self) -> u64 {
self.max_value
}

fn num_vals(&self) -> u64 {
self.num_vals
}
}
let fastfield_accessor = SortedDocIdMultiValueAccessProvider {
let fastfield_accessor = SortedDocIdMultiValueColumn::new(
&self.readers,
doc_id_mapping,
fast_field_readers: &ff_readers,
offsets,
min_value,
max_value,
num_vals: num_vals as u64,
};
&offsets,
field
);
fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(
field,
fastfield_accessor,
Expand Down
2 changes: 2 additions & 0 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ mod segment_register;
pub mod segment_serializer;
pub mod segment_updater;
mod segment_writer;
mod sorted_doc_id_column;
mod sorted_doc_id_multivalue_column;
mod stamper;

use crossbeam_channel as channel;
Expand Down
Loading

0 comments on commit 0349fdc

Please sign in to comment.