Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos] Adds support for non streaming ORDER BY #35468

Merged
merged 69 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
991a274
sync changes and sample for vector search control plane
simorenoh Mar 21, 2024
3a3a652
Update index_management.py
simorenoh Mar 21, 2024
20f533c
Update index_management.py
simorenoh Mar 21, 2024
09f33b7
async and samples
simorenoh Mar 28, 2024
8e527fd
sync and async tests
simorenoh Mar 28, 2024
7c44137
Update CHANGELOG.md
simorenoh Mar 28, 2024
7eb5439
developed typehints
simorenoh Mar 28, 2024
c428476
skip tests
simorenoh Mar 29, 2024
58000fd
create_if_not_exists, README
simorenoh Apr 2, 2024
4c4b1ab
Update README.md
simorenoh Apr 2, 2024
0e6b24f
add provisional, add dimension limit
simorenoh Apr 3, 2024
b42f3cb
Merge branch 'main' into vector-search-query
simorenoh Apr 16, 2024
fef391d
adds sync changes, adds changelog
simorenoh May 3, 2024
8583dbf
async changes
simorenoh May 3, 2024
158f60f
some comments addressed
simorenoh May 3, 2024
c880436
Update CHANGELOG.md
simorenoh May 3, 2024
a414f05
bug fix on ordering
simorenoh May 8, 2024
d217210
ordering bug fix
simorenoh May 8, 2024
8869ea4
fix datetime
simorenoh May 8, 2024
0c6d8eb
samples added
simorenoh May 8, 2024
30b0645
small fixes
simorenoh May 9, 2024
5056d89
fix some additional PQ logic
simorenoh May 9, 2024
358deae
last bit of pq fixes
simorenoh May 9, 2024
617c709
Update non_streaming_order_by_aggregator.py
simorenoh May 9, 2024
73e3709
memory optimization
simorenoh May 10, 2024
6bb8090
Update sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/do…
simorenoh May 10, 2024
326b155
Merge branch 'main' into vector-search-query
simorenoh May 10, 2024
540a645
addressing comments
simorenoh May 10, 2024
98a4fc9
test name fix, improve readme/ samples
simorenoh May 10, 2024
d487519
add sync tests, improve readme
simorenoh May 10, 2024
abd2bc0
async tests
simorenoh May 10, 2024
a0547b1
pylint
simorenoh May 10, 2024
07acb93
remove print
simorenoh May 10, 2024
7cd5b92
pylint
simorenoh May 10, 2024
5834b29
adds env variable
simorenoh May 10, 2024
f615f3e
adds JS tests
simorenoh May 13, 2024
0081bbe
error logic improvements
simorenoh May 13, 2024
674f483
readme updates
simorenoh May 13, 2024
0e26bf6
more fixes to logic
simorenoh May 13, 2024
a65eb0a
oops
simorenoh May 13, 2024
6563bc3
memory optimization
simorenoh May 13, 2024
9935dc1
Update sdk/cosmos/azure-cosmos/README.md
simorenoh May 13, 2024
ad36a9c
update variable for naming conventions
simorenoh May 13, 2024
86b78b7
remove/ comment out diskANN
simorenoh May 13, 2024
3cff42f
offset + limit fix, tests fixes
simorenoh May 14, 2024
dd187dd
add capabilities env var flag
simorenoh May 14, 2024
d2fbb1b
use feature flag for existing query tests
simorenoh May 14, 2024
fe7742a
disable emulator for query tests
simorenoh May 14, 2024
7cd4d9d
missed some tests
simorenoh May 14, 2024
b3876c6
Update test_aggregate.py
simorenoh May 14, 2024
d8bc50d
Update test-resources.bicep
simorenoh May 15, 2024
1e699e4
forgot tests were being skipped
simorenoh May 15, 2024
e79839b
Update sdk/cosmos/azure-cosmos/test/test_vector_policy.py
Pilchie May 15, 2024
16860dc
Update sdk/cosmos/azure-cosmos/test/test_vector_policy_async.py
Pilchie May 15, 2024
1431e9e
test fixes
simorenoh May 15, 2024
28bef5b
Merge branch 'vector-search-query' of https://github.com/simorenoh/az…
simorenoh May 15, 2024
8701b80
Update README.md
simorenoh May 15, 2024
58af1bb
create separate db for vectors
simorenoh May 15, 2024
9bfdf57
tests
simorenoh May 15, 2024
45e5b6d
tests
simorenoh May 15, 2024
c4a7c60
more tests
simorenoh May 15, 2024
b6dbe45
small bit
simorenoh May 15, 2024
fca1294
final fixes hopefully
simorenoh May 15, 2024
445ba94
raise time limit on test so it doesnt fail
simorenoh May 15, 2024
f64775d
Update test_query_vector_similarity_async.py
simorenoh May 15, 2024
ae9524d
add date for release prep
simorenoh May 15, 2024
e616c4a
Merge branch 'main' into vector-search-query
simorenoh May 15, 2024
8ad2591
Update CHANGELOG.md
simorenoh May 15, 2024
fd10e89
Merge branch 'main' into vector-search-query
simorenoh May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
## Release History

