Skip to content

Commit

Permalink
ChangeFeedRequestOptions Refactor (#1332)
Browse files Browse the repository at this point in the history
* made it so that it is clear that ifmatchetag and ifnonematchetag are not used

* added start from types

* added visitor

* added visitors

* wired throughout the codebase

* set explicit default

* fixed tests

* fixed tests

* resolved iteration comments

* merged

* fixed tests

* updated preview API

* more preview build errors

* more build issues

* grr

* fixed tests

* fixed build

* added feed range to feed options to remove duplicated state

* preview build

* fixed NRE

* preview build

* asdf

* fixed preview build errors

* need to investigate why there are test failures

* fixed request options tests

* asdf

* need to investigate in a clean brach

* need to figure out what is wrong with the continuation token

* fixed some tests

* fixed XML comments

* fixed infinite loop

* need to investigate some changes

* fixed one test

* fixed continuation token bug

* added clone method

* fixed continuation token bug in standbyfeediterator

* fixed another continuaiton token bug

* need to remove because of start time

* fixed test that needed to start from the begining

* need to investigate this on master

* updated continuation token

* fixed diagnostics scope
  • Loading branch information
bchong95 authored and kirankumarkolli committed Jul 11, 2020
1 parent 58e2ebb commit 2a8020f
Show file tree
Hide file tree
Showing 44 changed files with 1,081 additions and 717 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,18 @@ public PartitionSupervisorFactoryCore(
FeedProcessorFactory<T> partitionProcessorFactory,
ChangeFeedLeaseOptions options)
{
if (observerFactory == null) throw new ArgumentNullException(nameof(observerFactory));
if (leaseManager == null) throw new ArgumentNullException(nameof(leaseManager));
if (options == null) throw new ArgumentNullException(nameof(options));
if (partitionProcessorFactory == null) throw new ArgumentNullException(nameof(partitionProcessorFactory));

this.observerFactory = observerFactory;
this.leaseManager = leaseManager;
this.changeFeedLeaseOptions = options;
this.partitionProcessorFactory = partitionProcessorFactory;
this.observerFactory = observerFactory ?? throw new ArgumentNullException(nameof(observerFactory));
this.leaseManager = leaseManager ?? throw new ArgumentNullException(nameof(leaseManager));
this.changeFeedLeaseOptions = options ?? throw new ArgumentNullException(nameof(options));
this.partitionProcessorFactory = partitionProcessorFactory ?? throw new ArgumentNullException(nameof(partitionProcessorFactory));
}

public override PartitionSupervisor Create(DocumentServiceLease lease)
{
if (lease == null)
{
throw new ArgumentNullException(nameof(lease));
}

ChangeFeedObserver<T> changeFeedObserver = this.observerFactory.CreateObserver();
FeedProcessor processor = this.partitionProcessorFactory.Create(lease, changeFeedObserver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,32 @@ public static ChangeFeedPartitionKeyResultSetIteratorCore BuildResultSetIterator
DateTime? startTime,
bool startFromBeginning)
{
ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions();
if (startTime.HasValue)
ChangeFeedRequestOptions.StartFrom startFrom;
if (continuationToken != null)
{
requestOptions.StartTime = startTime.Value;
startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuationToken);
}
else if (startTime.HasValue)
{
startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromTime(startTime.Value);
}
else if (startFromBeginning)
{
requestOptions.StartTime = ChangeFeedRequestOptions.DateTimeStartFromBeginning;
startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning();
}
else
{
startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromNow();
}

ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions()
{
MaxItemCount = maxItemCount,
FeedRange = new FeedRangePartitionKeyRange(partitionKeyRangeId),
From = startFrom,
};

return new ChangeFeedPartitionKeyResultSetIteratorCore(
partitionKeyRangeId: partitionKeyRangeId,
continuationToken: continuationToken,
maxItemCount: maxItemCount,
clientContext: container.ClientContext,
container: container,
options: requestOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ private FeedRangeCompositeContinuation(
this.CompositeContinuationTokens = new Queue<CompositeContinuationToken>();
}

public override void Accept(
FeedRangeVisitor visitor,
Action<RequestMessage, string> fillContinuation)
{
visitor.Visit(this, fillContinuation);
}

public FeedRangeCompositeContinuation(
string containerRid,
FeedRangeInternal feedRange,
Expand Down Expand Up @@ -100,6 +93,21 @@ public FeedRangeCompositeContinuation(

public override string GetContinuation() => this.CurrentToken?.Token;

public override FeedRange GetFeedRange()
{
if (this.FeedRange is FeedRangePartitionKeyRange)
{
return this.FeedRange;
}

if (this.CurrentToken != null)
{
return new FeedRangeEPK(this.CurrentToken.Range);
}

return null;
}

public override string ToString()
{
return JsonConvert.SerializeObject(this);
Expand Down Expand Up @@ -228,7 +236,7 @@ private static bool TryParseAsCompositeContinuationToken(
{
return false;
}
}
}

private static CompositeContinuationToken CreateCompositeContinuationTokenForRange(
string minInclusive,
Expand All @@ -252,7 +260,7 @@ private void MoveToNextToken()
// Consider current range done, if this FeedToken contains multiple ranges due to splits, all of them need to be considered done
this.CompositeContinuationTokens.Enqueue(recentToken);
}

this.CurrentToken = this.CompositeContinuationTokens.Count > 0 ? this.CompositeContinuationTokens.Peek() : null;
}

Expand Down Expand Up @@ -316,5 +324,7 @@ private void CreateChildRanges(IReadOnlyList<Documents.PartitionKeyRange> keyRan

return keyRanges;
}

public override void Accept(IFeedRangeContinuationVisitor visitor) => visitor.Visit(this);
}
}
8 changes: 4 additions & 4 deletions Microsoft.Azure.Cosmos/src/FeedRange/FeedRangeContinuation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ public FeedRangeContinuation(
this.ContainerRid = containerRid;
}

public abstract void Accept(
FeedRangeVisitor visitor,
Action<RequestMessage, string> fillContinuation);

public abstract string GetContinuation();

public abstract FeedRange GetFeedRange();

public abstract void ReplaceContinuation(string continuationToken);

public abstract bool IsDone { get; }
Expand All @@ -64,5 +62,7 @@ public static bool TryParse(
ContainerInternal containerCore,
ResponseMessage responseMessage,
CancellationToken cancellationToken);

public abstract void Accept(IFeedRangeContinuationVisitor visitor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;

/// <summary>
/// Visitor to populate RequestMessage headers and properties based on FeedRange.
/// </summary>
internal sealed class FeedRangeContinuationRequestMessagePopulatorVisitor : IFeedRangeContinuationVisitor
{
private readonly RequestMessage request;
private readonly Action<RequestMessage, string> fillContinuation;

public FeedRangeContinuationRequestMessagePopulatorVisitor(RequestMessage request, Action<RequestMessage, string> fillContinuation)
{
this.request = request ?? throw new ArgumentNullException(nameof(request));
this.fillContinuation = fillContinuation ?? throw new ArgumentNullException(nameof(fillContinuation));
}

public void Visit(FeedRangeCompositeContinuation feedRangeCompositeContinuation)
{
// In case EPK has already been set by compute
if (!this.request.Properties.ContainsKey(HandlerConstants.StartEpkString))
{
this.request.Properties[HandlerConstants.StartEpkString] = feedRangeCompositeContinuation.CurrentToken.Range.Min;
this.request.Properties[HandlerConstants.EndEpkString] = feedRangeCompositeContinuation.CurrentToken.Range.Max;
}

this.fillContinuation(this.request, feedRangeCompositeContinuation.GetContinuation());
}
}
}
37 changes: 19 additions & 18 deletions Microsoft.Azure.Cosmos/src/FeedRange/FeedRanges/FeedRangeEPK.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand All @@ -15,29 +16,28 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal sealed class FeedRangeEPK : FeedRangeInternal
{
public Documents.Routing.Range<string> Range { get; }

public static FeedRangeEPK ForFullRange()
{
return new FeedRangeEPK(new Documents.Routing.Range<string>(
Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey,
isMinInclusive: true,
isMaxInclusive: false));
}
public static readonly FeedRangeEPK FullRange = new FeedRangeEPK(new Documents.Routing.Range<string>(
Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey,
isMinInclusive: true,
isMaxInclusive: false));

public FeedRangeEPK(Documents.Routing.Range<string> range)
{
if (range == null)
{
throw new ArgumentNullException(nameof(range));
}

this.Range = range;
}

public Documents.Routing.Range<string> Range { get; }

public override Task<List<Documents.Routing.Range<string>>> GetEffectiveRangesAsync(
IRoutingMapProvider routingMapProvider,
string containerRid,
Documents.PartitionKeyDefinition partitionKeyDefinition)
{
return Task.FromResult(new List<Documents.Routing.Range<string>>() { this.Range });
}
Documents.PartitionKeyDefinition partitionKeyDefinition) => Task.FromResult(new List<Documents.Routing.Range<string>>() { this.Range });

public override async Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
IRoutingMapProvider routingMapProvider,
Expand All @@ -49,10 +49,11 @@ public override async Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
return partitionKeyRanges.Select(partitionKeyRange => partitionKeyRange.Id);
}

public override void Accept(FeedRangeVisitor visitor)
{
visitor.Visit(this);
}
public override void Accept(IFeedRangeVisitor visitor) => visitor.Visit(this);

public override Task<TResult> AcceptAsync<TResult>(
IFeedRangeAsyncVisitor<TResult> visitor,
CancellationToken cancellationToken = default) => visitor.VisitAsync(this, cancellationToken);

public override string ToString() => this.Range.ToString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
Documents.PartitionKeyDefinition partitionKeyDefinition,
CancellationToken cancellationToken);

