Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cardinality aggregation performance #2446

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repository = "https://github.com/quickwit-oss/tantivy"
readme = "README.md"
keywords = ["search", "information", "retrieval"]
edition = "2021"
rust-version = "1.63"
rust-version = "1.66"
exclude = ["benches/*.json", "benches/*.txt"]

[dependencies]
Expand All @@ -38,7 +38,7 @@ levenshtein_automata = "0.2.1"
uuid = { version = "1.0.0", features = ["v4", "serde"] }
crossbeam-channel = "0.5.4"
rust-stemmers = "1.2.0"
downcast-rs = "1.2.0"
downcast-rs = "1.2.1"
bitpacking = { version = "0.9.2", default-features = false, features = [
"bitpacker4x",
] }
Expand Down
36 changes: 34 additions & 2 deletions benches/agg_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ fn bench_agg(mut group: InputGroup<Index>) {
register!(group, terms_many_order_by_term);
register!(group, terms_many_with_top_hits);
register!(group, terms_many_with_avg_sub_agg);
register!(group, terms_many_json_mixed_type_with_sub_agg_card);
register!(group, terms_many_json_mixed_type_with_avg_sub_agg);

register!(group, cardinality_agg);
register!(group, terms_few_with_cardinality_agg);

register!(group, range_agg);
register!(group, range_agg_with_avg_sub_agg);
register!(group, range_agg_with_term_agg_few);
Expand Down Expand Up @@ -123,6 +127,33 @@ fn percentiles_f64(index: &Index) {
});
execute_agg(index, agg_req);
}

fn cardinality_agg(index: &Index) {
let agg_req = json!({
"cardinality": {
"cardinality": {
"field": "text_many_terms"
},
}
});
execute_agg(index, agg_req);
}
fn terms_few_with_cardinality_agg(index: &Index) {
let agg_req = json!({
"my_texts": {
"terms": { "field": "text_few_terms" },
"aggs": {
"cardinality": {
"cardinality": {
"field": "text_many_terms"
},
}
}
},
});
execute_agg(index, agg_req);
}

