Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos] Adds support for non streaming ORDER BY #35468

Merged
merged 69 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
991a274
sync changes and sample for vector search control plane
simorenoh Mar 21, 2024
3a3a652
Update index_management.py
simorenoh Mar 21, 2024
20f533c
Update index_management.py
simorenoh Mar 21, 2024
09f33b7
async and samples
simorenoh Mar 28, 2024
8e527fd
sync and async tests
simorenoh Mar 28, 2024
7c44137
Update CHANGELOG.md
simorenoh Mar 28, 2024
7eb5439
developed typehints
simorenoh Mar 28, 2024
c428476
skip tests
simorenoh Mar 29, 2024
58000fd
create_if_not_exists, README
simorenoh Apr 2, 2024
4c4b1ab
Update README.md
simorenoh Apr 2, 2024
0e6b24f
add provisional, add dimension limit
simorenoh Apr 3, 2024
b42f3cb
Merge branch 'main' into vector-search-query
simorenoh Apr 16, 2024
fef391d
adds sync changes, adds changelog
simorenoh May 3, 2024
8583dbf
async changes
simorenoh May 3, 2024
158f60f
some comments addressed
simorenoh May 3, 2024
c880436
Update CHANGELOG.md
simorenoh May 3, 2024
a414f05
bug fix on ordering
simorenoh May 8, 2024
d217210
ordering bug fix
simorenoh May 8, 2024
8869ea4
fix datetime
simorenoh May 8, 2024
0c6d8eb
samples added
simorenoh May 8, 2024
30b0645
small fixes
simorenoh May 9, 2024
5056d89
fix some additional PQ logic
simorenoh May 9, 2024
358deae
last bit of pq fixes
simorenoh May 9, 2024
617c709
Update non_streaming_order_by_aggregator.py
simorenoh May 9, 2024
73e3709
memory optimization
simorenoh May 10, 2024
6bb8090
Update sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/do…
simorenoh May 10, 2024
326b155
Merge branch 'main' into vector-search-query
simorenoh May 10, 2024
540a645
addressing comments
simorenoh May 10, 2024
98a4fc9
test name fix, improve readme/ samples
simorenoh May 10, 2024
d487519
add sync tests, improve readme
simorenoh May 10, 2024
abd2bc0
async tests
simorenoh May 10, 2024
a0547b1
pylint
simorenoh May 10, 2024
07acb93
remove print
simorenoh May 10, 2024
7cd5b92
pylint
simorenoh May 10, 2024
5834b29
adds env variable
simorenoh May 10, 2024
f615f3e
adds JS tests
simorenoh May 13, 2024
0081bbe
error logic improvements
simorenoh May 13, 2024
674f483
readme updates
simorenoh May 13, 2024
0e26bf6
more fixes to logic
simorenoh May 13, 2024
a65eb0a
oops
simorenoh May 13, 2024
6563bc3
memory optimization
simorenoh May 13, 2024
9935dc1
Update sdk/cosmos/azure-cosmos/README.md
simorenoh May 13, 2024
ad36a9c
update variable for naming conventions
simorenoh May 13, 2024
86b78b7
remove/ comment out diskANN
simorenoh May 13, 2024
3cff42f
offset + limit fix, tests fixes
simorenoh May 14, 2024
dd187dd
add capabilities env var flag
simorenoh May 14, 2024
d2fbb1b
use feature flag for existing query tests
simorenoh May 14, 2024
fe7742a
disable emulator for query tests
simorenoh May 14, 2024
7cd4d9d
missed some tests
simorenoh May 14, 2024
b3876c6
Update test_aggregate.py
simorenoh May 14, 2024
d8bc50d
Update test-resources.bicep
simorenoh May 15, 2024
1e699e4
forgot tests were being skipped
simorenoh May 15, 2024
e79839b
Update sdk/cosmos/azure-cosmos/test/test_vector_policy.py
Pilchie May 15, 2024
16860dc
Update sdk/cosmos/azure-cosmos/test/test_vector_policy_async.py
Pilchie May 15, 2024
1431e9e
test fixes
simorenoh May 15, 2024
28bef5b
Merge branch 'vector-search-query' of https://github.com/simorenoh/az…
simorenoh May 15, 2024
8701b80
Update README.md
simorenoh May 15, 2024
58af1bb
create separate db for vectors
simorenoh May 15, 2024
9bfdf57
tests
simorenoh May 15, 2024
45e5b6d
tests
simorenoh May 15, 2024
c4a7c60
more tests
simorenoh May 15, 2024
b6dbe45
small bit
simorenoh May 15, 2024
fca1294
final fixes hopefully
simorenoh May 15, 2024
445ba94
raise time limit on test so it doesnt fail
simorenoh May 15, 2024
f64775d
Update test_query_vector_similarity_async.py
simorenoh May 15, 2024
ae9524d
add date for release prep
simorenoh May 15, 2024
e616c4a
Merge branch 'main' into vector-search-query
simorenoh May 15, 2024
8ad2591
Update CHANGELOG.md
simorenoh May 15, 2024
fd10e89
Merge branch 'main' into vector-search-query
simorenoh May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
### 4.6.1 (Unreleased)

