Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChangeFeed pull model and switching CF processor to FeedRange #18056

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
120 commits
Select commit Hold shift + click to select a range
21e25e7
Initial draft of FeedRange artifacts
FabianMeiswinkel Nov 4, 2020
6e69300
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 4, 2020
40ef415
Iterating on FeedRange Apis
FabianMeiswinkel Nov 5, 2020
54bed2e
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 5, 2020
58f98e7
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 11, 2020
46662e0
Adding public surface area
FabianMeiswinkel Nov 12, 2020
03617a4
Adding FeedRange unit tests
FabianMeiswinkel Nov 12, 2020
b7de6b3
Adding test FeedRangePKRangeId_GetEffectiveRangesAsync_Refresh
FabianMeiswinkel Nov 13, 2020
4095bc5
Adding test FeedRangePKRangeId_GetEffectiveRangesAsync_Null
FabianMeiswinkel Nov 13, 2020
8e864be
Adding test feedRangeEPK_getPartitionKeyRangesAsync
FabianMeiswinkel Nov 13, 2020
4ebd7cb
Adding test feedRangePK_getPartitionKeyRangesAsync
FabianMeiswinkel Nov 13, 2020
2e6eb79
Adding test feedRangePKRangeId_getPartitionKeyRangesAsync
FabianMeiswinkel Nov 13, 2020
dc4c66c
Adding request visitor unit tests
FabianMeiswinkel Nov 13, 2020
0ef3170
Finishing FeedRange tests
FabianMeiswinkel Nov 13, 2020
ae9dd96
Cleanup and prettifying
FabianMeiswinkel Nov 13, 2020
4811e50
Prettifying feed range tests
FabianMeiswinkel Nov 13, 2020
6515ff5
Fixes and new test for Conatiner.getFeedRanges()
FabianMeiswinkel Nov 13, 2020
30e162e
Addressing some SpotBug violations
FabianMeiswinkel Nov 13, 2020
b08488a
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 16, 2020
c67b1f7
Iterating on Changefeed Pull model draft
FabianMeiswinkel Nov 17, 2020
428be5e
Snapshot to be able to switch branches
FabianMeiswinkel Nov 17, 2020
fdfd53a
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 18, 2020
913725e
Finishing switch from deleted ChangeFeedOptions to new public CosmosC…
FabianMeiswinkel Nov 18, 2020
afc2678
Adding basic json serialization test coverage for FeedRangeContinuation
FabianMeiswinkel Nov 20, 2020
3bec34f
Snapshot - converting Chagefeed to FeedRanges - tests not working yet
FabianMeiswinkel Nov 20, 2020
c08a6b2
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 23, 2020
abfac43
Snapshot
FabianMeiswinkel Nov 25, 2020
b344204
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 26, 2020
93f971d
Snapshot
FabianMeiswinkel Nov 26, 2020
d785579
Removing TODOs
FabianMeiswinkel Dec 9, 2020
33ab685
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Dec 9, 2020
3b7936e
Fixing test regressions
FabianMeiswinkel Dec 9, 2020
37b2a46
Fixing a couple SpotBug violations
FabianMeiswinkel Dec 9, 2020
34ad1e5
Fixing few more SPotBug violations
FabianMeiswinkel Dec 9, 2020
20aaccf
Fixing ChangeFeedState unit test
FabianMeiswinkel Dec 10, 2020
9e55ea7
Code cleanup
FabianMeiswinkel Dec 10, 2020
90f371f
Test code cleanup
FabianMeiswinkel Dec 10, 2020
d197486
Fixing a regression in Range json serialization
FabianMeiswinkel Dec 11, 2020
332acfe
Fixing CF split unit test failure
FabianMeiswinkel Dec 11, 2020
804e294
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Dec 31, 2020
ececd08
Fixes for some code review feedback
FabianMeiswinkel Dec 31, 2020
4f8a3a9
Fixing test flakiness (unrelated to actual change)
FabianMeiswinkel Jan 4, 2021
b7d675f
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Jan 4, 2021
7ceacd3
Adding ChangeFeedPolicy
FabianMeiswinkel Jan 7, 2021
ec9cd23
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Jan 7, 2021
01a4a68
Adding tests for ChangeFeedPolicy
FabianMeiswinkel Jan 7, 2021
84f36ea
Fixing build break
FabianMeiswinkel Jan 7, 2021
2f512e2
Fixing SpotBug issue
FabianMeiswinkel Jan 7, 2021
714b9b2
Re-triggering CI
FabianMeiswinkel Jan 7, 2021
3e149ed
Deleting temp data
FabianMeiswinkel Jan 7, 2021
8bbb0d3
Deleting CosmosDB Emulator temp data
FabianMeiswinkel Jan 7, 2021
6f5196a
Fixing scritp to delete temp folder
FabianMeiswinkel Jan 7, 2021
29adeb8
Fixing PS script
FabianMeiswinkel Jan 7, 2021
3aac6f0
Adding message with Emulator version to emulator install yaml file
FabianMeiswinkel Jan 8, 2021
7ec9f6c
Adding Cosmos DB Emulator installation logs
FabianMeiswinkel Jan 8, 2021
5e533c9
Adding logic to unistall emulator to Cosmos Emulator yml
FabianMeiswinkel Jan 8, 2021
3d3280f
Update cosmos-emulator.yml
FabianMeiswinkel Jan 8, 2021
9a69b86
Fixing missing admin elevation in the uninstall step in Cosmos Emulat…
FabianMeiswinkel Jan 8, 2021
2c205b0
Iterating on cosmos-emulator.yml
FabianMeiswinkel Jan 8, 2021
8f6e0b0
Iteratiing on cosmos-emulator.yml
FabianMeiswinkel Jan 8, 2021
9e7f154
Iterating on cosmos-emulator.yml
FabianMeiswinkel Jan 8, 2021
8891306
Iterating on cosmos-emulator.yml
FabianMeiswinkel Jan 8, 2021
2577aa0
Iterating on comsos-emulator.yml
FabianMeiswinkel Jan 8, 2021
b4b3e72
Iterating on comsos-emulator.yml
FabianMeiswinkel Jan 8, 2021
0f70676
Iterate on cosmos-emulator.yml
FabianMeiswinkel Jan 8, 2021
efd3c6c
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Jan 8, 2021
f3bf00d
Adding FF changefeed unit tests
FabianMeiswinkel Jan 8, 2021
27cd733
Fixing build warning
FabianMeiswinkel Jan 8, 2021
026b0e9
Fixing test regression when ContainerTest hit emulator limit of numbe…
FabianMeiswinkel Jan 11, 2021
024ab99
Fixing emulator config for CI tests
FabianMeiswinkel Jan 11, 2021
90641a3
Fixing test issue - causing Begin/After Method/Class to also happen f…
FabianMeiswinkel Jan 11, 2021
88b5ae5
Refactoring FeedRange visitors to populate request headers
FabianMeiswinkel Jan 12, 2021
09eab8e
Implementing FeedRangeInternal.populateFeedRangeFilteringHeaders
FabianMeiswinkel Jan 12, 2021
aec75ba
Fixes for FeedRangeTest
FabianMeiswinkel Jan 12, 2021
c5e63cb
Refactoring FeedRangeINternal.getEffectiveRanges to getEffectiveRange
FabianMeiswinkel Jan 13, 2021
dbbd26d
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Jan 13, 2021
e0aeae1
Fixing test issue in FeedRangeTest
FabianMeiswinkel Jan 13, 2021
edb7440
Reacting to self-Code review
FabianMeiswinkel Jan 13, 2021
35a901f
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Jan 13, 2021
5e75d98
Actually adding public APIs for queryChangeFeed
FabianMeiswinkel Jan 14, 2021
32ae487
Adding public API in CosmosContainer and SomsosAsyncContainer for CF …
FabianMeiswinkel Jan 14, 2021
c955cc6
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Jan 14, 2021
ce2bae2
Reacting to CR comments
FabianMeiswinkel Jan 14, 2021
af1469e
SpotBug fixes
FabianMeiswinkel Jan 15, 2021
8bb6f60
Fixing JavaDoc bug
FabianMeiswinkel Jan 15, 2021
1f99a86
Reverting switch to @JsonDeserialize for custom deserializers
FabianMeiswinkel Jan 15, 2021
32f22dd
Iterating on code review comments
FabianMeiswinkel Jan 16, 2021
eda9329
Adding more unit tests
FabianMeiswinkel Jan 16, 2021
d256919
Spotbug fix
FabianMeiswinkel Jan 16, 2021
e2348ce
Fix for test failures in FeedRangeTest
FabianMeiswinkel Jan 16, 2021
e386a2c
Fixing SpotBUg issue
FabianMeiswinkel Jan 16, 2021
5c8eba5
Fixing test issues
FabianMeiswinkel Jan 18, 2021
3819b37
Iterating on Cosmos Container ChangeFeed tests
FabianMeiswinkel Jan 18, 2021
7e78972
Fixing test issue for small page size
FabianMeiswinkel Jan 18, 2021
3d88840
Adding addiitonal unit tests for change feed pull model
FabianMeiswinkel Jan 18, 2021
ca796ce
Adding remaining tests for CF pull model
FabianMeiswinkel Jan 18, 2021
6392f9a
Update FeedRange.java
FabianMeiswinkel Jan 19, 2021
29c0719
Cleanup
FabianMeiswinkel Jan 19, 2021
0804d16
Fixing typos in comments
FabianMeiswinkel Jan 19, 2021
c1d565e
Fixing Duration in ChangeFeedPolicy
FabianMeiswinkel Jan 19, 2021
7148a3a
Changing the encoding of FeedRange and ChangeFeedState to be base64-e…
FabianMeiswinkel Jan 20, 2021
b8ed993
Cleanup
FabianMeiswinkel Jan 20, 2021
268e467
Fixing spotbug issue
FabianMeiswinkel Jan 20, 2021
7718b24
Fixing test issue
FabianMeiswinkel Jan 20, 2021
4db657e
Switching deserializer registration to annotation based approach
FabianMeiswinkel Jan 20, 2021
681d994
Reacting to code review feedback from Milis
FabianMeiswinkel Jan 21, 2021
964c2da
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Jan 21, 2021
ab94708
Fixing Test issues after changing ChangeFeedStateVersion from 0 to 1
FabianMeiswinkel Jan 21, 2021
e9401ab
Fix test issue
FabianMeiswinkel Jan 21, 2021
df78ea0
Renaming ShouldretryResult.RETRY_IMMEDIATELY to RETRY_NOW
FabianMeiswinkel Jan 21, 2021
fd76cb3
Adding Status code to JavaDoc description when processing FF CF outsi…
FabianMeiswinkel Jan 21, 2021
1f9ef37
Fixing ChangeFeedProcessor issues Matias found
FabianMeiswinkel Jan 22, 2021
3724f5f
Addressing code review comments from Mo
FabianMeiswinkel Jan 23, 2021
7fc3b5e
Fixing SpotBUg issue
FabianMeiswinkel Jan 23, 2021
9d55b4c
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Jan 23, 2021
e9f114c
Adding additional assert in CFP split test
FabianMeiswinkel Jan 25, 2021
f2d6dc8
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Jan 25, 2021
37900c7
Addressing code review comments from Milis
FabianMeiswinkel Jan 25, 2021
f7ddca6
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Jan 25, 2021
bf76ae4
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Jan 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 86 additions & 2 deletions eng/common/pipelines/templates/steps/cosmos-emulator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,92 @@ steps:
Write-Host "Target Dir: $targetDir"
msiexec /a ${{ parameters.EmulatorMsiUrl }} TARGETDIR=$targetDir /qn | wait-process
displayName: Download and Extract Public Cosmos DB Emulator
- powershell: |
Copy link
Member