### 4.6.1 (Unreleased)
### 4.6.1 (2024-05-15)

#### Features Added
* Added support for using the start time option for change feed query API. See [PR 35090](https://github.com/Azure/azure-sdk-for-python/pull/35090)

#### Breaking Changes
* Adds vector embedding policy and vector indexing policy. See [PR 34882](https://github.com/Azure/azure-sdk-for-python/pull/34882).
* Adds support for vector search non-streaming order by queries. See [PR 35468](https://github.com/Azure/azure-sdk-for-python/pull/35468).
* Adds support for using the start time option for change feed query API. See [PR 35090](https://github.com/Azure/azure-sdk-for-python/pull/35090).

#### Bugs Fixed
* Fixed a bug where change feed query in Async client was not returning all pages due to case-sensitive response headers. See [PR 35090](https://github.com/Azure/azure-sdk-for-python/pull/35090)
* Fixed a bug where change feed query in Async client was not returning all pages due to case-sensitive response headers. See [PR 35090](https://github.com/Azure/azure-sdk-for-python/pull/35090).
* Fixed a bug when a retryable exception occurs in the first page of a query execution causing query to return 0 results. See [PR 35090](https://github.com/Azure/azure-sdk-for-python/pull/35090).


#### Other Changes

### 4.6.0 (2024-03-14)

#### Features Added
Expand Down
103 changes: 103 additions & 0 deletions sdk/cosmos/azure-cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,107 @@ as well as containing the list of failed responses for the failed request.

For more information on Transactional Batch, see [Azure Cosmos DB Transactional Batch][cosmos_transactional_batch].

### Public Preview - Vector Embeddings and Vector Indexes
We have added new capabilities to utilize vector embeddings and vector indexing for users to leverage vector
search utilizing our Cosmos SDK. These two container-level configurations have to be turned on at the account-level
before you can use them.

Each vector embedding should have a path to the relevant vector field in your items being stored, a supported data type
(float32, int8, uint8), the vector's dimensions, and the distance function being used for that embedding. Vectors indexed
with the flat index type can be at most 505 dimensions. Vectors indexed with the quantizedFlat index type can be at most 4,096 dimensions.
A sample vector embedding policy would look like this:
```python
vector_embedding_policy = {
"vectorEmbeddings": [
{
"path": "/vector1",
"dataType": "float32",
"dimensions": 256,
"distanceFunction": "euclidean"
},
{
"path": "/vector2",
"dataType": "int8",
"dimensions": 200,
"distanceFunction": "dotproduct"
},
{
"path": "/vector3",
"dataType": "uint8",
"dimensions": 400,
"distanceFunction": "cosine"
}
]
}
```

Separately, vector indexes have been added to the already existing indexing_policy and only require two fields per index:
the path to the relevant field to be used, and the type of index from the possible options (flat or quantizedFlat).
A sample indexing policy with vector indexes would look like this:
```python
indexing_policy = {
"automatic": True,
"indexingMode": "consistent",
"compositeIndexes": [
[
{"path": "/numberField", "order": "ascending"},
{"path": "/stringField", "order": "descending"}
]
],
"spatialIndexes": [
{"path": "/location/*", "types": [
"Point",
"Polygon"]}
],
"vectorIndexes": [
{"path": "/vector1", "type": "flat"},
{"path": "/vector2", "type": "quantizedFlat"}
]
}
```
You would then pass in the relevant policies to your container creation method to ensure these configurations are used by it.
The operation will fail if you pass new vector indexes to your indexing policy but forget to pass in an embedding policy.
```python
database.create_container(id=container_id, partition_key=PartitionKey(path="/id"),
indexing_policy=indexing_policy, vector_embedding_policy=vector_embedding_policy)
```
***Note: vector embeddings and vector indexes CANNOT be edited by container replace operations. They are only available directly through creation.***

### Public Preview - Vector Search

With the addition of the vector indexing and vector embedding capabilities, the SDK can now perform order by vector search queries.
These queries specify the VectorDistance to use as a metric within the query text. These must always use a TOP or LIMIT clause within the query though,
since vector search queries have to look through a lot of data otherwise and may become too expensive or long-running.
Since these queries are relatively expensive, the SDK sets a default limit of 50000 max items per query - if you'd like to raise that further, you
can use the `AZURE_COSMOS_MAX_ITEM_BUFFER_VECTOR_SEARCH` environment variable to do so. However, be advised that queries with too many vector results
may have additional latencies associated with searching in the service.
The query syntax for these operations looks like this:
```python
VectorDistance(<embedding1>, <embedding2>, [,<exact_search>], [,<specification>])
```
Embeddings 1 and 2 are the arrays of values for the relevant embeddings, `exact_search` is an optional boolean indicating whether
to do an exact search vs. an approximate one (default value of false), and `specification` is an optional Json snippet with embedding
specs that can include `dataType`, `dimensions` and `distanceFunction`. The specifications within the query will take precedence
to any configurations previously set by a vector embedding policy.
A sample vector search query would look something like this:
```python
query = "SELECT TOP 10 c.title,VectorDistance(c.embedding, [{}]) AS " \
"SimilarityScore FROM c ORDER BY VectorDistance(c.embedding, [{}])".format(embeddings_string, embeddings_string)
```
Or if you'd like to add the optional parameters to the vector distance, you could do this:
```python
query = "SELECT TOP 10 c.title,VectorDistance(c.embedding, [{}], true, {{'dataType': 'float32' , 'distanceFunction': 'cosine'}}) AS " \
"SimilarityScore FROM c ORDER BY VectorDistance(c.embedding, [{}], true, {{'dataType': " \
"'float32', 'distanceFunction': 'cosine'}})".format(embeddings_string, embeddings_string)
```
The `embeddings_string` above would be your string made from your vector embeddings.
You can find our sync samples [here][cosmos_index_sample] and our async samples [here][cosmos_index_sample_async] as well to help yourself out.

*Note: For a limited time, if your query operates against a region or emulator that has not yet been updated the client might run into some issues
not being able to recognize the new NonStreamingOrderBy capability that makes vector search possible.
If this happens, you can set the `AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY` environment variable to `"True"` to opt out of this
functionality and continue operating as usual.*

## Troubleshooting

### General
Expand Down Expand Up @@ -762,6 +863,8 @@ For more extensive documentation on the Cosmos DB service, see the [Azure Cosmos
[timeouts_document]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/docs/TimeoutAndRetriesConfig.md
[cosmos_transactional_batch]: https://learn.microsoft.com/azure/cosmos-db/nosql/transactional-batch
[cosmos_concurrency_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/concurrency_sample.py
[cosmos_index_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/index_management.py
[cosmos_index_sample_async]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/index_management_async.py

## Contributing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

"""Document client class for the Azure Cosmos database service.
"""
import os
import urllib.parse
from typing import Callable, Dict, Any, Iterable, List, Mapping, Optional, Sequence, Tuple, Union, cast, Type
from typing_extensions import TypedDict
Expand Down Expand Up @@ -3107,7 +3108,16 @@ def _GetQueryPlanThroughGateway(self, query: str, resource_link: str, **kwargs:
documents._QueryFeature.MultipleOrderBy + "," +
documents._QueryFeature.OffsetAndLimit + "," +
documents._QueryFeature.OrderBy + "," +
documents._QueryFeature.Top)
documents._QueryFeature.Top + "," +
documents._QueryFeature.NonStreamingOrderBy)
if os.environ.get('AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY', False):
supported_query_features = (documents._QueryFeature.Aggregate + "," +
documents._QueryFeature.CompositeAggregate + "," +
documents._QueryFeature.Distinct + "," +
documents._QueryFeature.MultipleOrderBy + "," +
documents._QueryFeature.OffsetAndLimit + "," +
documents._QueryFeature.OrderBy + "," +
documents._QueryFeature.Top)

options = {
"contentType": runtime_constants.MediaTypes.Json,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from azure.cosmos import _base
from azure.cosmos._execution_context.aio.base_execution_context import _DefaultQueryExecutionContext

# pylint: disable=protected-access

class _DocumentProducer(object):
"""This class takes care of handling of the results for one single partition
Expand Down Expand Up @@ -271,3 +272,51 @@ def _validate_orderby_items(self, res1, res2):
type2 = _OrderByHelper.getTypeStr(elt2)
if type1 != type2:
raise ValueError("Expected {}, but got {}.".format(type1, type2))

class _NonStreamingItemResultProducer:
"""This class takes care of handling of the items to be sorted in a non-streaming context.
One instance of this document producer goes attached to every item coming in for the priority queue to be able
to properly sort items as they get inserted.
"""

def __init__(self, item_result, sort_order):
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
"""
Constructor
:param dict[str, Any] item_result: The item result extracted from the document producer
:param list[str] sort_order: List of sort orders (i.e., Ascending, Descending)
"""
self._item_result = item_result
self._doc_producer_comp = _NonStreamingOrderByComparator(sort_order)
simorenoh marked this conversation as resolved.
Show resolved Hide resolved



class _NonStreamingOrderByComparator(object):
"""Provide a Comparator for item results which respects orderby sort order.
"""

def __init__(self, sort_order):
"""Instantiates this class
:param list sort_order:
List of sort orders (i.e., Ascending, Descending)
"""
self._sort_order = sort_order

async def compare(self, doc_producer1, doc_producer2):
"""Compares the given two instances of DocumentProducers.
Based on the orderby query items and whether the sort order is Ascending
or Descending compares the peek result of the two DocumentProducers.
:param _DocumentProducer doc_producer1: first instance to be compared
:param _DocumentProducer doc_producer2: second instance to be compared
:return:
Integer value of compare result.
positive integer if doc_producers1 > doc_producers2
negative integer if doc_producers1 < doc_producers2
:rtype: int
"""
rank1 = doc_producer1._item_result["orderByItems"][0]
rank2 = doc_producer2._item_result["orderByItems"][0]
res = await _OrderByHelper.compare(rank1, rank2)
if res != 0:
if self._sort_order[0] == "Descending":
return -res
return res
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ async def __anext__(self):
payload = await self._execution_context.__anext__()
return payload["payload"]

class _QueryExecutionNonStreamingEndpointComponent(_QueryExecutionEndpointComponent):
"""Represents an endpoint in handling a non-streaming order by query results.
For each processed orderby result it returns the item result.
"""
async def __anext__(self):
payload = await self._execution_context.__anext__()
return payload._item_result["payload"]

class _QueryExecutionTopEndpointComponent(_QueryExecutionEndpointComponent):
"""Represents an endpoint in handling top query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
Cosmos database service.
"""

from azure.cosmos._execution_context.aio import endpoint_component
from azure.cosmos._execution_context.aio import multi_execution_aggregator
import os
from azure.cosmos._execution_context.aio import endpoint_component, multi_execution_aggregator
from azure.cosmos._execution_context.aio import non_streaming_order_by_aggregator
from azure.cosmos._execution_context.aio.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context.aio.base_execution_context import _DefaultQueryExecutionContext
from azure.cosmos._execution_context.execution_dispatcher import _is_partitioned_execution_info
Expand Down Expand Up @@ -107,12 +108,31 @@ async def _create_pipelined_execution_context(self, query_execution_info):
raise CosmosHttpResponseError(StatusCodes.BAD_REQUEST,
"Cross partition query only supports 'VALUE <AggregateFunc>' for aggregates")

execution_context_aggregator = multi_execution_aggregator._MultiExecutionContextAggregator(self._client,
# throw exception here for vector search query without limit filter or limit > max_limit
if query_execution_info.get_non_streaming_order_by():
total_item_buffer = query_execution_info.get_top() or\
query_execution_info.get_limit() + query_execution_info.get_offset()
if total_item_buffer is None:
raise ValueError("Executing a vector search query without TOP or LIMIT can consume many" +
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
" RUs very fast and have long runtimes. Please ensure you are using one" +
" of the two filters with your vector search query.")
if total_item_buffer > os.environ.get('AZURE_COSMOS_MAX_ITEM_BUFFER_VECTOR_SEARCH', 50000):
raise ValueError("Executing a vector search query with more items than the max is not allowed." +
"Please ensure you are using a limit smaller than the max, or change the max.")
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
execution_context_aggregator =\
non_streaming_order_by_aggregator._NonStreamingOrderByContextAggregator(self._client,
self._resource_link,
self._query,
self._options,
query_execution_info)
await execution_context_aggregator._configure_partition_ranges()
else:
execution_context_aggregator = multi_execution_aggregator._MultiExecutionContextAggregator(self._client,
self._resource_link,
self._query,
self._options,
query_execution_info)
await execution_context_aggregator._configure_partition_ranges()
await execution_context_aggregator._configure_partition_ranges()
return _PipelineExecutionContext(self._client, self._options, execution_context_aggregator,
query_execution_info)

Expand All @@ -134,7 +154,9 @@ def __init__(self, client, options, execution_context, query_execution_info):
self._endpoint = endpoint_component._QueryExecutionEndpointComponent(execution_context)

order_by = query_execution_info.get_order_by()
if order_by:
if query_execution_info.get_non_streaming_order_by():
self._endpoint = endpoint_component._QueryExecutionNonStreamingEndpointComponent(self._endpoint)
elif order_by:
self._endpoint = endpoint_component._QueryExecutionOrderByEndpointComponent(self._endpoint)

aggregates = query_execution_info.get_aggregates()
Expand Down
Loading