Skip to content

Commit

Permalink
Adding basic FeedRanges API (#17570)
Browse files Browse the repository at this point in the history
* Initial draft of FeedRange artifacts

* Iterating on FeedRange Apis

* Adding public surface area

* Adding FeedRange unit tests

* Adding test FeedRangePKRangeId_GetEffectiveRangesAsync_Refresh

* Adding test FeedRangePKRangeId_GetEffectiveRangesAsync_Null

* Adding test feedRangeEPK_getPartitionKeyRangesAsync

* Adding test feedRangePK_getPartitionKeyRangesAsync

* Adding test feedRangePKRangeId_getPartitionKeyRangesAsync

* Adding request visitor unit tests

* Finishing FeedRange tests

* Cleanup and prettifying

* Prettifying feed range tests

* Fixes and new test for Conatiner.getFeedRanges()

* Addressing some SpotBug violations

* Reacting to code review feedback

* Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java

Co-authored-by: Mohammad Derakhshani <moderakh@users.noreply.github.com>

Co-authored-by: Mohammad Derakhshani <moderakh@users.noreply.github.com>
  • Loading branch information
FabianMeiswinkel and moderakh authored Nov 18, 2020
1 parent 8458de5 commit dca8bcb
Show file tree
Hide file tree
Showing 41 changed files with 2,050 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
Expand Down Expand Up @@ -1124,4 +1125,15 @@ private Mono<ThroughputResponse> replaceThroughputInternal(Mono<CosmosContainerR
ItemDeserializer getItemDeserializer() {
return getDatabase().getDocClientWrapper().getItemDeserializer();
}

/**
* Obtains a list of {@link FeedRange} that can be used to parallelize Feed
* operations.
*
* @return An unmodifiable list of {@link FeedRange}
*/
@Beta(Beta.SinceVersion.V4_9_0)
public Mono<List<FeedRange>> getFeedRanges() {
return this.getDatabase().getDocClientWrapper().getFeedRanges(getLink());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
Expand Down Expand Up @@ -556,4 +557,23 @@ private <T> CosmosPagedIterable<T> getCosmosPagedIterable(CosmosPagedFlux<T> cos
return UtilBridgeInternal.createCosmosPagedIterable(cosmosPagedFlux);
}

/**
* Obtains a list of {@link FeedRange} that can be used to parallelize Feed
* operations.
*
* @return An unmodifiable list of {@link FeedRange}
*/
@Beta(Beta.SinceVersion.V4_9_0)
public List<FeedRange> getFeedRanges() {
try {
return asyncContainer.getFeedRanges().block();
} catch (Exception ex) {
final Throwable throwable = Exceptions.unwrap(ex);
if (throwable instanceof CosmosException) {
throw (CosmosException) throwable;
} else {
throw ex;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
Expand Down Expand Up @@ -668,6 +669,14 @@ Flux<FeedResponse<Document>> queryDocumentChangeFeed(String collectionLink,
*/
Flux<FeedResponse<PartitionKeyRange>> readPartitionKeyRanges(String collectionLink, CosmosQueryRequestOptions options);

/**
* Gets the feed ranges of a container.
*
* @param collectionLink the link to the parent document collection.
* @return a {@link List} of @{link FeedRange} containing the feed ranges of a container.
*/
Mono<List<FeedRange>> getFeedRanges(String collectionLink);

/**
* Creates a stored procedure.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@

import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList;
import com.azure.cosmos.implementation.directconnectivity.WebExceptionUtility;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.ThrottlingRetryOptions;
import io.netty.handler.timeout.ReadTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ public static final class Properties {
public static final String KeyWrapMetadataValue = "value";
public static final String EncryptedInfo = "_ei";

// Feed Ranges
public static final String RANGE = "Range";
public static final String FEED_RANGE_PARTITION_KEY = "PartitionKey";
public static final String FEED_RANGE_PARTITION_KEY_RANGE_ID = "PKRangeId";

}

public static final class UrlEncodingInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ public static class Versions {
public static class StatusCodes {
public static final int OK = 200;
public static final int NOT_MODIFIED = 304;
// Success
public static final int MINIMUM_SUCCESS_STATUSCODE = 200;
public static final int MAXIMUM_SUCCESS_STATUSCODE = 299;
// Client error
public static final int MINIMUM_STATUSCODE_AS_ERROR_GATEWAY = 400;
public static final int BADREQUEST = 400;
Expand Down Expand Up @@ -323,5 +326,6 @@ public static class SubStatusCodes {
public static class HeaderValues {
public static final String NO_CACHE = "no-cache";
public static final String PREFER_RETURN_MINIMAL = "return=minimal";
public static final String IF_NONE_MATCH_ALL = "*";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,62 +43,4 @@ public interface IRetryPolicy {
void addStatusAndSubStatusCode(Integer index, int statusCode, int subStatusCode);

List<int[]> getStatusAndSubStatusCodes();

class ShouldRetryResult {
/// <summary>
/// How long to wait before next retry. 0 indicates retry immediately.
/// </summary>
public final Duration backOffTime;
public final Exception exception;
public boolean shouldRetry;
public final Quadruple<Boolean, Boolean, Duration, Integer> policyArg;

private ShouldRetryResult(Duration dur, Exception e, boolean shouldRetry,
Quadruple<Boolean, Boolean, Duration, Integer> policyArg) {
this.backOffTime = dur;
this.exception = e;
this.shouldRetry = shouldRetry;
this.policyArg = policyArg;
}

public static ShouldRetryResult retryAfter(Duration dur) {
Utils.checkNotNullOrThrow(dur, "duration", "cannot be null");
return new ShouldRetryResult(dur, null, true, null);
}

public static ShouldRetryResult retryAfter(Duration dur,
Quadruple<Boolean, Boolean, Duration, Integer> policyArg) {
Utils.checkNotNullOrThrow(dur, "duration", "cannot be null");
return new ShouldRetryResult(dur, null, true, policyArg);
}

public static ShouldRetryResult error(Exception e) {
Utils.checkNotNullOrThrow(e, "exception", "cannot be null");
return new ShouldRetryResult(null, e, false, null);
}

public static ShouldRetryResult noRetry() {
return new ShouldRetryResult(null, null, false, null);
}

public static ShouldRetryResult noRetry(Quadruple<Boolean, Boolean, Duration, Integer> policyArg) {
return new ShouldRetryResult(
null,
null,
false,
policyArg);
}

public void throwIfDoneTrying(Exception capturedException) throws Exception {
if (this.shouldRetry) {
return;
}

if (this.exception == null) {
throw capturedException;
} else {
throw this.exception;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,18 @@ public String getMinInclusive() {
return super.getString("minInclusive");
}

public void setMinInclusive(String minInclusive) {
public PartitionKeyRange setMinInclusive(String minInclusive) {
BridgeInternal.setProperty(this, "minInclusive", minInclusive);
return this;
}

public String getMaxExclusive() {
return super.getString("maxExclusive");
}

public void setMaxExclusive(String maxExclusive) {
public PartitionKeyRange setMaxExclusive(String maxExclusive) {
BridgeInternal.setProperty(this, "maxExclusive", maxExclusive);
return this;
}

public Range<String> toRange() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ static Function<Flux<Throwable>, Flux<Long>> toRetryWhenFunc(IRetryPolicy policy
return Flux.error(t);
}
policy.captureStartTimeIfNotSet();
Flux<IRetryPolicy.ShouldRetryResult> shouldRetryResultFlux = policy.shouldRetry(e).flux();
Flux<ShouldRetryResult> shouldRetryResultFlux = policy.shouldRetry(e).flux();
return shouldRetryResultFlux.flatMap(s -> {
CosmosException clientException = Utils.as(e, CosmosException.class);
if(clientException != null) {
Expand Down Expand Up @@ -76,7 +76,7 @@ public static <T> Function<Throwable, Mono<T>> toRetryWithAlternateFunc(Function
return Mono.error(throwable);
}
retryPolicy.captureStartTimeIfNotSet();
Mono<IRetryPolicy.ShouldRetryResult> shouldRetryResultFlux = retryPolicy.shouldRetry(e);
Mono<ShouldRetryResult> shouldRetryResultFlux = retryPolicy.shouldRetry(e);
return shouldRetryResultFlux.flatMap(shouldRetryResult -> {
CosmosException clientException = Utils.as(e, CosmosException.class);
if(clientException != null) {
Expand Down Expand Up @@ -140,7 +140,7 @@ private static <T> Mono<T> recursiveFunc(
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> callbackMethod,
IRetryPolicy retryPolicy,
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> inBackoffAlternateCallbackMethod,
IRetryPolicy.ShouldRetryResult shouldRetryResult,
ShouldRetryResult shouldRetryResult,
Duration minBackoffForInBackoffCallback,
RxDocumentServiceRequest rxDocumentServiceRequest,
AddressSelector addressSelector) {
Expand All @@ -153,7 +153,7 @@ private static <T> Function<Throwable, Mono<T>> recursiveWithAlternateFunc(
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> callbackMethod,
IRetryPolicy retryPolicy,
Function<Quadruple<Boolean, Boolean, Duration, Integer>, Mono<T>> inBackoffAlternateCallbackMethod,
IRetryPolicy.ShouldRetryResult shouldRetryResult,
ShouldRetryResult shouldRetryResult,
StopWatch stopwatch,
Duration minBackoffForInBackoffCallback,
RxDocumentServiceRequest rxDocumentServiceRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList;
import com.azure.cosmos.implementation.batch.BatchResponseParser;
import com.azure.cosmos.implementation.batch.ServerBatchRequest;
import com.azure.cosmos.implementation.batch.SinglePartitionKeyServerBatchRequest;
Expand All @@ -26,6 +27,7 @@
import com.azure.cosmos.implementation.directconnectivity.ServerStoreModel;
import com.azure.cosmos.implementation.directconnectivity.StoreClient;
import com.azure.cosmos.implementation.directconnectivity.StoreClientFactory;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.implementation.http.HttpClient;
import com.azure.cosmos.implementation.http.HttpClientConfig;
import com.azure.cosmos.implementation.http.HttpHeaders;
Expand All @@ -40,8 +42,10 @@
import com.azure.cosmos.implementation.routing.PartitionKeyAndResourceTokenPair;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
Expand Down Expand Up @@ -94,6 +98,9 @@
public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorizationTokenProvider, CpuListener, DiagnosticsClientContext {
private static final AtomicInteger activeClientsCnt = new AtomicInteger(0);
private static final AtomicInteger clientIdGenerator = new AtomicInteger(0);
private static final Range<String> RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES = new Range<String>(
PartitionKeyInternalHelper.MinimumInclusiveEffectivePartitionKey,
PartitionKeyInternalHelper.MaximumExclusiveEffectivePartitionKey, true, false);

private static final String DUMMY_SQL_QUERY = "this is dummy and only used in creating " +
"ParallelDocumentQueryExecutioncontext, but not used";
Expand Down Expand Up @@ -3634,4 +3641,56 @@ private static SqlQuerySpec createLogicalPartitionScanQuerySpec(

return new SqlQuerySpec(queryStringBuilder.toString(), parameters);
}

@Override
public Mono<List<FeedRange>> getFeedRanges(String collectionLink) {

if (StringUtils.isEmpty(collectionLink)) {
throw new IllegalArgumentException("collectionLink");
}

RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
this,
OperationType.Query,
ResourceType.Document,
collectionLink,
null); // This should not go to backend
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(null,
request);

return collectionObs.flatMap(documentCollectionResourceResponse -> {
final DocumentCollection collection = documentCollectionResourceResponse.v;
if (collection == null) {
throw new IllegalStateException("Collection cannot be null");
}

Mono<Utils.ValueHolder<List<PartitionKeyRange>>> valueHolderMono = partitionKeyRangeCache
.tryGetOverlappingRangesAsync(
BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics),
collection.getResourceId(), RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES, true, null);

return valueHolderMono.map(partitionKeyRangeListResponse -> {
return toFeedRanges(partitionKeyRangeListResponse);
});
});
}

private static List<FeedRange> toFeedRanges(
Utils.ValueHolder<List<PartitionKeyRange>> partitionKeyRangeListValueHolder) {
final List<PartitionKeyRange> partitionKeyRangeList = partitionKeyRangeListValueHolder.v;
if (partitionKeyRangeList == null) {
throw new IllegalStateException("PartitionKeyRange list cannot be null");
}

List<FeedRange> feedRanges = new ArrayList<FeedRange>(partitionKeyRangeList.size());
partitionKeyRangeList.forEach(pkRange -> {
feedRanges.add(toFeedRange(pkRange));
});

return feedRanges;
}

private static FeedRange toFeedRange(PartitionKeyRange pkRange) {
return new FeedRangePartitionKeyRangeImpl(pkRange.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,20 @@ public void dispose() {
this.isDisposed = true;
}

/**
* Gets the request properties.
*
* @return the request properties.
*/
public Map<String, Object> getPropertiesOrThrow() {
if (this.properties == null) {
throw new IllegalStateException(
"Only requests with properties (request options) can be used when using feed ranges");
}

return this.properties;
}

private static Map<String, Object> getProperties(Object options) {
if (options == null) {
return null;
Expand Down
Loading

0 comments on commit dca8bcb

Please sign in to comment.