From b8083a36206a5502ce60412fcf726e4510b9e6d0 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 13 Sep 2024 12:22:00 +0800 Subject: [PATCH 1/2] fix: can't filter on string column which has FTS index Signed-off-by: BubbleCal --- python/python/tests/test_scalar_index.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index c3096c166e..449a95476c 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -228,6 +228,20 @@ def test_full_text_search(dataset, with_position): assert query in row.as_py() +def test_filter_with_fts_index(dataset): + dataset.create_scalar_index("doc", index_type="INVERTED", with_position=False) + row = dataset.take(indices=[0], columns=["doc"]) + query = row.column(0)[0].as_py() + query = query.split(" ")[0] + results = dataset.scanner( + columns=["doc"], + filter=f"doc = '{query}'", + ).to_table() + results = results.column(0) + for row in results: + assert query == row.as_py() + + def test_bitmap_index(tmp_path: Path): """Test create bitmap index""" tbl = pa.Table.from_arrays( From ad3ee08b5f4bb61163e5f906de7204fb483b5f8c Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 13 Sep 2024 14:31:10 +0800 Subject: [PATCH 2/2] fix Signed-off-by: BubbleCal --- python/python/tests/test_scalar_index.py | 6 ++- rust/lance-datafusion/src/expr.rs | 5 +++ rust/lance-linalg/build.rs | 2 +- rust/lance/src/dataset/scanner.rs | 31 ++++++++------- rust/lance/src/index.rs | 44 +++++++++++++-------- rust/lance/src/index/scalar.rs | 49 +++++++++++++++++------- 6 files changed, 91 insertions(+), 46 deletions(-) diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 449a95476c..06385cb6c3 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -223,6 +223,7 @@ def test_full_text_search(dataset, with_position): columns=["doc"], full_text_query=query, ).to_table() + assert results.num_rows > 0 results = results.column(0) for row in results: assert query in row.as_py() @@ -234,10 +235,11 @@ def test_filter_with_fts_index(dataset): query = row.column(0)[0].as_py() query = query.split(" ")[0] results = dataset.scanner( - columns=["doc"], filter=f"doc = '{query}'", + prefilter=True, ).to_table() - results = results.column(0) + assert results.num_rows > 0 + results = results["doc"] for row in results: assert query == row.as_py() diff --git a/rust/lance-datafusion/src/expr.rs b/rust/lance-datafusion/src/expr.rs index 00ebf1e86b..dbc450b654 100644 --- a/rust/lance-datafusion/src/expr.rs +++ b/rust/lance-datafusion/src/expr.rs @@ -219,6 +219,11 @@ pub fn safe_coerce_scalar(value: &ScalarValue, ty: &DataType) -> Option Some(ScalarValue::LargeUtf8(val.clone())), _ => None, }, + ScalarValue::LargeUtf8(val) => match ty { + DataType::Utf8 => Some(ScalarValue::Utf8(val.clone())), + DataType::LargeUtf8 => Some(value.clone()), + _ => None, + }, ScalarValue::Boolean(_) => match ty { DataType::Boolean => Some(value.clone()), _ => None, diff --git a/rust/lance-linalg/build.rs b/rust/lance-linalg/build.rs index 88c7223271..2eadc0a65d 100644 --- a/rust/lance-linalg/build.rs +++ b/rust/lance-linalg/build.rs @@ -89,7 +89,7 @@ fn build_f16_with_flags(suffix: &str, flags: &[&str]) -> Result<(), cc::Error> { // Pedantic will complain about _Float16 in some versions of GCC // .flag("-Wpedantic") // We pass in the suffix to make sure the symbol names are unique - .flag(format!("-DSUFFIX=_{}", suffix)); + .flag(format!("-DSUFFIX=_{}", suffix).as_str()); for flag in flags { builder.flag(flag); diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 5eef527144..31538c21b2 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -38,7 +38,6 @@ use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{ROW_ADDR, ROW_ADDR_FIELD, ROW_ID, ROW_ID_FIELD}; use lance_datafusion::exec::{execute_plan, LanceExecutionOptions}; use lance_datafusion::projection::ProjectionPlan; -use lance_index::scalar::expression::IndexInformationProvider; use lance_index::scalar::expression::PlannerIndexExt; use lance_index::scalar::inverted::SCORE_COL; use lance_index::scalar::FullTextSearchQuery; @@ -1127,19 +1126,23 @@ impl Scanner { query: &FullTextSearchQuery, ) -> Result> { let columns = if query.columns.is_empty() { - let index_info = self.dataset.scalar_index_info().await?; - self.dataset - .schema() - .fields - .iter() - .filter_map(|f| { - if f.data_type() == DataType::Utf8 || f.data_type() == DataType::LargeUtf8 { - index_info.get_index(&f.name).map(|_| f.name.clone()) - } else { - None - } - }) - .collect() + let string_columns = self.dataset.schema().fields.iter().filter_map(|f| { + if f.data_type() == DataType::Utf8 || f.data_type() == DataType::LargeUtf8 { + Some(&f.name) + } else { + None + } + }); + + let mut indexed_columns = Vec::new(); + for column in string_columns { + let index = self.dataset.load_scalar_index_for_column(column).await?; + if index.is_some() { + indexed_columns.push(column.clone()); + } + } + + indexed_columns } else { query.columns.clone() }; diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 969a719376..fd803e0cc5 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -19,7 +19,7 @@ use lance_index::scalar::expression::{ IndexInformationProvider, LabelListQueryParser, SargableQueryParser, ScalarQueryParser, }; use lance_index::scalar::lance_format::LanceIndexStore; -use lance_index::scalar::{InvertedIndexParams, ScalarIndex}; +use lance_index::scalar::{InvertedIndexParams, ScalarIndex, ScalarIndexType}; use lance_index::vector::flat::index::{FlatIndex, FlatQuantizer}; use lance_index::vector::hnsw::HNSW; use lance_index::vector::pq::ProductQuantizer; @@ -41,7 +41,7 @@ use lance_table::format::Index as IndexMetadata; use lance_table::format::{Fragment, SelfDescribingFileReader}; use lance_table::io::manifest::read_manifest_indexes; use roaring::RoaringBitmap; -use scalar::build_inverted_index; +use scalar::{build_inverted_index, detect_scalar_index_type}; use serde_json::json; use snafu::{location, Location}; use tracing::instrument; @@ -766,22 +766,34 @@ impl DatasetIndexInternalExt for Dataset { async fn scalar_index_info(&self) -> Result { let indices = self.load_indices().await?; let schema = self.schema(); - let indexed_fields = indices - .iter() - .filter(|idx| idx.fields.len() == 1) - .map(|idx| { - let field = idx.fields[0]; - let field = schema.field_by_id(field).ok_or_else(|| Error::Internal { message: format!("Index referenced a field with id {field} which did not exist in the schema"), location: location!() }); - field.map(|field| { - let query_parser = if let DataType::List(_) = field.data_type() { + let mut indexed_fields = Vec::new(); + for index in indices.iter().filter(|idx| idx.fields.len() == 1) { + let field = index.fields[0]; + let field = schema.field_by_id(field).ok_or_else(|| Error::Internal { + message: format!( + "Index referenced a field with id {field} which did not exist in the schema" + ), + location: location!(), + })?; + + let query_parser = match field.data_type() { + DataType::List(_) => { Box::::default() as Box - } else { + } + DataType::Utf8 | DataType::LargeUtf8 => { + let uuid = index.uuid.to_string(); + let index_type = detect_scalar_index_type(self, &field.name, &uuid).await?; + // Inverted index can't be used for filtering + if matches!(index_type, ScalarIndexType::Inverted) { + continue; + } Box::::default() as Box - }; - (field.name.clone(), (field.data_type(), query_parser)) - }) - }) - .collect::>>()?; + } + _ => Box::::default() as Box, + }; + + indexed_fields.push((field.name.clone(), (field.data_type(), query_parser))); + } let index_info_map = HashMap::from_iter(indexed_fields); Ok(ScalarIndexInfo { indexed_columns: index_info_map, diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index fb9a6c3c88..b7d5e6cbb1 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -170,9 +170,33 @@ pub async fn open_scalar_index( uuid: &str, ) -> Result> { let index_store = Arc::new(LanceIndexStore::from_dataset(dataset, uuid)); - let index_dir = dataset.indices_dir().child(uuid); - // This works at the moment, since we only have a few index types, may need to introduce better - // detection method in the future. + let index_type = detect_scalar_index_type(dataset, column, uuid).await?; + match index_type { + ScalarIndexType::Bitmap => { + let bitmap_index = BitmapIndex::load(index_store).await?; + Ok(bitmap_index as Arc) + } + ScalarIndexType::LabelList => { + let tag_index = LabelListIndex::load(index_store).await?; + Ok(tag_index as Arc) + } + ScalarIndexType::Inverted => { + let inverted_index = InvertedIndex::load(index_store).await?; + Ok(inverted_index as Arc) + } + ScalarIndexType::BTree => { + let btree_index = BTreeIndex::load(index_store).await?; + Ok(btree_index as Arc) + } + } +} + +pub async fn detect_scalar_index_type( + dataset: &Dataset, + column: &str, + index_uuid: &str, +) -> Result { + let index_dir = dataset.indices_dir().child(index_uuid); let col = dataset.schema().field(column).ok_or(Error::Internal { message: format!( "Index refers to column {} which does not exist in dataset schema", @@ -180,19 +204,18 @@ pub async fn open_scalar_index( ), location: location!(), })?; + let bitmap_page_lookup = index_dir.child(BITMAP_LOOKUP_NAME); let inverted_list_lookup = index_dir.child(INVERT_LIST_FILE); - if let DataType::List(_) = col.data_type() { - let tag_index = LabelListIndex::load(index_store).await?; - Ok(tag_index as Arc) + let index_type = if let DataType::List(_) = col.data_type() { + ScalarIndexType::LabelList } else if dataset.object_store.exists(&bitmap_page_lookup).await? { - let bitmap_index = BitmapIndex::load(index_store).await?; - Ok(bitmap_index as Arc) + ScalarIndexType::Bitmap } else if dataset.object_store.exists(&inverted_list_lookup).await? { - let inverted_index = InvertedIndex::load(index_store).await?; - Ok(inverted_index as Arc) + ScalarIndexType::Inverted } else { - let btree_index = BTreeIndex::load(index_store).await?; - Ok(btree_index as Arc) - } + ScalarIndexType::BTree + }; + + Ok(index_type) }