public abstract void Accept(FeedRangeVisitor visitor);
public abstract void Accept(IFeedRangeVisitor visitor);

public abstract Task<TResult> AcceptAsync<TResult>(IFeedRangeAsyncVisitor<TResult> visitor, CancellationToken cancellationToken = default);

public abstract override string ToString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ public FeedRangePartitionKey(PartitionKey partitionKey)
public override Task<List<Documents.Routing.Range<string>>> GetEffectiveRangesAsync(
IRoutingMapProvider routingMapProvider,
string containerRid,
Documents.PartitionKeyDefinition partitionKeyDefinition)
{
return Task.FromResult(new List<Documents.Routing.Range<string>>
Documents.PartitionKeyDefinition partitionKeyDefinition) => Task.FromResult(
new List<Documents.Routing.Range<string>>
{
Documents.Routing.Range<string>.GetPointRange(
this.PartitionKey.InternalKey.GetEffectivePartitionKeyString(partitionKeyDefinition))
});
}

public override async Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
IRoutingMapProvider routingMapProvider,
Expand All @@ -44,10 +42,11 @@ public override async Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
return new List<string>() { range.Id };
}

public override void Accept(FeedRangeVisitor visitor)
{
visitor.Visit(this);
}
public override void Accept(IFeedRangeVisitor visitor) => visitor.Visit(this);