@chidozieononiwu chidozieononiwu Jan 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FabianMeiswinkel Please can you make the changes in this file against the azure-sdk-tools repo. Follow instruction for common tooling https://github.com/Azure/azure-sdk-tools/blob/master/doc/common/common_engsys.md.
Your current changes will be overwriten by this sync PR https://github.com/Azure/azure-sdk-for-java/pull/18879/files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chidozieononiwu I'm not sure I understand your request... Why will these files be overwritten by some other content sitting in a different repo? It sounds to me like a very opaque process and out of our own control.
Please do not override these settings unless your other PR preserves them as is.

Write-Host "Deleting Cosmos DB Emulator data"
if (Test-Path $Env:LOCALAPPDATA\CosmosDbEmulator) { Remove-Item -Recurse -Force $Env:LOCALAPPDATA\CosmosDbEmulator }
displayName: Delete Cosmos DB Emulator data
- powershell: |
Write-Host "Getting Cosmos DB Emulator Version"
$ProductName = "Azure Cosmos DB Emulator"
$Emulator = (Join-Path $env:temp (Join-Path $ProductName "Microsoft.Azure.Cosmos.Emulator.exe"))
$fileVersion = Get-ChildItem $Emulator
Write-Host $Emulator $fileVersion.VersionInfo
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
displayName: Get Cosmos DB Emulator Version
- powershell: |
Write-Host "Launching Cosmos DB Emulator"
Import-Module "$env:temp\Azure Cosmos DB Emulator\PSModules\Microsoft.Azure.CosmosDB.Emulator"
Start-CosmosDbEmulator -NoUI ${{ parameters.StartParameters }}
$ProductName = "Azure Cosmos DB Emulator"
$Emulator = (Join-Path $env:temp (Join-Path $ProductName "Microsoft.Azure.Cosmos.Emulator.exe"))
if (!(Test-Path $Emulator)) {
Write-Error "The emulator is not installed where expected at '$Emulator'"
return
}

