Skip to content

Commit

Permalink
Update to Tantivy 0.22.0 (#148)
Browse files Browse the repository at this point in the history
* Resolve tantivy 0.22 issues

* Update clippy and resolve issues

* Use tracing subscriber rather than pretty env logger
  • Loading branch information
ChillFish8 authored Jun 3, 2024
1 parent 9b988d2 commit 8836c37
Show file tree
Hide file tree
Showing 19 changed files with 823 additions and 632 deletions.
1,203 changes: 664 additions & 539 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lnx-engine/aexecutor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ edition = "2018"
async-channel = "1.6.1"
anyhow = "1"
rayon = "1.5.1"
tantivy = "0.18.0"
tantivy = "0.22.0"
crossbeam = "0.8"
tokio = { version = "1.11", features = ["sync"] }
6 changes: 3 additions & 3 deletions lnx-engine/aexecutor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod reader_executor;
use std::borrow::Borrow;

use anyhow::{Error, Result};
use tantivy::{LeasedItem, Searcher};
use tantivy::Searcher;
use tokio::sync::{oneshot, Semaphore};

use crate::reader_executor::TantivyExecutorPool;
Expand Down Expand Up @@ -96,7 +96,7 @@ impl SearcherExecutorPool {
/// the results once complete.
pub async fn spawn<F, T>(&self, func: F) -> Result<T>
where
F: FnOnce(LeasedItem<Searcher>, &tantivy::Executor) -> T + Send + 'static,
F: FnOnce(Searcher, &tantivy::Executor) -> T + Send + 'static,
T: Sync + Send + 'static,
{
let _permit = self.limiter.acquire().await?;
Expand All @@ -117,7 +117,7 @@ impl SearcherExecutorPool {
}

#[inline]
pub fn searcher(&self) -> LeasedItem<Searcher> {
pub fn searcher(&self) -> Searcher {
self.reader.searcher()
}
}
2 changes: 1 addition & 1 deletion lnx-engine/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl Engine {
indexes = guard.as_ref().clone();
}

if !override_if_exists & indexes.get(index.name()).is_some() {
if !override_if_exists & indexes.contains_key(index.name()) {
return Err(Error::msg("index already exists."));
}

Expand Down
6 changes: 3 additions & 3 deletions lnx-engine/search-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ edition = "2018"
crossbeam = { version = "0.8.1", default-features = false, features = ["crossbeam-channel", "crossbeam-queue"] }
time = { version = "0.3.9", features = ["serde", "parsing", "formatting"] }
serde = { version = "1", features = ["derive"] }
sled = { version = "0.34.7", features = ["compression"] }
sled = { version = "0.34.7" }
tokio = { version = "1.12", features = ["sync", "fs", "rt"] }
compose = { git = "https://github.com/lnx-search/compose.git", rev = "77099ad" }
compose = { git = "https://github.com/lnx-search/compose.git", rev = "a3cf2ef" }

deunicode = "1.6.0"
tantivy = "0.18.0"
tantivy = "0.22.0"
tracing = "0.1.29"
crc32fast = "1.3.0"
bincode = "1.3"
Expand Down
2 changes: 1 addition & 1 deletion lnx-engine/search-index/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn compress_stop_words() -> Result<()> {
encoder.write_all(&data)?;
let data = encoder.finish()?;

fs::write("./_dist/stop_words", &data)?;
fs::write("./_dist/stop_words", data)?;

Ok(())
}
4 changes: 2 additions & 2 deletions lnx-engine/search-index/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ mod tests {
use crate::structures::{DocumentValue, IndexDeclaration};

fn init_state() {
let _ = std::env::set_var("RUST_LOG", "debug,sled=info");
let _ = pretty_env_logger::try_init_timed();
std::env::set_var("RUST_LOG", "debug,sled=info");
let _ = tracing_subscriber::fmt::try_init();
}

async fn get_index_with(value: serde_json::Value) -> Result<Index> {
Expand Down
6 changes: 4 additions & 2 deletions lnx-engine/search-index/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tantivy::query::{
BooleanQuery,
BoostQuery,
EmptyQuery,
EnableScoring,
FuzzyTermQuery,
MoreLikeThisQuery,
Query,
Expand Down Expand Up @@ -559,7 +560,7 @@ impl QueryBuilder {
.schema
.get_field(&v)
.map(|field| (field, 0.0f32))
.ok_or_else(|| {
.map_err(|_| {
anyhow!(
"Unknown field {:?} in fuzzy query config {:?}.",
v,
Expand Down Expand Up @@ -691,6 +692,7 @@ impl QueryBuilder {
&qry,
&TopDocs::with_limit(1),
executor,
EnableScoring::enabled_from_searcher(&searcher),
)?;
if results.is_empty() {
return Err(Error::msg(format!(
Expand Down Expand Up @@ -796,7 +798,7 @@ impl QueryBuilder {
}

fn get_searchable_field(&self, field: &str) -> Result<Field> {
let field = self.schema.get_field(field).ok_or_else(|| {
let field = self.schema.get_field(field).map_err(|_| {
Error::msg(format!("no field exists with name: {:?}", field))
})?;

Expand Down
73 changes: 46 additions & 27 deletions lnx-engine/search-index/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@ use aexecutor::SearcherExecutorPool;
use anyhow::{anyhow, Error, Result};
use serde::{Deserialize, Serialize};
use tantivy::collector::{Count, TopDocs};
use tantivy::fastfield::FastFieldReader;
use tantivy::query::{Query, TermQuery};
use tantivy::schema::{Field, FieldType, IndexRecordOption, Schema, Value};
use tantivy::query::{EnableScoring, Query, TermQuery};
use tantivy::schema::{Field, FieldType, IndexRecordOption, OwnedValue, Schema};
use tantivy::{
DateTime,
DocAddress,
DocId,
Document,
Executor,
IndexReader,
LeasedItem,
Order,
ReloadPolicy,
Score,
Searcher,
SegmentReader,
TantivyDocument,
Term,
};

Expand Down Expand Up @@ -146,11 +147,17 @@ fn order_and_search<R: AsScore + tantivy::fastfield::FastValue>(
executor: &Executor,
) -> Result<(Vec<(R, DocAddress)>, usize)> {
let collector = TopDocs::with_limit(limit + offset);
let collector = collector.order_by_fast_field(field);
let field_name = searcher.schema().get_field_name(field);
let collector = collector.order_by_fast_field(field_name, Order::Desc);
let collector = (collector, Count);

let (result_addresses, count) = searcher
.search_with_executor(&query, &collector, executor)
.search_with_executor(
&query,
&collector,
executor,
EnableScoring::enabled_from_searcher(searcher),
)
.map_err(Error::from)?;

let results = result_addresses
Expand All @@ -167,7 +174,12 @@ macro_rules! execute_staged_search {
let collector = ($collector, Count);

let (results, count) = $searcher
.search_with_executor(&$query, &collector, $executor)
.search_with_executor(
&$query,
&collector,
$executor,
tantivy::query::EnableScoring::enabled_from_searcher(&$searcher),
)
.map_err(Error::from)?;

let results = results
Expand All @@ -189,13 +201,13 @@ fn process_search<S: AsScore>(
) -> Result<Vec<DocumentHit>> {
let mut hits = Vec::with_capacity(top_docs.len());
for (ratio, ref_address) in top_docs {
let retrieved_doc = searcher.doc(ref_address)?;
let mut doc = schema.to_named_doc(&retrieved_doc);
let retrieved_doc: TantivyDocument = searcher.doc(ref_address)?;
let mut doc = retrieved_doc.to_named_doc(schema);
let id = doc.0
.remove("_id")
.ok_or_else(|| Error::msg("document has been missed labeled (missing primary key '_id'), the dataset is invalid"))?;

if let Value::U64(doc_id) = id[0] {
if let OwnedValue::U64(doc_id) = id[0] {
hits.push(DocumentHit::from_tantivy_document(
ctx,
doc_id,
Expand Down Expand Up @@ -226,9 +238,8 @@ fn order_and_sort(
offset: usize,
executor: &Executor,
) -> Result<(Vec<DocumentHit>, usize)> {
let is_multi_value = ctx
.multi_value_fields()
.contains(schema.get_field_name(field));
let field_name = schema.get_field_name(field);
let is_multi_value = ctx.multi_value_fields().contains(field_name);

if is_multi_value {
return Err(anyhow!(
Expand Down Expand Up @@ -266,15 +277,17 @@ fn order_and_sort(
let collector = TopDocs::with_limit(limit + offset);
let out = match field_type {
FieldType::I64(_) => {
let field_name = field_name.to_owned();
let collector =
collector.custom_score(move |segment_reader: &SegmentReader| {
let reader = segment_reader
.fast_fields()
.i64(field)
.i64(&field_name)
.expect("field exists");

move |doc: DocId| {
let value: i64 = reader.get(doc);
let value: i64 =
reader.first(doc).expect("Col must not be None");
std::cmp::Reverse(value)
}
});
Expand All @@ -285,15 +298,17 @@ fn order_and_sort(
(process_search(ctx, searcher, schema, out.0)?, out.1)
},
FieldType::U64(_) => {
let field_name = field_name.to_owned();
let collector =
collector.custom_score(move |segment_reader: &SegmentReader| {
let reader = segment_reader
.fast_fields()
.u64(field)
.u64(&field_name)
.expect("field exists");

move |doc: DocId| {
let value: u64 = reader.get(doc);
let value: u64 =
reader.first(doc).expect("Col must not be None");
std::cmp::Reverse(value)
}
});
Expand All @@ -304,15 +319,17 @@ fn order_and_sort(
(process_search(ctx, searcher, schema, out.0)?, out.1)
},
FieldType::F64(_) => {
let field_name = field_name.to_owned();
let collector =
collector.custom_score(move |segment_reader: &SegmentReader| {
let reader = segment_reader
.fast_fields()
.f64(field)
.f64(&field_name)
.expect("field exists");

move |doc: DocId| {
let value: f64 = reader.get(doc);
let value: f64 =
reader.first(doc).expect("Col must not be None");
std::cmp::Reverse(value)
}
});
Expand All @@ -323,15 +340,17 @@ fn order_and_sort(
(process_search(ctx, searcher, schema, out.0)?, out.1)
},
FieldType::Date(_) => {
let field_name = field_name.to_owned();
let collector =
collector.custom_score(move |segment_reader: &SegmentReader| {
let reader = segment_reader
.fast_fields()
.date(field)
.date(&field_name)
.expect("field exists");

move |doc: DocId| {
let value: DateTime = reader.get(doc);
let value: DateTime =
reader.first(doc).expect("Col must not be None");
std::cmp::Reverse(value)
}
});
Expand Down Expand Up @@ -373,8 +392,7 @@ impl Reader {
let reader: IndexReader = ctx
.index
.reader_builder()
.reload_policy(ReloadPolicy::OnCommit)
.num_searchers(ctx.reader_ctx.max_concurrency)
.reload_policy(ReloadPolicy::OnCommitWithDelay)
.try_into()?;
info!(
"index reader created with reload policy=OnCommit, num_searchers={}",
Expand Down Expand Up @@ -441,6 +459,7 @@ impl Reader {
&qry,
&TopDocs::with_limit(1),
executor,
EnableScoring::enabled_from_searcher(&searcher),
)?;
if results.is_empty() {
return Err(Error::msg(format!(
Expand All @@ -450,10 +469,10 @@ impl Reader {
}

let (_, addr) = results.remove(0);
let doc = searcher.doc(addr)?;
let doc: TantivyDocument = searcher.doc(addr)?;
let schema = searcher.schema();

Ok(schema.to_named_doc(&doc))
Ok(doc.to_named_doc(schema))
})
.await??;

Expand Down Expand Up @@ -487,7 +506,7 @@ impl Reader {
let schema = searcher.schema();
let order_by = order_by.map(|v| schema.get_field(&v));

let (hits, count) = if let Some(Some(field)) = order_by {
let (hits, count) = if let Some(Ok(field)) = order_by {
order_and_sort(
sort,
field,
Expand Down Expand Up @@ -541,7 +560,7 @@ impl Reader {
///
/// This is an internal export to allow the writer
/// to have access to the segment reader information.
pub(crate) fn get_searcher(&self) -> LeasedItem<Searcher> {
pub(crate) fn get_searcher(&self) -> Searcher {
self.pool.searcher()
}
}
Loading

0 comments on commit 8836c37

Please sign in to comment.