Skip to content

Commit

Permalink
add missing parameter to term agg (#2103)
Browse files Browse the repository at this point in the history
* add missing parameter to term agg

* move missing handling to block accessor

* add multivalue test, fix multivalue case, add comments

* add documentation, deactivate special case

* cargo fmt

* resolve merge conflict
  • Loading branch information
PSeitz authored Aug 14, 2023
1 parent 22c35b1 commit 2e10901
Show file tree
Hide file tree
Showing 6 changed files with 567 additions and 22 deletions.
96 changes: 96 additions & 0 deletions columnar/src/block_accessor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::cmp::Ordering;

use crate::{Column, DocId, RowId};

#[derive(Debug, Default, Clone)]
pub struct ColumnBlockAccessor<T> {
val_cache: Vec<T>,
docid_cache: Vec<DocId>,
missing_docids_cache: Vec<DocId>,
row_id_cache: Vec<RowId>,
}

Expand All @@ -20,6 +23,20 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
.values
.get_vals(&self.row_id_cache, &mut self.val_cache);
}
#[inline]
pub fn fetch_block_with_missing(&mut self, docs: &[u32], accessor: &Column<T>, missing: T) {
self.fetch_block(docs, accessor);
// We can compare docid_cache with docs to find missing docs
if docs.len() != self.docid_cache.len() || accessor.index.is_multivalue() {
self.missing_docids_cache.clear();
find_missing_docs(docs, &self.docid_cache, |doc| {
self.missing_docids_cache.push(doc);
self.val_cache.push(missing);
});
self.docid_cache
.extend_from_slice(&self.missing_docids_cache);
}
}

#[inline]
pub fn iter_vals(&self) -> impl Iterator<Item = T> + '_ {
Expand All @@ -34,3 +51,82 @@ impl<T: PartialOrd + Copy + std::fmt::Debug + Send + Sync + 'static + Default>
.zip(self.val_cache.iter().cloned())
}
}

/// Given two sorted lists of docids `docs` and `hits`, hits is a subset of `docs`.
/// Return all docs that are not in `hits`.
fn find_missing_docs<F>(docs: &[u32], hits: &[u32], mut callback: F)
where F: FnMut(u32) {
let mut docs_iter = docs.iter();
let mut hits_iter = hits.iter();

let mut doc = docs_iter.next();
let mut hit = hits_iter.next();

while let (Some(&current_doc), Some(&current_hit)) = (doc, hit) {
match current_doc.cmp(&current_hit) {
Ordering::Less => {
callback(current_doc);
doc = docs_iter.next();
}
Ordering::Equal => {
doc = docs_iter.next();
hit = hits_iter.next();
}
Ordering::Greater => {
hit = hits_iter.next();
}
}
}

while let Some(&current_doc) = doc {
callback(current_doc);
doc = docs_iter.next();
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_find_missing_docs() {
let docs: Vec<u32> = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let hits: Vec<u32> = vec![2, 4, 6, 8, 10];

let mut missing_docs: Vec<u32> = Vec::new();

find_missing_docs(&docs, &hits, |missing_doc| {
missing_docs.push(missing_doc);
});

assert_eq!(missing_docs, vec![1, 3, 5, 7, 9]);
}

#[test]
fn test_find_missing_docs_empty() {
let docs: Vec<u32> = Vec::new();
let hits: Vec<u32> = vec![2, 4, 6, 8, 10];

let mut missing_docs: Vec<u32> = Vec::new();

find_missing_docs(&docs, &hits, |missing_doc| {
missing_docs.push(missing_doc);
});

assert_eq!(missing_docs, vec![]);
}

#[test]
fn test_find_missing_docs_all_missing() {
let docs: Vec<u32> = vec![1, 2, 3, 4, 5];
let hits: Vec<u32> = Vec::new();

let mut missing_docs: Vec<u32> = Vec::new();

find_missing_docs(&docs, &hits, |missing_doc| {
missing_docs.push(missing_doc);
});

assert_eq!(missing_docs, vec![1, 2, 3, 4, 5]);
}
}
2 changes: 1 addition & 1 deletion columnar/src/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
.select_batch_in_place(selected_docid_range.start, doc_ids);
}

/// Fils the output vector with the (possibly multiple values that are associated_with
/// Fills the output vector with the (possibly multiple values that are associated_with
/// `row_id`.
///
/// This method clears the `output` vector.
Expand Down
4 changes: 4 additions & 0 deletions columnar/src/column_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ impl From<MultiValueIndex> for ColumnIndex {
}

