Skip to content

Commit

Permalink
fix: can't filter on string column which has FTS index (#2875)
Browse files Browse the repository at this point in the history
also fix BTree index isn't used if the column is with LargeUtf8 data
type

---------

Signed-off-by: BubbleCal <bubble-cal@outlook.com>
  • Loading branch information
BubbleCal authored Sep 13, 2024
1 parent 1731639 commit 60797a6
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 44 deletions.
16 changes: 16 additions & 0 deletions python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,27 @@ def test_full_text_search(dataset, with_position):
columns=["doc"],
full_text_query=query,
).to_table()
assert results.num_rows > 0
results = results.column(0)
for row in results:
assert query in row.as_py()


def test_filter_with_fts_index(dataset):
dataset.create_scalar_index("doc", index_type="INVERTED", with_position=False)
row = dataset.take(indices=[0], columns=["doc"])
query = row.column(0)[0].as_py()
query = query.split(" ")[0]
results = dataset.scanner(
filter=f"doc = '{query}'",
prefilter=True,
).to_table()
assert results.num_rows > 0
results = results["doc"]
for row in results:
assert query == row.as_py()


def test_bitmap_index(tmp_path: Path):
"""Test create bitmap index"""
tbl = pa.Table.from_arrays(
Expand Down
5 changes: 5 additions & 0 deletions rust/lance-datafusion/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ pub fn safe_coerce_scalar(value: &ScalarValue, ty: &DataType) -> Option<ScalarVa
DataType::LargeUtf8 => Some(ScalarValue::LargeUtf8(val.clone())),
_ => None,
},
ScalarValue::LargeUtf8(val) => match ty {
DataType::Utf8 => Some(ScalarValue::Utf8(val.clone())),
DataType::LargeUtf8 => Some(value.clone()),
_ => None,
},
ScalarValue::Boolean(_) => match ty {
DataType::Boolean => Some(value.clone()),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-linalg/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn build_f16_with_flags(suffix: &str, flags: &[&str]) -> Result<(), cc::Error> {
// Pedantic will complain about _Float16 in some versions of GCC
// .flag("-Wpedantic")
// We pass in the suffix to make sure the symbol names are unique
.flag(format!("-DSUFFIX=_{}", suffix));
.flag(format!("-DSUFFIX=_{}", suffix).as_str());

for flag in flags {
builder.flag(flag);
Expand Down
31 changes: 17 additions & 14 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use lance_core::utils::tokio::get_num_compute_intensive_cpus;
use lance_core::{ROW_ADDR, ROW_ADDR_FIELD, ROW_ID, ROW_ID_FIELD};
use lance_datafusion::exec::{execute_plan, LanceExecutionOptions};
use lance_datafusion::projection::ProjectionPlan;
use lance_index::scalar::expression::IndexInformationProvider;
use lance_index::scalar::expression::PlannerIndexExt;
use lance_index::scalar::inverted::SCORE_COL;
use lance_index::scalar::FullTextSearchQuery;
Expand Down Expand Up @@ -1127,19 +1126,23 @@ impl Scanner {
query: &FullTextSearchQuery,
) -> Result<Arc<dyn ExecutionPlan>> {
let columns = if query.columns.is_empty() {
let index_info = self.dataset.scalar_index_info().await?;
self.dataset
.schema()
.fields
.iter()
.filter_map(|f| {
if f.data_type() == DataType::Utf8 || f.data_type() == DataType::LargeUtf8 {
index_info.get_index(&f.name).map(|_| f.name.clone())
} else {
None
}
})
.collect()
let string_columns = self.dataset.schema().fields.iter().filter_map(|f| {
if f.data_type() == DataType::Utf8 || f.data_type() == DataType::LargeUtf8 {
Some(&f.name)
} else {
None
}
});

let mut indexed_columns = Vec::new();
for column in string_columns {
let index = self.dataset.load_scalar_index_for_column(column).await?;
if index.is_some() {
indexed_columns.push(column.clone());
}
}

indexed_columns
} else {
query.columns.clone()
};
Expand Down
44 changes: 28 additions & 16 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use lance_index::scalar::expression::{
IndexInformationProvider, LabelListQueryParser, SargableQueryParser, ScalarQueryParser,
};
use lance_index::scalar::lance_format::LanceIndexStore;
use lance_index::scalar::{InvertedIndexParams, ScalarIndex};
use lance_index::scalar::{InvertedIndexParams, ScalarIndex, ScalarIndexType};
use lance_index::vector::flat::index::{FlatIndex, FlatQuantizer};
use lance_index::vector::hnsw::HNSW;
use lance_index::vector::pq::ProductQuantizer;
Expand All @@ -41,7 +41,7 @@ use lance_table::format::Index as IndexMetadata;
use lance_table::format::{Fragment, SelfDescribingFileReader};
use lance_table::io::manifest::read_manifest_indexes;
use roaring::RoaringBitmap;
use scalar::build_inverted_index;
use scalar::{build_inverted_index, detect_scalar_index_type};
use serde_json::json;
use snafu::{location, Location};
use tracing::instrument;
Expand Down Expand Up @@ -766,22 +766,34 @@ impl DatasetIndexInternalExt for Dataset {
async fn scalar_index_info(&self) -> Result<ScalarIndexInfo> {
let indices = self.load_indices().await?;
let schema = self.schema();
let indexed_fields = indices
.iter()
.filter(|idx| idx.fields.len() == 1)
.map(|idx| {
let field = idx.fields[0];
let field = schema.field_by_id(field).ok_or_else(|| Error::Internal { message: format!("Index referenced a field with id {field} which did not exist in the schema"), location: location!() });
field.map(|field| {
let query_parser = if let DataType::List(_) = field.data_type() {
let mut indexed_fields = Vec::new();
for index in indices.iter().filter(|idx| idx.fields.len() == 1) {
let field = index.fields[0];
let field = schema.field_by_id(field).ok_or_else(|| Error::Internal {
message: format!(
"Index referenced a field with id {field} which did not exist in the schema"
),
location: location!(),
})?;

let query_parser = match field.data_type() {
DataType::List(_) => {
Box::<LabelListQueryParser>::default() as Box<dyn ScalarQueryParser>
} else {
}
DataType::Utf8 | DataType::LargeUtf8 => {
let uuid = index.uuid.to_string();
let index_type = detect_scalar_index_type(self, &field.name, &uuid).await?;
// Inverted index can't be used for filtering
if matches!(index_type, ScalarIndexType::Inverted) {
continue;
}
Box::<SargableQueryParser>::default() as Box<dyn ScalarQueryParser>
};
(field.name.clone(), (field.data_type(), query_parser))
})
})
.collect::<Result<Vec<_>>>()?;
}
_ => Box::<SargableQueryParser>::default() as Box<dyn ScalarQueryParser>,
};

indexed_fields.push((field.name.clone(), (field.data_type(), query_parser)));
}
let index_info_map = HashMap::from_iter(indexed_fields);
Ok(ScalarIndexInfo {
indexed_columns: index_info_map,
Expand Down
49 changes: 36 additions & 13 deletions rust/lance/src/index/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,29 +170,52 @@ pub async fn open_scalar_index(
uuid: &str,
) -> Result<Arc<dyn ScalarIndex>> {
let index_store = Arc::new(LanceIndexStore::from_dataset(dataset, uuid));
let index_dir = dataset.indices_dir().child(uuid);
// This works at the moment, since we only have a few index types, may need to introduce better
// detection method in the future.
let index_type = detect_scalar_index_type(dataset, column, uuid).await?;
match index_type {
ScalarIndexType::Bitmap => {
let bitmap_index = BitmapIndex::load(index_store).await?;
Ok(bitmap_index as Arc<dyn ScalarIndex>)
}
ScalarIndexType::LabelList => {
let tag_index = LabelListIndex::load(index_store).await?;
Ok(tag_index as Arc<dyn ScalarIndex>)
}
ScalarIndexType::Inverted => {
let inverted_index = InvertedIndex::load(index_store).await?;
Ok(inverted_index as Arc<dyn ScalarIndex>)
}
ScalarIndexType::BTree => {
let btree_index = BTreeIndex::load(index_store).await?;
Ok(btree_index as Arc<dyn ScalarIndex>)
}
}
}

pub async fn detect_scalar_index_type(
dataset: &Dataset,
column: &str,
index_uuid: &str,
) -> Result<ScalarIndexType> {
let index_dir = dataset.indices_dir().child(index_uuid);
let col = dataset.schema().field(column).ok_or(Error::Internal {
message: format!(
"Index refers to column {} which does not exist in dataset schema",
column
),
location: location!(),
})?;

let bitmap_page_lookup = index_dir.child(BITMAP_LOOKUP_NAME);
let inverted_list_lookup = index_dir.child(INVERT_LIST_FILE);
if let DataType::List(_) = col.data_type() {
let tag_index = LabelListIndex::load(index_store).await?;
Ok(tag_index as Arc<dyn ScalarIndex>)
let index_type = if let DataType::List(_) = col.data_type() {
ScalarIndexType::LabelList
} else if dataset.object_store.exists(&bitmap_page_lookup).await? {
let bitmap_index = BitmapIndex::load(index_store).await?;
Ok(bitmap_index as Arc<dyn ScalarIndex>)
ScalarIndexType::Bitmap
} else if dataset.object_store.exists(&inverted_list_lookup).await? {
let inverted_index = InvertedIndex::load(index_store).await?;
Ok(inverted_index as Arc<dyn ScalarIndex>)
ScalarIndexType::Inverted
} else {
let btree_index = BTreeIndex::load(index_store).await?;
Ok(btree_index as Arc<dyn ScalarIndex>)
}
ScalarIndexType::BTree
};

Ok(index_type)
}

0 comments on commit 60797a6

Please sign in to comment.