$process = Start-Process $Emulator -ArgumentList "/getstatus" -PassThru -Wait

switch ($process.ExitCode) {
1 {
Write-Host "The emulator is already starting"
return
}
2 {
Write-Host "The emulator is already running"
return
}
3 {
Write-Host "The emulator is stopped"
}
default {
Write-Host "Unrecognized exit code $process.ExitCode"
return
}
}

$argumentList = ""
if (-not [string]::IsNullOrEmpty("${{ parameters.StartParameters }}")) {
$argumentList += , "${{ parameters.StartParameters }}"
}

Write-Host "Starting emulator process: $Emulator $argumentList"
$process=Start-Process $Emulator -ArgumentList $argumentList -ErrorAction Stop -PassThru
Write-Host "Emulator process started: $($process.Name), $($process.FileVersion)"
$Timeout = 600
$result="NotYetStarted"
$complete = if ($Timeout -gt 0) {
$start = [DateTimeOffset]::Now
$stop = $start.AddSeconds($Timeout)
{
$result -eq "Running" -or [DateTimeOffset]::Now -ge $stop
}
}
else {
{
$result -eq "Running"
}
}

do {
$process = Start-Process $Emulator -ArgumentList "/getstatus" -PassThru -Wait

switch ($process.ExitCode) {
1 {
Write-Host "The emulator is starting"
}
2 {
Write-Host "The emulator is running"
$result="Running"
return
}
3 {
Write-Host "The emulator is stopped"
}
default {
Write-Host "Unrecognized exit code $process.ExitCode"
}
}

Start-Sleep -Seconds 5
}
until ($complete.Invoke())