fn terms_few(index: &Index) {
let agg_req = json!({
"my_texts": { "terms": { "field": "text_few_terms" } },
Expand Down Expand Up @@ -171,7 +202,7 @@ fn terms_many_with_avg_sub_agg(index: &Index) {
});
execute_agg(index, agg_req);
}
fn terms_many_json_mixed_type_with_sub_agg_card(index: &Index) {
fn terms_many_json_mixed_type_with_avg_sub_agg(index: &Index) {
let agg_req = json!({
"my_texts": {
"terms": { "field": "json.mixed_type" },
Expand Down Expand Up @@ -268,6 +299,7 @@ fn range_agg_with_term_agg_many(index: &Index) {
});
execute_agg(index, agg_req);
}

fn histogram(index: &Index) {
let agg_req = json!({
"rangef64": {
Expand Down
6 changes: 3 additions & 3 deletions src/aggregation/agg_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use super::bucket::{
use super::metric::{
AverageAggregation, CardinalityAggregationReq, CountAggregation, ExtendedStatsAggregation,
MaxAggregation, MinAggregation, PercentilesAggregationReq, StatsAggregation, SumAggregation,
TopHitsAggregation,
TopHitsAggregationReq,
};

/// The top-level aggregation request structure, which contains [`Aggregation`] and their user
Expand Down Expand Up @@ -160,7 +160,7 @@ pub enum AggregationVariants {
Percentiles(PercentilesAggregationReq),
/// Finds the top k values matching some order
#[serde(rename = "top_hits")]
TopHits(TopHitsAggregation),
TopHits(TopHitsAggregationReq),
/// Computes an estimate of the number of unique values
#[serde(rename = "cardinality")]
Cardinality(CardinalityAggregationReq),
Expand Down Expand Up @@ -208,7 +208,7 @@ impl AggregationVariants {
_ => None,
}
}
pub(crate) fn as_top_hits(&self) -> Option<&TopHitsAggregation> {
pub(crate) fn as_top_hits(&self) -> Option<&TopHitsAggregationReq> {
match &self {
AggregationVariants::TopHits(top_hits) => Some(top_hits),
_ => None,
Expand Down
76 changes: 62 additions & 14 deletions src/aggregation/metric/cardinality.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::hash_map::DefaultHasher;
use std::hash::{BuildHasher, Hasher};

use columnar::column_values::CompactSpaceU64Accessor;
use columnar::{BytesColumn, StrColumn};
use columnar::Dictionary;
use common::f64_to_u64;
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
use rustc_hash::FxHashSet;
Expand Down Expand Up @@ -38,7 +38,53 @@ impl BuildHasher for BuildSaltedHasher {
///
/// The cardinality aggregation allows for computing an estimate
/// of the number of different values in a data set based on the
/// HyperLogLog++ alogrithm.
/// HyperLogLog++ algorithm. This is particularly useful for understanding the
/// uniqueness of values in a large dataset where counting each unique value
/// individually would be computationally expensive.
///
/// For example, you might use a cardinality aggregation to estimate the number
/// of unique visitors to a website by aggregating on a field that contains
/// user IDs or session IDs.
///
/// To use the cardinality aggregation, you'll need to provide a field to
/// aggregate on. The following example demonstrates a request for the cardinality
/// of the "user_id" field:
///
/// ```JSON
/// {
/// "cardinality": {
/// "field": "user_id"
/// }
/// }
/// ```
///
/// This request will return an estimate of the number of unique values in the
/// "user_id" field.
///
/// ## Missing Values
///
/// The `missing` parameter defines how documents that are missing a value should be treated.
/// By default, documents without a value for the specified field are ignored. However, you can
/// specify a default value for these documents using the `missing` parameter. This can be useful
/// when you want to include documents with missing values in the aggregation.
///
/// For example, the following request treats documents with missing values in the "user_id"
/// field as if they had a value of "unknown":
///
/// ```JSON
/// {
/// "cardinality": {
/// "field": "user_id",
/// "missing": "unknown"
/// }
/// }
/// ```
///
/// # Estimation Accuracy
///
/// The cardinality aggregation provides an approximate count, which is usually
/// accurate within a small error range. This trade-off allows for efficient
/// computation even on very large datasets.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct CardinalityAggregationReq {
/// The field name to compute the percentiles on.
Expand Down Expand Up @@ -108,27 +154,29 @@ impl SegmentCardinalityCollector {
agg_with_accessor: &AggregationWithAccessor,
) -> crate::Result<IntermediateMetricResult> {
if self.column_type == ColumnType::Str {
let mut buffer = String::new();
let term_dict = agg_with_accessor
let fallback_dict = Dictionary::empty();
let dict = agg_with_accessor
.str_dict_column
.as_ref()
.cloned()
.unwrap_or_else(|| {
StrColumn::wrap(BytesColumn::empty(agg_with_accessor.accessor.num_docs()))
});
.map(|el| el.dictionary())
.unwrap_or_else(|| &fallback_dict);
let mut has_missing = false;

// TODO: replace FxHashSet with something that allows iterating in order
// (e.g. sparse bitvec)
let mut term_ids = Vec::new();
for term_ord in self.entries.into_iter() {
if term_ord == u64::MAX {
has_missing = true;
} else {
if !term_dict.ord_to_str(term_ord, &mut buffer)? {
return Err(TantivyError::InternalError(format!(
"Couldn't find term_ord {term_ord} in dict"
)));
}
self.cardinality.sketch.insert_any(&buffer);
// we can reasonably exclude values above u32::MAX
term_ids.push(term_ord as u32);
}
}
term_ids.sort_unstable();
dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| {
self.cardinality.sketch.insert_any(&term);
})?;
if has_missing {
let missing_key = self
.missing
Expand Down
14 changes: 7 additions & 7 deletions src/aggregation/metric/top_hits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ use crate::{DocAddress, DocId, SegmentOrdinal};
/// }
/// ```
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct TopHitsAggregation {
pub struct TopHitsAggregationReq {
sort: Vec<KeyOrder>,
size: usize,
from: Option<usize>,
Expand Down Expand Up @@ -164,7 +164,7 @@ fn unsupported_err(parameter: &str) -> crate::Result<()> {
))
}

impl TopHitsAggregation {
impl TopHitsAggregationReq {
/// Validate and resolve field retrieval parameters
pub fn validate_and_resolve_field_names(
&mut self,
Expand Down Expand Up @@ -431,7 +431,7 @@ impl Eq for DocSortValuesAndFields {}
/// The TopHitsCollector used for collecting over segments and merging results.
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct TopHitsTopNComputer {
req: TopHitsAggregation,
req: TopHitsAggregationReq,
top_n: TopNComputer<DocSortValuesAndFields, DocAddress, false>,
}

Expand All @@ -443,7 +443,7 @@ impl std::cmp::PartialEq for TopHitsTopNComputer {

impl TopHitsTopNComputer {
/// Create a new TopHitsCollector
pub fn new(req: &TopHitsAggregation) -> Self {
pub fn new(req: &TopHitsAggregationReq) -> Self {
Self {
top_n: TopNComputer::new(req.size + req.from.unwrap_or(0)),
req: req.clone(),
Expand Down Expand Up @@ -496,7 +496,7 @@ pub(crate) struct TopHitsSegmentCollector {

impl TopHitsSegmentCollector {
pub fn from_req(
req: &TopHitsAggregation,
req: &TopHitsAggregationReq,
accessor_idx: usize,
segment_ordinal: SegmentOrdinal,
) -> Self {
Expand All @@ -509,7 +509,7 @@ impl TopHitsSegmentCollector {
fn into_top_hits_collector(
self,
value_accessors: &HashMap<String, Vec<DynamicColumn>>,
req: &TopHitsAggregation,
req: &TopHitsAggregationReq,
) -> TopHitsTopNComputer {
let mut top_hits_computer = TopHitsTopNComputer::new(req);
let top_results = self.top_n.into_vec();
Expand All @@ -532,7 +532,7 @@ impl TopHitsSegmentCollector {
fn collect_with(
&mut self,
doc_id: crate::DocId,
req: &TopHitsAggregation,
req: &TopHitsAggregationReq,
accessors: &[(Column<u64>, ColumnType)],
) -> crate::Result<()> {
let sorts: Vec<DocValueAndOrder> = req
Expand Down
3 changes: 3 additions & 0 deletions src/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@
//! - [Metric](metric)
//! - [Average](metric::AverageAggregation)
//! - [Stats](metric::StatsAggregation)
//! - [ExtendedStats](metric::ExtendedStatsAggregation)
//! - [Min](metric::MinAggregation)
//! - [Max](metric::MaxAggregation)
//! - [Sum](metric::SumAggregation)
//! - [Count](metric::CountAggregation)
//! - [Percentiles](metric::PercentilesAggregationReq)
//! - [Cardinality](metric::CardinalityAggregationReq)
//! - [TopHits](metric::TopHitsAggregationReq)
//!
//! # Example
//! Compute the average metric, by building [`agg_req::Aggregations`], which is built from an
Expand Down
4 changes: 2 additions & 2 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1574,11 +1574,11 @@ mod tests {
deleted_ids.remove(id);
}
IndexingOp::DeleteDoc { id } => {
existing_ids.remove(&id);
existing_ids.remove(id);
deleted_ids.insert(*id);
}
IndexingOp::DeleteDocQuery { id } => {
existing_ids.remove(&id);
existing_ids.remove(id);
deleted_ids.insert(*id);
}
_ => {}
Expand Down
2 changes: 1 addition & 1 deletion src/positions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! In "The beauty and the beast", the term "the" appears in position 0 and position 3.
//! This information is useful to run phrase queries.
//!
//! The [position](crate::SegmentComponent::Positions) file contains all of the
//! The [position](crate::index::SegmentComponent::Positions) file contains all of the
//! bitpacked positions delta, for all terms of a given field, one term after the other.
//!
//! Each term is encoded independently.
Expand Down
3 changes: 1 addition & 2 deletions src/query/boolean_query/boolean_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ impl Clone for BooleanQuery {
.subqueries
.iter()
.map(|(occur, subquery)| (*occur, subquery.box_clone()))
.collect::<Vec<_>>()
.into();
.collect::<Vec<_>>();
Self {
subqueries,
minimum_number_should_match: self.minimum_number_should_match,
Expand Down
2 changes: 1 addition & 1 deletion src/query/boolean_query/boolean_weight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl<TScoreCombiner: ScoreCombiner> BooleanWeight<TScoreCombiner> {
scorer_union(should_scorers, &score_combiner_fn),
&score_combiner_fn,
)),
n @ _ if num_of_should_scorers == n => {
n if num_of_should_scorers == n => {
// When num_of_should_scorers equals the number of should clauses,
// they are no different from must clauses.
must_scorers = match must_scorers.take() {
Expand Down
2 changes: 1 addition & 1 deletion src/query/disjunction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
self.current_doc = TERMINATED;
}
self.current_score = self.score_combiner.score();
return self.current_doc;
self.current_doc
}

#[inline]
Expand Down Expand Up @@ -192,7 +192,7 @@
.cloned()
.map(VecDocSet::from)
.map(|d| ConstScorer::new(d, 1.0)),
DoNothingCombiner::default(),

Check warning on line 195 in src/query/disjunction.rs

View workflow job for this annotation

GitHub Actions / clippy

use of `default` to create a unit struct

warning: use of `default` to create a unit struct --> src/query/disjunction.rs:195:34 | 195 | DoNothingCombiner::default(), | ^^^^^^^^^^^ help: remove this call to `default` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#default_constructed_unit_structs = note: `#[warn(clippy::default_constructed_unit_structs)]` on by default
min_match,
)
};
Expand Down
4 changes: 2 additions & 2 deletions src/schema/document/default_document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ impl std::fmt::Debug for ValueAddr {

/// A enum representing a value for tantivy to index.
///
/// Any changes need to be reflected in `BinarySerializable` for `ValueType`
/// ** Any changes need to be reflected in `BinarySerializable` for `ValueType` **
///
/// We can't use [schema::Type] or [columnar::ColumnType] here, because they are missing
/// some items like Array and PreTokStr.
Expand Down Expand Up @@ -553,7 +553,7 @@ impl BinarySerializable for ValueType {
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let num = u8::deserialize(reader)?;
let type_id = if (0..=12).contains(&num) {
unsafe { std::mem::transmute(num) }
unsafe { std::mem::transmute::<u8, ValueType>(num) }
} else {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
Expand Down
4 changes: 1 addition & 3 deletions sstable/benches/ord_to_term.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ fn make_test_sstable(suffix: &str) -> FileSlice {

let table = builder.finish().unwrap();
let table = Arc::new(OwnedBytes::new(table));
let slice = common::file_slice::FileSlice::new(table.clone());

slice
common::file_slice::FileSlice::new(table.clone())
}

pub fn criterion_benchmark(c: &mut Criterion) {
Expand Down
2 changes: 1 addition & 1 deletion sstable/benches/stream_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tantivy_sstable::{Dictionary, MonotonicU64SSTable};

const CHARSET: &'static [u8] = b"abcdefghij";
const CHARSET: &[u8] = b"abcdefghij";

fn generate_key(rng: &mut impl Rng) -> String {
let len = rng.gen_range(3..12);
Expand Down
Loading
Loading