Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'change_feed_mode' to 'query_items_change_feed' API #38105

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
165 commits
Select commit Hold shift + click to select a range
d82870e
Add 'change_feed_mode' to 'query_items_change_feed' API
allenkim0129 Oct 25, 2024
ced92d3
remove unnecessary import
allenkim0129 Oct 25, 2024
06a8e2f
Fix lint
allenkim0129 Oct 28, 2024
c1bd9bb
Updated CHANGELOG.md
allenkim0129 Oct 28, 2024
b663998
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Oct 28, 2024
cf9e4b0
Removed _feed_range.py
allenkim0129 Oct 28, 2024
e0a1b18
Addressed comments
allenkim0129 Oct 29, 2024
357b4f8
Fixed lint
allenkim0129 Oct 29, 2024
3592bac
Add kwargs back to 'call'QueryItemsChangeFeed'
allenkim0129 Oct 29, 2024
e36392c
Fixed syntax error with f-string
allenkim0129 Oct 29, 2024
e57aef3
Removed StrEnum to support earlier Python versions
allenkim0129 Oct 29, 2024
0071042
Fixed f-string error
allenkim0129 Oct 31, 2024
6d65a5d
addressed comments
allenkim0129 Nov 1, 2024
c7d51af
Addressed comments
allenkim0129 Nov 5, 2024
c87ec71
Removed unnecessary tests
allenkim0129 Nov 5, 2024
cf6f518
Fix tests for emulator
allenkim0129 Nov 5, 2024
8f462ce
Generating SDK with model renames (#38108)
hamshavathimunibyraiah Oct 28, 2024
92d01cb
[Identity][Monitor] Update live test setup (#37943)
pvaneck Oct 28, 2024
c4bc225
Clean-up cosmos test pipeline (#38126)
weshaggard Oct 28, 2024
2e57e49
Multi modal eval fix (#38134)
w-javed Oct 28, 2024
e6f9bc1
azure-ai-evaluation release 1.0.0b5 2024-10-28 (#38138)
nagkumar91 Oct 29, 2024
c328607
open apiview for mgmt sdk (#38143)
msyyc Oct 29, 2024
bf4db51
[AutoRelease] t2-appplatform-2024-10-25-72111(can only be merged by S…
azure-sdk Oct 29, 2024
1e5f7e9
AzurePipelinesCredential | adding mlflow uri func (#36580)
kshitij-microsoft Oct 29, 2024
71aa1a3
Update changelog (#38133)
sanchez-alex Oct 29, 2024
faf53ba
Increment package version after release of azure-ai-evaluation (#38142)
azure-sdk Oct 29, 2024
984f89d
Remove psycopg2-binary from dev_requirements.txt (#38103)
lzchen Oct 29, 2024
80eb904
[Evaluation] Error improve for service-based evaluator/simulator (#38…
ninghu Oct 29, 2024
5741d7b
AzMon exporter: Serialize complex log bodies to json and set dependen…
lmolkova Oct 29, 2024
cd55049
Experimental tags on ADV scenarios (#38166)
nagkumar91 Oct 29, 2024
1381ce3
Sync eng/common directory with azure-sdk-tools for PR 9259 (#38160)
azure-sdk Oct 29, 2024
bb075f5
Re-generated REST client after re-copying Swagger folder for `2024-10…
u9009 Oct 29, 2024
5ca4f33
[Evaluation] Change RougeType to Enum (#38131)
ninghu Oct 29, 2024
2063360
Auto-enable Azure AI Inference instrumentation in Azure Monitor, upda…
lmolkova Oct 30, 2024
6c83494
[AutoRelease] t2-redhatopenshift-2024-10-30-81004(can only be merged …
azure-sdk Oct 30, 2024
69fc484
[AutoRelease] t2-resourcehealth-2024-10-30-72592(can only be merged b…
azure-sdk Oct 30, 2024
19ca69b
[AutoRelease] t2-appconfiguration-2024-10-30-38914(can only be merged…
azure-sdk Oct 30, 2024
c2cdda1
[AutoRelease] t2-databox-2024-10-30-61405(can only be merged by SDK o…
azure-sdk Oct 30, 2024
af19113
[AutoRelease] t2-edgeorder-2024-10-30-57522(can only be merged by SDK…
azure-sdk Oct 30, 2024
142a578
[AutoRelease] t2-extendedlocation-2024-10-30-79235(can only be merged…
azure-sdk Oct 30, 2024
b214335
[AutoRelease] t2-digitaltwins-2024-10-30-74766(can only be merged by …
azure-sdk Oct 30, 2024
e3852f6
Added get_arm_info (#38018)
xiangyan99 Oct 30, 2024
a3c3f4d
Update CHANGELOG.md (#38170)
changliu2 Oct 30, 2024
20a251f
Minor Readme fix (#38191)
nagkumar91 Oct 30, 2024
6603ca3
Minor fixes in vanilla OTel tracing sample (#38194)
lmolkova Oct 30, 2024
1c3ffe2
Add test for get_arm_endpoints (#38196)
xiangyan99 Oct 30, 2024
0619608
Add overloads for __call__ methods that accept query/response and con…
needuv Oct 30, 2024
0c48225
[Monitor] Apply black formatting (#38129)
pvaneck Oct 30, 2024
9ec0264
[CI] Update autorest CI to use Python 3.9 (#38175)
pvaneck Oct 30, 2024
3f3ab5e
Eval qr json lines now has context from both turns and category if it…
nagkumar91 Oct 30, 2024
83dfe39
Fix doc issues (#38204)
YalinLi0312 Oct 30, 2024
2c3ad69
Evaluation: Remove `parallel` from composite evaluators (#38168)
ninghu Oct 30, 2024
052acb8
[Core] Allow operation-level tracing attributes (#38164)
pvaneck Oct 30, 2024
e44cb7c
Sync eng/common directory with azure-sdk-tools for PR 9281 (#38213)
azure-sdk Oct 30, 2024
070285e
Sync eng/common directory with azure-sdk-tools for PR 9290 (#38223)
azure-sdk Oct 31, 2024
7ab343b
update (#38220)
msyyc Oct 31, 2024
596b0f5
[AutoRelease] t2-containerservicefleet-2024-10-31-68497(can only be m…
azure-sdk Oct 31, 2024
090851a
[BatchAI] deprecate azure-mgmt-batchai (#38226)
swathipil Oct 31, 2024
a2dfe14
[ModelsRepository] deprecating azure-iot-modelrepository (#38225)
swathipil Oct 31, 2024
20e67ee
[ServerManager] deprecating azure-mgmt-servermanager (#38229)
swathipil Oct 31, 2024
2d4b381
[DocumentDB] deprecate azure-mgmt-documentdb (#38227)
swathipil Oct 31, 2024
499c99c
[EH/SB] ran black (#38210)
l0lawrence Oct 31, 2024
62ff0b7
Update randomization pattern for Adversarial simulation (#38211)
slister1001 Oct 31, 2024
e6c2c82
amqp msg (#38122)
l0lawrence Oct 31, 2024
25d4b5d
Implement live metrics filtering for charts (part 1) (#37998)
lzchen Oct 31, 2024
88ddb5d
Update CODEOWNERS for graphrbac owner (#38236)
swathipil Oct 31, 2024
75f9093
Multi modal docstring improvements (#38193)
w-javed Oct 31, 2024
23b5424
Increment package version after release of azure-core (#38240)
azure-sdk Oct 31, 2024
17ecb3c
kwarg type hints (#38214)
mrm9084 Oct 31, 2024
f9b29ee
[Evaluation] add environment variable for API token refresh rate (#38…
slister1001 Oct 31, 2024
e73ed33
[Evaluation] Default to non-randomized order of template parameters (…
slister1001 Oct 31, 2024
deaabd2
resolve issue with language-settings handling additional service chan…
scbedd Oct 31, 2024
20d8d9d
Reduce unnecessary delete calls to ARM for storage accounts (#38246)
azure-sdk Oct 31, 2024
9c380c9
clean up unused python script (#38128)
scbedd Oct 31, 2024
e3f68e1
Sync eng/common directory with azure-sdk-tools for PR 9288 (#38243)
azure-sdk Oct 31, 2024
127741d
[Scheduler] deprecate azure-mgmt-scheduler (#38228)
swathipil Oct 31, 2024
5f9f6f4
[ServiceManagement] deprecate azure-servicemanagement-legacy (#38230)
swathipil Oct 31, 2024
b678cd1
[GraphRBAC] deprecating package (#38224)
swathipil Oct 31, 2024
f1e8d6a
Enable py2docfx docs gen tool, remove the dockerimage docs validation…
JimSuplizio Oct 31, 2024
0655653
Sync eng/common directory with azure-sdk-tools for PR 9294 (#38251)
azure-sdk Oct 31, 2024
c9ef152
[core] add servicemanagement legacy to ci for release (#38253)
swathipil Oct 31, 2024
4fecbe3
Session Token Management APIs (#36971)
allenkim0129 Nov 5, 2024
e92a163
[AutoRelease] t2-network-2024-10-31-29845(can only be merged by SDK o…
azure-sdk Nov 1, 2024
7acf41b
[ServiceBus/EventHub] add service specific message annotations to rec…
swathipil Nov 1, 2024
9d2c7fc
Updating CODEOWNERS for Synapse (#38255)
swathipil Nov 1, 2024
d9ea7a8
Evaluation: Fix the `output_path` parameter of `evaluate` API doesn't…
ninghu Nov 1, 2024
0009679
[synapse] deprecate azure-synapse (#38262)
swathipil Nov 1, 2024
4c2ad17
[DocumentDB] update deprecation release date (#38265)
swathipil Nov 1, 2024
08107df
[CognitiveServices] deprecate vision packages (#38206)
swathipil Nov 1, 2024
677832b
RAI service input sanitization (#38247)
MilesHolland Nov 1, 2024
d9324d5
pass params from ci.yml to cosmos-sdk-client appropriately (#38272)
scbedd Nov 1, 2024
7d09bd3
Fix __call__ Overload Types (#38238)
needuv Nov 1, 2024
cc5c394
Update deprecation_process.md (#38270)
swathipil Nov 1, 2024
d49ac47
[DocumentDB] add changelog to manifest.ini (#38273)
swathipil Nov 1, 2024
e0dac67
[evaluation] Add support for using evaluate() with evaluators that ha…
diondrapeck Nov 2, 2024
d12aece
disabled black in pyproject.toml for all packages (#38271)
weirongw23-msft Nov 2, 2024
c1ef0e8
[AutoRelease] t2-postgresqlflexibleservers-2024-10-30-49242(can only …
azure-sdk Nov 4, 2024
b056604
[AutoRelease] t2-devtestlabs-2024-11-04-17468(can only be merged by S…
azure-sdk Nov 4, 2024
f1c1a0a
[AutoRelease] t2-sql-2024-10-03-42323(can only be merged by SDK owner…
azure-sdk Nov 4, 2024
a7d5ca0
[EventHub] add ssl_context kwarg to clients (#37702)
swathipil Nov 4, 2024
5eedf8b
Update CHANGELOG.md (#38301)
lzchen Nov 4, 2024
96ba2ea
download_file is fully annotated (#38284)
weirongw23-msft Nov 4, 2024
620fd8c
Release azure-monitor-opentelemetry-exporter (#38310)
lzchen Nov 4, 2024
d9d8ca8
Increment package version after release of azure-monitor-opentelemetr…
azure-sdk Nov 4, 2024
3560c15
Eval/bugfix/content safety parallel (#38307)
MilesHolland Nov 4, 2024
9a453d6
target newly released proxy version (#38282)
azure-sdk Nov 4, 2024
63dd783
[Storage] Added connection pool note to `max_concurrency` kwarg for u…
weirongw23-msft Nov 4, 2024
cab8727
Sync eng/common directory with azure-sdk-tools for PR 9308 (#38311)
azure-sdk Nov 4, 2024
4b85898
Version/location updates for stress script usage (#38281)
azure-sdk Nov 5, 2024
e6855c2
[AutoRelease] t2-loganalytics-2024-11-04-45063(can only be merged by …
azure-sdk Nov 5, 2024
e70e2b4
[AutoRelease] t2-automation-2024-11-04-74277(can only be merged by SD…
azure-sdk Nov 5, 2024
c3ce7dc
Broker on mac support (#38274)
xiangyan99 Nov 5, 2024
02c72b7
Add firewallsku as ManagedNetwork property (#37885)
Nethracs Nov 5, 2024
4f1a889
[AutoRelease] t2-managementgroups-2024-11-04-45946(can only be merged…
azure-sdk Nov 5, 2024
61228e6
[AutoRelease] t2-managedservices-2024-11-04-44075(can only be merged …
azure-sdk Nov 5, 2024
7c8af04
[AutoRelease] t2-marketplaceordering-2024-11-04-08673(can only be mer…
azure-sdk Nov 5, 2024
b5b8d8c
[AutoRelease] t2-servicebus-2024-11-04-58886(can only be merged by SD…
azure-sdk Nov 5, 2024
53b79b0
[Synapse] azure-synapse post deprecation (#38315)
swathipil Nov 5, 2024
3a539de
[Cognitive Services] vision post-deprecation (#38304)
swathipil Nov 5, 2024
5a3a914
[Cosmos] documentdb post deprecation (#38314)
swathipil Nov 5, 2024
a726059
[sdk generation pipeline] fix logic to extract swagger file (#38334)
msyyc Nov 5, 2024
c78ea56
Update deprecation_process.md for Verify Readmes failure (#38333)
swathipil Nov 5, 2024
5d20ca2
Increment package version after release of azure-identity-broker (#38…
azure-sdk Nov 5, 2024
f7926ae
Edit pass on Azure Identity Broker README (#38339)
scottaddie Nov 5, 2024
eca23e6
[Core] Deprecate OpenCensus tracing plugin (#37975)
pvaneck Nov 5, 2024
98822e2
[Core] servicemanagement-legacy post deprecation (#38319)
swathipil Nov 5, 2024
a0ea744
Prompt support for Inference SDK (#37917)
YusakuNo1 Nov 5, 2024
6077526
Remove a defunct variable from docindex.yml (#38342)
JimSuplizio Nov 5, 2024
c34c723
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Nov 5, 2024
b20c6dc
Fix errors from sphinx and mypy
allenkim0129 Nov 6, 2024
db24ede
Changed parameter to `mode`
allenkim0129 Nov 6, 2024
394d7b3
Fixed typo
allenkim0129 Nov 6, 2024
8e54e8f
Changed 'mode' to be string type
allenkim0129 Nov 6, 2024
cff439f
Reverted necessary type def
allenkim0129 Nov 7, 2024
e43b025
Addressed comments
allenkim0129 Nov 7, 2024
fcf4c04
Added samples for change_feed_mode
allenkim0129 Nov 8, 2024
e8b7fcd
Addressed comments
allenkim0129 Nov 11, 2024
f67326b
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Nov 11, 2024
a5f8e46
Removed unnecessary docstring
allenkim0129 Nov 13, 2024
b2c0a5e
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Nov 14, 2024
b3126fb
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Nov 15, 2024
5b49d0f
Remove mode if 'continuation' was in override definition
allenkim0129 Nov 15, 2024
8b29b72
add test samples tracking (#38502)
kristapratico Nov 15, 2024
ae91385
Add OpenTelemetry LoggingHandler conditionally (#38549)
lzchen Nov 15, 2024
faf6625
Add helpers to log a GitHub "notice" (#38574)
azure-sdk Nov 16, 2024
1fed8a5
[AutoRelease] t2-cosmosdb-2024-11-14-60943(can only be merged by SDK …
azure-sdk Nov 18, 2024
dfe0768
[AutoRelease] t2-mysqlflexibleservers-2024-11-05-47456(can only be me…
azure-sdk Nov 18, 2024
1154089
[AutoRelease] t2-netapp-2024-11-08-58381(can only be merged by SDK ow…
azure-sdk Nov 18, 2024
d3700a9
Shrike (#38560)
achauhan-scc Nov 18, 2024
808fd67
Datastore auth bug (#38586)
achauhan-scc Nov 18, 2024
839ee96
Increment package version after release of azure-search-documents (#3…
azure-sdk Nov 18, 2024
ea1c691
Merge App Config Provider Beta to Main (#38579)
mrm9084 Nov 18, 2024
e716f4b
batching adjustments for create-prjobmatrix (#38597)
azure-sdk Nov 18, 2024
228fbcd
[EG] resource notification event (#38100)
l0lawrence Nov 18, 2024
9549143
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Nov 18, 2024
c6e2f1b
Updated doc strings
allenkim0129 Nov 18, 2024
217dabc
Merged main
allenkim0129 Nov 21, 2024
4334dc1
Addressed comments
allenkim0129 Nov 21, 2024
19ebb4e
Revert "Merged main"
allenkim0129 Nov 22, 2024
4ffd502
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Nov 22, 2024
8a3d3dd
Added comment why it is safe to raise exception if mode was missing
allenkim0129 Nov 22, 2024
ba938f6
Added comment why it is safe to raise exception if mode was missing
allenkim0129 Nov 22, 2024
3674c6d
Merge remote-tracking branch 'origin/users/allekim/feature/feedRangeA…
allenkim0129 Nov 22, 2024
4231057
Moved the feature update log under unreleased features
allenkim0129 Nov 22, 2024
eb093a4
Add missing period in changelog
allenkim0129 Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@
'supported_query_features': 'supportedQueryFeatures',
'query_version': 'queryVersion',
'priority': 'priorityLevel',
'no_response': 'responsePayloadOnWriteDisabled'
'no_response': 'responsePayloadOnWriteDisabled',
'max_item_count': 'maxItemCount',
}

# Cosmos resource ID validation regex breakdown:
Expand Down Expand Up @@ -170,6 +171,7 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
# set consistency level. check if set via options, this will override the default
if options.get("consistencyLevel"):
consistency_level = options["consistencyLevel"]
# TODO: move this line outside of if-else cause to remove the code duplication
headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level
elif default_client_consistency_level is not None:
consistency_level = default_client_consistency_level
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional, Union, List, Any, Dict, Deque
import warnings

from typing_extensions import Literal
allenkim0129 marked this conversation as resolved.
Show resolved Hide resolved

from azure.cosmos import http_constants
from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromInternal, \
Expand All @@ -48,6 +51,17 @@ class ChangeFeedStateVersion(Enum):
V1 = "v1"
V2 = "v2"

class ChangeFeedMode(Enum):
LATEST_VERSION = "LatestVersion"
ALL_VERSIONS_AND_DELETES = "AllVersionsAndDeletes"

def __str__(self):
return self.value

@classmethod
def to_string(cls):
allenkim0129 marked this conversation as resolved.
Show resolved Hide resolved
return f"[{', '.join(f"'{item.value}'" for item in cls)}]"

class ChangeFeedState(ABC):
version_property_name = "v"

Expand Down Expand Up @@ -96,7 +110,8 @@ def from_json(
raise ValueError("Invalid base64 encoded continuation string [Missing version]")

if version == ChangeFeedStateVersion.V2.value:
return ChangeFeedStateV2.from_continuation(container_link, container_rid, continuation_json)
change_feed_mode = change_feed_state_context.get("changeFeedMode")
return ChangeFeedStateV2.from_continuation(container_link, container_rid, continuation_json, change_feed_mode)

raise ValueError("Invalid base64 encoded continuation string [Invalid version]")

Expand Down Expand Up @@ -125,9 +140,10 @@ def __init__(
self._continuation = continuation
super(ChangeFeedStateV1, self).__init__(ChangeFeedStateVersion.V1)

@property
def container_rid(self):
return self._container_rid
# Not in use
# @property
# def container_rid(self):
# return self._container_rid

@classmethod
def from_json(
Expand Down Expand Up @@ -180,14 +196,14 @@ class ChangeFeedStateV2(ChangeFeedState):
change_feed_start_from_property_name = "startFrom"
continuation_property_name = "continuation"

# TODO: adding change feed mode
def __init__(
self,
container_link: str,
container_rid: str,
feed_range: FeedRangeInternal,
change_feed_start_from: ChangeFeedStartFromInternal,
continuation: Optional[FeedRangeCompositeContinuation]
continuation: Optional[FeedRangeCompositeContinuation],
change_feed_mode: ChangeFeedMode
) -> None:

self._container_link = container_link
Expand All @@ -208,27 +224,32 @@ def __init__(
else:
self._continuation = continuation

if change_feed_mode is None or change_feed_mode == ChangeFeedMode.LATEST_VERSION:
self._change_feed_mode = http_constants.HttpHeaders.IncrementalFeedHeaderValue
allenkim0129 marked this conversation as resolved.
Show resolved Hide resolved
elif change_feed_mode == ChangeFeedMode.ALL_VERSIONS_AND_DELETES:
self._change_feed_mode = http_constants.HttpHeaders.FullFidelityFeedHeaderValue
else:
raise ValueError(f"Invalid change_feed_mode was used: '{change_feed_mode}'. Supported change_feed_modes are 'LatestVersion' and 'AllVersionsAndDeletes'.")

super(ChangeFeedStateV2, self).__init__(ChangeFeedStateVersion.V2)

@property
def container_rid(self) -> str :
return self._container_rid
# Not in use
# @property
# def container_rid(self) -> str :
# return self._container_rid

def to_dict(self) -> Dict[str, Any]:
return {
self.version_property_name: ChangeFeedStateVersion.V2.value,
self.container_rid_property_name: self._container_rid,
self.change_feed_mode_property_name: "Incremental",
self.change_feed_mode_property_name: self._change_feed_mode,
self.change_feed_start_from_property_name: self._change_feed_start_from.to_dict(),
self.continuation_property_name: self._continuation.to_dict() if self._continuation is not None else None
}

def populate_request_headers(
def populate_change_feed_start_from_request_headers(
self,
routing_provider: SmartRoutingMapProvider,
request_headers: Dict[str, Any]) -> None:
request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue

# When a merge happens, the child partition will contain documents ordered by LSN but the _ts/creation time
# of the documents may not be sequential.
# So when reading the changeFeed by LSN, it is possible to encounter documents with lower _ts.
Expand All @@ -237,17 +258,16 @@ def populate_request_headers(
self._change_feed_start_from.populate_request_headers(request_headers)

if self._continuation.current_token is not None and self._continuation.current_token.token is not None:
change_feed_start_from_feed_range_and_etag =\
change_feed_start_from_feed_range_and_etag = \
ChangeFeedStartFromETagAndFeedRange(
self._continuation.current_token.token,
self._continuation.current_token.feed_range)
change_feed_start_from_feed_range_and_etag.populate_request_headers(request_headers)

# based on the feed range to find the overlapping partition key range id
over_lapping_ranges =\
routing_provider.get_overlapping_ranges(
self._container_link,
[self._continuation.current_token.feed_range])
def populate_partition_key_range_id_request_headers(
self,
over_lapping_ranges,
request_headers: Dict[str, Any]) -> None:

if len(over_lapping_ranges) > 1:
raise self.get_feed_range_gone_error(over_lapping_ranges)
Expand All @@ -260,53 +280,50 @@ def populate_request_headers(
# the current token feed range spans less than single physical partition
# for this case, need to set both the partition key range id and epk filter headers
request_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_ranges[0]["id"]
request_headers[
http_constants.HttpHeaders.StartEpkString] = self._continuation.current_token.feed_range.min
request_headers[
http_constants.HttpHeaders.EndEpkString] = self._continuation.current_token.feed_range.max
request_headers[http_constants.HttpHeaders.StartEpkString] = self._continuation.current_token.feed_range.min
request_headers[http_constants.HttpHeaders.EndEpkString] = self._continuation.current_token.feed_range.max

async def populate_request_headers_async(
def populate_change_feed_mode_request_headers(
self,
async_routing_provider: AsyncSmartRoutingMapProvider,
request_headers: Dict[str, Any]) -> None:

request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue
if self._change_feed_mode == http_constants.HttpHeaders.FullFidelityFeedHeaderValue:
request_headers[http_constants.HttpHeaders.AIM] = self._change_feed_mode
request_headers[http_constants.HttpHeaders.ChangeFeedWireFormatVersion] = http_constants.HttpHeaders.Separate_meta_with_crts

# When a merge happens, the child partition will contain documents ordered by LSN but the _ts/creation time
# of the documents may not be sequential.
# So when reading the changeFeed by LSN, it is possible to encounter documents with lower _ts.
# In order to guarantee we always get the documents after customer's point start time,
# we will need to always pass the start time in the header.
self._change_feed_start_from.populate_request_headers(request_headers)
def populate_request_headers(
self,
routing_provider: SmartRoutingMapProvider,
request_headers: Dict[str, Any]) -> None:
self.populate_change_feed_start_from_request_headers(request_headers)

if self._continuation.current_token is not None and self._continuation.current_token.token is not None:
change_feed_start_from_feed_range_and_etag = \
ChangeFeedStartFromETagAndFeedRange(
self._continuation.current_token.token,
self._continuation.current_token.feed_range)
change_feed_start_from_feed_range_and_etag.populate_request_headers(request_headers)
# based on the feed range to find the overlapping partition key range id
over_lapping_ranges = \
routing_provider.get_overlapping_ranges(
self._container_link,
[self._continuation.current_token.feed_range])

self.populate_partition_key_range_id_request_headers(over_lapping_ranges, request_headers)

self.populate_change_feed_mode_request_headers(request_headers)


async def populate_request_headers_async(
self,
async_routing_provider: AsyncSmartRoutingMapProvider,
request_headers: Dict[str, Any]) -> None:
self.populate_change_feed_start_from_request_headers(request_headers)

# based on the feed range to find the overlapping partition key range id
over_lapping_ranges = \
await async_routing_provider.get_overlapping_ranges(
self._container_link,
[self._continuation.current_token.feed_range])

if len(over_lapping_ranges) > 1:
raise self.get_feed_range_gone_error(over_lapping_ranges)
self.populate_partition_key_range_id_request_headers(over_lapping_ranges, request_headers)

overlapping_feed_range = Range.PartitionKeyRangeToRange(over_lapping_ranges[0])
if overlapping_feed_range == self._continuation.current_token.feed_range:
# exactly mapping to one physical partition, only need to set the partitionKeyRangeId
request_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_ranges[0]["id"]
else:
# the current token feed range spans less than single physical partition
# for this case, need to set both the partition key range id and epk filter headers
request_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = \
over_lapping_ranges[0]["id"]
request_headers[http_constants.HttpHeaders.StartEpkString] = \
self._continuation.current_token.feed_range.min
request_headers[http_constants.HttpHeaders.EndEpkString] = \
self._continuation.current_token.feed_range.max
self.populate_change_feed_mode_request_headers(request_headers)

def populate_feed_options(self, feed_options: Dict[str, Any]) -> None:
pass
Expand All @@ -329,8 +346,9 @@ def apply_server_response_continuation(self, continuation: str, has_modified_res
def should_retry_on_not_modified_response(self) -> bool:
return self._continuation.should_retry_on_not_modified_response()

def apply_not_modified_response(self) -> None:
self._continuation.apply_not_modified_response()
# Not in use
# def apply_not_modified_response(self) -> None:
# self._continuation.apply_not_modified_response()

def get_feed_range_gone_error(self, over_lapping_ranges: List[Dict[str, Any]]) -> CosmosHttpResponseError:
formatted_message =\
Expand All @@ -349,7 +367,8 @@ def from_continuation(
cls,
container_link: str,
container_rid: str,
continuation_json: Dict[str, Any]) -> 'ChangeFeedStateV2':
continuation_json: Dict[str, Any],
change_feed_mode: ChangeFeedMode) -> 'ChangeFeedStateV2':
allenkim0129 marked this conversation as resolved.
Show resolved Hide resolved

container_rid_from_continuation = continuation_json.get(ChangeFeedStateV2.container_rid_property_name)
if container_rid_from_continuation is None:
Expand All @@ -367,12 +386,13 @@ def from_continuation(
if continuation_data is None:
raise ValueError(f"Invalid continuation: [Missing {ChangeFeedStateV2.continuation_property_name}]")
continuation = FeedRangeCompositeContinuation.from_json(continuation_data)
return ChangeFeedStateV2(
return cls(
container_link=container_link,
container_rid=container_rid,
feed_range=continuation.feed_range,
change_feed_start_from=change_feed_start_from,
continuation=continuation)
continuation=continuation,
change_feed_mode=change_feed_mode)

@classmethod
def from_initial_state(
Expand All @@ -394,6 +414,7 @@ def from_initial_state(
raise ValueError("partitionKey is in the changeFeedStateContext, but missing partitionKeyFeedRange")
else:
# default to full range
warnings.warn("'feed_range' was not given, so by default using full range", UserWarning)
feed_range = FeedRangeInternalEpk(
Range(
"",
Expand All @@ -405,11 +426,12 @@ def from_initial_state(
change_feed_start_from = (
ChangeFeedStartFromInternal.from_start_time(change_feed_state_context.get("startTime")))

if feed_range is not None:
return cls(
container_link=container_link,
container_rid=collection_rid,
feed_range=feed_range,
change_feed_start_from=change_feed_start_from,
continuation=None)
raise RuntimeError("feed_range is empty")
change_feed_mode = change_feed_state_context.get("changeFeedMode")

return cls(
container_link=container_link,
container_rid=collection_rid,
feed_range=feed_range,
change_feed_start_from=change_feed_start_from,
continuation=None,
change_feed_mode=change_feed_mode)
Loading