Skip to content

Commit

Permalink
[Cosmos] Hybrid Search query pipeline (#38275)
Browse files Browse the repository at this point in the history
* Create hybrid_search_aggregator.py

* others

* Update execution_dispatcher.py

* Update execution_dispatcher.py

* sync changes, need to look at vector + FTS/ skip + take

* async pipeline

* account for skip/take and simplify logics

* small hack for now

* fixing top/limit logic

* return only payload

* fix hack

* pylint

* simplifying further

* small changes

* adds readme, buffer limit, simplifies

* simplify async, CI green

* Update hybrid_search_aggregator.py

* Update sdk/cosmos/azure-cosmos/README.md

Co-authored-by: Anna Tisch <antisch@microsoft.com>

* update variable name

* add sync and async tests

* Update README.md

* simplifications, test fixes

* add wrong query tests

* pylint/cspell

* Update CHANGELOG.md

* small changes

* test updates

* Update hybrid_search_data.py

* cspell, samples

* change tops

* address comments

* Update hybrid_search_aggregator.py

* update pipeline description

* Update CHANGELOG.md

* Update CHANGELOG.md

---------

Co-authored-by: Anna Tisch <antisch@microsoft.com>
  • Loading branch information
simorenoh and annatisch authored Nov 19, 2024
1 parent 67aa17f commit 3de7a4c
Show file tree
Hide file tree
Showing 21 changed files with 1,179 additions and 39 deletions.
9 changes: 2 additions & 7 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
## Release History

### 4.9.0 (Unreleased)
### 4.9.0 (2024-11-18)

#### Features Added
* Added full text policy and full text indexing policy. See [PR 37891](https://github.com/Azure/azure-sdk-for-python/pull/37891).

#### Breaking Changes

#### Bugs Fixed

#### Other Changes
* Added support for full text search and hybrid search queries. See [PR 38275](https://github.com/Azure/azure-sdk-for-python/pull/38275).

### 4.8.0 (2024-11-12)
This version and all future versions will support Python 3.13.
Expand Down
36 changes: 36 additions & 0 deletions sdk/cosmos/azure-cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,40 @@ indexing_policy = {
Modifying the index in a container is an asynchronous operation that can take a long time to finish. See [here][cosmos_index_policy_change] for more information.
For more information on using full text policies and full text indexes, see [here][cosmos_fts].

### Public Preview - Full Text Search and Hybrid Search

With the addition of the full text indexing and full text policies, the SDK can now perform full text search and hybrid search queries.
These queries can utilize the new query functions `FullTextContains()`, `FullTextContainsAll`, and `FullTextContainsAny` to efficiently
search for the given terms within your item fields.

Beyond these, you can also utilize the new `Order By RANK` and `Order By RANK RRF` along with `FullTextScore` to execute the [BM25][BM25] scoring algorithm
or [Reciprocal Rank Fusion][RRF] (RRF) on your query, finding the items with the highest relevance to the terms you are looking for.
All of these mentioned queries would look something like this:

- `SELECT TOP 10 c.id, c.text FROM c WHERE FullTextContains(c.text, 'quantum')`


- `SELECT TOP 10 c.id, c.text FROM c WHERE FullTextContainsAll(c.text, 'quantum', 'theory')`


- `SELECT TOP 10 c.id, c.text FROM c WHERE FullTextContainsAny(c.text, 'quantum', 'theory')`


- `SELECT TOP 10 c.id, c.text FROM c ORDER BY RANK FullTextScore(c.text, ['quantum', 'theory'])`


- `SELECT TOP 10 c.id, c.text FROM c ORDER BY RANK RRF(FullTextScore(c.text, ['quantum', 'theory']), FullTextScore(c.text, ['model']))`


- `SELECT TOP 10 c.id, c.text FROM c ORDER BY RANK RRF(FullTextScore(c.text, ['quantum', 'theory']), FullTextScore(c.text, ['model']), VectorDistance(c.embedding, {item_embedding}))"`

These queries must always use a TOP or LIMIT clause within the query since hybrid search queries have to look through a lot of data otherwise and may become too expensive or long-running.
Since these queries are relatively expensive, the SDK sets a default limit of 1000 max items per query - if you'd like to raise that further, you
can use the `AZURE_COSMOS_HYBRID_SEARCH_MAX_ITEMS` environment variable to do so. However, be advised that queries with too many vector results
may have additional latencies associated with searching in the service.

You can find our sync samples [here][cosmos_index_sample] and our async samples [here][cosmos_index_sample_async] as well for additional guidance.

## Troubleshooting

### General
Expand Down Expand Up @@ -954,6 +988,8 @@ For more extensive documentation on the Cosmos DB service, see the [Azure Cosmos
[cosmos_concurrency_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/concurrency_sample.py
[cosmos_index_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/index_management.py
[cosmos_index_sample_async]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/index_management_async.py
[RRF]: https://learn.microsoft.com/azure/search/hybrid-search-ranking
[BM25]: https://learn.microsoft.com/azure/search/index-similarity-and-scoring
[cosmos_fts]: https://aka.ms/cosmosfulltextsearch
[cosmos_index_policy_change]: https://learn.microsoft.com/azure/cosmos-db/index-policy#modifying-the-indexing-policy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3141,7 +3141,9 @@ def _GetQueryPlanThroughGateway(self, query: str, resource_link: str, **kwargs:
documents._QueryFeature.OffsetAndLimit + "," +
documents._QueryFeature.OrderBy + "," +
documents._QueryFeature.Top + "," +
documents._QueryFeature.NonStreamingOrderBy)
documents._QueryFeature.NonStreamingOrderBy + "," +
documents._QueryFeature.HybridSearch + "," +
documents._QueryFeature.CountIf)
if os.environ.get('AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY', False):
supported_query_features = (documents._QueryFeature.Aggregate + "," +
documents._QueryFeature.CompositeAggregate + "," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(self, partition_key_target_range, client, collection_link, query, d
self._is_finished = False
self._has_started = False
self._cur_item = None
self._query = query
# initiate execution context

path = _base.GetPathFromLink(collection_link, "docs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def __init__(self, execution_context, aggregate_operators):
for operator in aggregate_operators:
if operator == "Average":
self._local_aggregators.append(_AverageAggregator())
elif operator == "Count":
elif operator in ("Count", "CountIf"):
self._local_aggregators.append(_CountAggregator())
elif operator == "Max":
self._local_aggregators.append(_MaxAggregator())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@

import os
from azure.cosmos._execution_context.aio import endpoint_component, multi_execution_aggregator
from azure.cosmos._execution_context.aio import non_streaming_order_by_aggregator
from azure.cosmos._execution_context.aio import non_streaming_order_by_aggregator, hybrid_search_aggregator
from azure.cosmos._execution_context.aio.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context.aio.base_execution_context import _DefaultQueryExecutionContext
from azure.cosmos._execution_context.execution_dispatcher import _is_partitioned_execution_info
from azure.cosmos._execution_context.execution_dispatcher import _is_partitioned_execution_info,\
_is_hybrid_search_query, _verify_valid_hybrid_search_query
from azure.cosmos._execution_context.query_execution_info import _PartitionedQueryExecutionInfo
from azure.cosmos.documents import _DistinctType
from azure.cosmos.exceptions import CosmosHttpResponseError
Expand Down Expand Up @@ -89,7 +90,7 @@ async def fetch_next_block(self):
try:
return await self._execution_context.fetch_next_block()
except CosmosHttpResponseError as e:
if _is_partitioned_execution_info(e): #cross partition query not servable
if _is_partitioned_execution_info(e) or _is_hybrid_search_query(self._query, e):
query_to_use = self._query if self._query is not None else "Select * from root r"
query_execution_info = _PartitionedQueryExecutionInfo(await self._client._GetQueryPlanThroughGateway
(query_to_use, self._resource_link))
Expand Down Expand Up @@ -126,6 +127,16 @@ async def _create_pipelined_execution_context(self, query_execution_info):
self._options,
query_execution_info)
await execution_context_aggregator._configure_partition_ranges()
elif query_execution_info.has_hybrid_search_query_info():
hybrid_search_query_info = query_execution_info._query_execution_info['hybridSearchQueryInfo']
_verify_valid_hybrid_search_query(hybrid_search_query_info)
execution_context_aggregator = \
hybrid_search_aggregator._HybridSearchContextAggregator(self._client,
self._resource_link,
self._options,
query_execution_info,
hybrid_search_query_info)
await execution_context_aggregator._run_hybrid_search()
else:
execution_context_aggregator = multi_execution_aggregator._MultiExecutionContextAggregator(self._client,
self._resource_link,
Expand Down
Loading

0 comments on commit 3de7a4c

Please sign in to comment.