impl ColumnIndex {
#[inline]
pub fn is_multivalue(&self) -> bool {
matches!(self, ColumnIndex::Multivalued(_))
}
// Returns the cardinality of the column index.
//
// By convention, if the column contains no docs, we consider that it is
Expand Down
3 changes: 2 additions & 1 deletion src/aggregation/agg_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ pub enum AggregationVariants {
}

impl AggregationVariants {
fn get_fast_field_name(&self) -> &str {
/// Returns the name of the field used by the aggregation.
pub fn get_fast_field_name(&self) -> &str {
match self {
AggregationVariants::Terms(terms) => terms.field.as_str(),
AggregationVariants::Range(range) => range.field.as_str(),
Expand Down
66 changes: 58 additions & 8 deletions src/aggregation/agg_req_with_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use super::metric::{
};
use super::segment_agg_result::AggregationLimits;
use super::VecWithNames;
use crate::aggregation::{f64_to_fastfield_u64, Key};
use crate::SegmentReader;

#[derive(Default)]
Expand All @@ -35,6 +36,8 @@ pub struct AggregationWithAccessor {
/// based on search terms. That is not that case currently, but eventually this needs to be
/// Option or moved.
pub(crate) accessor: Column<u64>,
/// Load insert u64 for missing use case
pub(crate) missing_value_for_accessor: Option<u64>,
pub(crate) str_dict_column: Option<StrColumn>,
pub(crate) field_type: ColumnType,
pub(crate) sub_aggregation: AggregationsWithAccessor,
Expand All @@ -44,12 +47,14 @@ pub struct AggregationWithAccessor {
}

impl AggregationWithAccessor {
/// May return multiple accessors if the aggregation is e.g. on mixed field types.
fn try_from_agg(
agg: &Aggregation,
sub_aggregation: &Aggregations,
reader: &SegmentReader,
limits: AggregationLimits,
) -> crate::Result<Vec<AggregationWithAccessor>> {
let mut missing_value_term_agg = None;
let mut str_dict_column = None;
use AggregationVariants::*;
let acc_field_types: Vec<(Column, ColumnType)> = match &agg.agg {
Expand All @@ -75,8 +80,11 @@ impl AggregationWithAccessor {
Some(get_numeric_or_date_column_types()),
)?],
Terms(TermsAggregation {
field: field_name, ..
field: field_name,
missing,
..
}) => {
missing_value_term_agg = missing.clone();
str_dict_column = reader.fast_fields().str(field_name)?;
let allowed_column_types = [
ColumnType::I64,
Expand All @@ -88,7 +96,21 @@ impl AggregationWithAccessor {
// ColumnType::IpAddr Unsupported
// ColumnType::DateTime Unsupported
];
get_all_ff_reader_or_empty(reader, field_name, Some(&allowed_column_types))?

// In case the column is empty we want the shim column to match the missing type
let fallback_type = missing
.as_ref()
.map(|missing| match missing {
Key::Str(_) => ColumnType::Str,
Key::F64(_) => ColumnType::F64,
})
.unwrap_or(ColumnType::U64);
get_all_ff_reader_or_empty(
reader,
field_name,
Some(&allowed_column_types),
fallback_type,
)?
}
Average(AverageAggregation { field: field_name })
| Count(CountAggregation { field: field_name })
Expand All @@ -113,10 +135,18 @@ impl AggregationWithAccessor {

let aggs: Vec<AggregationWithAccessor> = acc_field_types
.into_iter()
.map(|(accessor, field_type)| {
.map(|(accessor, column_type)| {
let missing_value_for_accessor =
if let Some(missing) = missing_value_term_agg.as_ref() {
get_missing_val(column_type, missing, agg.agg.get_fast_field_name())?
} else {
None
};

Ok(AggregationWithAccessor {
missing_value_for_accessor,
accessor,
field_type,
field_type: column_type,
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
sub_aggregation,
reader,
Expand All @@ -133,6 +163,28 @@ impl AggregationWithAccessor {
}
}

fn get_missing_val(
column_type: ColumnType,
missing: &Key,
field_name: &str,
) -> crate::Result<Option<u64>> {
let missing_val = match missing {
Key::Str(_) if column_type == ColumnType::Str => Some(u64::MAX),
// Allow fallback to number on text fields
Key::F64(_) if column_type == ColumnType::Str => Some(u64::MAX),
Key::F64(val) if column_type.numerical_type().is_some() => {
f64_to_fastfield_u64(*val, &column_type)
}
_ => {
return Err(crate::TantivyError::InvalidArgument(format!(
"Missing value {:?} for field {} is not supported for column type {:?}",
missing, field_name, column_type
)));
}
};
Ok(missing_val)
}

fn get_numeric_or_date_column_types() -> &'static [ColumnType] {
&[
ColumnType::F64,
Expand Down Expand Up @@ -189,15 +241,13 @@ fn get_all_ff_reader_or_empty(
reader: &SegmentReader,
field_name: &str,
allowed_column_types: Option<&[ColumnType]>,
fallback_type: ColumnType,
) -> crate::Result<Vec<(columnar::Column<u64>, ColumnType)>> {
let ff_fields = reader.fast_fields();
let mut ff_field_with_type =
ff_fields.u64_lenient_for_type_all(allowed_column_types, field_name)?;
if ff_field_with_type.is_empty() {
ff_field_with_type.push((
Column::build_empty_column(reader.num_docs()),
ColumnType::U64,
));
ff_field_with_type.push((Column::build_empty_column(reader.num_docs()), fallback_type));
}
Ok(ff_field_with_type)
}
Loading

0 comments on commit 2e10901

Please sign in to comment.