diff --git a/Cargo.lock b/Cargo.lock index 81df0e0cbfb4..f8cdc9ca06e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5396,7 +5396,6 @@ version = "0.1.0" dependencies = [ "anyerror", "cbordata", - "crc32fast", "criterion", "databend-common-arrow", "databend-common-ast", @@ -5411,8 +5410,8 @@ dependencies = [ "match-template", "parquet", "rand 0.8.5", + "roaring", "serde", - "serde_json", "tantivy", "tantivy-common", "tantivy-fst", @@ -11086,7 +11085,7 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "ownedbytes" version = "0.7.0" -source = "git+https://github.com/datafuse-extras/tantivy?rev=37aeac0#37aeac01096a7e480118dbc91e48c8f54d3fea4c" +source = "git+https://github.com/datafuse-extras/tantivy?rev=7502370#7502370b68e6822a687ee071660e350b67808533" dependencies = [ "stable_deref_trait", ] @@ -14770,7 +14769,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] name = "tantivy" version = "0.22.0" -source = "git+https://github.com/datafuse-extras/tantivy?rev=37aeac0#37aeac01096a7e480118dbc91e48c8f54d3fea4c" +source = "git+https://github.com/datafuse-extras/tantivy?rev=7502370#7502370b68e6822a687ee071660e350b67808533" dependencies = [ "aho-corasick", "arc-swap", @@ -14820,7 +14819,7 @@ dependencies = [ [[package]] name = "tantivy-bitpacker" version = "0.6.0" -source = "git+https://github.com/datafuse-extras/tantivy?rev=37aeac0#37aeac01096a7e480118dbc91e48c8f54d3fea4c" +source = "git+https://github.com/datafuse-extras/tantivy?rev=7502370#7502370b68e6822a687ee071660e350b67808533" dependencies = [ "bitpacking 0.9.2", ] @@ -14828,7 +14827,7 @@ dependencies = [ [[package]] name = "tantivy-columnar" version = "0.3.0" -source = "git+https://github.com/datafuse-extras/tantivy?rev=37aeac0#37aeac01096a7e480118dbc91e48c8f54d3fea4c" +source = "git+https://github.com/datafuse-extras/tantivy?rev=7502370#7502370b68e6822a687ee071660e350b67808533" dependencies = [ "downcast-rs", "fastdivide", @@ -14843,7 +14842,7 @@ dependencies = [ [[package]] name = "tantivy-common" version = "0.7.0" -source = "git+https://github.com/datafuse-extras/tantivy?rev=37aeac0#37aeac01096a7e480118dbc91e48c8f54d3fea4c" +source = "git+https://github.com/datafuse-extras/tantivy?rev=7502370#7502370b68e6822a687ee071660e350b67808533" dependencies = [ "async-trait", "byteorder", @@ -14866,7 +14865,7 @@ dependencies = [ [[package]] name = "tantivy-jieba" version = "0.11.0" -source = "git+https://github.com/datafuse-extras/tantivy-jieba?rev=124a8fc#124a8fc8c8a9f1389af5a9bfa497fb358ecc556e" +source = "git+https://github.com/datafuse-extras/tantivy-jieba?rev=0e300e9#0e300e9085651b7e6659dfcc7b0ea0fa9cab09c2" dependencies = [ "jieba-rs", "lazy_static", @@ -14876,7 +14875,7 @@ dependencies = [ [[package]] name = "tantivy-query-grammar" version = "0.22.0" -source = "git+https://github.com/datafuse-extras/tantivy?rev=37aeac0#37aeac01096a7e480118dbc91e48c8f54d3fea4c" +source = "git+https://github.com/datafuse-extras/tantivy?rev=7502370#7502370b68e6822a687ee071660e350b67808533" dependencies = [ "nom", ] @@ -14884,7 +14883,7 @@ dependencies = [ [[package]] name = "tantivy-sstable" version = "0.3.0" -source = "git+https://github.com/datafuse-extras/tantivy?rev=37aeac0#37aeac01096a7e480118dbc91e48c8f54d3fea4c" +source = "git+https://github.com/datafuse-extras/tantivy?rev=7502370#7502370b68e6822a687ee071660e350b67808533" dependencies = [ "tantivy-bitpacker", "tantivy-common", @@ -14895,7 +14894,7 @@ dependencies = [ [[package]] name = "tantivy-stacker" version = "0.3.0" -source = "git+https://github.com/datafuse-extras/tantivy?rev=37aeac0#37aeac01096a7e480118dbc91e48c8f54d3fea4c" +source = "git+https://github.com/datafuse-extras/tantivy?rev=7502370#7502370b68e6822a687ee071660e350b67808533" dependencies = [ "murmurhash32", "rand_distr", @@ -14905,7 +14904,7 @@ dependencies = [ [[package]] name = "tantivy-tokenizer-api" version = "0.3.0" -source = "git+https://github.com/datafuse-extras/tantivy?rev=37aeac0#37aeac01096a7e480118dbc91e48c8f54d3fea4c" +source = "git+https://github.com/datafuse-extras/tantivy?rev=7502370#7502370b68e6822a687ee071660e350b67808533" dependencies = [ "serde", ] diff --git a/Cargo.toml b/Cargo.toml index d723c441a9d6..2e207de5095e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -408,7 +408,7 @@ openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = orc-rust = { git = "https://github.com/datafuse-extras/datafusion-orc", rev = "03372b97" } recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" } sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" } -tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "37aeac0" } -tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "37aeac0", package = "tantivy-common" } -tantivy-jieba = { git = "https://github.com/datafuse-extras/tantivy-jieba", rev = "124a8fc" } +tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" } +tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370", package = "tantivy-common" } +tantivy-jieba = { git = "https://github.com/datafuse-extras/tantivy-jieba", rev = "0e300e9" } xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", tag = "databend-alpha.4" } diff --git a/src/common/arrow/src/arrow/array/list/mutable.rs b/src/common/arrow/src/arrow/array/list/mutable.rs index 1a63727a45ad..4834b84c4bff 100644 --- a/src/common/arrow/src/arrow/array/list/mutable.rs +++ b/src/common/arrow/src/arrow/array/list/mutable.rs @@ -182,7 +182,9 @@ impl MutableListArray { pub fn try_push_valid(&mut self) -> Result<()> { let total_length = self.values.len(); let offset = self.offsets.last().to_usize(); - let length = total_length.checked_sub(offset).ok_or(Error::Overflow)?; + let length = total_length + .checked_sub(offset) + .ok_or_else(|| Error::Overflow)?; self.offsets.try_push_usize(length)?; if let Some(validity) = &mut self.validity { diff --git a/src/common/arrow/src/arrow/offset.rs b/src/common/arrow/src/arrow/offset.rs index 7d56d883ed8c..dd24cd0bce21 100644 --- a/src/common/arrow/src/arrow/offset.rs +++ b/src/common/arrow/src/arrow/offset.rs @@ -127,7 +127,9 @@ impl Offsets { pub fn try_push(&mut self, length: O) -> Result<(), Error> { let old_length = self.last(); assert!(length >= O::zero()); - let new_length = old_length.checked_add(&length).ok_or(Error::Overflow)?; + let new_length = old_length + .checked_add(&length) + .ok_or_else(|| Error::Overflow)?; self.0.push(new_length); Ok(()) } @@ -140,10 +142,12 @@ impl Offsets { /// * checks that this length does not overflow #[inline] pub fn try_push_usize(&mut self, length: usize) -> Result<(), Error> { - let length = O::from_usize(length).ok_or(Error::Overflow)?; + let length = O::from_usize(length).ok_or_else(|| Error::Overflow)?; let old_length = self.last(); - let new_length = old_length.checked_add(&length).ok_or(Error::Overflow)?; + let new_length = old_length + .checked_add(&length) + .ok_or_else(|| Error::Overflow)?; self.0.push(new_length); Ok(()) } @@ -267,8 +271,8 @@ impl Offsets { let last_offset = original_offset .checked_add(total_length) - .ok_or(Error::Overflow)?; - O::from_usize(last_offset).ok_or(Error::Overflow)?; + .ok_or_else(|| Error::Overflow)?; + O::from_usize(last_offset).ok_or_else(|| Error::Overflow)?; Ok(()) } @@ -279,7 +283,9 @@ impl Offsets { let mut length = *self.last(); let other_length = *other.last(); // check if the operation would overflow - length.checked_add(&other_length).ok_or(Error::Overflow)?; + length + .checked_add(&other_length) + .ok_or_else(|| Error::Overflow)?; let lengths = other.as_slice().windows(2).map(|w| w[1] - w[0]); let offsets = lengths.map(|new_length| { @@ -306,7 +312,9 @@ impl Offsets { let other_length = other.last().expect("Length to be non-zero"); let mut length = *self.last(); // check if the operation would overflow - length.checked_add(other_length).ok_or(Error::Overflow)?; + length + .checked_add(other_length) + .ok_or_else(|| Error::Overflow)?; let lengths = other.windows(2).map(|w| w[1] - w[0]); let offsets = lengths.map(|new_length| { diff --git a/src/query/ee/tests/it/inverted_index/index_refresh.rs b/src/query/ee/tests/it/inverted_index/index_refresh.rs index 611e1ff11181..9d0d45879293 100644 --- a/src/query/ee/tests/it/inverted_index/index_refresh.rs +++ b/src/query/ee/tests/it/inverted_index/index_refresh.rs @@ -143,7 +143,6 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> { &index_version, ); - let field_nums = query_fields.len(); let has_score = true; let need_position = false; let mut field_ids = HashSet::new(); @@ -177,7 +176,6 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> { let matched_rows = index_reader .clone() .do_filter( - field_nums, need_position, has_score, query.box_clone(), @@ -185,7 +183,7 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> { &index_record, &fuzziness, tokenizer_manager, - block_meta.row_count as u32, + block_meta.row_count, &index_loc, ) .await?; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_add_internal_columns.rs b/src/query/service/src/pipelines/processors/transforms/transform_add_internal_columns.rs index f62db09eee1e..7b35277c25e2 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_add_internal_columns.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_add_internal_columns.rs @@ -40,8 +40,8 @@ impl Transform for TransformAddInternalColumns { fn transform(&mut self, mut block: DataBlock) -> Result { if let Some(meta) = block.take_meta() { - let internal_column_meta = - InternalColumnMeta::downcast_from(meta).ok_or(ErrorCode::Internal("It's a bug"))?; + let internal_column_meta = InternalColumnMeta::downcast_from(meta) + .ok_or_else(|| ErrorCode::Internal("It's a bug"))?; let num_rows = block.num_rows(); for internal_column in self.internal_columns.values() { let column = diff --git a/src/query/service/src/servers/flight_sql/flight_sql_service/session.rs b/src/query/service/src/servers/flight_sql/flight_sql_service/session.rs index 82214bc67e4f..1cc7a6de4731 100644 --- a/src/query/service/src/servers/flight_sql/flight_sql_service/session.rs +++ b/src/query/service/src/servers/flight_sql/flight_sql_service/session.rs @@ -64,7 +64,7 @@ impl FlightSqlServiceImpl { pub(super) fn get_user_password(metadata: &MetadataMap) -> Result<(String, String), String> { let basic = "Basic "; let authorization = Self::get_header_value(metadata, "authorization") - .ok_or("authorization not parsable".to_string())?; + .ok_or_else(|| "authorization not parsable".to_string())?; if !authorization.starts_with(basic) { return Err(format!("Auth type not implemented: {authorization}")); diff --git a/src/query/service/src/servers/http/v1/session/refresh_handler.rs b/src/query/service/src/servers/http/v1/session/refresh_handler.rs index 84b0bac34462..830330b7c140 100644 --- a/src/query/service/src/servers/http/v1/session/refresh_handler.rs +++ b/src/query/service/src/servers/http/v1/session/refresh_handler.rs @@ -48,11 +48,11 @@ pub async fn refresh_handler( let mgr = ClientSessionManager::instance(); match &ctx.credential { Credential::Jwt { .. } => { - let session_id = - req.session_id - .ok_or(HttpErrorCode::bad_request(ErrorCode::BadArguments( - "JWT session should provide session_id when refresh session", - )))?; + let session_id = req.session_id.ok_or_else(|| { + HttpErrorCode::bad_request(ErrorCode::BadArguments( + "JWT session should provide session_id when refresh session", + )) + })?; mgr.refresh_in_memory_states(&session_id); let tenant = ctx.session.get_current_tenant(); diff --git a/src/query/sql/src/planner/planner_cache.rs b/src/query/sql/src/planner/planner_cache.rs index 8da310f3bb55..2a8f0e51b8aa 100644 --- a/src/query/sql/src/planner/planner_cache.rs +++ b/src/query/sql/src/planner/planner_cache.rs @@ -165,7 +165,7 @@ impl TableRefVisitor { let func_name = func.name.name.to_lowercase(); // If the function is not suitable for caching, we should not cache the plan - if !is_cacheable_function(&func_name) || func_name == "score" { + if !is_cacheable_function(&func_name) { self.cache_miss = true; } } diff --git a/src/query/storages/common/index/Cargo.toml b/src/query/storages/common/index/Cargo.toml index 1d1372be7302..2ee4d664f7df 100644 --- a/src/query/storages/common/index/Cargo.toml +++ b/src/query/storages/common/index/Cargo.toml @@ -16,7 +16,6 @@ ignored = ["xorfilter-rs", "match-template"] [dependencies] anyerror = { workspace = true } cbordata = { version = "0.6.0" } -crc32fast = "1.3.2" databend-common-arrow = { workspace = true } databend-common-ast = { workspace = true } databend-common-exception = { workspace = true } @@ -29,8 +28,8 @@ levenshtein_automata = "0.2.1" log = { workspace = true } match-template = { workspace = true } parquet = { workspace = true } +roaring = "0.10.1" serde = { workspace = true } -serde_json = { workspace = true } tantivy = { workspace = true } tantivy-common = { workspace = true } tantivy-fst = "0.5" diff --git a/src/query/storages/common/index/src/inverted_index.rs b/src/query/storages/common/index/src/inverted_index.rs index 6d8db4c5b424..09580866f2de 100644 --- a/src/query/storages/common/index/src/inverted_index.rs +++ b/src/query/storages/common/index/src/inverted_index.rs @@ -36,19 +36,20 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::HashSet; -use std::collections::VecDeque; use std::io; use std::io::BufWriter; use std::io::Cursor; use std::io::Read; use std::io::Write; use std::marker::PhantomData; +use std::ops::BitAndAssign; +use std::ops::BitOrAssign; +use std::ops::SubAssign; use std::path::Path; use std::path::PathBuf; use std::result; use std::sync::Arc; -use crc32fast::Hasher; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::Scalar; @@ -61,6 +62,7 @@ use levenshtein_automata::Distance; use levenshtein_automata::LevenshteinAutomatonBuilder; use levenshtein_automata::DFA; use log::warn; +use roaring::RoaringTreemap; use tantivy::directory::error::DeleteError; use tantivy::directory::error::OpenReadError; use tantivy::directory::error::OpenWriteError; @@ -73,14 +75,21 @@ use tantivy::directory::WatchCallback; use tantivy::directory::WatchHandle; use tantivy::directory::WritePtr; use tantivy::positions::PositionReader; +use tantivy::postings::BlockSegmentPostings; use tantivy::postings::TermInfo; +use tantivy::query::AllQuery; use tantivy::query::BooleanQuery; +use tantivy::query::BoostQuery; +use tantivy::query::ConstScoreQuery; +use tantivy::query::EmptyQuery; use tantivy::query::FuzzyTermQuery; use tantivy::query::Occur; +use tantivy::query::PhrasePrefixQuery; use tantivy::query::PhraseQuery; use tantivy::query::Query; use tantivy::query::QueryClone; use tantivy::query::TermQuery; +use tantivy::schema::Field; use tantivy::Directory; use tantivy::Term; use tantivy_common::BinarySerializable; @@ -88,60 +97,9 @@ use tantivy_common::HasLen; use tantivy_common::VInt; use tantivy_fst::Automaton; use tantivy_fst::IntoStreamer; +use tantivy_fst::Regex; use tantivy_fst::Streamer; -// tantivy version is used to generate the footer data - -// Index major version. -const INDEX_MAJOR_VERSION: u32 = 0; -// Index minor version. -const INDEX_MINOR_VERSION: u32 = 22; -// Index patch version. -const INDEX_PATCH_VERSION: u32 = 0; -// Index format version. -const INDEX_FORMAT_VERSION: u32 = 6; - -// The magic byte of the footer to identify corruption -// or an old version of the footer. -const FOOTER_MAGIC_NUMBER: u32 = 1337; - -type CrcHashU32 = u32; - -/// Structure version for the index. -#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct Version { - major: u32, - minor: u32, - patch: u32, - index_format_version: u32, -} - -/// A Footer is appended every part of data, like tantivy file. -#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] -struct Footer { - version: Version, - crc: CrcHashU32, -} - -impl Footer { - fn new(crc: CrcHashU32) -> Self { - let version = Version { - major: INDEX_MAJOR_VERSION, - minor: INDEX_MINOR_VERSION, - patch: INDEX_PATCH_VERSION, - index_format_version: INDEX_FORMAT_VERSION, - }; - Footer { version, crc } - } - - fn append_footer(&self, write: &mut W) -> Result<()> { - let footer_payload_len = write.write(serde_json::to_string(&self)?.as_ref())?; - BinarySerializable::serialize(&(footer_payload_len as u32), write)?; - BinarySerializable::serialize(&FOOTER_MAGIC_NUMBER, write)?; - Ok(()) - } -} - fn extract_footer(data: FileSlice) -> Result<(Vec, Vec)> { // The following code is copied from tantivy `CompositeFile::open` function. // extract field number and offsets of each fields. @@ -240,59 +198,12 @@ pub fn extract_component_fields( Ok(()) } -// Build footer for tantivy files. -// Footer is used to check whether the data is valid when open a file. -pub fn build_tantivy_footer(bytes: &[u8]) -> Result> { - let mut hasher = Hasher::new(); - hasher.update(bytes); - let crc = hasher.finalize(); - - let footer = Footer::new(crc); - let mut buf = Vec::new(); - footer.append_footer(&mut buf)?; - Ok(buf) -} - -#[derive( - Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, serde::Serialize, serde::Deserialize, -)] -struct Field(u32); - -impl Field { - /// Create a new field object for the given FieldId. - const fn from_field_id(field_id: u32) -> Field { - Field(field_id) - } - - /// Returns a u32 identifying uniquely a field within a schema. - #[allow(dead_code)] - const fn field_id(self) -> u32 { - self.0 - } -} - -impl BinarySerializable for Field { - fn serialize(&self, writer: &mut W) -> io::Result<()> { - self.0.serialize(writer) - } - - fn deserialize(reader: &mut R) -> io::Result { - u32::deserialize(reader).map(Field) - } -} - #[derive(Eq, PartialEq, Hash, Copy, Ord, PartialOrd, Clone, Debug)] struct FileAddr { field: Field, idx: usize, } -impl FileAddr { - fn new(field: Field, idx: usize) -> FileAddr { - FileAddr { field, idx } - } -} - impl BinarySerializable for FileAddr { fn serialize(&self, writer: &mut W) -> io::Result<()> { self.field.serialize(writer)?; @@ -307,36 +218,6 @@ impl BinarySerializable for FileAddr { } } -// Build empty position data to be used when there are no phrase terms in the query. -// This can reduce data reading and speed up the query -fn build_empty_position_data(field_nums: usize) -> Result { - let offsets: Vec<_> = (0..field_nums) - .map(|i| { - let field = Field::from_field_id(i as u32); - let file_addr = FileAddr::new(field, 0); - (file_addr, 0) - }) - .collect(); - - let mut buf = Vec::new(); - VInt(offsets.len() as u64).serialize(&mut buf)?; - - let mut prev_offset = 0; - for (file_addr, offset) in offsets { - VInt(offset - prev_offset).serialize(&mut buf)?; - file_addr.serialize(&mut buf)?; - prev_offset = offset; - } - - let footer_len = buf.len() as u32; - footer_len.serialize(&mut buf)?; - - let mut footer = build_tantivy_footer(&buf)?; - buf.append(&mut footer); - - Ok(OwnedBytes::new(buf)) -} - struct DfaWrapper(pub DFA); impl Automaton for DfaWrapper { @@ -362,354 +243,704 @@ impl Automaton for DfaWrapper { } } -// Term value contains values associated with a Term -// used to match query and collect matched doc ids. +// Collect matched `doc_ids` for a Query. #[derive(Clone)] -pub struct TermValue { - // term info - pub term_info: TermInfo, - // term matched doc ids - pub doc_ids: Vec, - // term frequencies for each doc - pub term_freqs: Vec, - // position reader is used to read positions in doc for phrase query - pub position_reader: Option, +pub struct DocIdsCollector { + row_count: u64, + need_position: bool, + // key is `term`, value is `term_id`, + // These terms are in `fst`, means that those terms exist in the block, + // we need to use related term infos to determine the matched `doc_ids`. + // Use `term_id` as key to get related information and avoid copy `term`. + term_map: HashMap, + // key is `term_id`, value is `field_id` and `term_info`. + term_infos: HashMap, + // key is `term_id`, value is related `BlockSegmentPostings`, + // used to read `doc_ids` and `term_freqs`. + block_postings_map: HashMap, + // key is `term_id`, value is related `PositionReader`, + // used to read `positions` in each docs. + position_reader_map: HashMap, + // key is `term_id`, value is related `doc_ids`. + // `doc_ids` is lazy loaded when used. + doc_ids: HashMap, + // key is `term_id`, value is related `term_freqs`. + // `term_freqs` is lazy loaded when used. + term_freqs: HashMap>, } -// Check if fst contains terms in query. -// If not, we can skip read other parts of inverted index. -pub fn check_term_fsts_match( - query: Box, - fst_maps: &HashMap>, - fuzziness: &Option, - matched_terms: &mut HashMap, - fuzziness_terms: &mut HashMap>, -) -> bool { - if let Some(term_query) = query.downcast_ref::() { - let term = term_query.term(); - let field = term.field(); - let field_id = field.field_id() as usize; - if let Some(fst_map) = fst_maps.get(&field_id) { - if let Some(idx) = fst_map.get(term.serialized_value_bytes()) { - matched_terms.insert(term.clone(), idx); - return true; - } +impl DocIdsCollector { + pub fn create( + row_count: u64, + need_position: bool, + terms: HashMap, + term_infos: HashMap, + block_postings_map: HashMap, + position_reader_map: HashMap, + ) -> Self { + let term_len = terms.len(); + let term_map = terms + .into_iter() + .map(|(term, (_, term_id))| (term, term_id)) + .collect::>(); + + Self { + row_count, + need_position, + term_map, + term_infos, + block_postings_map, + position_reader_map, + doc_ids: HashMap::with_capacity(term_len), + term_freqs: HashMap::with_capacity(term_len), } - false - } else if let Some(bool_query) = query.downcast_ref::() { - let mut matched_num = 0; - for (occur, sub_query) in bool_query.clauses() { - let matched = check_term_fsts_match( - sub_query.box_clone(), - fst_maps, - fuzziness, - matched_terms, - fuzziness_terms, - ); - if matched { - matched_num += 1; - } - match occur { - Occur::Should => {} - Occur::Must => { - if !matched { - return false; + } + + fn check_term_match( + fst_map: &tantivy_fst::Map, + field_id: u32, + term: &Term, + matched_terms: &mut HashMap, + ) -> bool { + if let Some(term_id) = fst_map.get(term.serialized_value_bytes()) { + matched_terms.insert(term.clone(), (field_id, term_id)); + true + } else { + false + } + } + + fn get_fst_map( + field_id: u32, + fst_maps: &HashMap>, + ) -> Result<&tantivy_fst::Map> { + let fst_map = fst_maps.get(&field_id).ok_or_else(|| { + ErrorCode::TantivyError(format!( + "inverted index fst field `{}` does not exist", + field_id + )) + })?; + + Ok(fst_map) + } + + // Check if fst contains terms in query. + // If not, we can skip read other parts of inverted index. + pub fn check_term_fsts_match( + query: Box, + fst_maps: &HashMap>, + fuzziness: &Option, + matched_terms: &mut HashMap, + prefix_terms: &mut HashMap>, + fuzziness_terms: &mut HashMap>, + ) -> Result { + if let Some(term_query) = query.downcast_ref::() { + let term = term_query.term(); + let field = term.field(); + let field_id = field.field_id(); + let fst_map = Self::get_fst_map(field_id, fst_maps)?; + let matched = Self::check_term_match(fst_map, field_id, term, matched_terms); + Ok(matched) + } else if let Some(bool_query) = query.downcast_ref::() { + let mut matched_any = false; + for (occur, sub_query) in bool_query.clauses() { + let matched = Self::check_term_fsts_match( + sub_query.box_clone(), + fst_maps, + fuzziness, + matched_terms, + prefix_terms, + fuzziness_terms, + )?; + match occur { + Occur::Should => { + if matched { + matched_any = true; + } + } + Occur::Must => { + if matched { + matched_any = true; + } else { + return Ok(false); + } + } + Occur::MustNot => { + // Matched means that the block contains the term, + // but we still need to filter out the `doc_ids`. + matched_any = true; } } - Occur::MustNot => {} } - } - matched_num > 0 - } else if let Some(phrase_query) = query.downcast_ref::() { - // PhraseQuery must match all terms. - let field = phrase_query.field(); - let field_id = field.field_id() as usize; - if let Some(fst_map) = fst_maps.get(&field_id) { + Ok(matched_any) + } else if let Some(phrase_query) = query.downcast_ref::() { + // PhraseQuery must match all terms. + let field = phrase_query.field(); + let field_id = field.field_id(); + let fst_map = Self::get_fst_map(field_id, fst_maps)?; + let mut matched_all = true; for term in phrase_query.phrase_terms() { - let matched = if let Some(idx) = fst_map.get(term.serialized_value_bytes()) { - matched_terms.insert(term.clone(), idx); - true - } else { - false - }; + let matched = Self::check_term_match(fst_map, field_id, &term, matched_terms); if !matched { matched_all = false; break; } } - matched_all - } else { - false - } - } else if let Some(fuzzy_term_query) = query.downcast_ref::() { - // FuzzyTermQuery match terms by levenshtein distance. - let fuzziness = fuzziness.unwrap(); - - let term = fuzzy_term_query.term(); - let field = term.field(); - let field_id = field.field_id() as usize; - if let Some(fst_map) = fst_maps.get(&field_id) { - // build levenshtein automaton + Ok(matched_all) + } else if let Some(phrase_prefix_query) = query.downcast_ref::() { + // PhrasePrefixQuery must match all terms. + let field = phrase_prefix_query.field(); + let field_id = field.field_id(); + let fst_map = Self::get_fst_map(field_id, fst_maps)?; + + let mut matched_all = true; + for term in phrase_prefix_query.phrase_terms() { + let matched = Self::check_term_match(fst_map, field_id, &term, matched_terms); + if !matched { + matched_all = false; + break; + } + } + if !matched_all { + return Ok(false); + } + + // using regex to check prefix term, get related term ids. + let (_, prefix_term) = phrase_prefix_query.prefix_term_with_offset(); + let term_str = String::from_utf8_lossy(prefix_term.serialized_value_bytes()); + let key = format!("{}.*", term_str); + let re = Regex::new(&key).map_err(|_| { + ErrorCode::TantivyError(format!("inverted index create regex `{}` failed", key)) + })?; + + let mut prefix_term_ids = vec![]; + let mut stream = fst_map.search(&re).into_stream(); + while let Some((key, term_id)) = stream.next() { + let key_str = unsafe { std::str::from_utf8_unchecked(key) }; + let prefix_term = Term::from_field_text(field, key_str); + matched_terms.insert(prefix_term.clone(), (field_id, term_id)); + prefix_term_ids.push(term_id); + } + let matched = !prefix_term_ids.is_empty(); + if matched { + prefix_terms.insert(prefix_term.clone(), prefix_term_ids); + } + Ok(matched) + } else if let Some(fuzzy_term_query) = query.downcast_ref::() { + // FuzzyTermQuery match terms by levenshtein distance. + let fuzziness = fuzziness.unwrap(); + let term = fuzzy_term_query.term(); + let field = term.field(); + let field_id = field.field_id(); + let fst_map = Self::get_fst_map(field_id, fst_maps)?; + + // build levenshtein automaton to check fuzziness term, get related term ids. let lev_automaton_builder = LevenshteinAutomatonBuilder::new(fuzziness, true); let term_str = String::from_utf8_lossy(term.serialized_value_bytes()); let automaton = DfaWrapper(lev_automaton_builder.build_dfa(&term_str)); - let mut fuzz_term_values = vec![]; + let mut fuzz_term_ids = vec![]; let mut stream = fst_map.search(automaton).into_stream(); - while let Some((key, idx)) = stream.next() { + while let Some((key, term_id)) = stream.next() { let key_str = unsafe { std::str::from_utf8_unchecked(key) }; let fuzz_term = Term::from_field_text(field, key_str); - matched_terms.insert(fuzz_term.clone(), idx); - fuzz_term_values.push(fuzz_term); + matched_terms.insert(fuzz_term.clone(), (field_id, term_id)); + fuzz_term_ids.push(term_id); + } + let matched = !fuzz_term_ids.is_empty(); + if matched { + fuzziness_terms.insert(term.clone(), fuzz_term_ids); } - let matched = !fuzz_term_values.is_empty(); - fuzziness_terms.insert(term.clone(), fuzz_term_values); - matched + Ok(matched) + } else if let Some(boost_query) = query.downcast_ref::() { + Self::check_term_fsts_match( + boost_query.query(), + fst_maps, + fuzziness, + matched_terms, + prefix_terms, + fuzziness_terms, + ) + } else if let Some(const_query) = query.downcast_ref::() { + Self::check_term_fsts_match( + const_query.query(), + fst_maps, + fuzziness, + matched_terms, + prefix_terms, + fuzziness_terms, + ) + } else if let Some(_empty_query) = query.downcast_ref::() { + Ok(false) + } else if let Some(_all_query) = query.downcast_ref::() { + Ok(true) } else { - false + Err(ErrorCode::TantivyError(format!( + "inverted index unsupported query `{:?}`", + query + ))) } - } else { - // TODO: handle other Query types - let mut matched = false; - query.query_terms(&mut |term, _| { - let field = term.field(); - let field_id = field.field_id() as usize; - if let Some(fst_map) = fst_maps.get(&field_id) { - if let Some(idx) = fst_map.get(term.serialized_value_bytes()) { - matched_terms.insert(term.clone(), idx); - matched = true; - } - } - }); - - matched } -} -// collect matched rows by term value -pub fn collect_matched_rows( - query: Box, - row_count: u32, - fuzziness_terms: &HashMap>, - term_values: &mut HashMap, -) -> Vec { - if let Some(term_query) = query.downcast_ref::() { - let term = term_query.term(); - if let Some(term_value) = term_values.get(term) { - term_value.doc_ids.clone() - } else { - vec![] - } - } else if let Some(bool_query) = query.downcast_ref::() { - let mut should_doc_ids_opt = None; - let mut must_doc_ids_opt = None; - let mut must_not_doc_ids_opt = None; - for (occur, sub_query) in bool_query.clauses() { - let doc_ids = collect_matched_rows( - sub_query.box_clone(), - row_count, - fuzziness_terms, - term_values, - ); - let doc_id_set = HashSet::from_iter(doc_ids.into_iter()); - match occur { - Occur::Should => { - if should_doc_ids_opt.is_none() { - should_doc_ids_opt = Some(doc_id_set); - } else { - let should_doc_ids = should_doc_ids_opt.unwrap(); - should_doc_ids_opt = - Some(should_doc_ids.union(&doc_id_set).copied().collect()) - } - } - Occur::Must => { - if must_doc_ids_opt.is_none() { - must_doc_ids_opt = Some(doc_id_set); - } else { - let must_doc_ids = must_doc_ids_opt.unwrap(); - must_doc_ids_opt = - Some(must_doc_ids.intersection(&doc_id_set).copied().collect()) - } + // get `doc_ids` of a `term_id`, + fn get_doc_ids(&mut self, term_id: u64) -> Result<&RoaringTreemap> { + if let std::collections::hash_map::Entry::Vacant(doc_ids_entry) = + self.doc_ids.entry(term_id) + { + // `doc_ids` are lazy loaded when used. + let block_postings = self.block_postings_map.get_mut(&term_id).ok_or_else(|| { + ErrorCode::TantivyError(format!( + "inverted index block postings `{}` does not exist", + term_id + )) + })?; + + let term_freqs_len = if self.need_position { + let (_, term_info) = self.term_infos.get(&term_id).ok_or_else(|| { + ErrorCode::TantivyError(format!( + "inverted index term info `{}` does not exist", + term_id + )) + })?; + term_info.doc_freq as usize + } else { + 0 + }; + let mut doc_ids = RoaringTreemap::new(); + let mut term_freqs = Vec::with_capacity(term_freqs_len); + // `doc_ids` are stored in multiple blocks and need to be decode sequentially. + // TODO: We can skip some blocks by checking related `doc_ids`. + loop { + let block_doc_ids = block_postings.docs(); + if block_doc_ids.is_empty() { + break; } - Occur::MustNot => { - if must_not_doc_ids_opt.is_none() { - must_not_doc_ids_opt = Some(doc_id_set); - } else { - let must_not_doc_ids = must_not_doc_ids_opt.unwrap(); - must_not_doc_ids_opt = - Some(must_not_doc_ids.union(&doc_id_set).copied().collect()) - } + doc_ids + .append(block_doc_ids.iter().map(|id| *id as u64)) + .unwrap(); + + // `term_freqs` is only used if the query need position. + if self.need_position { + let block_term_freqs = block_postings.freqs(); + term_freqs.extend_from_slice(block_term_freqs); } + block_postings.advance(); } + doc_ids_entry.insert(doc_ids); + self.term_freqs.insert(term_id, term_freqs); } - let doc_ids = if let Some(mut should_doc_ids) = should_doc_ids_opt { - if let Some(must_doc_ids) = must_doc_ids_opt { - should_doc_ids = should_doc_ids - .intersection(&must_doc_ids) - .copied() - .collect() + let doc_ids = self.doc_ids.get(&term_id).unwrap(); + Ok(doc_ids) + } + + // get the position `offsets` and `term_freqs` of a `term_id` in each `docs`, + // which is used to read `positions`. + fn get_position_offsets( + &mut self, + term_id: u64, + all_doc_ids: &RoaringTreemap, + ) -> Result> { + let doc_ids = self.doc_ids.get(&term_id).unwrap(); + let term_freqs = self.term_freqs.get(&term_id).unwrap(); + + let mut doc_offset = 0; + let mut offset_and_term_freqs = HashMap::with_capacity(all_doc_ids.len() as usize); + for (doc_id, term_freq) in doc_ids.iter().zip(term_freqs.iter()) { + if all_doc_ids.len() as usize == offset_and_term_freqs.len() { + break; } - if let Some(must_not_doc_ids) = must_not_doc_ids_opt { - should_doc_ids = should_doc_ids - .difference(&must_not_doc_ids) - .copied() - .collect() + if !all_doc_ids.contains(doc_id) { + doc_offset += *term_freq as u64; + continue; } - should_doc_ids - } else if let Some(mut must_doc_ids) = must_doc_ids_opt { - if let Some(must_not_doc_ids) = must_not_doc_ids_opt { - must_doc_ids = must_doc_ids - .difference(&must_not_doc_ids) - .copied() - .collect() + + offset_and_term_freqs.insert(doc_id, (doc_offset, *term_freq)); + doc_offset += *term_freq as u64; + } + + Ok(offset_and_term_freqs) + } + + // get `positions` of a `term_id` in a `doc`. + fn get_positions( + &mut self, + term_id: u64, + doc_offset: u64, + term_freq: u32, + ) -> Result { + let position_reader = self.position_reader_map.get_mut(&term_id).ok_or_else(|| { + ErrorCode::TantivyError(format!( + "inverted index position reader `{}` does not exist", + term_id + )) + })?; + + let mut positions = vec![0; term_freq as usize]; + position_reader.read(doc_offset, &mut positions[..]); + for i in 1..positions.len() { + positions[i] += positions[i - 1]; + } + let term_poses = + RoaringTreemap::from_sorted_iter(positions.into_iter().map(|i| i as u64)).unwrap(); + Ok(term_poses) + } + + // The phrase query matches `doc_ids` as follows: + // + // 1. Collect the position for each term in the query. + // 2. Collect the `doc_ids` of each term and take + // the intersection to get the candidate `doc_ids`. + // 3. Iterate over the candidate `doc_ids` to check whether + // the position of terms matches the position of terms in query. + // 4. Each position in the first term is a possible query phrase beginning. + // Verify that the beginning is valid by checking whether corresponding + // positions in other terms exist. If not, delete the possible position + // in the first term. After traversing all terms, determine if there are + // any positions left in the first term. If there are, then the `doc_id` + // is matched. + // + // If the query is a prefix phrase query, also check if any prefix terms + // match the positions. + pub fn collect_phrase_matched_rows( + &mut self, + phrase_terms: Vec<(usize, Term)>, + prefix_term: Option<(usize, &Vec)>, + ) -> Result> { + let mut query_term_poses = Vec::with_capacity(phrase_terms.len()); + for (term_pos, term) in &phrase_terms { + // term not exist means this phrase in not matched. + let Some(term_id) = self.term_map.get(term) else { + return Ok(None); + }; + query_term_poses.push((*term_pos, *term_id)); + } + if query_term_poses.is_empty() { + return Ok(None); + } + + let first_term_pos = &query_term_poses[0].0; + let first_term_id = &query_term_poses[0].1; + + let mut term_ids = HashSet::with_capacity(phrase_terms.len() + 1); + term_ids.insert(*first_term_id); + + let first_doc_ids = self.get_doc_ids(*first_term_id)?; + let mut candidate_doc_ids = RoaringTreemap::new(); + candidate_doc_ids.bitor_assign(first_doc_ids); + + // Collect the `doc_ids` of other terms in the query, and take the intersection, + // obtains the candidate `doc_ids` containing all terms. + let mut query_term_offsets = Vec::with_capacity(query_term_poses.len() - 1); + for (term_pos, term_id) in query_term_poses.iter().skip(1) { + if !term_ids.contains(term_id) { + let doc_ids = self.get_doc_ids(*term_id)?; + + candidate_doc_ids.bitand_assign(doc_ids); + if candidate_doc_ids.is_empty() { + break; + } + term_ids.insert(*term_id); } - must_doc_ids - } else if let Some(must_not_doc_ids) = must_not_doc_ids_opt { - let all_doc_ids = HashSet::from_iter(0..row_count); - let doc_ids = all_doc_ids.difference(&must_not_doc_ids).copied().collect(); - doc_ids - } else { - HashSet::new() - }; - - let mut doc_ids = Vec::from_iter(doc_ids); - doc_ids.sort(); - doc_ids - } else if let Some(phrase_query) = query.downcast_ref::() { - let mut union_doc_ids = HashSet::new(); - let mut intersection_doc_ids_opt = None; - - for term in phrase_query.phrase_terms() { - if let Some(term_value) = term_values.get(&term) { - let doc_id_set = HashSet::from_iter(term_value.doc_ids.clone()); - union_doc_ids = union_doc_ids.union(&doc_id_set).copied().collect(); - if intersection_doc_ids_opt.is_none() { - intersection_doc_ids_opt = Some(doc_id_set); - } else { - let intersection_doc_ids = intersection_doc_ids_opt.unwrap(); - intersection_doc_ids_opt = Some( - intersection_doc_ids - .intersection(&doc_id_set) - .copied() - .collect(), - ); + let term_pos_offset = (term_pos - first_term_pos) as u64; + query_term_offsets.push((*term_id, term_pos_offset)); + } + // If the candidate `doc_ids` is empty, all docs are not matched. + if candidate_doc_ids.is_empty() { + return Ok(None); + } + + // If the query is a prefix phrase query, + // also need to collect `doc_ids` of prefix terms. + if let Some((_, prefix_term_ids)) = prefix_term { + let mut all_prefix_doc_ids = RoaringTreemap::new(); + for prefix_term_id in prefix_term_ids { + let prefix_doc_ids = self.get_doc_ids(*prefix_term_id)?; + // If the `doc_ids` does not intersect at all, this prefix can be ignored. + if candidate_doc_ids.is_disjoint(prefix_doc_ids) { + continue; } + + all_prefix_doc_ids.bitor_assign(prefix_doc_ids); + term_ids.insert(*prefix_term_id); } + // If there is no matched prefix `doc_ids`, the prefix term does not matched. + if all_prefix_doc_ids.is_empty() { + return Ok(None); + } + + // Get the intersection of phrase `doc_ids` and prefix `doc_ids` + candidate_doc_ids.bitand_assign(all_prefix_doc_ids); } - let intersection_doc_ids = intersection_doc_ids_opt.unwrap_or_default(); - if intersection_doc_ids.is_empty() { - return vec![]; + // Collect the position `offset` and `term_freqs` for each terms, + // which can be used to read positions. + let mut offset_and_term_freqs_map = HashMap::new(); + for term_id in term_ids.into_iter() { + let offset_and_term_freqs = self.get_position_offsets(term_id, &candidate_doc_ids)?; + offset_and_term_freqs_map.insert(term_id, offset_and_term_freqs); } - let mut union_doc_ids = Vec::from_iter(union_doc_ids); - union_doc_ids.sort(); - // check each docs - let mut matched_doc_ids = vec![]; - for doc_id in union_doc_ids { - if !intersection_doc_ids.contains(&doc_id) { - continue; + let first_offset_and_term_freqs = offset_and_term_freqs_map.get(first_term_id).unwrap(); + + // Check every candidate `doc_ids` if the position of each terms match the query. + let mut all_doc_ids = RoaringTreemap::new(); + let mut offset_poses = RoaringTreemap::new(); + let mut term_poses_map = HashMap::new(); + for (doc_id, (first_doc_offset, first_term_freq)) in first_offset_and_term_freqs.iter() { + let mut first_term_poses = + self.get_positions(*first_term_id, *first_doc_offset, *first_term_freq)?; + + term_poses_map.clear(); + term_poses_map.insert(first_term_id, first_term_poses.clone()); + + for (term_id, term_pos_offset) in &query_term_offsets { + if !term_poses_map.contains_key(term_id) { + let offset_and_term_freqs = offset_and_term_freqs_map.get(term_id).unwrap(); + let (doc_offset, term_freq) = offset_and_term_freqs.get(doc_id).unwrap(); + + let term_poses = self.get_positions(*term_id, *doc_offset, *term_freq)?; + term_poses_map.insert(term_id, term_poses); + } + let term_poses = term_poses_map.get(term_id).unwrap(); + + // Using the position of the first term and the offset of this term with the first term, + // calculate all possible positions for this term. + offset_poses.clear(); + offset_poses + .append(first_term_poses.iter().map(|pos| pos + term_pos_offset)) + .unwrap(); + + // Term possible positions subtract term actual positions, + // remaining positions are not matched and need to be removed in the first term. + offset_poses.sub_assign(term_poses); + for offset_pos in &offset_poses { + first_term_poses.remove(offset_pos - term_pos_offset); + } + if first_term_poses.is_empty() { + break; + } } - let mut term_pos_map = HashMap::new(); - for term in phrase_query.phrase_terms() { - let mut offset = 0; - let mut term_freq = 0; - if let Some(term_value) = term_values.get_mut(&term) { - for i in 0..term_value.doc_ids.len() { - if term_value.doc_ids[i] < doc_id { - offset += term_value.term_freqs[i] as u64; - } else { - term_freq = term_value.term_freqs[i] as usize; - break; + // If the query is a prefix phrase query, + // also need to check if any prefix term match. + if let Some((prefix_term_pos, prefix_term_ids)) = prefix_term { + if !first_term_poses.is_empty() { + let mut prefix_matched = false; + let prefix_term_pos_offset = (prefix_term_pos - first_term_pos) as u64; + for prefix_term_id in prefix_term_ids { + if !term_poses_map.contains_key(prefix_term_id) { + if let Some(offset_and_term_freqs) = + offset_and_term_freqs_map.get(prefix_term_id) + { + if let Some((doc_offset, term_freq)) = + offset_and_term_freqs.get(doc_id) + { + let term_poses = self.get_positions( + *prefix_term_id, + *doc_offset, + *term_freq, + )?; + term_poses_map.insert(prefix_term_id, term_poses); + } + } } - } - // collect positions in the docs - if let Some(position_reader) = term_value.position_reader.as_mut() { - let mut pos_output = vec![0; term_freq]; - position_reader.read(offset, &mut pos_output[..]); - for i in 1..pos_output.len() { - pos_output[i] += pos_output[i - 1]; + if let Some(term_poses) = term_poses_map.get(prefix_term_id) { + offset_poses.clear(); + offset_poses + .append( + first_term_poses + .iter() + .map(|pos| pos + prefix_term_pos_offset), + ) + .unwrap(); + + offset_poses.bitand_assign(term_poses); + // If any of the possible prefix term positions exist, + // the prefix phrase query is matched. + if !offset_poses.is_empty() { + prefix_matched = true; + break; + } } - let positions = VecDeque::from_iter(pos_output); - term_pos_map.insert(term.clone(), positions); + } + if prefix_matched { + all_doc_ids.insert(*doc_id); } } + } else { + let matched = !first_term_poses.is_empty(); + if matched { + all_doc_ids.insert(*doc_id); + } } + } + + if !all_doc_ids.is_empty() { + Ok(Some(all_doc_ids)) + } else { + Ok(None) + } + } - let mut is_first = true; - let mut distance = 0; - let mut matched = true; - let mut last_position = 0; - for (query_position, term) in phrase_query.phrase_terms_with_offsets() { - if let Some(positions) = term_pos_map.get_mut(&term) { - let mut find_position = false; - while let Some(doc_position) = positions.pop_front() { - // skip previous positions. - if doc_position < last_position { + // collect matched rows by term value + pub fn collect_matched_rows( + &mut self, + query: Box, + prefix_terms: &HashMap>, + fuzziness_terms: &HashMap>, + ) -> Result> { + if let Some(term_query) = query.downcast_ref::() { + let term = term_query.term(); + if let Some(term_id) = self.term_map.get(term) { + let doc_ids = self.get_doc_ids(*term_id)?; + Ok(Some(doc_ids.clone())) + } else { + Ok(None) + } + } else if let Some(bool_query) = query.downcast_ref::() { + let mut should_doc_ids: Option = None; + let mut must_doc_ids: Option = None; + let mut must_not_doc_ids: Option = None; + for (occur, sub_query) in bool_query.clauses() { + let doc_ids = self.collect_matched_rows( + sub_query.box_clone(), + prefix_terms, + fuzziness_terms, + )?; + if doc_ids.is_none() { + match occur { + Occur::Should => { continue; } - last_position = doc_position; - let doc_distance = doc_position - (query_position as u32); - if is_first { - is_first = false; - distance = doc_distance; - } else { - // distance must same as first term. - if doc_distance != distance { - matched = false; + Occur::Must => { + if must_doc_ids.is_none() { + must_doc_ids = Some(RoaringTreemap::new()); + } + continue; + } + Occur::MustNot => { + if must_not_doc_ids.is_none() { + must_not_doc_ids = Some(RoaringTreemap::new()); } + continue; } - find_position = true; - break; } - if !find_position { - matched = false; + } + let doc_ids = doc_ids.unwrap(); + match occur { + Occur::Should => { + if let Some(ref mut should_doc_ids) = should_doc_ids { + should_doc_ids.bitor_assign(&doc_ids); + } else { + should_doc_ids = Some(doc_ids); + } + } + Occur::Must => { + if let Some(ref mut must_doc_ids) = must_doc_ids { + must_doc_ids.bitand_assign(&doc_ids); + } else { + must_doc_ids = Some(doc_ids); + } + } + Occur::MustNot => { + if let Some(ref mut must_not_doc_ids) = must_not_doc_ids { + must_not_doc_ids.bitor_assign(&doc_ids); + } else { + must_not_doc_ids = Some(doc_ids); + } } - } else { - matched = false; } - if !matched { - break; + } + + let all_doc_ids = match (should_doc_ids, must_doc_ids, must_not_doc_ids) { + // only should + (Some(should_doc_ids), None, None) => should_doc_ids, + // only must + (None, Some(must_doc_ids), None) => must_doc_ids, + // only must not + (None, None, Some(must_not_doc_ids)) => { + let mut all_doc_ids = RoaringTreemap::from_iter(0..self.row_count); + all_doc_ids.sub_assign(must_not_doc_ids); + all_doc_ids + } + // should and must + (Some(mut should_doc_ids), Some(must_doc_ids), None) => { + should_doc_ids.bitor_assign(must_doc_ids); + should_doc_ids + } + // should and must not + (Some(mut should_doc_ids), None, Some(must_not_doc_ids)) => { + should_doc_ids.sub_assign(must_not_doc_ids); + should_doc_ids + } + // must and must not + (None, Some(mut must_doc_ids), Some(must_not_doc_ids)) => { + must_doc_ids.sub_assign(must_not_doc_ids); + must_doc_ids + } + // should, must and must not + (Some(mut should_doc_ids), Some(must_doc_ids), Some(must_not_doc_ids)) => { + should_doc_ids.bitor_assign(must_doc_ids); + should_doc_ids.sub_assign(must_not_doc_ids); + should_doc_ids + } + (None, None, None) => { + return Ok(None); } + }; + + if !all_doc_ids.is_empty() { + Ok(Some(all_doc_ids)) + } else { + Ok(None) } - if matched { - matched_doc_ids.push(doc_id); + } else if let Some(phrase_query) = query.downcast_ref::() { + let phrase_terms = phrase_query.phrase_terms_with_offsets(); + self.collect_phrase_matched_rows(phrase_terms, None) + } else if let Some(phrase_prefix_query) = query.downcast_ref::() { + let phrase_terms = phrase_prefix_query.phrase_terms_with_offsets(); + let (prefix_term_pos, prefix_term) = phrase_prefix_query.prefix_term_with_offset(); + + let Some(prefix_term_ids) = prefix_terms.get(&prefix_term) else { + return Ok(None); + }; + let prefix_term = Some((prefix_term_pos, prefix_term_ids)); + + self.collect_phrase_matched_rows(phrase_terms, prefix_term) + } else if let Some(fuzzy_term_query) = query.downcast_ref::() { + let mut all_doc_ids = RoaringTreemap::new(); + let term = fuzzy_term_query.term(); + + let Some(fuzz_term_ids) = fuzziness_terms.get(term) else { + return Ok(None); + }; + // collect related terms of the original term. + for term_id in fuzz_term_ids { + let doc_ids = self.get_doc_ids(*term_id)?; + all_doc_ids.bitor_assign(doc_ids); } - } - matched_doc_ids - } else if let Some(fuzzy_term_query) = query.downcast_ref::() { - let mut fuzz_doc_ids = HashSet::new(); - let term = fuzzy_term_query.term(); - - // collect related terms of the original term. - if let Some(related_terms) = fuzziness_terms.get(term) { - for term in related_terms { - if let Some(term_value) = term_values.get(term) { - let doc_id_set: HashSet = HashSet::from_iter(term_value.doc_ids.clone()); - fuzz_doc_ids = fuzz_doc_ids.union(&doc_id_set).copied().collect(); - } + if !all_doc_ids.is_empty() { + Ok(Some(all_doc_ids)) + } else { + Ok(None) } - let mut doc_ids = Vec::from_iter(fuzz_doc_ids); - doc_ids.sort(); - doc_ids + } else if let Some(boost_query) = query.downcast_ref::() { + self.collect_matched_rows(boost_query.query(), prefix_terms, fuzziness_terms) + } else if let Some(const_query) = query.downcast_ref::() { + self.collect_matched_rows(const_query.query(), prefix_terms, fuzziness_terms) + } else if let Some(_empty_query) = query.downcast_ref::() { + Ok(None) + } else if let Some(_all_query) = query.downcast_ref::() { + let all_doc_ids = RoaringTreemap::from_iter(0..self.row_count); + Ok(Some(all_doc_ids)) } else { - vec![] + Err(ErrorCode::TantivyError(format!( + "inverted index unsupported query `{:?}`", + query + ))) } - } else { - let mut union_doc_ids = HashSet::new(); - query.query_terms(&mut |term, _| { - if let Some(term_value) = term_values.get(term) { - let doc_id_set: HashSet = HashSet::from_iter(term_value.doc_ids.clone()); - union_doc_ids = union_doc_ids.union(&doc_id_set).copied().collect(); - } - }); - - let mut doc_ids = Vec::from_iter(union_doc_ids); - doc_ids.sort(); - doc_ids } } @@ -798,7 +1029,7 @@ pub struct InvertedIndexDirectory { } impl InvertedIndexDirectory { - pub fn try_create(field_nums: usize, files: Vec>) -> Result { + pub fn try_create(files: Vec>) -> Result { let mut file_map = BTreeMap::::new(); for file in files.into_iter() { @@ -812,12 +1043,7 @@ impl InvertedIndexDirectory { let fast_data = file_map.remove("fast").unwrap(); let store_data = file_map.remove("store").unwrap(); let fieldnorm_data = file_map.remove("fieldnorm").unwrap(); - // If there are no phrase terms in the query, - // we can use empty position data instead. - let pos_data = match file_map.remove("pos") { - Some(pos_data) => pos_data, - None => build_empty_position_data(field_nums)?, - }; + let pos_data = file_map.remove("pos").unwrap(); let idx_data = file_map.remove("idx").unwrap(); let term_data = file_map.remove("term").unwrap(); let meta_data = file_map.remove("meta.json").unwrap(); diff --git a/src/query/storages/common/index/src/lib.rs b/src/query/storages/common/index/src/lib.rs index f60a1b16982e..48a30df64cae 100644 --- a/src/query/storages/common/index/src/lib.rs +++ b/src/query/storages/common/index/src/lib.rs @@ -26,15 +26,12 @@ pub use bloom_index::BloomIndex; pub use bloom_index::BloomIndexMeta; pub use bloom_index::FilterEvalResult; pub use index::Index; -pub use inverted_index::build_tantivy_footer; -pub use inverted_index::check_term_fsts_match; -pub use inverted_index::collect_matched_rows; pub use inverted_index::extract_component_fields; pub use inverted_index::extract_fsts; +pub use inverted_index::DocIdsCollector; pub use inverted_index::InvertedIndexDirectory; pub use inverted_index::InvertedIndexFile; pub use inverted_index::InvertedIndexMeta; -pub use inverted_index::TermValue; pub use page_index::PageIndex; pub use range_index::statistics_to_domain; pub use range_index::RangeIndex; diff --git a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs index 411dc316b98b..658a8bc38c95 100644 --- a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs +++ b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs @@ -131,7 +131,6 @@ pub(crate) async fn load_inverted_index_file<'a>( #[fastrace::trace] pub(crate) async fn load_inverted_index_directory<'a>( dal: Operator, - field_nums: usize, index_path: &'a str, ) -> Result { // load inverted index meta, contains the offsets of each files. @@ -154,7 +153,7 @@ pub(crate) async fn load_inverted_index_directory<'a>( let files: Vec<_> = try_join_all(futs).await?.into_iter().collect(); // use those files to create inverted index directory - let directory = InvertedIndexDirectory::try_create(field_nums, files)?; + let directory = InvertedIndexDirectory::try_create(files)?; Ok(directory) } diff --git a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_reader.rs b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_reader.rs index e3baee6c0c0f..6441a0fccf97 100644 --- a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_reader.rs +++ b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_reader.rs @@ -17,12 +17,10 @@ use std::collections::HashSet; use std::sync::Arc; use std::time::Instant; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::F32; use databend_common_metrics::storage::metrics_inc_block_inverted_index_search_milliseconds; -use databend_storages_common_index::check_term_fsts_match; -use databend_storages_common_index::collect_matched_rows; -use databend_storages_common_index::TermValue; use databend_storages_common_table_meta::meta::SingleColumnMeta; use futures_util::future::try_join_all; use opendal::Operator; @@ -40,6 +38,7 @@ use tantivy::tokenizer::TokenizerManager; use tantivy::Index; use tantivy_fst::raw::Fst; +use crate::index::DocIdsCollector; use crate::io::read::inverted_index::inverted_index_loader::load_inverted_index_directory; use crate::io::read::inverted_index::inverted_index_loader::load_inverted_index_file; use crate::io::read::inverted_index::inverted_index_loader::load_inverted_index_meta; @@ -61,15 +60,14 @@ impl InvertedIndexReader { #[allow(clippy::too_many_arguments)] pub async fn do_filter( self, - field_nums: usize, need_position: bool, has_score: bool, query: Box, - field_ids: &HashSet, + field_ids: &HashSet, index_record: &IndexRecordOption, fuzziness: &Option, tokenizer_manager: TokenizerManager, - row_count: u32, + row_count: u64, index_loc: &str, ) -> Result)>>> { let start = Instant::now(); @@ -79,7 +77,6 @@ impl InvertedIndexReader { index_loc, query, field_ids, - field_nums, need_position, has_score, index_record, @@ -101,15 +98,19 @@ impl InvertedIndexReader { &self, index_path: &'a str, name: &str, - field_ids: &HashSet, + field_ids: &HashSet, inverted_index_meta_map: &HashMap, - ) -> Result> { - let mut col_metas = vec![]; - let mut col_field_map = HashMap::new(); + ) -> Result> { + let mut col_metas = Vec::with_capacity(field_ids.len()); + let mut col_field_map = HashMap::with_capacity(field_ids.len()); for field_id in field_ids { let col_name = format!("{}-{}", name, field_id); - let col_meta = inverted_index_meta_map.get(&col_name).unwrap(); - + let col_meta = inverted_index_meta_map.get(&col_name).ok_or_else(|| { + ErrorCode::TantivyError(format!( + "inverted index column `{}` does not exist", + col_name + )) + })?; col_metas.push((col_name.clone(), col_meta)); col_field_map.insert(col_name, *field_id); } @@ -131,18 +132,16 @@ impl InvertedIndexReader { Ok(col_files) } - // first version search function, using tantivy searcher. - async fn search_v0<'a>( + // legacy query search function, using tantivy searcher. + async fn legacy_search<'a>( &self, index_path: &'a str, query: Box, - field_nums: usize, has_score: bool, tokenizer_manager: TokenizerManager, - row_count: u32, + row_count: u64, ) -> Result)>>> { - let directory = - load_inverted_index_directory(self.dal.clone(), field_nums, index_path).await?; + let directory = load_inverted_index_directory(self.dal.clone(), index_path).await?; let mut index = Index::open(directory)?; index.set_tokenizers(tokenizer_manager); @@ -179,34 +178,48 @@ impl InvertedIndexReader { } // Follow the process below to perform the query search: - // 1. Read the `fst` first, check if the term in the query matches, - // return if it doesn't matched. - // 2. Read the `term dict` to get the `postings_range` in `idx` - // and the `positions_range` in `pos` for each terms. - // 3. Read the `doc_ids` and `term_freqs` in `idx` for each terms - // using `postings_range`. - // 4. If it's a phrase query, read the `position` of each terms in - // `pos` using `positions_range`. - // 5. Collect matched doc ids using term-related information. + // + // 1. Read the `fst` first, check if the term in the query matches. + // If it matches, collect the terms that need to be checked + // for subsequent processing, otherwise, return directly + // and ignore the block. + // 2. Read the `term_info` for each terms from the `term_dict`, + // which contains three parts: + // `doc_freq` is the number of docs containing the term. + // `postings_range` is used to read posting list in the postings (`.idx`) file. + // `positions_range` is used to read positions in the positions (`.pos`) file. + // 3. Read `postings` data from postings (`.idx`) file by `postings_range` + // of each terms, and `positions` data from positions (`.pos`) file + // by `positions_range` of each terms. + // 4. Open `BlockSegmentPostings` using `postings` data for each terms, + // which can be used to read `doc_ids` and `term_freqs`. + // 5. If the query is a phrase query, Open `PositionReader` using + // `positions` data for each terms, which can be use to read + // term `positions` in each docs. + // 6. Collect matched `doc_ids` of the query. + // If the query is a term query, the `doc_ids` can read from `BlockSegmentPostings`. + // If the query is a phrase query, in addition to `doc_ids`, also need to + // use `PositionReader` to read the positions for each terms and check whether + // the position of terms in doc is the same as the position of terms in query. // // If the term does not match, only the `fst` file needs to be read. - // If the term matches, only the `idx` and `pos` data of the related terms - // need to be read instead of all the `idx` and `pos` data. + // If the term matches, the `term_dict` and `postings`, `positions` + // data of the related terms need to be read instead of all + // the `postings` and `positions` data. #[allow(clippy::too_many_arguments)] async fn search<'a>( &self, index_path: &'a str, query: Box, - field_ids: &HashSet, - field_nums: usize, + field_ids: &HashSet, need_position: bool, has_score: bool, index_record: &IndexRecordOption, fuzziness: &Option, tokenizer_manager: TokenizerManager, - row_count: u32, + row_count: u64, ) -> Result)>>> { - // 1. read index meta + // 1. read index meta. let inverted_index_meta = load_inverted_index_meta(self.dal.clone(), index_path).await?; let inverted_index_meta_map = inverted_index_meta @@ -220,18 +233,11 @@ impl InvertedIndexReader { // use compatible search function to read. if inverted_index_meta_map.contains_key("meta.json") { return self - .search_v0( - index_path, - query, - field_nums, - has_score, - tokenizer_manager, - row_count, - ) + .legacy_search(index_path, query, has_score, tokenizer_manager, row_count) .await; } - // 2. read fst files + // 2. read fst files. let fst_files = self .read_column_data(index_path, "fst", field_ids, &inverted_index_meta_map) .await?; @@ -250,15 +256,18 @@ impl InvertedIndexReader { // 3. check whether query is matched in the fsts. let mut matched_terms = HashMap::new(); + let mut prefix_terms = HashMap::new(); let mut fuzziness_terms = HashMap::new(); - let matched = check_term_fsts_match( + let matched = DocIdsCollector::check_term_fsts_match( query.box_clone(), &fst_maps, fuzziness, &mut matched_terms, + &mut prefix_terms, &mut fuzziness_terms, - ); + )?; + // if not matched, return without further check if !matched { return Ok(None); } @@ -268,71 +277,66 @@ impl InvertedIndexReader { .read_column_data(index_path, "term", field_ids, &inverted_index_meta_map) .await?; - let mut term_dict_maps = HashMap::new(); + let mut term_infos = HashMap::with_capacity(matched_terms.len()); for (field_id, term_dict_data) in term_dict_files.into_iter() { let term_dict_file = FileSlice::new(Arc::new(term_dict_data)); let term_info_store = TermInfoStore::open(term_dict_file)?; - term_dict_maps.insert(field_id, term_info_store); - } - let mut term_values = HashMap::new(); - for (term, term_ord) in matched_terms.iter() { - let field = term.field(); - let field_id = field.field_id() as usize; - - let term_dict = term_dict_maps.get(&field_id).unwrap(); - let term_info = term_dict.get(*term_ord); - - let term_value = TermValue { - term_info, - doc_ids: vec![], - term_freqs: vec![], - position_reader: None, - }; - term_values.insert(term.clone(), term_value); + for (_, (term_field_id, term_id)) in matched_terms.iter() { + if field_id == *term_field_id { + let term_info = term_info_store.get(*term_id); + term_infos.insert(*term_id, (field_id, term_info)); + } + } } // 5. read postings and optional positions. - // collect doc ids, term frequencies and optional position readers. - let mut slice_metas = Vec::with_capacity(term_values.len()); - let mut name_map = HashMap::new(); - for (term, term_value) in term_values.iter() { - let field = term.field(); - let field_id = field.field_id() as usize; - + let term_slice_len = if need_position { + term_infos.len() * 2 + } else { + term_infos.len() + }; + let mut slice_metas = Vec::with_capacity(term_slice_len); + let mut slice_name_map = HashMap::with_capacity(term_slice_len); + for (term_id, (field_id, term_info)) in term_infos.iter() { let idx_name = format!("idx-{}", field_id); - let idx_meta = inverted_index_meta_map.get(&idx_name).unwrap(); - + let idx_meta = inverted_index_meta_map.get(&idx_name).ok_or_else(|| { + ErrorCode::TantivyError(format!( + "inverted index column `{}` does not exist", + idx_name + )) + })?; // ignore 8 bytes total_num_tokens_slice - let offset = idx_meta.offset + 8 + (term_value.term_info.postings_range.start as u64); - let len = term_value.term_info.postings_range.len() as u64; + let offset = idx_meta.offset + 8 + (term_info.postings_range.start as u64); + let len = term_info.postings_range.len() as u64; let idx_slice_meta = SingleColumnMeta { offset, len, num_values: 1, }; - let idx_slice_name = - format!("{}-{}", idx_name, term_value.term_info.postings_range.start); + let idx_slice_name = format!("{}-{}", idx_name, term_info.postings_range.start); slice_metas.push((idx_slice_name.clone(), idx_slice_meta)); - name_map.insert(idx_slice_name, term.clone()); + slice_name_map.insert(idx_slice_name, *term_id); if need_position { let pos_name = format!("pos-{}", field_id); - let pos_meta = inverted_index_meta_map.get(&pos_name).unwrap(); - let offset = pos_meta.offset + (term_value.term_info.positions_range.start as u64); - let len = term_value.term_info.positions_range.len() as u64; + let pos_meta = inverted_index_meta_map.get(&pos_name).ok_or_else(|| { + ErrorCode::TantivyError(format!( + "inverted index column `{}` does not exist", + pos_name + )) + })?; + let offset = pos_meta.offset + (term_info.positions_range.start as u64); + let len = term_info.positions_range.len() as u64; let pos_slice_meta = SingleColumnMeta { offset, len, num_values: 1, }; - let pos_slice_name = format!( - "{}-{}", - pos_name, term_value.term_info.positions_range.start - ); + let pos_slice_name = format!("{}-{}", pos_name, term_info.positions_range.start); slice_metas.push((pos_slice_name.clone(), pos_slice_meta)); - name_map.insert(pos_slice_name, term.clone()); + slice_name_map.insert(pos_slice_name, *term_id); } } @@ -347,53 +351,58 @@ impl InvertedIndexReader { .map(|f| (f.name.clone(), f.data.clone())) .collect::>(); + let mut block_postings_map = HashMap::with_capacity(term_infos.len()); + let mut position_reader_map = HashMap::with_capacity(term_infos.len()); for (slice_name, slice_data) in slice_files.into_iter() { - let term = name_map.get(&slice_name).unwrap(); - let term_value = term_values.get_mut(term).unwrap(); + let term_id = slice_name_map.remove(&slice_name).unwrap(); + let (_, term_info) = term_infos.get(&term_id).unwrap(); if slice_name.starts_with("idx") { let posting_file = FileSlice::new(Arc::new(slice_data)); - let postings = BlockSegmentPostings::open( - term_value.term_info.doc_freq, + let block_postings = BlockSegmentPostings::open( + term_info.doc_freq, posting_file, *index_record, *index_record, )?; - let doc_ids = postings.docs(); - let term_freqs = postings.freqs(); - term_value.doc_ids = doc_ids.to_vec(); - term_value.term_freqs = term_freqs.to_vec(); + block_postings_map.insert(term_id, block_postings); } else if slice_name.starts_with("pos") { let position_reader = PositionReader::open(slice_data)?; - term_value.position_reader = Some(position_reader); + position_reader_map.insert(term_id, position_reader); } } - // 6. collect matched rows by term values. - let matched_docs = collect_matched_rows( - query.box_clone(), + // 6. collect matched doc ids. + let mut collector = DocIdsCollector::create( row_count, - &fuzziness_terms, - &mut term_values, + need_position, + matched_terms, + term_infos, + block_postings_map, + position_reader_map, ); - - if !matched_docs.is_empty() { - let mut matched_rows = Vec::with_capacity(matched_docs.len()); - if has_score { - // TODO: add score - for doc_id in matched_docs { - matched_rows.push((doc_id as usize, Some(F32::from(1.0)))); - } - } else { - for doc_id in matched_docs { - matched_rows.push((doc_id as usize, None)) + let matched_doc_ids = + collector.collect_matched_rows(query.box_clone(), &prefix_terms, &fuzziness_terms)?; + + if let Some(matched_doc_ids) = matched_doc_ids { + if !matched_doc_ids.is_empty() { + let mut matched_rows = Vec::with_capacity(matched_doc_ids.len() as usize); + let doc_ids_iter = matched_doc_ids.iter(); + if has_score { + // TODO: add score + for doc_id in doc_ids_iter { + matched_rows.push((doc_id as usize, Some(F32::from(1.0)))); + } + } else { + for doc_id in doc_ids_iter { + matched_rows.push((doc_id as usize, None)); + } } + return Ok(Some(matched_rows)); } - Ok(Some(matched_rows)) - } else { - Ok(None) } + Ok(None) } // delegation of [InvertedIndexFileReader::cache_key_of_index_columns] diff --git a/src/query/storages/fuse/src/io/write/inverted_index_writer.rs b/src/query/storages/fuse/src/io/write/inverted_index_writer.rs index 8935a3420e78..232ebe3ab773 100644 --- a/src/query/storages/fuse/src/io/write/inverted_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/inverted_index_writer.rs @@ -14,8 +14,6 @@ use std::collections::BTreeMap; use std::collections::HashSet; -use std::io::Write; -use std::path::Path; use std::sync::Arc; use arrow_ipc::writer::write_message; @@ -52,16 +50,12 @@ use tantivy::tokenizer::Stemmer; use tantivy::tokenizer::StopWordFilter; use tantivy::tokenizer::TextAnalyzer; use tantivy::tokenizer::TokenizerManager; -use tantivy::Directory; -use tantivy::Index; use tantivy::IndexBuilder; use tantivy::IndexSettings; use tantivy::IndexWriter; use tantivy::SegmentComponent; use tantivy_jieba::JiebaTokenizer; -use crate::index::build_tantivy_footer; - pub struct InvertedIndexWriter { schema: DataSchemaRef, index_writer: IndexWriter, @@ -175,178 +169,6 @@ impl InvertedIndexWriter { Ok((inverted_index_schema, inverted_index_block)) } - - // The tantivy index data consists of eight files. - // `.managed.json` file stores the name of the file that the index contains, for example: - // [ - // "94bce521d5bc4eccbf3e7a0212093622.pos", - // "94bce521d5bc4eccbf3e7a0212093622.fieldnorm", - // "94bce521d5bc4eccbf3e7a0212093622.fast", - // "meta.json", - // "94bce521d5bc4eccbf3e7a0212093622.term", - // "94bce521d5bc4eccbf3e7a0212093622.store", - // "94bce521d5bc4eccbf3e7a0212093622.idx" - // ] - // - // `meta.json` file store the meta information associated with the index, for example: - // { - // "index_settings": { - // "docstore_compression": "lz4", - // "docstore_blocksize": 16384 - // }, - // "segments":[{ - // "segment_id": "94bce521-d5bc-4ecc-bf3e-7a0212093622", - // "max_doc": 6, - // "deletes": null - // }], - // "schema":[{ - // "name": "title", - // "type": "text", - // "options": { - // "indexing": { - // "record": "position", - // "fieldnorms": true, - // "tokenizer": "en" - // }, - // "stored": false, - // "fast": false - // } - // }, { - // "name": "content", - // "type": "text", - // "options": { - // "indexing": { - // "record": "position", - // "fieldnorms": true, - // "tokenizer": "en" - // }, - // "stored": false, - // "fast": false - // } - // }], - // "opstamp":0 - // } - // - // `terms` file stores the term dictionary, the value is - // an address into the `postings` file and the `positions` file. - // `postings` file stores the lists of document ids and freqs. - // `positions` file stores the positions of terms in each document. - // `field norms` file stores the sum of the length of the term. - // `fast fields` file stores column-oriented documents. - // `store` file stores row-oriented documents. - // - // More details can be seen here - // https://github.com/quickwit-oss/tantivy/blob/main/src/index/segment_component.rs#L8 - // - // We merge the data from these files into one file and - // record the offset to read each part of the data. - #[async_backtrace::framed] - fn write_index(mut writer: &mut W, index: &Index) -> Result<()> { - let directory = index.directory(); - - let managed_filepath = Path::new(".managed.json"); - let managed_bytes = directory.atomic_read(managed_filepath)?; - - let meta_filepath = Path::new("meta.json"); - let meta_data = directory.atomic_read(meta_filepath)?; - - let meta_string = std::str::from_utf8(&meta_data)?; - let meta_val: serde_json::Value = serde_json::from_str(meta_string)?; - let meta_json: String = serde_json::to_string(&meta_val)?; - - let segments = index.searchable_segments()?; - let segment = &segments[0]; - - let fast_fields = segment.open_read(SegmentComponent::FastFields)?; - let fast_fields_bytes = fast_fields.read_bytes()?; - - let store = segment.open_read(SegmentComponent::Store)?; - let store_bytes = store.read_bytes()?; - - let field_norms = segment.open_read(SegmentComponent::FieldNorms)?; - let field_norms_bytes = field_norms.read_bytes()?; - - let positions = segment.open_read(SegmentComponent::Positions)?; - let positions_bytes = positions.read_bytes()?; - - let postings = segment.open_read(SegmentComponent::Postings)?; - let postings_bytes = postings.read_bytes()?; - - let terms = segment.open_read(SegmentComponent::Terms)?; - let terms_bytes = terms.read_bytes()?; - - // write each tantivy files as part of data - let mut fast_fields_length = writer.write(&fast_fields_bytes)?; - let footer_length = Self::build_footer(&mut writer, &fast_fields_bytes)?; - fast_fields_length += footer_length; - - let mut store_length = writer.write(&store_bytes)?; - let footer_length = Self::build_footer(&mut writer, &store_bytes)?; - store_length += footer_length; - - let mut field_norms_length = writer.write(&field_norms_bytes)?; - let footer_length = Self::build_footer(&mut writer, &field_norms_bytes)?; - field_norms_length += footer_length; - - let mut positions_length = writer.write(&positions_bytes)?; - let footer_length = Self::build_footer(&mut writer, &positions_bytes)?; - positions_length += footer_length; - - let mut postings_length = writer.write(&postings_bytes)?; - let footer_length = Self::build_footer(&mut writer, &postings_bytes)?; - postings_length += footer_length; - - let mut terms_length = writer.write(&terms_bytes)?; - let footer_length = Self::build_footer(&mut writer, &terms_bytes)?; - terms_length += footer_length; - - let meta_length = writer.write(meta_json.as_bytes())?; - let managed_length = writer.write(&managed_bytes)?; - - // write offsets of each parts - let mut offset: u32 = 0; - let mut offsets = Vec::with_capacity(8); - offset += fast_fields_length as u32; - offsets.push(offset); - - offset += store_length as u32; - offsets.push(offset); - - offset += field_norms_length as u32; - offsets.push(offset); - - offset += positions_length as u32; - offsets.push(offset); - - offset += postings_length as u32; - offsets.push(offset); - - offset += terms_length as u32; - offsets.push(offset); - - offset += meta_length as u32; - offsets.push(offset); - - offset += managed_length as u32; - offsets.push(offset); - - // the number of offsets, used for multi index segments in one file - let nums = offsets.len() as u32; - for offset in offsets { - writer.write_all(&offset.to_le_bytes())?; - } - writer.write_all(&nums.to_le_bytes())?; - - writer.flush()?; - - Ok(()) - } - - fn build_footer(writer: &mut W, bytes: &[u8]) -> Result { - let buf = build_tantivy_footer(bytes)?; - let len = writer.write(&buf)?; - Ok(len) - } } // inverted index block include 5 types of data, diff --git a/src/query/storages/fuse/src/pruning/inverted_index_pruner.rs b/src/query/storages/fuse/src/pruning/inverted_index_pruner.rs index 9f58e4233edf..a5f585e15dd5 100644 --- a/src/query/storages/fuse/src/pruning/inverted_index_pruner.rs +++ b/src/query/storages/fuse/src/pruning/inverted_index_pruner.rs @@ -53,13 +53,12 @@ use crate::io::TableMetaLocationGenerator; pub struct InvertedIndexPruner { dal: Operator, has_score: bool, - field_nums: usize, index_name: String, index_version: String, need_position: bool, tokenizer_manager: TokenizerManager, query: Box, - field_ids: HashSet, + field_ids: HashSet, index_record: IndexRecordOption, fuzziness: Option, } @@ -84,7 +83,7 @@ impl InvertedIndexPruner { let mut field_ids = HashSet::new(); query.query_terms(&mut |term, pos| { let field = term.field(); - let field_id = field.field_id() as usize; + let field_id = field.field_id(); field_ids.insert(field_id); if pos { need_position = true; @@ -93,14 +92,12 @@ impl InvertedIndexPruner { // whether need to generate score internl column let has_score = inverted_index_info.has_score; - let field_nums = inverted_index_info.index_schema.num_fields(); let index_name = inverted_index_info.index_name.clone(); let index_version = inverted_index_info.index_version.clone(); return Ok(Some(Arc::new(InvertedIndexPruner { dal, has_score, - field_nums, index_name, index_version, need_position, @@ -130,7 +127,6 @@ impl InvertedIndexPruner { let matched_rows = inverted_index_reader .do_filter( - self.field_nums, self.need_position, self.has_score, self.query.box_clone(), @@ -138,7 +134,7 @@ impl InvertedIndexPruner { &self.index_record, &self.fuzziness, self.tokenizer_manager.clone(), - row_count as u32, + row_count, &index_loc, ) .await?; diff --git a/src/query/storages/stream/src/stream_table.rs b/src/query/storages/stream/src/stream_table.rs index cf5599b468d0..e593ab106acd 100644 --- a/src/query/storages/stream/src/stream_table.rs +++ b/src/query/storages/stream/src/stream_table.rs @@ -264,10 +264,9 @@ impl StreamTable { .get_table_name_by_id(source_table_id) .await .and_then(|opt| { - opt.ok_or(ErrorCode::UnknownTable(format!( - "Unknown table id: '{}'", - source_table_id - ))) + opt.ok_or_else(|| { + ErrorCode::UnknownTable(format!("Unknown table id: '{}'", source_table_id)) + }) }) } @@ -286,7 +285,7 @@ impl StreamTable { let source_table_meta = catalog .get_table_meta_by_id(source_table_id) .await? - .ok_or(ErrorCode::Internal("source database id must be set"))?; + .ok_or_else(|| ErrorCode::Internal("source database id must be set"))?; source_table_meta .data .options