Write-Error "The emulator failed to reach Running status within ${Timeout} seconds"
milismsft marked this conversation as resolved.
Show resolved Hide resolved
displayName: Start Cosmos DB Emulator
2 changes: 1 addition & 1 deletion eng/pipelines/templates/stages/cosmos-sdk-client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ stages:
PreRunSteps:
- template: /eng/common/pipelines/templates/steps/cosmos-emulator.yml
parameters:
StartParameters: '-EnableAadAuthentication -PartitionCount 50 -Consistency Strong -Timeout 600 -EnablePreview'
StartParameters: '/noexplorer /noui /enablepreview /disableratelimiting /enableaadauthentication /partitioncount=50 /consistency=Strong'
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
- powershell: |
$Key = 'C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=='
$password = ConvertTo-SecureString -String $Key -Force -AsPlainText
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
import com.azure.cosmos.implementation.ItemDeserializer;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.encryption.CosmosResponseFactory;
import com.azure.cosmos.implementation.encryption.CosmosResponseFactoryCore;
import com.azure.cosmos.implementation.encryption.EncryptionProcessor;
import com.azure.cosmos.implementation.encryption.EncryptionUtils;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.query.Transformer;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
Expand All @@ -38,6 +40,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
import static com.azure.cosmos.models.EncryptionModelBridgeInternal.createEncryptionItemResponse;


