Skip to content

Commit

Permalink
Fixed an issue in QuorumReader when quorum could not be selected even…
Browse files Browse the repository at this point in the history
… though 1 secondary and Primary are reachable and in sync (#38832)

* Added overload with CosmosReadManyRequestOptions

* Fixing style errors

* Update CosmosReadManyRequestOptions.java

* Fixing build break

* Avoiding a possibly breaking change by injecting a base class - CosmosQueryRequestOptions was not final.

* Update TransientIOErrorsRetryingIteratorITest.scala

* Addressing code review feedback

* Added fallback to include Primary in QuorumReader when quorum could not be selected.

* Update CHANGELOG.md

* Update CosmosItemTest.java

* Update CosmosItemTest.java
  • Loading branch information
FabianMeiswinkel authored Feb 20, 2024
1 parent 14a9127 commit b37d95a
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
Expand Down Expand Up @@ -50,6 +51,7 @@
import reactor.core.scheduler.Schedulers;

import java.nio.charset.StandardCharsets;
import java.rmi.ConnectIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -76,7 +78,7 @@ public class CosmosItemTest extends TestSuiteBase {
private CosmosClient client;
private CosmosContainer container;

@Factory(dataProvider = "clientBuildersWithDirectSession")
@Factory(dataProvider = "clientBuildersWithDirect")
public CosmosItemTest(CosmosClientBuilder clientBuilder) {
super(clientBuilder);
}
Expand Down Expand Up @@ -301,8 +303,91 @@ public void readManyWithTimeout() throws Exception {
logger.info("Exception handled", e);
assertThat(timeoutFired.get()).isTrue();
}
finally {
transitTimeout.disable();
}
}

@Test(groups = { "fast" }, timeOut = 100 * TIMEOUT)
public void readManyWithTwoSecondariesNotReachable() throws Exception {
if (client.asyncClient().getConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT) {
throw new SkipException("Fault injection only targeting direct mode");
}

ConsistencyLevel effectiveConsistencyLevel = ImplementationBridgeHelpers
.CosmosAsyncClientHelper
.getCosmosAsyncClientAccessor()
.getEffectiveConsistencyLevel(client.asyncClient(), OperationType.Query, null);

if (effectiveConsistencyLevel != ConsistencyLevel.BOUNDED_STALENESS &&
effectiveConsistencyLevel != ConsistencyLevel.STRONG) {
throw new SkipException("Test only targeting strong and bounded staleness.");
}

List<CosmosItemIdentity> cosmosItemIdentities = new ArrayList<>();
Set<String> idSet = new HashSet<>();
int numDocuments = 50;

for (int i = 0; i < numDocuments; i++) {
String id = UUID.randomUUID().toString();
ObjectNode document = getDocumentDefinition(id, id);
container.createItem(document);

PartitionKey partitionKey = new PartitionKey(id);
CosmosItemIdentity cosmosItemIdentity = new CosmosItemIdentity(partitionKey, id);
cosmosItemIdentities.add(cosmosItemIdentity);
idSet.add(id);
}

FaultInjectionRuleBuilder ruleBuilder = new FaultInjectionRuleBuilder("extremelyLongConnectDelay");
FaultInjectionConditionBuilder conditionBuilder = new FaultInjectionConditionBuilder()
.operationType(FaultInjectionOperationType.QUERY_ITEM)
.connectionType(FaultInjectionConnectionType.DIRECT);
List<FeedRange> feedRanges = container.getFeedRanges();
conditionBuilder = conditionBuilder.endpoints(
new FaultInjectionEndpointBuilder(FeedRange.forFullRange()) //feedRanges.get(feedRanges.size() - 1)
.replicaCount(2)
.includePrimary(false)
.build()
);
FaultInjectionCondition faultInjectionCondition = conditionBuilder.build();
FaultInjectionServerErrorResult connectTimeoutResult = FaultInjectionResultBuilders
.getResultBuilder(FaultInjectionServerErrorType.GONE)
.times(Integer.MAX_VALUE - 1)
.delay(Duration.ofDays(1))
.build();
FaultInjectionRule connectTimeout = ruleBuilder
.condition(faultInjectionCondition)
.result(connectTimeoutResult)
.duration(Duration.ofSeconds(240))
.build();

CosmosFaultInjectionHelper
.configureFaultInjectionRules(container.asyncContainer, Arrays.asList(connectTimeout))
.block();

CosmosReadManyRequestOptions requestOptionsWith5SecondsTimeout = new CosmosReadManyRequestOptions()
.setCosmosEndToEndOperationLatencyPolicyConfig(
new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(70)).build()
);

try {
FeedResponse<ObjectNode> feedResponse = container
.asyncContainer
.readMany(cosmosItemIdentities, requestOptionsWith5SecondsTimeout, ObjectNode.class)
.onErrorMap(throwable -> {
logger.error("Error observed.", throwable);

return throwable;
})
.block();

logger.info("Cosmos Diagnostics: {}", feedResponse.getCosmosDiagnostics().getDiagnosticsContext().toJson());
}
finally {
connectTimeout.disable();
}
}