#### Features Added
* Adds vector embedding policy and vector indexing policy. See [PR 34882](https://github.com/Azure/azure-sdk-for-python/pull/34882).
* Adds support for vector search non-streaming order by queries. See [PR 10101](https://github.com/Azure/azure-sdk-for-python/pull/10101).
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
* Added support for using the start time option for change feed query API. See [PR 35090](https://github.com/Azure/azure-sdk-for-python/pull/35090)

#### Breaking Changes
Expand All @@ -11,7 +13,6 @@
* Fixed a bug where change feed query in Async client was not returning all pages due to case-sensitive response headers. See [PR 35090](https://github.com/Azure/azure-sdk-for-python/pull/35090)
* Fixed a bug when a retryable exception occurs in the first page of a query execution causing query to return 0 results. See [PR 35090](https://github.com/Azure/azure-sdk-for-python/pull/35090).


#### Other Changes

### 4.6.0 (2024-03-14)
Expand Down
66 changes: 66 additions & 0 deletions sdk/cosmos/azure-cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,72 @@ as well as containing the list of failed responses for the failed request.

For more information on Transactional Batch, see [Azure Cosmos DB Transactional Batch][cosmos_transactional_batch].

### Private Preview - Vector Embeddings and Vector Indexes
We have added new capabilities to utilize vector embeddings and vector indexing for users to leverage vector
search utilizing our Cosmos SDK. These two container-level configurations have to be turned on at the account-level
before you can use them.

Each vector embedding should have a path to the relevant vector field in your items being stored, a supported data type
(float32, int8, uint8), the vector's dimensions (positive int <=1536), and the distance function being used for that embedding.
A sample vector embedding policy would look like this:
```python
vector_embedding_policy = {
"vectorEmbeddings": [
{
"path": "/vector1",
"dataType": "float32",
"dimensions": 1000,
"distanceFunction": "euclidean"
},
{
"path": "/vector2",
"dataType": "int8",
"dimensions": 200,
"distanceFunction": "dotproduct"
},
{
"path": "/vector3",
"dataType": "uint8",
"dimensions": 400,
"distanceFunction": "cosine"
}
]
}
```

Separately, vector indexes have been added to the already existing indexing_policy and only require two fields per index:
the path to the relevant field to be used, and the type of index from the possible options (flat, quantizedFlat, or diskANN).
A sample indexing policy with vector indexes would look like this:
```python
indexing_policy = {
"automatic": True,
"indexingMode": "consistent",
"compositeIndexes": [
[
{"path": "/numberField", "order": "ascending"},
{"path": "/stringField", "order": "descending"}
]
],
"spatialIndexes": [
{"path": "/location/*", "types": [
"Point",
"Polygon"]}
],
"vectorIndexes": [
{"path": "/vector1", "type": "flat"},
{"path": "/vector2", "type": "quantizedFlat"},
{"path": "/vector3", "type": "diskANN"}
]
}
```
You would then pass in the relevant policies to your container creation method to ensure these configurations are used by it.
The operation will fail if you pass new vector indexes to your indexing policy but forget to pass in an embedding policy.
```python
database.create_container(id=container_id, partition_key=PartitionKey(path="/id"),
indexing_policy=indexing_policy, vector_embedding_policy=vector_embedding_policy)
```
***Note: vector embeddings and vector indexes CANNOT be edited by container replace operations. They are only available directly through creation.***

## Troubleshooting

### General
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3107,7 +3107,8 @@ def _GetQueryPlanThroughGateway(self, query: str, resource_link: str, **kwargs:
documents._QueryFeature.MultipleOrderBy + "," +
documents._QueryFeature.OffsetAndLimit + "," +
documents._QueryFeature.OrderBy + "," +
documents._QueryFeature.Top)
documents._QueryFeature.Top + "," +
documents._QueryFeature.NonStreamingOrderBy)

options = {
"contentType": runtime_constants.MediaTypes.Json,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,57 @@ def _validate_orderby_items(self, res1, res2):
type2 = _OrderByHelper.getTypeStr(elt2)
if type1 != type2:
raise ValueError("Expected {}, but got {}.".format(type1, type2))


class _NonStreamingDocumentProducer(object):
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
"""This class takes care of handling of the items to be sorted in a non-streaming context.

One instance of this document producer goes attached to every item coming in for the priority queue to be able
to properly sort items as they get inserted.
"""

def __init__(self, item_result, sort_order):
"""
Constructor
"""
self._item_result = item_result
self._doc_producer_comp = _NonStreamingOrderByComparator(sort_order)

def __lt__(self, other):
return self._doc_producer_comp.compare(self, other) < 0


class _NonStreamingOrderByComparator(object):
"""Provide a Comparator for item results which respects orderby sort order.
"""

def __init__(self, sort_order): # pylint: disable=super-init-not-called
"""Instantiates this class

:param list sort_order:
List of sort orders (i.e., Ascending, Descending)

:ivar list sort_order:
List of sort orders (i.e., Ascending, Descending)

"""
self._sort_order = sort_order

def compare(self, doc_producer1, doc_producer2):
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
"""Compares the given two instances of DocumentProducers.

Based on the orderby query items and whether the sort order is Ascending
or Descending compares the peek result of the two DocumentProducers.

:param _DocumentProducer doc_producer1: first instance to be compared
:param _DocumentProducer doc_producer2: second instance to be compared
:return:
Integer value of compare result.
positive integer if doc_producers1 > doc_producers2
negative integer if doc_producers1 < doc_producers2
:rtype: int
"""
# TODO: this is not fully safe - doesn't deal with scenario of having orderByItems of [{}]
rank1 = doc_producer1._item_result["orderByItems"][0]['item']
rank2 = doc_producer2._item_result["orderByItems"][0]['item']
return _compare_helper(rank1, rank2)
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ def __next__(self):

next = __next__ # Python 2 compatibility.

class _QueryExecutionNonStreamingEndpointComponent(_QueryExecutionEndpointComponent):
"""Represents an endpoint in handling a non-streaming order by query results.

For each processed orderby result it returns the item result.
"""
def __next__(self):
return next(self._execution_context)._item_result["payload"]


class _QueryExecutionTopEndpointComponent(_QueryExecutionEndpointComponent):
"""Represents an endpoint in handling top query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@

import json
from azure.cosmos.exceptions import CosmosHttpResponseError
from azure.cosmos._execution_context import multi_execution_aggregator
from azure.cosmos._execution_context import endpoint_component, multi_execution_aggregator
from azure.cosmos._execution_context import non_streaming_order_by_aggregator
from azure.cosmos._execution_context.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context.base_execution_context import _DefaultQueryExecutionContext
from azure.cosmos._execution_context.query_execution_info import _PartitionedQueryExecutionInfo
from azure.cosmos._execution_context import endpoint_component
from azure.cosmos.documents import _DistinctType
from azure.cosmos.http_constants import StatusCodes, SubStatusCodes

Expand Down Expand Up @@ -111,15 +111,29 @@ def fetch_next_block(self):
return self._execution_context.fetch_next_block()

def _create_pipelined_execution_context(self, query_execution_info):

assert self._resource_link, "code bug, resource_link is required."
if query_execution_info.has_aggregates() and not query_execution_info.has_select_value():
if self._options and ("enableCrossPartitionQuery" in self._options
and self._options["enableCrossPartitionQuery"]):
raise CosmosHttpResponseError(StatusCodes.BAD_REQUEST,
"Cross partition query only supports 'VALUE <AggregateFunc>' for aggregates")

execution_context_aggregator = multi_execution_aggregator._MultiExecutionContextAggregator(self._client,
# throw exception here for vector search query without limit filter
if query_execution_info.get_has_non_streaming_order_by():
if query_execution_info.get_top() is None and query_execution_info.get_limit() is None:
# TODO: missing one last if statement here to check for the system variable bypass - need name
raise CosmosHttpResponseError(StatusCodes.BAD_REQUEST,
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
"Executing a vector search query without TOP or LIMIT can consume many" +
" RUs very fast and have long runtimes. Please ensure you are using one" +
" of the two filters with your vector search query.")
execution_context_aggregator =\
non_streaming_order_by_aggregator._NonStreamingOrderByContextAggregator(self._client,
self._resource_link,
self._query,
self._options,
query_execution_info)
else:
execution_context_aggregator = multi_execution_aggregator._MultiExecutionContextAggregator(self._client,
self._resource_link,
self._query,
self._options,
Expand Down Expand Up @@ -147,7 +161,9 @@ def __init__(self, client, options, execution_context, query_execution_info):
self._endpoint = endpoint_component._QueryExecutionEndpointComponent(execution_context)

order_by = query_execution_info.get_order_by()
if order_by:
if query_execution_info.get_has_non_streaming_order_by():
self._endpoint = endpoint_component._QueryExecutionNonStreamingEndpointComponent(self._endpoint)
elif order_by:
self._endpoint = endpoint_component._QueryExecutionOrderByEndpointComponent(self._endpoint)

aggregates = query_execution_info.get_aggregates()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# The MIT License (MIT)
# Copyright (c) 2024 Microsoft Corporation

"""Internal class for multi execution context aggregator implementation in the Azure Cosmos database service.
"""

import heapq
from azure.cosmos._execution_context.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context import document_producer
from azure.cosmos._routing import routing_range
from azure.cosmos import exceptions


# pylint: disable=protected-access

class FixedSizePriorityQueue:
"""Provides a Fixed Size Priority Queue abstraction data structure"""

def __init__(self, max_size):
self._heap = []
self.max_size = max_size

def pop(self):
return heapq.heappop(self._heap)

def push(self, item):
if len(self._heap) < self.max_size:
heapq.heappush(self._heap, item)
else:
heapq.heappushpop(self._heap, item)

def peek(self):
return self._heap[0]

def size(self):
return len(self._heap)


class _NonStreamingOrderByContextAggregator(_QueryExecutionContextBase):
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
"""This class is a subclass of the query execution context base and serves for
non-streaming order by queries. It is very similar to the existing MultiExecutionContextAggregator,
but is needed since we're dealing with items and not document producers.

This class builds upon the multi-execution aggregator, building a document producer per partition
and draining their results entirely in order to create the result set relevant to the filters passed
by the user.
"""

def __init__(self, client, resource_link, query, options, partitioned_query_ex_info):
super(_NonStreamingOrderByContextAggregator, self).__init__(client, options)

# use the routing provider in the client
self._routing_provider = client._routing_map_provider
self._client = client
self._resource_link = resource_link
self._query = query
self._partitioned_query_ex_info = partitioned_query_ex_info
self._sort_orders = partitioned_query_ex_info.get_order_by()

# will be a list of (partition_min, partition_max) tuples
targetPartitionRanges = self._get_target_partition_key_range()

self._document_producer_comparator = document_producer._NonStreamingOrderByComparator(self._sort_orders)

targetPartitionQueryExecutionContextList = []
for partitionTargetRange in targetPartitionRanges:
# create a document producer for each partition key range
targetPartitionQueryExecutionContextList.append(
self._createTargetPartitionQueryExecutionContext(partitionTargetRange)
)

self._doc_producers = []
# verify all document producers have items/ no splits
for targetQueryExContext in targetPartitionQueryExecutionContextList:
try:
targetQueryExContext.peek()
self._doc_producers.append(targetQueryExContext)
except exceptions.CosmosHttpResponseError as e:
if exceptions._partition_range_is_gone(e):
# repairing document producer context on partition split
self._repair_document_producer()
else:
raise
except StopIteration:
continue

pq_size = partitioned_query_ex_info.get_top() or partitioned_query_ex_info.get_limit()
self._orderByPQ = FixedSizePriorityQueue(pq_size)
for doc_producer in self._doc_producers:
while True:
try:
result = doc_producer.peek()
item_result = document_producer._NonStreamingDocumentProducer(result, self._sort_orders)
self._orderByPQ.push(item_result)
next(doc_producer)
except StopIteration:
break

def __next__(self):
"""Returns the next item result.

:return: The next result.
:rtype: dict
:raises StopIteration: If no more results are left.
"""
if self._orderByPQ.size() > 0:
res = self._orderByPQ.pop()
return res
raise StopIteration

def fetch_next_block(self):
raise NotImplementedError("You should use pipeline's fetch_next_block.")

def _repair_document_producer(self):
"""Repairs the document producer context by using the re-initialized routing map provider in the client,
which loads in a refreshed partition key range cache to re-create the partition key ranges.
After loading this new cache, the document producers get re-created with the new valid ranges.
"""
# refresh the routing provider to get the newly initialized one post-refresh
self._routing_provider = self._client._routing_map_provider
# will be a list of (partition_min, partition_max) tuples
targetPartitionRanges = self._get_target_partition_key_range()

targetPartitionQueryExecutionContextList = []
for partitionTargetRange in targetPartitionRanges:
# create and add the child execution context for the target range
targetPartitionQueryExecutionContextList.append(
self._createTargetPartitionQueryExecutionContext(partitionTargetRange)
)

self._doc_producers = []
for targetQueryExContext in targetPartitionQueryExecutionContextList:
try:
# TODO: we can also use more_itertools.peekable to be more python friendly
targetQueryExContext.peek()
# if there are matching results in the target ex range add it to the priority queue
self._doc_producers.append(targetQueryExContext)

except StopIteration:
continue

def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range):

rewritten_query = self._partitioned_query_ex_info.get_rewritten_query()
if rewritten_query:
if isinstance(self._query, dict):
# this is a parameterized query, collect all the parameters
query = dict(self._query)
query["query"] = rewritten_query
else:
query = rewritten_query
else:
query = self._query

return document_producer._DocumentProducer(
partition_key_target_range,
self._client,
self._resource_link,
query,
self._document_producer_comparator,
self._options,
)

def _get_target_partition_key_range(self):
query_ranges = self._partitioned_query_ex_info.get_query_ranges()
return self._routing_provider.get_overlapping_ranges(
self._resource_link, [routing_range.Range.ParseFromDict(range_as_dict) for range_as_dict in query_ranges]
)
Loading
Loading