Expand Down Expand Up @@ -89,7 +92,7 @@ private Mono<CosmosItemResponse<byte[]>> createItemHelper(
if (decryptResponse) {
return setByteArrayContent(rsp,
EncryptionProcessor.decrypt(EncryptionModelBridgeInternal.getByteArrayContent(rsp),
this.encryptor).map(pair -> pair.getLeft()).publishOn(encryptionScheduler));
this.encryptor).map(Pair::getLeft).publishOn(encryptionScheduler));
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
}

return Mono.just(rsp);
Expand Down Expand Up @@ -131,7 +134,7 @@ private Mono<CosmosItemResponse<byte[]>> upsertItemHelper(
if (decryptResponse) {
return setByteArrayContent(rsp,
EncryptionProcessor.decrypt(EncryptionModelBridgeInternal.getByteArrayContent(rsp),
this.encryptor).map(pair -> pair.getLeft()).publishOn(encryptionScheduler));
this.encryptor).map(Pair::getLeft).publishOn(encryptionScheduler));
}

return Mono.just(rsp);
Expand Down Expand Up @@ -176,7 +179,7 @@ private Mono<CosmosItemResponse<byte[]>> replaceItemHelper(
if (decryptResponse) {
return setByteArrayContent(rsp,
EncryptionProcessor.decrypt(EncryptionModelBridgeInternal.getByteArrayContent(rsp),
this.encryptor).map(pair -> pair.getLeft()).publishOn(encryptionScheduler));
this.encryptor).map(Pair::getLeft).publishOn(encryptionScheduler));
}

return Mono.just(rsp);
Expand Down Expand Up @@ -529,7 +532,36 @@ public <T> CosmosPagedFlux<T> queryItems(SqlQuerySpec query, CosmosQueryRequestO
new Transformer<T>() {
@Override
public Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> transform(Function<CosmosPagedFluxOptions, Flux<FeedResponse<JsonNode>>> func) {
return queryDecryptionTransformer(classType, func);
return queryDecryptionTransformer(classType, func, false);
}
});
}

/**
* Query for items in the change feed of the current container using the {@link CosmosChangeFeedRequestOptions}.
* <p>
* After subscription the operation will be performed. The {@link Flux} will
* contain one or several feed response of the obtained items. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param <T> the type parameter.
* @param options the change feed request options.
* @param classType the class type.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages of the obtained
* items or an error.
*/
public <T> CosmosPagedFlux<T> queryChangeFeed(
CosmosChangeFeedRequestOptions options,
Class<T> classType) {

checkNotNull(options, "Argument 'options' must not be null.");
checkNotNull(classType, "Argument 'classType' must not be null.");

return CosmosBridgeInternal.queryChangeFeedInternal(container, options,
new Transformer<T>() {
@Override
public Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> transform(Function<CosmosPagedFluxOptions, Flux<FeedResponse<JsonNode>>> func) {
return queryDecryptionTransformer(classType, func, true);
}
});
}
Expand Down Expand Up @@ -565,8 +597,11 @@ private Mono<CosmosItemResponse<byte[]>> setByteArrayContent(CosmosItemResponse<
);
}