public override Task<TResult> AcceptAsync<TResult>(
IFeedRangeAsyncVisitor<TResult> visitor,
CancellationToken cancellationToken = default) => visitor.VisitAsync(this, cancellationToken);

public override string ToString() => this.PartitionKey.InternalKey.ToJsonString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal sealed class FeedRangePartitionKeyRange : FeedRangeInternal
{
public string PartitionKeyRangeId { get; }

public FeedRangePartitionKeyRange(string partitionKeyRangeId)
{
this.PartitionKeyRangeId = partitionKeyRangeId;
}

public string PartitionKeyRangeId { get; }

public override async Task<List<Documents.Routing.Range<string>>> GetEffectiveRangesAsync(
IRoutingMapProvider routingMapProvider,
string containerRid,
Expand Down Expand Up @@ -74,10 +74,11 @@ public override Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
return Task.FromResult(partitionKeyRanges);
}

public override void Accept(FeedRangeVisitor visitor)
{
visitor.Visit(this);
}
public override void Accept(IFeedRangeVisitor visitor) => visitor.Visit(this);

public override Task<TResult> AcceptAsync<TResult>(
IFeedRangeAsyncVisitor<TResult> visitor,
CancellationToken cancellationToken = default) => visitor.VisitAsync(this, cancellationToken);

public override string ToString() => this.PartitionKeyRangeId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;

internal sealed class FeedRangePartitionKeyRangeExtractor : IFeedRangeAsyncVisitor<IReadOnlyList<Documents.Routing.Range<string>>>
{
private readonly ContainerInternal container;

public FeedRangePartitionKeyRangeExtractor(ContainerInternal container)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
}

public async Task<IReadOnlyList<Documents.Routing.Range<string>>> VisitAsync(FeedRangePartitionKey feedRange, CancellationToken cancellationToken = default)
{
Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.container.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync();
PartitionKeyDefinition partitionKeyDefinition = await this.container.GetPartitionKeyDefinitionAsync(cancellationToken);
return await feedRange.GetEffectiveRangesAsync(
partitionKeyRangeCache,
await this.container.GetRIDAsync(cancellationToken),
partitionKeyDefinition);
}

public async Task<IReadOnlyList<Documents.Routing.Range<string>>> VisitAsync(FeedRangePartitionKeyRange feedRange, CancellationToken cancellationToken = default)
{
// Migration from PKRangeId scenario
Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.container.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync();
return await feedRange.GetEffectiveRangesAsync(
routingMapProvider: partitionKeyRangeCache,
containerRid: await this.container.GetRIDAsync(cancellationToken),
partitionKeyDefinition: null);
}

public async Task<IReadOnlyList<Documents.Routing.Range<string>>> VisitAsync(FeedRangeEPK feedRange, CancellationToken cancellationToken = default)
{
Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.container.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync();
IReadOnlyList<PartitionKeyRange> pkRanges = await partitionKeyRangeCache.TryGetOverlappingRangesAsync(
collectionRid: await this.container.GetRIDAsync(cancellationToken),
range: feedRange.Range,
forceRefresh: false);
return pkRanges.Select(pkRange => pkRange.ToRange()).ToList();
}
}
}
Loading

0 comments on commit 2a8020f

Please sign in to comment.