From 2e109018b72f672927f99ff08a0e453727ddb3d4 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Mon, 14 Aug 2023 20:22:18 +0800 Subject: [PATCH] add missing parameter to term agg (#2103) * add missing parameter to term agg * move missing handling to block accessor * add multivalue test, fix multivalue case, add comments * add documentation, deactivate special case * cargo fmt * resolve merge conflict --- columnar/src/block_accessor.rs | 96 ++++++ columnar/src/column/mod.rs | 2 +- columnar/src/column_index/mod.rs | 4 + src/aggregation/agg_req.rs | 3 +- src/aggregation/agg_req_with_accessor.rs | 66 +++- src/aggregation/bucket/term_agg.rs | 418 ++++++++++++++++++++++- 6 files changed, 567 insertions(+), 22 deletions(-) diff --git a/columnar/src/block_accessor.rs b/columnar/src/block_accessor.rs index 5c1913a360..378f361043 100644 --- a/columnar/src/block_accessor.rs +++ b/columnar/src/block_accessor.rs @@ -1,9 +1,12 @@ +use std::cmp::Ordering; + use crate::{Column, DocId, RowId}; #[derive(Debug, Default, Clone)] pub struct ColumnBlockAccessor { val_cache: Vec, docid_cache: Vec, + missing_docids_cache: Vec, row_id_cache: Vec, } @@ -20,6 +23,20 @@ impl .values .get_vals(&self.row_id_cache, &mut self.val_cache); } + #[inline] + pub fn fetch_block_with_missing(&mut self, docs: &[u32], accessor: &Column, missing: T) { + self.fetch_block(docs, accessor); + // We can compare docid_cache with docs to find missing docs + if docs.len() != self.docid_cache.len() || accessor.index.is_multivalue() { + self.missing_docids_cache.clear(); + find_missing_docs(docs, &self.docid_cache, |doc| { + self.missing_docids_cache.push(doc); + self.val_cache.push(missing); + }); + self.docid_cache + .extend_from_slice(&self.missing_docids_cache); + } + } #[inline] pub fn iter_vals(&self) -> impl Iterator + '_ { @@ -34,3 +51,82 @@ impl .zip(self.val_cache.iter().cloned()) } } + +/// Given two sorted lists of docids `docs` and `hits`, hits is a subset of `docs`. +/// Return all docs that are not in `hits`. +fn find_missing_docs(docs: &[u32], hits: &[u32], mut callback: F) +where F: FnMut(u32) { + let mut docs_iter = docs.iter(); + let mut hits_iter = hits.iter(); + + let mut doc = docs_iter.next(); + let mut hit = hits_iter.next(); + + while let (Some(¤t_doc), Some(¤t_hit)) = (doc, hit) { + match current_doc.cmp(¤t_hit) { + Ordering::Less => { + callback(current_doc); + doc = docs_iter.next(); + } + Ordering::Equal => { + doc = docs_iter.next(); + hit = hits_iter.next(); + } + Ordering::Greater => { + hit = hits_iter.next(); + } + } + } + + while let Some(¤t_doc) = doc { + callback(current_doc); + doc = docs_iter.next(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_find_missing_docs() { + let docs: Vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let hits: Vec = vec![2, 4, 6, 8, 10]; + + let mut missing_docs: Vec = Vec::new(); + + find_missing_docs(&docs, &hits, |missing_doc| { + missing_docs.push(missing_doc); + }); + + assert_eq!(missing_docs, vec![1, 3, 5, 7, 9]); + } + + #[test] + fn test_find_missing_docs_empty() { + let docs: Vec = Vec::new(); + let hits: Vec = vec![2, 4, 6, 8, 10]; + + let mut missing_docs: Vec = Vec::new(); + + find_missing_docs(&docs, &hits, |missing_doc| { + missing_docs.push(missing_doc); + }); + + assert_eq!(missing_docs, vec![]); + } + + #[test] + fn test_find_missing_docs_all_missing() { + let docs: Vec = vec![1, 2, 3, 4, 5]; + let hits: Vec = Vec::new(); + + let mut missing_docs: Vec = Vec::new(); + + find_missing_docs(&docs, &hits, |missing_doc| { + missing_docs.push(missing_doc); + }); + + assert_eq!(missing_docs, vec![1, 2, 3, 4, 5]); + } +} diff --git a/columnar/src/column/mod.rs b/columnar/src/column/mod.rs index 187377586a..37db03e1be 100644 --- a/columnar/src/column/mod.rs +++ b/columnar/src/column/mod.rs @@ -130,7 +130,7 @@ impl Column { .select_batch_in_place(selected_docid_range.start, doc_ids); } - /// Fils the output vector with the (possibly multiple values that are associated_with + /// Fills the output vector with the (possibly multiple values that are associated_with /// `row_id`. /// /// This method clears the `output` vector. diff --git a/columnar/src/column_index/mod.rs b/columnar/src/column_index/mod.rs index 1a0e9073ce..41865e67ae 100644 --- a/columnar/src/column_index/mod.rs +++ b/columnar/src/column_index/mod.rs @@ -37,6 +37,10 @@ impl From for ColumnIndex { } impl ColumnIndex { + #[inline] + pub fn is_multivalue(&self) -> bool { + matches!(self, ColumnIndex::Multivalued(_)) + } // Returns the cardinality of the column index. // // By convention, if the column contains no docs, we consider that it is diff --git a/src/aggregation/agg_req.rs b/src/aggregation/agg_req.rs index d8d30f6ac2..ca6ff48a96 100644 --- a/src/aggregation/agg_req.rs +++ b/src/aggregation/agg_req.rs @@ -123,7 +123,8 @@ pub enum AggregationVariants { } impl AggregationVariants { - fn get_fast_field_name(&self) -> &str { + /// Returns the name of the field used by the aggregation. + pub fn get_fast_field_name(&self) -> &str { match self { AggregationVariants::Terms(terms) => terms.field.as_str(), AggregationVariants::Range(range) => range.field.as_str(), diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index 16c7bf478e..8f8a3df057 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -13,6 +13,7 @@ use super::metric::{ }; use super::segment_agg_result::AggregationLimits; use super::VecWithNames; +use crate::aggregation::{f64_to_fastfield_u64, Key}; use crate::SegmentReader; #[derive(Default)] @@ -35,6 +36,8 @@ pub struct AggregationWithAccessor { /// based on search terms. That is not that case currently, but eventually this needs to be /// Option or moved. pub(crate) accessor: Column, + /// Load insert u64 for missing use case + pub(crate) missing_value_for_accessor: Option, pub(crate) str_dict_column: Option, pub(crate) field_type: ColumnType, pub(crate) sub_aggregation: AggregationsWithAccessor, @@ -44,12 +47,14 @@ pub struct AggregationWithAccessor { } impl AggregationWithAccessor { + /// May return multiple accessors if the aggregation is e.g. on mixed field types. fn try_from_agg( agg: &Aggregation, sub_aggregation: &Aggregations, reader: &SegmentReader, limits: AggregationLimits, ) -> crate::Result> { + let mut missing_value_term_agg = None; let mut str_dict_column = None; use AggregationVariants::*; let acc_field_types: Vec<(Column, ColumnType)> = match &agg.agg { @@ -75,8 +80,11 @@ impl AggregationWithAccessor { Some(get_numeric_or_date_column_types()), )?], Terms(TermsAggregation { - field: field_name, .. + field: field_name, + missing, + .. }) => { + missing_value_term_agg = missing.clone(); str_dict_column = reader.fast_fields().str(field_name)?; let allowed_column_types = [ ColumnType::I64, @@ -88,7 +96,21 @@ impl AggregationWithAccessor { // ColumnType::IpAddr Unsupported // ColumnType::DateTime Unsupported ]; - get_all_ff_reader_or_empty(reader, field_name, Some(&allowed_column_types))? + + // In case the column is empty we want the shim column to match the missing type + let fallback_type = missing + .as_ref() + .map(|missing| match missing { + Key::Str(_) => ColumnType::Str, + Key::F64(_) => ColumnType::F64, + }) + .unwrap_or(ColumnType::U64); + get_all_ff_reader_or_empty( + reader, + field_name, + Some(&allowed_column_types), + fallback_type, + )? } Average(AverageAggregation { field: field_name }) | Count(CountAggregation { field: field_name }) @@ -113,10 +135,18 @@ impl AggregationWithAccessor { let aggs: Vec = acc_field_types .into_iter() - .map(|(accessor, field_type)| { + .map(|(accessor, column_type)| { + let missing_value_for_accessor = + if let Some(missing) = missing_value_term_agg.as_ref() { + get_missing_val(column_type, missing, agg.agg.get_fast_field_name())? + } else { + None + }; + Ok(AggregationWithAccessor { + missing_value_for_accessor, accessor, - field_type, + field_type: column_type, sub_aggregation: get_aggs_with_segment_accessor_and_validate( sub_aggregation, reader, @@ -133,6 +163,28 @@ impl AggregationWithAccessor { } } +fn get_missing_val( + column_type: ColumnType, + missing: &Key, + field_name: &str, +) -> crate::Result> { + let missing_val = match missing { + Key::Str(_) if column_type == ColumnType::Str => Some(u64::MAX), + // Allow fallback to number on text fields + Key::F64(_) if column_type == ColumnType::Str => Some(u64::MAX), + Key::F64(val) if column_type.numerical_type().is_some() => { + f64_to_fastfield_u64(*val, &column_type) + } + _ => { + return Err(crate::TantivyError::InvalidArgument(format!( + "Missing value {:?} for field {} is not supported for column type {:?}", + missing, field_name, column_type + ))); + } + }; + Ok(missing_val) +} + fn get_numeric_or_date_column_types() -> &'static [ColumnType] { &[ ColumnType::F64, @@ -189,15 +241,13 @@ fn get_all_ff_reader_or_empty( reader: &SegmentReader, field_name: &str, allowed_column_types: Option<&[ColumnType]>, + fallback_type: ColumnType, ) -> crate::Result, ColumnType)>> { let ff_fields = reader.fast_fields(); let mut ff_field_with_type = ff_fields.u64_lenient_for_type_all(allowed_column_types, field_name)?; if ff_field_with_type.is_empty() { - ff_field_with_type.push(( - Column::build_empty_column(reader.num_docs()), - ColumnType::U64, - )); + ff_field_with_type.push((Column::build_empty_column(reader.num_docs()), fallback_type)); } Ok(ff_field_with_type) } diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 752f4657e2..79db14c971 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -9,7 +9,6 @@ use crate::aggregation::agg_limits::MemoryConsumption; use crate::aggregation::agg_req_with_accessor::{ AggregationWithAccessor, AggregationsWithAccessor, }; -use crate::aggregation::f64_from_fastfield_u64; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, IntermediateKey, IntermediateTermBucketEntry, IntermediateTermBucketResult, @@ -17,6 +16,7 @@ use crate::aggregation::intermediate_agg_result::{ use crate::aggregation::segment_agg_result::{ build_segment_agg_collector, SegmentAggregationCollector, }; +use crate::aggregation::{f64_from_fastfield_u64, Key}; use crate::error::DataCorruption; use crate::TantivyError; @@ -146,6 +146,28 @@ pub struct TermsAggregation { /// { "average_price": "asc" } #[serde(skip_serializing_if = "Option::is_none", default)] pub order: Option, + + /// The missing parameter defines how documents that are missing a value should be treated. + /// By default they will be ignored but it is also possible to treat them as if they had a + /// value. Examples in JSON format: + /// { "missing": "NO_DATA" } + /// + /// # Internal + /// + /// Internally, `missing` requires some specialized handling in some scenarios. + /// + /// Simple Case: + /// In the simplest case, we can just put the missing value in the termmap use that. In case of + /// text we put a special u64::MAX and replace it at the end with the actual missing value, + /// when loading the text. + /// Special Case 1: + /// If we have multiple columns on one field, we need to have a union on the indices on both + /// columns, to find docids without a value. That requires a special missing aggreggation. + /// Special Case 2: if the key is of type text and the column is numerical, we also need to use + /// the special missing aggregation, since there is no mechanism in the numerical column to + /// add text. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub missing: Option, } /// Same as TermsAggregation, but with populated defaults. @@ -176,6 +198,7 @@ pub(crate) struct TermsAggregationInternal { pub min_doc_count: u64, pub order: CustomOrder, + pub missing: Option, } impl TermsAggregationInternal { @@ -195,6 +218,7 @@ impl TermsAggregationInternal { .unwrap_or_else(|| order == CustomOrder::default()), min_doc_count: req.min_doc_count.unwrap_or(1), order, + missing: req.missing.clone(), } } } @@ -275,9 +299,16 @@ impl SegmentAggregationCollector for SegmentTermCollector { let mem_pre = self.get_memory_consumption(); - bucket_agg_accessor - .column_block_accessor - .fetch_block(docs, &bucket_agg_accessor.accessor); + if let Some(missing) = bucket_agg_accessor.missing_value_for_accessor { + bucket_agg_accessor + .column_block_accessor + .fetch_block_with_missing(docs, &bucket_agg_accessor.accessor, missing); + } else { + bucket_agg_accessor + .column_block_accessor + .fetch_block(docs, &bucket_agg_accessor.accessor); + } + for term_id in bucket_agg_accessor.column_block_accessor.iter_vals() { let entry = self.term_buckets.entries.entry(term_id).or_default(); *entry += 1; @@ -443,15 +474,36 @@ impl SegmentTermCollector { let mut buffer = String::new(); for (term_id, doc_count) in entries { - if !term_dict.ord_to_str(term_id, &mut buffer)? { - return Err(TantivyError::InternalError(format!( - "Couldn't find term_id {term_id} in dict" - ))); - } - let intermediate_entry = into_intermediate_bucket_entry(term_id, doc_count)?; - - dict.insert(IntermediateKey::Str(buffer.to_string()), intermediate_entry); + // Special case for missing key + if term_id == u64::MAX { + let missing_key = self + .req + .missing + .as_ref() + .expect("Found placeholder term_id but `missing` is None"); + match missing_key { + Key::Str(missing) => { + buffer.clear(); + buffer.push_str(missing); + dict.insert( + IntermediateKey::Str(buffer.to_string()), + intermediate_entry, + ); + } + Key::F64(val) => { + buffer.push_str(&val.to_string()); + dict.insert(IntermediateKey::F64(*val), intermediate_entry); + } + } + } else { + if !term_dict.ord_to_str(term_id, &mut buffer)? { + return Err(TantivyError::InternalError(format!( + "Couldn't find term_id {term_id} in dict" + ))); + } + dict.insert(IntermediateKey::Str(buffer.to_string()), intermediate_entry); + } } if self.req.min_doc_count == 0 { // TODO: Handle rev streaming for descending sorting by keys @@ -1437,6 +1489,348 @@ mod tests { assert_eq!(res["my_texts"]["sum_other_doc_count"], 0); assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0); + Ok(()) + } + #[test] + fn terms_aggregation_missing_multi_value() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", FAST); + let id_field = schema_builder.add_u64_field("id", FAST); + let index = Index::create_in_ram(schema_builder.build()); + { + let mut index_writer = index.writer_with_num_threads(1, 20_000_000)?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + index_writer.add_document(doc!( + text_field => "Hello Hello", + text_field => "Hello Hello", + id_field => 1u64, + id_field => 1u64, + ))?; + // Missing + index_writer.add_document(doc!())?; + index_writer.add_document(doc!( + text_field => "Hello Hello", + ))?; + index_writer.add_document(doc!( + text_field => "Hello Hello", + ))?; + index_writer.commit()?; + // Empty segment special case + index_writer.add_document(doc!())?; + index_writer.commit()?; + // Full segment special case + index_writer.add_document(doc!( + text_field => "Hello Hello", + id_field => 1u64, + ))?; + index_writer.commit()?; + } + + let agg_req: Aggregations = serde_json::from_value(json!({ + "my_texts": { + "terms": { + "field": "text", + "missing": "Empty" + }, + }, + "my_texts2": { + "terms": { + "field": "text", + "missing": 1337 + }, + }, + "my_ids": { + "terms": { + "field": "id", + "missing": 1337 + }, + } + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + // text field + assert_eq!(res["my_texts"]["buckets"][0]["key"], "Hello Hello"); + assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 5); + assert_eq!(res["my_texts"]["buckets"][1]["key"], "Empty"); + assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 2); + assert_eq!( + res["my_texts"]["buckets"][2]["key"], + serde_json::Value::Null + ); + // text field with numner as missing fallback + assert_eq!(res["my_texts2"]["buckets"][0]["key"], "Hello Hello"); + assert_eq!(res["my_texts2"]["buckets"][0]["doc_count"], 5); + assert_eq!(res["my_texts2"]["buckets"][1]["key"], 1337.0); + assert_eq!(res["my_texts2"]["buckets"][1]["doc_count"], 2); + assert_eq!( + res["my_texts2"]["buckets"][2]["key"], + serde_json::Value::Null + ); + assert_eq!(res["my_texts"]["sum_other_doc_count"], 0); + assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0); + + // id field + assert_eq!(res["my_ids"]["buckets"][0]["key"], 1337.0); + assert_eq!(res["my_ids"]["buckets"][0]["doc_count"], 4); + assert_eq!(res["my_ids"]["buckets"][1]["key"], 1.0); + assert_eq!(res["my_ids"]["buckets"][1]["doc_count"], 3); + assert_eq!(res["my_ids"]["buckets"][2]["key"], serde_json::Value::Null); + + Ok(()) + } + #[test] + fn terms_aggregation_missing_simple_id() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let id_field = schema_builder.add_u64_field("id", FAST); + let index = Index::create_in_ram(schema_builder.build()); + { + let mut index_writer = index.writer_with_num_threads(1, 20_000_000)?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + index_writer.add_document(doc!( + id_field => 1u64, + ))?; + // Missing + index_writer.add_document(doc!())?; + index_writer.add_document(doc!())?; + index_writer.commit()?; + } + + let agg_req: Aggregations = serde_json::from_value(json!({ + "my_ids": { + "terms": { + "field": "id", + "missing": 1337 + }, + } + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + // id field + assert_eq!(res["my_ids"]["buckets"][0]["key"], 1337.0); + assert_eq!(res["my_ids"]["buckets"][0]["doc_count"], 2); + assert_eq!(res["my_ids"]["buckets"][1]["key"], 1.0); + assert_eq!(res["my_ids"]["buckets"][1]["doc_count"], 1); + assert_eq!(res["my_ids"]["buckets"][2]["key"], serde_json::Value::Null); + + Ok(()) + } + + #[test] + fn terms_aggregation_missing1() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", FAST); + let id_field = schema_builder.add_u64_field("id", FAST); + let index = Index::create_in_ram(schema_builder.build()); + { + let mut index_writer = index.writer_with_num_threads(1, 20_000_000)?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + index_writer.add_document(doc!( + text_field => "Hello Hello", + id_field => 1u64, + ))?; + // Missing + index_writer.add_document(doc!())?; + index_writer.add_document(doc!( + text_field => "Hello Hello", + ))?; + index_writer.add_document(doc!( + text_field => "Hello Hello", + ))?; + index_writer.commit()?; + // Empty segment special case + index_writer.add_document(doc!())?; + index_writer.commit()?; + // Full segment special case + index_writer.add_document(doc!( + text_field => "Hello Hello", + id_field => 1u64, + ))?; + index_writer.commit()?; + } + + let agg_req: Aggregations = serde_json::from_value(json!({ + "my_texts": { + "terms": { + "field": "text", + "missing": "Empty" + }, + }, + "my_texts2": { + "terms": { + "field": "text", + "missing": 1337 + }, + }, + "my_ids": { + "terms": { + "field": "id", + "missing": 1337 + }, + } + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + // text field + assert_eq!(res["my_texts"]["buckets"][0]["key"], "Hello Hello"); + assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 4); + assert_eq!(res["my_texts"]["buckets"][1]["key"], "Empty"); + assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 2); + assert_eq!( + res["my_texts"]["buckets"][2]["key"], + serde_json::Value::Null + ); + // text field with numner as missing fallback + assert_eq!(res["my_texts2"]["buckets"][0]["key"], "Hello Hello"); + assert_eq!(res["my_texts2"]["buckets"][0]["doc_count"], 4); + assert_eq!(res["my_texts2"]["buckets"][1]["key"], 1337.0); + assert_eq!(res["my_texts2"]["buckets"][1]["doc_count"], 2); + assert_eq!( + res["my_texts2"]["buckets"][2]["key"], + serde_json::Value::Null + ); + assert_eq!(res["my_texts"]["sum_other_doc_count"], 0); + assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0); + + // id field + assert_eq!(res["my_ids"]["buckets"][0]["key"], 1337.0); + assert_eq!(res["my_ids"]["buckets"][0]["doc_count"], 4); + assert_eq!(res["my_ids"]["buckets"][1]["key"], 1.0); + assert_eq!(res["my_ids"]["buckets"][1]["doc_count"], 2); + assert_eq!(res["my_ids"]["buckets"][2]["key"], serde_json::Value::Null); + + Ok(()) + } + #[test] + fn terms_aggregation_missing_empty() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + schema_builder.add_text_field("text", FAST); + schema_builder.add_u64_field("id", FAST); + let index = Index::create_in_ram(schema_builder.build()); + { + let mut index_writer = index.writer_with_num_threads(1, 20_000_000)?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + // Empty segment special case + index_writer.add_document(doc!())?; + index_writer.commit()?; + } + + let agg_req: Aggregations = serde_json::from_value(json!({ + "my_texts": { + "terms": { + "field": "text", + "missing": "Empty" + }, + }, + "my_texts2": { + "terms": { + "field": "text", + "missing": 1337 + }, + }, + "my_ids": { + "terms": { + "field": "id", + "missing": 1337 + }, + } + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + // text field + assert_eq!(res["my_texts"]["buckets"][0]["key"], "Empty"); + assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 1); + assert_eq!( + res["my_texts"]["buckets"][1]["key"], + serde_json::Value::Null + ); + // text field with number as missing fallback + assert_eq!(res["my_texts2"]["buckets"][0]["key"], 1337.0); + assert_eq!(res["my_texts2"]["buckets"][0]["doc_count"], 1); + assert_eq!( + res["my_texts2"]["buckets"][1]["key"], + serde_json::Value::Null + ); + assert_eq!(res["my_texts"]["sum_other_doc_count"], 0); + assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0); + + // id field + assert_eq!(res["my_ids"]["buckets"][0]["key"], 1337.0); + assert_eq!(res["my_ids"]["buckets"][0]["doc_count"], 1); + assert_eq!(res["my_ids"]["buckets"][1]["key"], serde_json::Value::Null); + + Ok(()) + } + + #[test] + #[ignore] + // TODO: This is not yet implemented + fn terms_aggregation_missing_mixed_type() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let json = schema_builder.add_json_field("json", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer = index.writer_for_tests().unwrap(); + // => Segment with all values numeric + index_writer + .add_document(doc!(json => json!({"mixed_type": 10.0}))) + .unwrap(); + index_writer.add_document(doc!())?; + index_writer.commit().unwrap(); + //// => Segment with all values text + index_writer + .add_document(doc!(json => json!({"mixed_type": "blue"}))) + .unwrap(); + index_writer.add_document(doc!())?; + index_writer.commit().unwrap(); + + // => Segment with mixed values + index_writer + .add_document(doc!(json => json!({"mixed_type": "red"}))) + .unwrap(); + index_writer + .add_document(doc!(json => json!({"mixed_type": -20.5}))) + .unwrap(); + index_writer + .add_document(doc!(json => json!({"mixed_type": true}))) + .unwrap(); + index_writer.add_document(doc!())?; + + index_writer.commit().unwrap(); + + let agg_req: Aggregations = serde_json::from_value(json!({ + "replace_null": { + "terms": { + "field": "json.mixed_type", + "missing": "NULL" + }, + }, + "replace_num": { + "terms": { + "field": "json.mixed_type", + "missing": 1337 + }, + }, + })) + .unwrap(); + + let res = exec_request_with_query(agg_req, &index, None)?; + + // text field + assert_eq!(res["replace_null"]["buckets"][0]["key"], "NULL"); + assert_eq!(res["replace_null"]["buckets"][0]["doc_count"], 4); // WRONG should be 3 + assert_eq!(res["replace_num"]["buckets"][0]["key"], 1337.0); + assert_eq!(res["replace_num"]["buckets"][0]["doc_count"], 5); // WRONG should be 3 + assert_eq!(res["replace_null"]["sum_other_doc_count"], 0); + assert_eq!(res["replace_null"]["doc_count_error_upper_bound"], 0); + Ok(()) } }