private <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryDecryptionTransformer(Class<T> classType,
Function<CosmosPagedFluxOptions, Flux<FeedResponse<JsonNode>>> func) {
private <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryDecryptionTransformer(
Class<T> classType,
Function<CosmosPagedFluxOptions, Flux<FeedResponse<JsonNode>>> func,
boolean useEtagAsContinuation) {
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved

return func.andThen(flux ->
flux.publishOn(encryptionScheduler)
.flatMap(
Expand All @@ -589,7 +624,9 @@ private <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryDecrypt
return Mono.just(ModelBridgeInternal.createFeedResponseWithQueryMetrics(itemList,
page.getResponseHeaders(),
BridgeInternal.queryMetricsFromFeedResponse(page),
ModelBridgeInternal.getQueryPlanDiagnosticsContext(page)));
ModelBridgeInternal.getQueryPlanDiagnosticsContext(page),
useEtagAsContinuation,
ModelBridgeInternal.noChanges(page)));
} else {
List<Mono<byte[]>> byteArrayMonoList =
byteArrayList.stream().map(bytes -> decryptResponse(bytes)).collect(Collectors.toList());
Expand All @@ -601,7 +638,9 @@ private <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryDecrypt
ModelBridgeInternal.createFeedResponseWithQueryMetrics(itemList,
page.getResponseHeaders(),
BridgeInternal.queryMetricsFromFeedResponse(page),
ModelBridgeInternal.getQueryPlanDiagnosticsContext(page))
ModelBridgeInternal.getQueryPlanDiagnosticsContext(page),
useEtagAsContinuation,
ModelBridgeInternal.noChanges(page))
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
import java.util.concurrent.ConcurrentMap;

class EncryptionFeedResponse<T> extends FeedResponse<T> {
EncryptionFeedResponse(List<T> results, Map<String, String> headers, ConcurrentMap<String, QueryMetrics> queryMetricsMap) {
super(results, headers, queryMetricsMap);
EncryptionFeedResponse(
List<T> results,
Map<String, String> headers,
ConcurrentMap<String, QueryMetrics> queryMetricsMap,
boolean useEtagAsContinuation,
boolean isNoChanges) {

super(results, headers, queryMetricsMap, useEtagAsContinuation, isNoChanges);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import com.azure.cosmos.encryption.EncryptionOptions;
import com.azure.cosmos.implementation.directconnectivity.Protocol;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlParameter;
Expand Down Expand Up @@ -481,6 +483,35 @@ public void encryptionDecryptQueryResultMultipleDocs() {
EncryptionTests.validateQueryResultsMultipleDocuments(EncryptionTests.encryptionContainer, testDoc1, testDoc2, query);
}

@Test(groups = { "encryption" }, timeOut = TIMEOUT)
public void encryptionDecryptChangeFeedResultMultipleDocs() {
String partitionKey = UUID.randomUUID().toString();
TestDoc testDoc1 = EncryptionTests
.createItem(
EncryptionTests.encryptionContainer,
EncryptionTests.dekId,
TestDoc.PathsToEncrypt,
partitionKey)
.getItem();
TestDoc testDoc2 = EncryptionTests
.createItem(
EncryptionTests.encryptionContainer,
EncryptionTests.dekId,
TestDoc.PathsToEncrypt,
partitionKey)
.getItem();

CosmosChangeFeedRequestOptions options =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(
FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));

EncryptionTests.validateChangeFeedResultsMultipleDocuments(
EncryptionTests.encryptionContainer,
testDoc1,
testDoc2,
options);
}

@Test(groups = { "encryption" }, timeOut = TIMEOUT)
public void encryptionDecryptQueryResultMultipleEncryptedProperties() {
List<String> pathsEncrypted = ImmutableList.of("/Sensitive", "/NonSensitive");
Expand Down Expand Up @@ -1047,6 +1078,29 @@ private static void validateQueryResultsMultipleDocuments(
validateQueryResultsMultipleDocuments(container, testDoc1, testDoc2, query, null);
}

private static void validateChangeFeedResultsMultipleDocuments(
EncryptionCosmosAsyncContainer container,
TestDoc testDoc1,
TestDoc testDoc2,
CosmosChangeFeedRequestOptions requestOptions) {
CosmosPagedFlux<TestDoc> pageFlux = container.queryChangeFeed(requestOptions, TestDoc.class);
List<TestDoc> readDocs = pageFlux.collectList().block();

assertThat(readDocs.size()).isEqualTo(2);
assertThat(readDocs).containsExactlyInAnyOrder(testDoc1, testDoc2);


CosmosPagedFlux<DecryptableItem> lazyDecryptablePageFlux = container.queryChangeFeed(requestOptions, DecryptableItem.class);
List<DecryptableItem> lazyDecryptableItems = lazyDecryptablePageFlux.collectList().block();

assertThat(readDocs.size()).isEqualTo(2);
assertThat(readDocs).containsExactlyInAnyOrder(testDoc1, testDoc2);


assertThat(lazyDecryptableItems.size()).isEqualTo(2);
assertThat(lazyDecryptableItems.stream().map(ldi -> ldi.getDecryptionResult(TestDoc.class).block().getDecryptedItem())).containsExactlyInAnyOrder(testDoc1, testDoc2);
}

private static void validateQueryResultsMultipleDocuments(
EncryptionCosmosAsyncContainer container,
TestDoc testDoc1,
Expand All @@ -1060,15 +1114,15 @@ private static void validateQueryResultsMultipleDocuments(
assertThat(readDocs).containsExactlyInAnyOrder(testDoc1, testDoc2);


CosmosPagedFlux<DecryptableItem> lazyDecraptablePageFlux = container.queryItems(new SqlQuerySpec(query), requestOptions, DecryptableItem.class);
List<DecryptableItem> lazyDecreptableItems = lazyDecraptablePageFlux.collectList().block();
CosmosPagedFlux<DecryptableItem> lazyDecryptablePageFlux = container.queryItems(new SqlQuerySpec(query), requestOptions, DecryptableItem.class);
List<DecryptableItem> lazyDecryptableItems = lazyDecryptablePageFlux.collectList().block();

assertThat(readDocs.size()).isEqualTo(2);
assertThat(readDocs).containsExactlyInAnyOrder(testDoc1, testDoc2);


assertThat(lazyDecreptableItems.size()).isEqualTo(2);
assertThat(lazyDecreptableItems.stream().map(ldi -> ldi.getDecryptionResult(TestDoc.class).block().getDecryptedItem())).containsExactlyInAnyOrder(testDoc1, testDoc2);
assertThat(lazyDecryptableItems.size()).isEqualTo(2);
assertThat(lazyDecryptableItems.stream().map(ldi -> ldi.getDecryptionResult(TestDoc.class).block().getDecryptedItem())).containsExactlyInAnyOrder(testDoc1, testDoc2);
}

private static <T> void validateQueryResponse(EncryptionCosmosAsyncContainer container,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,21 @@ public static <T> FeedResponse<T> createFeedResponse(List<T> results,
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <T> FeedResponse<T> createFeedResponseWithQueryMetrics(List<T> results,
Map<String, String> headers,
ConcurrentMap<String, QueryMetrics> queryMetricsMap,
QueryInfo.QueryPlanDiagnosticsContext diagnosticsContext) {
return ModelBridgeInternal.createFeedResponseWithQueryMetrics(results, headers, queryMetricsMap, diagnosticsContext);
public static <T> FeedResponse<T> createFeedResponseWithQueryMetrics(
List<T> results,
Map<String, String> headers,
ConcurrentMap<String, QueryMetrics> queryMetricsMap,
QueryInfo.QueryPlanDiagnosticsContext diagnosticsContext,
boolean useEtagAsContinuation,
boolean isNoChangesResponse) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

majority of Query components, top, orderby, etc need to use false, false param.

I think it might be easier to fork this method into two new methods one for query and the other for changefeed.
customization helps to reduce the number of params.


return ModelBridgeInternal.createFeedResponseWithQueryMetrics(
results,
headers,
queryMetricsMap,
diagnosticsContext,
useEtagAsContinuation,
isNoChangesResponse);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
Expand Down
Loading