@Test(groups = { "fast" }, timeOut = TIMEOUT)
public void readManyWithSamePartitionKey() throws Exception {
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed an issue in QuorumReader when quorum could not be selected even though 1 secondary and Primary are reachable and in sync. - See [PR 38832](https://github.com/Azure/azure-sdk-for-java/pull/38832)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public Mono<StoreResponse> readStrongAsync(
ReadMode readMode) {
final MutableVolatile<Boolean> shouldRetryOnSecondary = new MutableVolatile<>(false);
final MutableVolatile<Boolean> hasPerformedReadFromPrimary = new MutableVolatile<>(false);
final MutableVolatile<Boolean> includePrimary = new MutableVolatile<>(false);

return Flux.defer(
// the following will be repeated till the repeat().takeUntil(.) condition is satisfied.
Expand All @@ -141,15 +142,19 @@ public Mono<StoreResponse> readStrongAsync(

shouldRetryOnSecondary.v = false;
Mono<ReadQuorumResult> secondaryQuorumReadResultObs =
this.readQuorumAsync(entity, readQuorumValue, false, readMode);
this.readQuorumAsync(
entity,
readQuorumValue,
includePrimary.v,
readMode);

return secondaryQuorumReadResultObs.flux().flatMap(
secondaryQuorumReadResult -> {

switch (secondaryQuorumReadResult.quorumResult) {
case QuorumMet:
try {
return Flux.just(secondaryQuorumReadResult.getResponse());
return Flux.just(secondaryQuorumReadResult.getResponse());
} catch (CosmosException e) {
return Flux.error(e);
}
Expand Down Expand Up @@ -203,36 +208,58 @@ public Mono<StoreResponse> readStrongAsync(

case QuorumNotSelected:
if (hasPerformedReadFromPrimary.v) {
logger.warn("QuorumNotSelected: Primary read already attempted. Quorum could not be selected after retrying on secondaries.");
return Flux.error(new GoneException(RMResources.ReadQuorumNotMet, HttpConstants.SubStatusCodes.READ_QUORUM_NOT_MET));
logger.warn(
"QuorumNotSelected: Primary read already attempted. Quorum could not be "
+ "selected after retrying on secondaries. ReadQuorumResult StoreResponses: {}",
String.join(";", secondaryQuorumReadResult.storeResponses));
return Flux.error(
new GoneException(
RMResources.ReadQuorumNotMet,
HttpConstants.SubStatusCodes.READ_QUORUM_NOT_MET));
}

logger.warn("QuorumNotSelected: Quorum could not be selected with read quorum of {}", readQuorumValue);
Mono<ReadPrimaryResult> responseObs = this.readPrimaryAsync(entity, readQuorumValue, false);
logger.warn(
"QuorumNotSelected: Quorum could not be selected with read quorum of {}, "
+ "ReadQuorumResult StoreResponses: {}",
readQuorumValue,
String.join(";", secondaryQuorumReadResult.storeResponses));
Mono<ReadPrimaryResult> responseObs = this.readPrimaryAsync(
entity, readQuorumValue, false);

return responseObs.flux().flatMap(
response -> {
if (response.isSuccessful && response.shouldRetryOnSecondary) {
assert false : "QuorumNotSelected: PrimaryResult has both Successful and shouldRetryOnSecondary flags set";
logger.error("PrimaryResult has both Successful and shouldRetryOnSecondary flags set");
assert false : "QuorumNotSelected: PrimaryResult has both Successful and "
+ "shouldRetryOnSecondary flags set";
logger.error(
"PrimaryResult has both Successful and shouldRetryOnSecondary "
+ "flags set. ReadQuorumResult StoreResponses: {}",
String.join(";", secondaryQuorumReadResult.storeResponses));
} else if (response.isSuccessful) {
logger.debug("QuorumNotSelected: ReadPrimary successful");
try {
return Flux.just(response.getResponse());
return Flux.just(response.getResponse());
} catch (CosmosException e) {
return Flux.error(e);
}
} else if (response.shouldRetryOnSecondary) {
shouldRetryOnSecondary.v = true;
logger.warn("QuorumNotSelected: ReadPrimary did not succeed. Will retry on secondary.");
logger.warn("QuorumNotSelected: ReadPrimary did not succeed. Will retry "
+ "on secondary. ReadQuorumResult StoreResponses: {}",
String.join(";", secondaryQuorumReadResult.storeResponses));
hasPerformedReadFromPrimary.v = true;
// We have failed to select a quorum before - could very well happen again
// especially with reduced replica set size (1 Primary and 2 Secondaries
// left, one Secondary might be unreachable - due to endpoint health like
// service-side crashes or network/connectivity issues). Including the
// Primary replica even for quorum selection in this case for the retry
includePrimary.v = true;
} else {
logger.warn("QuorumNotSelected: Could not get successful response from ReadPrimary");
return Flux.error(new GoneException(String.format(RMResources.ReadQuorumNotMet, readQuorumValue), HttpConstants.SubStatusCodes.READ_QUORUM_NOT_MET));
}

return Flux.empty();

return Flux.empty();
}
);

Expand Down Expand Up @@ -324,7 +351,22 @@ private Mono<Pair<ReadQuorumResult, Quadruple<Long, Long, StoreResult, List<Stri

return responseResultObs.flatMap(
responseResult -> {
List<String> storeResponses = responseResult.stream().map(response -> response.toString()).collect(Collectors.toList());
List<String> storeResponses = responseResult
.stream()
.map(response -> {
StoreResponse storeResponse = response.getStoreResponse();
if (storeResponse == null) {
return response.storePhysicalAddress + " -> n/a";
}

return response.storePhysicalAddress
+ " -> "
+ storeResponse.getStatus()
+ "("
+ storeResponse.getActivityId()
+ ")";
})
.collect(Collectors.toList());
int responseCount = (int) responseResult.stream().filter(response -> response.isValid).count();
if (responseCount < readQuorum) {
return Mono.just(Pair.of(new ReadQuorumResult(entity.requestContext.requestChargeTracker,
Expand Down

0 comments on commit b37d95a

Please sign in to comment.