diff --git a/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedIteratorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs similarity index 84% rename from Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedIteratorCore.cs rename to Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs index d8eb88a268..3c4d475080 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedIteratorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs @@ -2,11 +2,10 @@ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos +namespace Microsoft.Azure.Cosmos.ChangeFeed { using System; using System.Collections.Generic; - using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -26,12 +25,14 @@ internal sealed class ChangeFeedIteratorCore : FeedIteratorInternal private readonly CosmosClientContext clientContext; private readonly ChangeFeedRequestOptions changeFeedOptions; private readonly AsyncLazy> lazyContainerRid; + private ChangeFeedStartFrom changeFeedStartFrom; private bool hasMoreResults; private FeedRangeContinuation FeedRangeContinuation; public ChangeFeedIteratorCore( ContainerInternal container, + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions changeFeedRequestOptions) { this.container = container ?? throw new ArgumentNullException(nameof(container)); @@ -43,7 +44,8 @@ public ChangeFeedIteratorCore( }); this.hasMoreResults = true; - if (this.changeFeedOptions?.From is ChangeFeedRequestOptions.StartFromContinuation startFromContinuation) + this.changeFeedStartFrom = changeFeedStartFrom; + if (this.changeFeedStartFrom is ChangeFeedStartFromContinuation startFromContinuation) { if (!FeedRangeContinuation.TryParse(startFromContinuation.Continuation, out FeedRangeContinuation feedRangeContinuation)) { @@ -51,16 +53,10 @@ public ChangeFeedIteratorCore( } this.FeedRangeContinuation = feedRangeContinuation; - this.changeFeedOptions.FeedRange = feedRangeContinuation.GetFeedRange(); - string continuationToken = feedRangeContinuation.GetContinuation(); - if (continuationToken != null) - { - this.changeFeedOptions.From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuationToken); - } - else - { - this.changeFeedOptions.From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(); - } + FeedRange feedRange = feedRangeContinuation.GetFeedRange(); + string etag = feedRangeContinuation.GetContinuation(); + + this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(etag, (FeedRangeInternal)feedRange); } } @@ -76,7 +72,9 @@ public override async Task ReadNextAsync(CancellationToken canc CosmosDiagnosticsContext diagnostics = CosmosDiagnosticsContext.Create(this.changeFeedOptions); using (diagnostics.GetOverallScope()) { - diagnostics.AddDiagnosticsInternal(new FeedRangeStatistics(this.changeFeedOptions.FeedRange)); + diagnostics.AddDiagnosticsInternal( + new FeedRangeStatistics( + this.changeFeedStartFrom.Accept(ChangeFeedRangeExtractor.Singleton))); if (!this.lazyContainerRid.ValueInitialized) { using (diagnostics.CreateScope("InitializeContainerResourceId")) @@ -130,26 +128,17 @@ private async Task ReadNextInternalAsync( { cancellationToken.ThrowIfCancellationRequested(); - string continuation = this.FeedRangeContinuation.GetContinuation(); - if (continuation != null) - { - this.changeFeedOptions.From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuation); - } - - if ((this.changeFeedOptions.FeedRange == null) || this.changeFeedOptions.FeedRange is FeedRangeEpk) - { - // For now the backend does not support EPK Ranges if they don't line up with a PKRangeId - // So if the range the user supplied is a logical pk value, then we don't want to overwrite it. - this.changeFeedOptions.FeedRange = this.FeedRangeContinuation.GetFeedRange(); - } - ResponseMessage responseMessage = await this.clientContext.ProcessResourceOperationStreamAsync( resourceUri: this.container.LinkUri, resourceType: ResourceType.Document, operationType: OperationType.ReadFeed, requestOptions: this.changeFeedOptions, cosmosContainerCore: this.container, - requestEnricher: default, + requestEnricher: (request) => + { + ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(request); + this.changeFeedStartFrom.Accept(visitor); + }, partitionKey: default, streamPayload: default, diagnosticsContext: diagnosticsScope, @@ -157,6 +146,10 @@ private async Task ReadNextInternalAsync( if (await this.ShouldRetryAsync(responseMessage, cancellationToken)) { + string etag = this.FeedRangeContinuation.GetContinuation(); + FeedRange feedRange = this.FeedRangeContinuation.GetFeedRange(); + this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(etag, (FeedRangeInternal)feedRange); + return await this.ReadNextInternalAsync(diagnosticsScope, cancellationToken); } @@ -166,6 +159,11 @@ private async Task ReadNextInternalAsync( // Change Feed read uses Etag for continuation this.hasMoreResults = responseMessage.IsSuccessStatusCode; this.FeedRangeContinuation.ReplaceContinuation(responseMessage.Headers.ETag); + + string etag = this.FeedRangeContinuation.GetContinuation(); + FeedRange feedRange = this.FeedRangeContinuation.GetFeedRange(); + this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(etag, (FeedRangeInternal)feedRange); + return FeedRangeResponse.CreateSuccess( responseMessage, this.FeedRangeContinuation); @@ -216,13 +214,14 @@ private async Task InitializeFeedContinuationAsync(CancellationToken cancellatio { FeedRangePartitionKeyRangeExtractor feedRangePartitionKeyRangeExtractor = new FeedRangePartitionKeyRangeExtractor(this.container); - IReadOnlyList> ranges = await ((FeedRangeInternal)this.changeFeedOptions.FeedRange).AcceptAsync( + FeedRange feedRange = this.changeFeedStartFrom.Accept(ChangeFeedRangeExtractor.Singleton); + IReadOnlyList> ranges = await ((FeedRangeInternal)feedRange).AcceptAsync( feedRangePartitionKeyRangeExtractor, cancellationToken); this.FeedRangeContinuation = new FeedRangeCompositeContinuation( containerRid: this.lazyContainerRid.Result.Result, - feedRange: (FeedRangeInternal)this.changeFeedOptions.FeedRange, + feedRange: (FeedRangeInternal)feedRange, ranges: ranges); } else if (this.FeedRangeContinuation?.FeedRange is FeedRangePartitionKeyRange feedRangePartitionKeyRange) diff --git a/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedPartitionKeyResultSetIteratorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedPartitionKeyResultSetIteratorCore.cs similarity index 73% rename from Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedPartitionKeyResultSetIteratorCore.cs rename to Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedPartitionKeyResultSetIteratorCore.cs index 8edcf583e4..01a40e292e 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedPartitionKeyResultSetIteratorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedPartitionKeyResultSetIteratorCore.cs @@ -2,7 +2,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos +namespace Microsoft.Azure.Cosmos.ChangeFeed { using System; using System.Net; @@ -18,16 +18,18 @@ internal sealed class ChangeFeedPartitionKeyResultSetIteratorCore : FeedIterator private readonly CosmosClientContext clientContext; private readonly ContainerInternal container; private readonly ChangeFeedRequestOptions changeFeedOptions; + private ChangeFeedStartFrom changeFeedStartFrom; private bool hasMoreResultsInternal; - private string continuationToken; public ChangeFeedPartitionKeyResultSetIteratorCore( CosmosClientContext clientContext, ContainerInternal container, + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions options) { this.clientContext = clientContext ?? throw new ArgumentNullException(nameof(clientContext)); this.container = container ?? throw new ArgumentNullException(nameof(container)); + this.changeFeedStartFrom = changeFeedStartFrom; this.changeFeedOptions = options; } @@ -42,27 +44,29 @@ public ChangeFeedPartitionKeyResultSetIteratorCore( /// A change feed response from cosmos service public override async Task ReadNextAsync(CancellationToken cancellationToken = default(CancellationToken)) { - // Change Feed read uses Etag for continuation - if (this.continuationToken != null) - { - this.changeFeedOptions.From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(this.continuationToken); - } - ResponseMessage responseMessage = await this.clientContext.ProcessResourceOperationStreamAsync( cosmosContainerCore: this.container, resourceUri: this.container.LinkUri, resourceType: Documents.ResourceType.Document, operationType: Documents.OperationType.ReadFeed, requestOptions: this.changeFeedOptions, - requestEnricher: default, + requestEnricher: (requestMessage) => + { + ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(requestMessage); + this.changeFeedStartFrom.Accept(visitor); + }, partitionKey: default, streamPayload: default, diagnosticsContext: default, cancellationToken: cancellationToken); - this.continuationToken = responseMessage.Headers.ETag; + // Change Feed uses etag as continuation token. + string etag = responseMessage.Headers.ETag; this.hasMoreResultsInternal = responseMessage.IsSuccessStatusCode; - responseMessage.Headers.ContinuationToken = this.continuationToken; + responseMessage.Headers.ContinuationToken = etag; + + FeedRangeInternal feedRange = (FeedRangeInternal)this.changeFeedStartFrom.Accept(ChangeFeedRangeExtractor.Singleton); + this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(etag, feedRange); return responseMessage; } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedRangeExtractor.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedRangeExtractor.cs new file mode 100644 index 0000000000..79d674d245 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedRangeExtractor.cs @@ -0,0 +1,28 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.ChangeFeed +{ + using System; + + internal sealed class ChangeFeedRangeExtractor : ChangeFeedStartFromVisitor + { + public static readonly ChangeFeedRangeExtractor Singleton = new ChangeFeedRangeExtractor(); + + private ChangeFeedRangeExtractor() + { + } + + public override FeedRange Visit(ChangeFeedStartFromNow startFromNow) => startFromNow.FeedRange; + + public override FeedRange Visit(ChangeFeedStartFromTime startFromTime) => startFromTime.FeedRange; + + public override FeedRange Visit(ChangeFeedStartFromContinuation startFromContinuation) + => throw new NotSupportedException($"{nameof(ChangeFeedStartFromContinuation)} does not have a feed range."); + + public override FeedRange Visit(ChangeFeedStartFromBeginning startFromBeginning) => startFromBeginning.FeedRange; + + public override FeedRange Visit(ChangeFeedStartFromContinuationAndFeedRange startFromContinuationAndFeedRange) => startFromContinuationAndFeedRange.FeedRange; + } +} diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFrom.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFrom.cs new file mode 100644 index 0000000000..8de9024bc5 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFrom.cs @@ -0,0 +1,105 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using Microsoft.Azure.Cosmos.ChangeFeed; + + /// + /// Base class for where to start a ChangeFeed operation in . + /// + /// Use one of the static constructors to generate a StartFrom option. +#if PREVIEW + public +#else + internal +#endif + abstract class ChangeFeedStartFrom + { + /// + /// Initializes an instance of the class. + /// + internal ChangeFeedStartFrom() + { + // Internal so people can't derive from this type. + } + + internal abstract void Accept(ChangeFeedStartFromVisitor visitor); + + internal abstract TResult Accept(ChangeFeedStartFromVisitor visitor); + + /// + /// Creates a that tells the ChangeFeed operation to start reading changes from this moment onward. + /// + /// A that tells the ChangeFeed operation to start reading changes from this moment onward. + public static ChangeFeedStartFrom Now() => Now(FeedRangeEpk.FullRange); + + /// + /// Creates a that tells the ChangeFeed operation to start reading changes from this moment onward. + /// + /// The range to start from. + /// A that tells the ChangeFeed operation to start reading changes from this moment onward. + public static ChangeFeedStartFrom Now(FeedRange feedRange) + { + if (!(feedRange is FeedRangeInternal feedRangeInternal)) + { + throw new ArgumentException($"{nameof(feedRange)} needs to be a {nameof(FeedRangeInternal)}."); + } + + return new ChangeFeedStartFromNow(feedRangeInternal); + } + + /// + /// Creates a that tells the ChangeFeed operation to start reading changes from some point in time onward. + /// + /// The time (in UTC) to start reading from. + /// A that tells the ChangeFeed operation to start reading changes from some point in time onward. + public static ChangeFeedStartFrom Time(DateTime dateTimeUtc) => Time(dateTimeUtc, FeedRangeEpk.FullRange); + + /// + /// Creates a that tells the ChangeFeed operation to start reading changes from some point in time onward. + /// + /// The time to start reading from. + /// The range to start from. + /// A that tells the ChangeFeed operation to start reading changes from some point in time onward. + public static ChangeFeedStartFrom Time(DateTime dateTimeUtc, FeedRange feedRange) + { + if (!(feedRange is FeedRangeInternal feedRangeInternal)) + { + throw new ArgumentException($"{nameof(feedRange)} needs to be a {nameof(FeedRangeInternal)}."); + } + + return new ChangeFeedStartFromTime(dateTimeUtc, feedRangeInternal); + } + + /// + /// Creates a that tells the ChangeFeed operation to start reading changes from a save point. + /// + /// The continuation to resume from. + /// A that tells the ChangeFeed operation to start reading changes from a save point. + public static ChangeFeedStartFrom ContinuationToken(string continuationToken) => new ChangeFeedStartFromContinuation(continuationToken); + + /// + /// Creates a that tells the ChangeFeed operation to start from the beginning of time. + /// + /// A that tells the ChangeFeed operation to start reading changes from the beginning of time. + public static ChangeFeedStartFrom Beginning() => Beginning(FeedRangeEpk.FullRange); + + /// + /// Creates a that tells the ChangeFeed operation to start from the beginning of time. + /// + /// The range to start from. + /// A that tells the ChangeFeed operation to start reading changes from the beginning of time. + public static ChangeFeedStartFrom Beginning(FeedRange feedRange) + { + if (!(feedRange is FeedRangeInternal feedRangeInternal)) + { + throw new ArgumentException($"{nameof(feedRange)} needs to be a {nameof(FeedRangeInternal)}."); + } + + return new ChangeFeedStartFromBeginning(feedRangeInternal); + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromBeginning.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromBeginning.cs new file mode 100644 index 0000000000..643a122a08 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromBeginning.cs @@ -0,0 +1,33 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.ChangeFeed +{ + using System; + + /// + /// Derived instance of that tells the ChangeFeed operation to start reading changes from the beginning of time. + /// + internal sealed class ChangeFeedStartFromBeginning : ChangeFeedStartFrom + { + /// + /// Initializes an instance of the class. + /// + /// The (optional) range to start from. + public ChangeFeedStartFromBeginning(FeedRangeInternal feedRange) + : base() + { + this.FeedRange = feedRange ?? throw new ArgumentNullException(nameof(feedRange)); + } + + /// + /// Gets the (optional) range to start from. + /// + public FeedRangeInternal FeedRange { get; } + + internal override void Accept(ChangeFeedStartFromVisitor visitor) => visitor.Visit(this); + + internal override TResult Accept(ChangeFeedStartFromVisitor visitor) => visitor.Visit(this); + } +} diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromContinuation.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromContinuation.cs new file mode 100644 index 0000000000..feabbad1ba --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromContinuation.cs @@ -0,0 +1,39 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.ChangeFeed +{ + using System; + + /// + /// Derived instance of that tells the ChangeFeed operation to start reading changes from a save point. + /// + /// This class is used to temporarily store the fully serialized composite continuation token and needs to transformed into a . + internal sealed class ChangeFeedStartFromContinuation : ChangeFeedStartFrom + { + /// + /// Initializes an instance of the class. + /// + /// The continuation to resume from. + public ChangeFeedStartFromContinuation(string continuation) + : base() + { + if (string.IsNullOrWhiteSpace(continuation)) + { + throw new ArgumentOutOfRangeException($"{nameof(continuation)} must not be null, empty, or whitespace."); + } + + this.Continuation = continuation; + } + + /// + /// Gets the continuation to resume from. + /// + public string Continuation { get; } + + internal override void Accept(ChangeFeedStartFromVisitor visitor) => visitor.Visit(this); + + internal override TResult Accept(ChangeFeedStartFromVisitor visitor) => visitor.Visit(this); + } +} diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromContinuationAndFeedRange.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromContinuationAndFeedRange.cs new file mode 100644 index 0000000000..c07bcc270f --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromContinuationAndFeedRange.cs @@ -0,0 +1,28 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.ChangeFeed +{ + using System; + + /// + /// Derived instance of that tells the ChangeFeed operation to start reading from an LSN for a particular feed range. + /// + internal sealed class ChangeFeedStartFromContinuationAndFeedRange : ChangeFeedStartFrom + { + public ChangeFeedStartFromContinuationAndFeedRange(string etag, FeedRangeInternal feedRange) + { + this.Etag = etag; + this.FeedRange = feedRange ?? throw new ArgumentNullException(nameof(feedRange)); + } + + public string Etag { get; } + + public FeedRangeInternal FeedRange { get; } + + internal override void Accept(ChangeFeedStartFromVisitor visitor) => visitor.Visit(this); + + internal override TResult Accept(ChangeFeedStartFromVisitor visitor) => visitor.Visit(this); + } +} diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromNow.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromNow.cs new file mode 100644 index 0000000000..598ac1cae3 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromNow.cs @@ -0,0 +1,33 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.ChangeFeed +{ + using System; + + /// + /// Derived instance of that tells the ChangeFeed operation to start reading changes from this moment onward. + /// + internal sealed class ChangeFeedStartFromNow : ChangeFeedStartFrom + { + /// + /// Intializes an instance of the class. + /// + /// The (optional) feed range to start from. + public ChangeFeedStartFromNow(FeedRangeInternal feedRange) + : base() + { + this.FeedRange = feedRange ?? throw new ArgumentNullException(nameof(feedRange)); + } + + /// + /// Gets the (optional) range to start from. + /// + public FeedRangeInternal FeedRange { get; } + + internal override void Accept(ChangeFeedStartFromVisitor visitor) => visitor.Visit(this); + + internal override TResult Accept(ChangeFeedStartFromVisitor visitor) => visitor.Visit(this); + } +} diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromRequestOptionPopulator.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromRequestOptionPopulator.cs new file mode 100644 index 0000000000..b090666d2d --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromRequestOptionPopulator.cs @@ -0,0 +1,77 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.ChangeFeed +{ + using System; + using System.Globalization; + using Microsoft.Azure.Documents; + + internal sealed class ChangeFeedStartFromRequestOptionPopulator : ChangeFeedStartFromVisitor + { + private const string IfNoneMatchAllHeaderValue = "*"; + private static readonly DateTime StartFromBeginningTime = DateTime.MinValue.ToUniversalTime(); + + private readonly RequestMessage requestMessage; + private readonly FeedRangeRequestMessagePopulatorVisitor feedRangeVisitor; + + public ChangeFeedStartFromRequestOptionPopulator(RequestMessage requestMessage) + { + this.requestMessage = requestMessage ?? throw new ArgumentNullException(nameof(requestMessage)); + this.feedRangeVisitor = new FeedRangeRequestMessagePopulatorVisitor(requestMessage); + } + + public override void Visit(ChangeFeedStartFromNow startFromNow) + { + this.requestMessage.Headers.IfNoneMatch = ChangeFeedStartFromRequestOptionPopulator.IfNoneMatchAllHeaderValue; + + if (startFromNow.FeedRange != null) + { + startFromNow.FeedRange.Accept(this.feedRangeVisitor); + } + } + + public override void Visit(ChangeFeedStartFromTime startFromTime) + { + // Our current public contract for ChangeFeedProcessor uses DateTime.MinValue.ToUniversalTime as beginning. + // We need to add a special case here, otherwise it would send it as normal StartTime. + // The problem is Multi master accounts do not support StartTime header on ReadFeed, and thus, + // it would break multi master Change Feed Processor users using Start From Beginning semantics. + // It's also an optimization, since the backend won't have to binary search for the value. + if (startFromTime.StartTime != ChangeFeedStartFromRequestOptionPopulator.StartFromBeginningTime) + { + this.requestMessage.Headers.Add( + HttpConstants.HttpHeaders.IfModifiedSince, + startFromTime.StartTime.ToString("r", CultureInfo.InvariantCulture)); + } + + startFromTime.FeedRange.Accept(this.feedRangeVisitor); + } + + public override void Visit(ChangeFeedStartFromContinuation startFromContinuation) + { + // On REST level, change feed is using IfNoneMatch/ETag instead of continuation + this.requestMessage.Headers.IfNoneMatch = startFromContinuation.Continuation; + } + + public override void Visit(ChangeFeedStartFromBeginning startFromBeginning) + { + // We don't need to set any headers to start from the beginning + + // Except for the feed range. + startFromBeginning.FeedRange.Accept(this.feedRangeVisitor); + } + + public override void Visit(ChangeFeedStartFromContinuationAndFeedRange startFromContinuationAndFeedRange) + { + // On REST level, change feed is using IfNoneMatch/ETag instead of continuation + if (startFromContinuationAndFeedRange.Etag != null) + { + this.requestMessage.Headers.IfNoneMatch = startFromContinuationAndFeedRange.Etag; + } + + startFromContinuationAndFeedRange.FeedRange.Accept(this.feedRangeVisitor); + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromTime.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromTime.cs new file mode 100644 index 0000000000..26aeb17714 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromTime.cs @@ -0,0 +1,45 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.ChangeFeed +{ + using System; + + /// + /// Derived instance of that tells the ChangeFeed operation to start reading changes from some point in time onward. + /// + internal sealed class ChangeFeedStartFromTime : ChangeFeedStartFrom + { + /// + /// Initializes an instance of the class. + /// + /// The time to start reading from. + /// The (optional) range to start from. + public ChangeFeedStartFromTime(DateTime time, FeedRangeInternal feedRange) + : base() + { + if (time.Kind != DateTimeKind.Utc) + { + throw new ArgumentOutOfRangeException($"{nameof(time)}.{nameof(DateTime.Kind)} must be {nameof(DateTimeKind)}.{nameof(DateTimeKind.Utc)}"); + } + + this.StartTime = time; + this.FeedRange = feedRange ?? throw new ArgumentNullException(nameof(feedRange)); + } + + /// + /// Gets the time the ChangeFeed operation should start reading from. + /// + public DateTime StartTime { get; } + + /// + /// Gets the (optional) range to start from. + /// + public FeedRangeInternal FeedRange { get; } + + internal override void Accept(ChangeFeedStartFromVisitor visitor) => visitor.Visit(this); + + internal override TResult Accept(ChangeFeedStartFromVisitor visitor) => visitor.Visit(this); + } +} diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromVisitor.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromVisitor.cs new file mode 100644 index 0000000000..960a6a6745 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromVisitor.cs @@ -0,0 +1,15 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.ChangeFeed +{ + internal abstract class ChangeFeedStartFromVisitor + { + public abstract void Visit(ChangeFeedStartFromNow startFromNow); + public abstract void Visit(ChangeFeedStartFromTime startFromTime); + public abstract void Visit(ChangeFeedStartFromContinuation startFromContinuation); + public abstract void Visit(ChangeFeedStartFromBeginning startFromBeginning); + public abstract void Visit(ChangeFeedStartFromContinuationAndFeedRange startFromContinuationAndFeedRange); + } +} diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromVisitor{TResult}.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromVisitor{TResult}.cs new file mode 100644 index 0000000000..fe52cf5d42 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFromVisitor{TResult}.cs @@ -0,0 +1,15 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.ChangeFeed +{ + internal abstract class ChangeFeedStartFromVisitor + { + public abstract TResult Visit(ChangeFeedStartFromNow startFromNow); + public abstract TResult Visit(ChangeFeedStartFromTime startFromTime); + public abstract TResult Visit(ChangeFeedStartFromContinuation startFromContinuation); + public abstract TResult Visit(ChangeFeedStartFromBeginning startFromBeginning); + public abstract TResult Visit(ChangeFeedStartFromContinuationAndFeedRange startFromContinuationAndFeedRange); + } +} diff --git a/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/StandByFeedContinuationToken.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/StandByFeedContinuationToken.cs similarity index 99% rename from Microsoft.Azure.Cosmos/src/Resource/QueryResponses/StandByFeedContinuationToken.cs rename to Microsoft.Azure.Cosmos/src/ChangeFeed/StandByFeedContinuationToken.cs index afec119dac..07ddc8cc81 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/StandByFeedContinuationToken.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/StandByFeedContinuationToken.cs @@ -2,7 +2,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos.Query +namespace Microsoft.Azure.Cosmos.ChangeFeed { using System; using System.Collections.Generic; diff --git a/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/StandByFeedIteratorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/StandByFeedIteratorCore.cs similarity index 86% rename from Microsoft.Azure.Cosmos/src/Resource/QueryResponses/StandByFeedIteratorCore.cs rename to Microsoft.Azure.Cosmos/src/ChangeFeed/StandByFeedIteratorCore.cs index 278c63bfeb..1c3b8fb969 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/StandByFeedIteratorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/StandByFeedIteratorCore.cs @@ -2,18 +2,15 @@ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos +namespace Microsoft.Azure.Cosmos.ChangeFeed { using System; using System.Net; - using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.Query; using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; using Microsoft.Azure.Cosmos.Routing; - using Microsoft.Azure.Documents.Routing; /// /// Cosmos Stand-By Feed iterator implementing Composite Continuation Token @@ -28,17 +25,20 @@ internal class StandByFeedIteratorCore : FeedIteratorInternal private readonly CosmosClientContext clientContext; private readonly ContainerInternal container; + private ChangeFeedStartFrom changeFeedStartFrom; private string containerRid; internal StandByFeedIteratorCore( CosmosClientContext clientContext, ContainerCore container, + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions options) { if (container == null) throw new ArgumentNullException(nameof(container)); this.clientContext = clientContext; this.container = container; + this.changeFeedStartFrom = changeFeedStartFrom ?? throw new ArgumentNullException(nameof(changeFeedStartFrom)); this.changeFeedOptions = options; } @@ -95,7 +95,7 @@ internal StandByFeedIteratorCore( PartitionKeyRangeCache pkRangeCache = await this.clientContext.DocumentClient.GetPartitionKeyRangeCacheAsync(); this.containerRid = await this.container.GetRIDAsync(cancellationToken); - if (this.changeFeedOptions?.From is ChangeFeedRequestOptions.StartFromContinuation startFromContinuation) + if (this.changeFeedStartFrom is ChangeFeedStartFromContinuation startFromContinuation) { this.compositeContinuationToken = await StandByFeedContinuationToken.CreateAsync( this.containerRid, @@ -105,11 +105,11 @@ internal StandByFeedIteratorCore( if (token.Token != null) { - this.changeFeedOptions.From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(token.Token); + this.changeFeedStartFrom = ChangeFeedStartFrom.ContinuationToken(token.Token); } else { - this.changeFeedOptions.From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(); + this.changeFeedStartFrom = ChangeFeedStartFrom.Beginning(); } } else @@ -122,12 +122,16 @@ internal StandByFeedIteratorCore( } (CompositeContinuationToken currentRangeToken, string rangeId) = await this.compositeContinuationToken.GetCurrentTokenAsync(); + FeedRange feedRange = new FeedRangePartitionKeyRange(rangeId); if (currentRangeToken.Token != null) { - this.changeFeedOptions.From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(currentRangeToken.Token); + this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(currentRangeToken.Token, (FeedRangeInternal)feedRange); + } + else + { + this.changeFeedStartFrom = ChangeFeedStartFrom.Beginning(feedRange); } - this.changeFeedOptions.FeedRange = new FeedRangePartitionKeyRange(rangeId); ResponseMessage response = await this.NextResultSetDelegateAsync(this.changeFeedOptions, cancellationToken); if (await this.ShouldRetryFailureAsync(response, cancellationToken)) { @@ -179,7 +183,11 @@ internal virtual Task NextResultSetDelegateAsync( operationType: Documents.OperationType.ReadFeed, requestOptions: options, containerInternal: this.container, - requestEnricher: default, + requestEnricher: (request) => + { + ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(request); + this.changeFeedStartFrom.Accept(visitor); + }, responseCreator: response => response, partitionKey: default, streamPayload: default, diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Utils/ResultSetIteratorUtils.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Utils/ResultSetIteratorUtils.cs index 0d80835da2..ce5cb03d11 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Utils/ResultSetIteratorUtils.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Utils/ResultSetIteratorUtils.cs @@ -16,34 +16,36 @@ public static ChangeFeedPartitionKeyResultSetIteratorCore BuildResultSetIterator DateTime? startTime, bool startFromBeginning) { - ChangeFeedRequestOptions.StartFrom startFrom; + FeedRangeInternal feedRange = new FeedRangePartitionKeyRange(partitionKeyRangeId); + + ChangeFeedStartFrom startFrom; if (continuationToken != null) { - startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuationToken); + // For continuation based feed range we need to manufactor a new continuation token that has the partition key range id in it. + startFrom = new ChangeFeedStartFromContinuationAndFeedRange(continuationToken, feedRange); } else if (startTime.HasValue) { - startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromTime(startTime.Value); + startFrom = ChangeFeedStartFrom.Time(startTime.Value, feedRange); } else if (startFromBeginning) { - startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(); + startFrom = ChangeFeedStartFrom.Beginning(feedRange); } else { - startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromNow(); + startFrom = ChangeFeedStartFrom.Now(feedRange); } ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions() { - MaxItemCount = maxItemCount, - FeedRange = new FeedRangePartitionKeyRange(partitionKeyRangeId), - From = startFrom, + PageSizeHint = maxItemCount, }; return new ChangeFeedPartitionKeyResultSetIteratorCore( clientContext: container.ClientContext, container: container, + changeFeedStartFrom: startFrom, options: requestOptions); } } diff --git a/Microsoft.Azure.Cosmos/src/FeedRange/FeedRange.cs b/Microsoft.Azure.Cosmos/src/FeedRange/FeedRange.cs index 61f94044ee..8ef5a5ea86 100644 --- a/Microsoft.Azure.Cosmos/src/FeedRange/FeedRange.cs +++ b/Microsoft.Azure.Cosmos/src/FeedRange/FeedRange.cs @@ -38,5 +38,12 @@ public static FeedRange FromJsonString(string toStringValue) return parsedRange; } + + /// + /// Creates a feed range that span only a single value. + /// + /// The partition key value to create a feed range from. + /// The feed range that spans the partition. + public static FeedRange FromPartitionKey(PartitionKey partitionKey) => new FeedRangePartitionKey(partitionKey); } } diff --git a/Microsoft.Azure.Cosmos/src/RequestOptions/ChangeFeedRequestOptions.cs b/Microsoft.Azure.Cosmos/src/RequestOptions/ChangeFeedRequestOptions.cs index 48568a5821..c3c7db7d9a 100644 --- a/Microsoft.Azure.Cosmos/src/RequestOptions/ChangeFeedRequestOptions.cs +++ b/Microsoft.Azure.Cosmos/src/RequestOptions/ChangeFeedRequestOptions.cs @@ -19,41 +19,29 @@ namespace Microsoft.Azure.Cosmos #endif sealed class ChangeFeedRequestOptions : RequestOptions { - private int? maxItemCount; + private int? pageSizeHint; /// /// Gets or sets the maximum number of items to be returned in the enumeration operation in the Azure Cosmos DB service. /// /// /// The maximum number of items to be returned in the enumeration operation. - /// - public int? MaxItemCount + /// + /// This is just a hint to the server which can return less items per page. + public int? PageSizeHint { - get => this.maxItemCount; + get => this.pageSizeHint; set { if (value.HasValue && (value.Value <= 0)) { - throw new ArgumentOutOfRangeException($"{nameof(this.MaxItemCount)} must be a positive value."); + throw new ArgumentOutOfRangeException($"{nameof(this.PageSizeHint)} must be a positive value."); } - this.maxItemCount = value; + this.pageSizeHint = value; } } - /// - /// Gets or sets where the ChangeFeed operation should start from. If not set then the ChangeFeed operation will start from now. - /// - /// - /// Only applies in the case where no FeedToken is provided or the FeedToken was never used in a previous iterator. - /// - public StartFrom From { get; set; } = StartFromNow.Singleton; - - /// - /// Gets or set which ranges to execute the ChangeFeed operation on. - /// - public FeedRange FeedRange { get; set; } = FeedRangeEpk.FullRange; - /// /// Fill the CosmosRequestMessage headers with the set properties /// @@ -64,25 +52,11 @@ internal override void PopulateRequestOptions(RequestMessage request) base.PopulateRequestOptions(request); - PopulateStartFromRequestOptionVisitor visitor = new PopulateStartFromRequestOptionVisitor(request); - if (this.From == null) - { - throw new InvalidOperationException($"{nameof(ChangeFeedRequestOptions)}.{nameof(ChangeFeedRequestOptions.StartFrom)} needs to be set to a value."); - } - - this.From.Accept(visitor); - - if (this.MaxItemCount.HasValue) + if (this.PageSizeHint.HasValue) { request.Headers.Add( HttpConstants.HttpHeaders.PageSize, - this.MaxItemCount.Value.ToString(CultureInfo.InvariantCulture)); - } - - if (this.FeedRange != null) - { - FeedRangeRequestMessagePopulatorVisitor feedRangeVisitor = new FeedRangeRequestMessagePopulatorVisitor(request); - ((FeedRangeInternal)this.FeedRange).Accept(feedRangeVisitor); + this.PageSizeHint.Value.ToString(CultureInfo.InvariantCulture)); } request.Headers.Add( @@ -112,220 +86,11 @@ internal override void PopulateRequestOptions(RequestMessage request) set => throw new NotSupportedException($"{nameof(ChangeFeedRequestOptions)} does not use the {nameof(this.IfNoneMatchEtag)} property."); } - /// - /// Base class for where to start a ChangeFeed operation in . - /// - /// Use one of the static constructors to generate a StartFrom option. - public abstract class StartFrom - { - /// - /// Initializes an instance of the class. - /// - protected StartFrom() - { - } - - internal abstract void Accept(StartFromVisitor visitor); - - /// - /// Creates a that tells the ChangeFeed operation to start reading changes from this moment onward. - /// - /// A that tells the ChangeFeed operation to start reading changes from this moment onward. - public static StartFrom CreateFromNow() - { - return StartFromNow.Singleton; - } - - /// - /// Creates a that tells the ChangeFeed operation to start reading changes from some point in time onward. - /// - /// The time to start reading from. - /// A that tells the ChangeFeed operation to start reading changes from some point in time onward. - public static StartFrom CreateFromTime(DateTime dateTime) - { - return new StartFromTime(dateTime); - } - - /// - /// Creates a that tells the ChangeFeed operation to start reading changes from a save point. - /// - /// The continuation to resume from. - /// A that tells the ChangeFeed operation to start reading changes from a save point. - public static StartFrom CreateFromContinuation(string continuation) - { - return new StartFromContinuation(continuation); - } - - /// - /// Creates a that tells the ChangeFeed operation to start from the beginning of time. - /// - /// A that tells the ChangeFeed operation to start reading changes from the beginning of time. - public static StartFrom CreateFromBeginning() - { - return StartFromBeginning.Singleton; - } - } - - internal abstract class StartFromVisitor - { - public abstract void Visit(StartFromNow startFromNow); - public abstract void Visit(StartFromTime startFromTime); - public abstract void Visit(StartFromContinuation startFromContinuation); - public abstract void Visit(StartFromBeginning startFromBeginning); - } - - internal sealed class PopulateStartFromRequestOptionVisitor : StartFromVisitor - { - private const string IfNoneMatchAllHeaderValue = "*"; - private static readonly DateTime StartFromBeginningTime = DateTime.MinValue.ToUniversalTime(); - - private readonly RequestMessage requestMessage; - - public PopulateStartFromRequestOptionVisitor(RequestMessage requestMessage) - { - this.requestMessage = requestMessage ?? throw new ArgumentNullException(nameof(requestMessage)); - } - - public override void Visit(StartFromNow startFromNow) - { - this.requestMessage.Headers.IfNoneMatch = PopulateStartFromRequestOptionVisitor.IfNoneMatchAllHeaderValue; - } - - public override void Visit(StartFromTime startFromTime) - { - // Our current public contract for ChangeFeedProcessor uses DateTime.MinValue.ToUniversalTime as beginning. - // We need to add a special case here, otherwise it would send it as normal StartTime. - // The problem is Multi master accounts do not support StartTime header on ReadFeed, and thus, - // it would break multi master Change Feed Processor users using Start From Beginning semantics. - // It's also an optimization, since the backend won't have to binary search for the value. - if (startFromTime.Time != PopulateStartFromRequestOptionVisitor.StartFromBeginningTime) - { - this.requestMessage.Headers.Add( - HttpConstants.HttpHeaders.IfModifiedSince, - startFromTime.Time.ToString("r", CultureInfo.InvariantCulture)); - } - } - - public override void Visit(StartFromContinuation startFromContinuation) - { - // On REST level, change feed is using IfNoneMatch/ETag instead of continuation - this.requestMessage.Headers.IfNoneMatch = startFromContinuation.Continuation; - } - - public override void Visit(StartFromBeginning startFromBeginning) - { - // We don't need to set any headers to start from the beginning - } - } - - /// - /// Derived instance of that tells the ChangeFeed operation to start reading changes from this moment onward. - /// - internal sealed class StartFromNow : StartFrom - { - public static readonly StartFromNow Singleton = new StartFromNow(); - - /// - /// Intializes an instance of the class. - /// - public StartFromNow() - : base() - { - } - - internal override void Accept(StartFromVisitor visitor) - { - visitor.Visit(this); - } - } - - /// - /// Derived instance of that tells the ChangeFeed operation to start reading changes from some point in time onward. - /// - internal sealed class StartFromTime : StartFrom - { - /// - /// Initializes an instance of the class. - /// - /// The time to start reading from. - public StartFromTime(DateTime time) - : base() - { - if (time.Kind != DateTimeKind.Utc) - { - throw new ArgumentOutOfRangeException($"{nameof(time)}.{nameof(DateTime.Kind)} must be {nameof(DateTimeKind)}.{nameof(DateTimeKind.Utc)}"); - } - - this.Time = time; - } - - /// - /// Gets the time the ChangeFeed operation should start reading from. - /// - public DateTime Time { get; } - - internal override void Accept(StartFromVisitor visitor) - { - visitor.Visit(this); - } - } - - /// - /// Derived instance of that tells the ChangeFeed operation to start reading changes from a save point. - /// - internal sealed class StartFromContinuation : StartFrom - { - /// - /// Initializes an instance of the class. - /// - /// The continuation to resume from. - public StartFromContinuation(string continuation) - : base() - { - if (string.IsNullOrWhiteSpace(continuation)) - { - throw new ArgumentOutOfRangeException($"{nameof(continuation)} must not be null, empty, or whitespace."); - } - - this.Continuation = continuation; - } - - /// - /// Gets the continuation to resume from. - /// - public string Continuation { get; } - - internal override void Accept(StartFromVisitor visitor) - { - visitor.Visit(this); - } - } - - /// - /// Derived instance of that tells the ChangeFeed operation to start reading changes from the beginning of time. - /// - internal sealed class StartFromBeginning : StartFrom - { - public static readonly StartFromBeginning Singleton = new StartFromBeginning(); - - public StartFromBeginning() - : base() - { - } - - internal override void Accept(StartFromVisitor visitor) - { - visitor.Visit(this); - } - } - internal ChangeFeedRequestOptions Clone() { return new ChangeFeedRequestOptions() { - MaxItemCount = this.maxItemCount, - From = this.From, - FeedRange = this.FeedRange, + PageSizeHint = this.pageSizeHint, }; } } diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs index cae8a40050..af35754246 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs @@ -1176,6 +1176,7 @@ public abstract ChangeFeedProcessorBuilder GetChangeFeedEstimatorBuilder( /// /// This method creates an iterator to consume a Change Feed. /// + /// Where to start the changefeed from. /// (Optional) The options for the Change Feed consumption. /// /// https://aka.ms/cosmosdb-dot-net-exceptions#stream-api @@ -1187,10 +1188,12 @@ public abstract ChangeFeedProcessorBuilder GetChangeFeedEstimatorBuilder( /// /// ChangeFeedRequestOptions options = new ChangeFeedRequestOptions() /// { - /// FeedRange = feedRanges[0] + /// PageSizeHint = 10, /// } /// - /// FeedIterator feedIterator = this.Container.GetChangeFeedStreamIterator(options); + /// FeedIterator feedIterator = this.Container.GetChangeFeedStreamIterator( + /// ChangeFeedStartFrom.Beginning(feedRanges[0]), + /// options); /// /// while (feedIterator.HasMoreResults) /// { @@ -1211,42 +1214,13 @@ public abstract ChangeFeedProcessorBuilder GetChangeFeedEstimatorBuilder( /// /// An iterator to go through the Change Feed. public abstract FeedIterator GetChangeFeedStreamIterator( - ChangeFeedRequestOptions changeFeedRequestOptions = null); - - /// - /// This method creates an iterator to consume the Change Feed for a Partition Key value. - /// - /// A to read the Change Feed for. - /// (Optional) The options for the Change Feed consumption. - /// https://aka.ms/cosmosdb-dot-net-exceptions#stream-api - /// - /// - /// - /// - /// - /// An iterator to go through the Change Feed for a particular Partition Key. - public abstract FeedIterator GetChangeFeedStreamIterator( - PartitionKey partitionKey, + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions changeFeedRequestOptions = null); /// /// This method creates an iterator to consume a Change Feed. /// + /// Where to start the changefeed from. /// (Optional) The options for the Change Feed consumption. /// /// https://aka.ms/cosmosdb-dot-net-exceptions#typed-api @@ -1258,10 +1232,12 @@ public abstract FeedIterator GetChangeFeedStreamIterator( /// /// ChangeFeedRequestOptions options = new ChangeFeedRequestOptions() /// { - /// FeedRange = feedRanges[0] + /// PageSizeHint = 10, /// } /// - /// FeedIterator feedIterator = this.Container.GetChangeFeedIterator(options); + /// FeedIterator feedIterator = this.Container.GetChangeFeedIterator( + /// ChangeFeedStartFrom.Beginning(feedRanges[0]), + /// options); /// while (feedIterator.HasMoreResults) /// { /// while (feedIterator.HasMoreResults) @@ -1278,34 +1254,7 @@ public abstract FeedIterator GetChangeFeedStreamIterator( /// /// An iterator to go through the Change Feed. public abstract FeedIterator GetChangeFeedIterator( - ChangeFeedRequestOptions changeFeedRequestOptions = null); - - /// - /// This method creates an iterator to consume the Change Feed for a Partition Key value. - /// - /// A to read the Change Feed for. - /// (Optional) The options for the Change Feed consumption. - /// - /// https://aka.ms/cosmosdb-dot-net-exceptions#typed-api - /// - /// feedIterator = this.Container.GetChangeFeedIterator(new PartitionKey("myPartitionKey"))) - /// { - /// while (feedIterator.HasMoreResults) - /// { - /// FeedResponse response = await feedIterator.ReadNextAsync(); - /// foreach (var item in response) - /// { - /// Console.WriteLine(item); - /// } - /// } - /// } - /// ]]> - /// - /// - /// An iterator to go through the Change Feed for a Partition Key. - public abstract FeedIterator GetChangeFeedIterator( - PartitionKey partitionKey, + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions changeFeedRequestOptions = null); /// diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs index 681d806e0c..43ab14a526 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs @@ -544,6 +544,7 @@ public override TransactionalBatch CreateTransactionalBatch(PartitionKey partiti } internal override FeedIterator GetStandByFeedIterator( + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions requestOptions = null) { ChangeFeedRequestOptions cosmosQueryRequestOptions = requestOptions as ChangeFeedRequestOptions ?? new ChangeFeedRequestOptions(); @@ -551,6 +552,7 @@ internal override FeedIterator GetStandByFeedIterator( return new StandByFeedIteratorCore( clientContext: this.ClientContext, container: this, + changeFeedStartFrom: changeFeedStartFrom, options: cosmosQueryRequestOptions); } diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs index a9be711c52..0b464750a7 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos using System.IO; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.ChangeFeed; using Microsoft.Azure.Cosmos.Query.Core.QueryClient; using Microsoft.Azure.Cosmos.Resource.CosmosExceptions; using Microsoft.Azure.Cosmos.Routing; @@ -251,42 +252,32 @@ public async Task> GetFeedRangesAsync( } public override FeedIterator GetChangeFeedStreamIterator( + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions changeFeedRequestOptions = null) { - return new ChangeFeedIteratorCore( - container: this, - changeFeedRequestOptions: changeFeedRequestOptions); - } + if (changeFeedStartFrom == null) + { + throw new ArgumentNullException(nameof(changeFeedStartFrom)); + } - public override FeedIterator GetChangeFeedStreamIterator( - PartitionKey partitionKey, - ChangeFeedRequestOptions changeFeedRequestOptions = null) - { - changeFeedRequestOptions ??= new ChangeFeedRequestOptions(); - changeFeedRequestOptions.FeedRange = new FeedRangePartitionKey(partitionKey); return new ChangeFeedIteratorCore( container: this, + changeFeedStartFrom: changeFeedStartFrom, changeFeedRequestOptions: changeFeedRequestOptions); } public override FeedIterator GetChangeFeedIterator( + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions changeFeedRequestOptions = null) { - ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore( - container: this, - changeFeedRequestOptions: changeFeedRequestOptions); - - return new FeedIteratorCore(changeFeedIteratorCore, responseCreator: this.ClientContext.ResponseFactory.CreateChangeFeedUserTypeResponse); - } + if (changeFeedStartFrom == null) + { + throw new ArgumentNullException(nameof(changeFeedStartFrom)); + } - public override FeedIterator GetChangeFeedIterator( - PartitionKey partitionKey, - ChangeFeedRequestOptions changeFeedRequestOptions = null) - { - changeFeedRequestOptions ??= new ChangeFeedRequestOptions(); - changeFeedRequestOptions.FeedRange = new FeedRangePartitionKey(partitionKey); ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore( container: this, + changeFeedStartFrom: changeFeedStartFrom, changeFeedRequestOptions: changeFeedRequestOptions); return new FeedIteratorCore(changeFeedIteratorCore, responseCreator: this.ClientContext.ResponseFactory.CreateChangeFeedUserTypeResponse); diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs index e3899c9bc2..b958ea0d87 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs @@ -373,29 +373,17 @@ public override TransactionalBatch CreateTransactionalBatch(PartitionKey partiti } public override FeedIterator GetChangeFeedStreamIterator( + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions changeFeedRequestOptions = null) { - return base.GetChangeFeedStreamIterator(changeFeedRequestOptions); - } - - public override FeedIterator GetChangeFeedStreamIterator( - PartitionKey partitionKey, - ChangeFeedRequestOptions changeFeedRequestOptions = null) - { - return base.GetChangeFeedStreamIterator(partitionKey, changeFeedRequestOptions); + return base.GetChangeFeedStreamIterator(changeFeedStartFrom, changeFeedRequestOptions); } public override FeedIterator GetChangeFeedIterator( + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions changeFeedRequestOptions = null) { - return base.GetChangeFeedIterator(changeFeedRequestOptions); - } - - public override FeedIterator GetChangeFeedIterator( - PartitionKey partitionKey, - ChangeFeedRequestOptions changeFeedRequestOptions = null) - { - return base.GetChangeFeedIterator(partitionKey, changeFeedRequestOptions); + return base.GetChangeFeedIterator(changeFeedStartFrom, changeFeedRequestOptions); } public override Task> GetPartitionKeyRangesAsync( diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInternal.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInternal.cs index 3674462bd5..b21102f140 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInternal.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInternal.cs @@ -7,7 +7,6 @@ namespace Microsoft.Azure.Cosmos using System; using System.Collections.Generic; using System.IO; - using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Query; @@ -57,6 +56,7 @@ public abstract Task TryExecuteQueryAsync( CancellationToken cancellationToken = default); internal abstract FeedIterator GetStandByFeedIterator( + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions requestOptions = default); public abstract FeedIteratorInternal GetItemQueryStreamIteratorInternal( @@ -112,17 +112,11 @@ public abstract Task> PatchItemAsync( public abstract Task> GetFeedRangesAsync(CancellationToken cancellationToken = default(CancellationToken)); public abstract FeedIterator GetChangeFeedStreamIterator( - ChangeFeedRequestOptions changeFeedRequestOptions = null); - - public abstract FeedIterator GetChangeFeedStreamIterator( - PartitionKey partitionKey, + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions changeFeedRequestOptions = null); public abstract FeedIterator GetChangeFeedIterator( - ChangeFeedRequestOptions changeFeedRequestOptions = null); - - public abstract FeedIterator GetChangeFeedIterator( - PartitionKey partitionKey, + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions changeFeedRequestOptions = null); public abstract Task> GetPartitionKeyRangesAsync( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs index bee0daf33c..a505b434e4 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs @@ -16,6 +16,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.Contracts using Microsoft.Azure.Documents; using System.IO; using Newtonsoft.Json.Linq; + using Microsoft.Azure.Cosmos.ChangeFeed; [EmulatorTests.TestClass] public class ContractTests : BaseCosmosClientHelper @@ -156,11 +157,11 @@ public async Task ChangeFeed_FeedRange_FromV2SDK() IEnumerable pkRangeIds = await container.GetPartitionKeyRangesAsync(feedRange); ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions() { - FeedRange = feedRange, - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - MaxItemCount = 1 + PageSizeHint = 1 }; - ChangeFeedIteratorCore feedIterator = container.GetChangeFeedStreamIterator(changeFeedRequestOptions: requestOptions) as ChangeFeedIteratorCore; + ChangeFeedIteratorCore feedIterator = container.GetChangeFeedStreamIterator( + changeFeedStartFrom: ChangeFeedStartFrom.Beginning(feedRange), + changeFeedRequestOptions: requestOptions) as ChangeFeedIteratorCore; ResponseMessage firstResponse = await feedIterator.ReadNextAsync(); if (firstResponse.IsSuccessStatusCode) { @@ -196,10 +197,11 @@ public async Task ChangeFeed_FeedRange_FromV2SDK() { ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions() { - From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuation), - MaxItemCount = 100 + PageSizeHint = 100 }; - ChangeFeedIteratorCore feedIterator = container.GetChangeFeedStreamIterator(changeFeedRequestOptions: requestOptions) as ChangeFeedIteratorCore; + ChangeFeedIteratorCore feedIterator = container.GetChangeFeedStreamIterator( + changeFeedStartFrom: ChangeFeedStartFrom.ContinuationToken(continuation), + changeFeedRequestOptions: requestOptions) as ChangeFeedIteratorCore; ResponseMessage firstResponse = await feedIterator.ReadNextAsync(); if (firstResponse.IsSuccessStatusCode) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosDiagnosticsTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosDiagnosticsTests.cs index ca76cc8563..4c3c88ef67 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosDiagnosticsTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosDiagnosticsTests.cs @@ -472,7 +472,9 @@ public async Task ChangeFeedDiagnostics(bool disableDiagnostics) await Task.WhenAll(createItemsTasks); ChangeFeedRequestOptions requestOptions = disableDiagnostics ? ChangeFeedRequestOptionDisableDiagnostic : null; - FeedIterator changeFeedIterator = ((ContainerCore)(container as ContainerInlineCore)).GetChangeFeedStreamIterator(changeFeedRequestOptions: requestOptions); + FeedIterator changeFeedIterator = ((ContainerCore)(container as ContainerInlineCore)).GetChangeFeedStreamIterator( + ChangeFeedStartFrom.Beginning(), + changeFeedRequestOptions: requestOptions); while (changeFeedIterator.HasMoreResults) { using (ResponseMessage response = await changeFeedIterator.ReadNextAsync()) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemChangeFeedTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemChangeFeedTests.cs index 9d2d637788..dbffcac0b3 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemChangeFeedTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemChangeFeedTests.cs @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using System.Linq; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.ChangeFeed; using Microsoft.Azure.Cosmos.Query; using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -66,10 +67,7 @@ public async Task StandByFeedIterator() await this.CreateRandomItems(this.Container, batchSize, randomPartitionKey: true); ContainerCore itemsCore = (ContainerCore)this.Container; FeedIterator feedIterator = itemsCore.GetStandByFeedIterator( - requestOptions: new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - }); + ChangeFeedStartFrom.Beginning()); while (feedIterator.HasMoreResults) { @@ -98,10 +96,7 @@ await feedIterator.ReadNextAsync(this.cancellationToken)) await this.CreateRandomItems(this.Container, batchSize, randomPartitionKey: true); FeedIterator setIteratorNew = itemsCore.GetStandByFeedIterator( - new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(lastcontinuation), - }); + ChangeFeedStartFrom.ContinuationToken(lastcontinuation)); while (setIteratorNew.HasMoreResults) { @@ -136,7 +131,8 @@ public async Task StandByFeedIterator_EmptyBeginning() bool createdDocuments = false; ContainerInternal itemsCore = this.Container; - FeedIterator feedIterator = itemsCore.GetStandByFeedIterator(); + FeedIterator feedIterator = itemsCore.GetStandByFeedIterator( + ChangeFeedStartFrom.Beginning()); while (feedIterator.HasMoreResults) { @@ -190,10 +186,7 @@ public async Task StandByFeedIterator_WithInexistentRange() ContainerInternal itemsCore = this.Container; FeedIterator setIteratorNew = itemsCore.GetStandByFeedIterator( - requestOptions: new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(corruptedTokenSerialized), - }); + ChangeFeedStartFrom.ContinuationToken(corruptedTokenSerialized)); _ = await setIteratorNew.ReadNextAsync(this.cancellationToken); Assert.Fail("Should have thrown."); @@ -208,10 +201,10 @@ public async Task StandByFeedIterator_WithMaxItemCount() await this.CreateRandomItems(this.Container, 2, randomPartitionKey: true); ContainerCore itemsCore = (ContainerCore)this.Container; FeedIterator feedIterator = itemsCore.GetStandByFeedIterator( + ChangeFeedStartFrom.Beginning(), requestOptions: new ChangeFeedRequestOptions() { - MaxItemCount = 1, - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), + PageSizeHint = 1 }); while (feedIterator.HasMoreResults) @@ -248,13 +241,8 @@ public async Task StandByFeedIterator_NoFetchNext() int count = 0; while (true) { - ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - }; - FeedIterator feedIterator = itemsCore.GetStandByFeedIterator( - requestOptions: requestOptions); + ChangeFeedStartFrom.Beginning()); using (ResponseMessage responseMessage = await feedIterator.ReadNextAsync(this.cancellationToken)) { @@ -297,10 +285,7 @@ public async Task StandByFeedIterator_BreathFirst() await this.CreateRandomItems(this.LargerContainer, expected, randomPartitionKey: true); ContainerCore itemsCore = (ContainerCore)this.LargerContainer; FeedIterator feedIterator = itemsCore.GetStandByFeedIterator( - requestOptions: new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning() - }); + ChangeFeedStartFrom.Beginning()); while (feedIterator.HasMoreResults) { using (ResponseMessage responseMessage = await feedIterator.ReadNextAsync(this.cancellationToken)) @@ -328,10 +313,10 @@ public async Task StandByFeedIterator_VerifyRefreshIsCalledOnSplit() { CosmosChangeFeedResultSetIteratorCoreMock iterator = new CosmosChangeFeedResultSetIteratorCoreMock( (ContainerCore)this.Container, + ChangeFeedStartFrom.Beginning(), new ChangeFeedRequestOptions() { - MaxItemCount = 100, - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), + PageSizeHint = 100, }); using (ResponseMessage responseMessage = await iterator.ReadNextAsync(this.cancellationToken)) @@ -366,10 +351,7 @@ public async Task GetChangeFeedTokensAsync_AllowsParallelProcessing() int count = 0; FeedIterator iteratorForToken = itemsCore.GetStandByFeedIterator( - requestOptions: new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(token), - }); + ChangeFeedStartFrom.ContinuationToken(token)); while (true) { using (ResponseMessage responseMessage = @@ -415,10 +397,7 @@ public async Task GetChangeFeedTokensAsync_DrainFromJustOnePartition() int count = 0; FeedIterator iteratorForToken = itemsCore.GetStandByFeedIterator( - requestOptions: new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(token), - }); + ChangeFeedStartFrom.ContinuationToken(token)); while (iteratorForToken.HasMoreResults) { using (ResponseMessage responseMessage = await iteratorForToken.ReadNextAsync(this.cancellationToken)) @@ -486,9 +465,11 @@ private class CosmosChangeFeedResultSetIteratorCoreMock : StandByFeedIteratorCor internal CosmosChangeFeedResultSetIteratorCoreMock( ContainerCore container, + ChangeFeedStartFrom changeFeedStartFrom, ChangeFeedRequestOptions options) : base( clientContext: container.ClientContext, container: container, + changeFeedStartFrom: changeFeedStartFrom, options: options) { List compositeContinuationTokens = new List() diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/FeedToken/ChangeFeedIteratorCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/FeedToken/ChangeFeedIteratorCoreTests.cs index 3ea3f8ace6..4432ca8903 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/FeedToken/ChangeFeedIteratorCoreTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/FeedToken/ChangeFeedIteratorCoreTests.cs @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Cosmos.EmulatorTests.FeedRanges using System.Net; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.ChangeFeed; using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; using Microsoft.Azure.Cosmos.SDK.EmulatorTests; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -62,11 +63,7 @@ public async Task ChangeFeedIteratorCore_ReadAll() await this.CreateRandomItems(this.LargerContainer, batchSize, randomPartitionKey: true); ContainerInternal itemsCore = this.LargerContainer; ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator( - changeFeedRequestOptions: - new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - }) as ChangeFeedIteratorCore; + ChangeFeedStartFrom.Beginning()) as ChangeFeedIteratorCore; string continuation = null; while (feedIterator.HasMoreResults) { @@ -90,10 +87,7 @@ await feedIterator.ReadNextAsync(this.cancellationToken)) // Insert another batch of 25 and use the last FeedToken from the first cycle await this.CreateRandomItems(this.LargerContainer, batchSize, randomPartitionKey: true); ChangeFeedIteratorCore setIteratorNew = itemsCore.GetChangeFeedStreamIterator( - changeFeedRequestOptions: new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuation), - }) as ChangeFeedIteratorCore; + ChangeFeedStartFrom.ContinuationToken(continuation)) as ChangeFeedIteratorCore; while (setIteratorNew.HasMoreResults) { @@ -126,10 +120,7 @@ public async Task ChangeFeedIteratorCore_StartTime() await this.CreateRandomItems(this.Container, batchSize, randomPartitionKey: true); ContainerInternal itemsCore = this.Container; FeedIterator feedIterator = itemsCore.GetChangeFeedStreamIterator( - changeFeedRequestOptions: new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromTime(now), - }); + ChangeFeedStartFrom.Time(now)); while (feedIterator.HasMoreResults) { using (ResponseMessage responseMessage = @@ -172,11 +163,9 @@ public async Task ChangeFeedIteratorCore_PartitionKey_ReadAll() ContainerInternal itemsCore = this.Container; ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator( - changeFeedRequestOptions: new ChangeFeedRequestOptions() - { - FeedRange = new FeedRangePartitionKey(new PartitionKey(pkToRead)), - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - }) as ChangeFeedIteratorCore; + ChangeFeedStartFrom.Beginning( + FeedRange.FromPartitionKey( + new PartitionKey(pkToRead)))) as ChangeFeedIteratorCore; string continuation = null; while (feedIterator.HasMoreResults) { @@ -208,10 +197,7 @@ await feedIterator.ReadNextAsync(this.cancellationToken)) } ChangeFeedIteratorCore setIteratorNew = itemsCore.GetChangeFeedStreamIterator( - changeFeedRequestOptions: new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuation), - }) as ChangeFeedIteratorCore; + ChangeFeedStartFrom.ContinuationToken(continuation)) as ChangeFeedIteratorCore; while (setIteratorNew.HasMoreResults) { @@ -259,11 +245,7 @@ public async Task ChangeFeedIteratorCore_PartitionKey_OfT_ReadAll() ContainerInternal itemsCore = this.Container; FeedIterator feedIterator = itemsCore.GetChangeFeedIterator( - changeFeedRequestOptions: new ChangeFeedRequestOptions() - { - FeedRange = new FeedRangePartitionKey(new PartitionKey(pkToRead)), - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - }); + ChangeFeedStartFrom.Beginning(new FeedRangePartitionKey(new PartitionKey(pkToRead)))); string continuation = null; while (feedIterator.HasMoreResults) { @@ -288,10 +270,7 @@ public async Task ChangeFeedIteratorCore_PartitionKey_OfT_ReadAll() } FeedIterator setIteratorNew = itemsCore.GetChangeFeedIterator( - changeFeedRequestOptions: new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuation), - }); + ChangeFeedStartFrom.ContinuationToken(continuation)); while (setIteratorNew.HasMoreResults) { @@ -319,10 +298,7 @@ public async Task ChangeFeedIteratorCore_OfT_ReadAll() await this.CreateRandomItems(this.Container, batchSize, randomPartitionKey: true); ContainerInternal itemsCore = this.Container; - FeedIterator feedIterator = itemsCore.GetChangeFeedIterator(changeFeedRequestOptions: new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - }); + FeedIterator feedIterator = itemsCore.GetChangeFeedIterator(ChangeFeedStartFrom.Beginning()); string continuation = null; while (feedIterator.HasMoreResults) { @@ -337,10 +313,7 @@ public async Task ChangeFeedIteratorCore_OfT_ReadAll() // Insert another batch of 25 and use the last FeedToken from the first cycle await this.CreateRandomItems(this.Container, batchSize, randomPartitionKey: true); - FeedIterator setIteratorNew = itemsCore.GetChangeFeedIterator(changeFeedRequestOptions: new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuation), - }); + FeedIterator setIteratorNew = itemsCore.GetChangeFeedIterator(ChangeFeedStartFrom.ContinuationToken(continuation)); while (setIteratorNew.HasMoreResults) { @@ -363,7 +336,8 @@ public async Task ChangeFeedIteratorCore_EmptyBeginning() bool createdDocuments = false; ContainerInternal itemsCore = this.Container; - ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator() as ChangeFeedIteratorCore; + ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator( + ChangeFeedStartFrom.Beginning()) as ChangeFeedIteratorCore; while (feedIterator.HasMoreResults || (createdDocuments && totalCount == 0)) @@ -399,11 +373,12 @@ public async Task ChangeFeedIteratorCore_WithMaxItemCount() { await this.CreateRandomItems(this.Container, 2, randomPartitionKey: true); ContainerInternal itemsCore = this.Container; - ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator(changeFeedRequestOptions: new ChangeFeedRequestOptions() - { - MaxItemCount = 1, - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - }) as ChangeFeedIteratorCore; + ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator( + ChangeFeedStartFrom.Beginning(), + changeFeedRequestOptions: new ChangeFeedRequestOptions() + { + PageSizeHint = 1, + }) as ChangeFeedIteratorCore; while (feedIterator.HasMoreResults) { @@ -443,23 +418,17 @@ public async Task ChangeFeedIteratorCore_NoFetchNext() int count = 0; while (true) { - ChangeFeedRequestOptions requestOptions; + ChangeFeedStartFrom startFrom; if (continuation == null) { - requestOptions = new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - }; + startFrom = ChangeFeedStartFrom.Beginning(); } else { - requestOptions = new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuation), - }; + startFrom = ChangeFeedStartFrom.ContinuationToken(continuation); } - ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator(changeFeedRequestOptions: requestOptions) as ChangeFeedIteratorCore; + ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator(startFrom) as ChangeFeedIteratorCore; using (ResponseMessage responseMessage = await feedIterator.ReadNextAsync(this.cancellationToken)) { if (responseMessage.IsSuccessStatusCode) @@ -503,11 +472,12 @@ public async Task ChangeFeedIteratorCore_BreathFirst() List previousToken = null; await this.CreateRandomItems(this.LargerContainer, expected, randomPartitionKey: true); ContainerInternal itemsCore = this.LargerContainer; - ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator(changeFeedRequestOptions: new ChangeFeedRequestOptions() - { - MaxItemCount = 1, - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - }) as ChangeFeedIteratorCore; + ChangeFeedIteratorCore feedIterator = itemsCore.GetChangeFeedStreamIterator( + ChangeFeedStartFrom.Beginning(), + new ChangeFeedRequestOptions() + { + PageSizeHint = 1, + }) as ChangeFeedIteratorCore; while (true) { using (ResponseMessage responseMessage = @@ -552,11 +522,8 @@ public async Task GetFeedRangesAsync_AllowsParallelProcessing() { int count = 0; ChangeFeedIteratorCore iteratorForToken = - itemsCore.GetChangeFeedStreamIterator(new ChangeFeedRequestOptions() - { - FeedRange = token, - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - }) as ChangeFeedIteratorCore; + itemsCore.GetChangeFeedStreamIterator( + ChangeFeedStartFrom.Beginning(token)) as ChangeFeedIteratorCore; while (true) { using (ResponseMessage responseMessage = @@ -592,16 +559,10 @@ public async Task CannotMixTokensFromOtherContainers() { IReadOnlyList tokens = await this.LargerContainer.GetFeedRangesAsync(); FeedIterator iterator = this.LargerContainer.GetChangeFeedStreamIterator( - new ChangeFeedRequestOptions() - { - FeedRange = tokens[0], - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - }); + ChangeFeedStartFrom.Beginning(tokens[0])); ResponseMessage responseMessage = await iterator.ReadNextAsync(); - iterator = this.Container.GetChangeFeedStreamIterator(new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(responseMessage.ContinuationToken), - }); + iterator = this.Container.GetChangeFeedStreamIterator( + ChangeFeedStartFrom.ContinuationToken(responseMessage.ContinuationToken)); responseMessage = await iterator.ReadNextAsync(); Assert.IsNotNull(responseMessage.CosmosException); Assert.AreEqual(HttpStatusCode.BadRequest, responseMessage.StatusCode); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/EndToEnd.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/EndToEnd.cs index 7d349a37a9..ca228c2a7d 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/EndToEnd.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/EndToEnd.cs @@ -7,6 +7,7 @@ using System.Reflection; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; + using Microsoft.Azure.Cosmos.ChangeFeed; using Microsoft.Azure.Cosmos.CosmosElements; using Microsoft.Azure.Cosmos.Fluent; using Microsoft.Azure.Cosmos.Json; @@ -120,10 +121,7 @@ public async Task ChangeFeedBaselineAsync() { ChangeFeedIteratorCore feedIterator = ((ContainerCore)this.container) .GetChangeFeedStreamIterator( - changeFeedRequestOptions: new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - }) as ChangeFeedIteratorCore; + ChangeFeedStartFrom.Beginning()) as ChangeFeedIteratorCore; while (feedIterator.HasMoreResults) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeedResultSetIteratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeedResultSetIteratorTests.cs index 825f449408..9d1e0a4148 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeedResultSetIteratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeedResultSetIteratorTests.cs @@ -18,6 +18,7 @@ namespace Microsoft.Azure.Cosmos.Tests using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; using Newtonsoft.Json; + using Microsoft.Azure.Cosmos.ChangeFeed; [TestClass] public class ChangeFeedResultSetIteratorTests @@ -66,10 +67,10 @@ public async Task ContinuationTokenIsNotUpdatedOnFails() StandByFeedIteratorCore iterator = new StandByFeedIteratorCore( mockContext.Object, new ContainerInlineCore(mockContext.Object, databaseCore, "myColl"), + ChangeFeedStartFrom.Beginning(), new ChangeFeedRequestOptions() { - MaxItemCount = 10, - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), + PageSizeHint = 10, }); ResponseMessage firstRequest = await iterator.ReadNextAsync(); Assert.IsTrue(firstRequest.Headers.ContinuationToken.Contains(firstResponse.Headers.ETag), "Response should contain the first continuation"); @@ -128,10 +129,10 @@ public async Task ShouldContinueUntilResponseOk() StandByFeedIteratorCore iterator = new StandByFeedIteratorCore( mockContext.Object, new ContainerInlineCore(mockContext.Object, databaseCore, "myColl"), + ChangeFeedStartFrom.Beginning(), new ChangeFeedRequestOptions() { - MaxItemCount = 10, - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), + PageSizeHint = 10, }); ResponseMessage firstRequest = await iterator.ReadNextAsync(); Assert.IsTrue(firstRequest.Headers.ContinuationToken.Contains(firstResponse.Headers.ETag), "Response should contain the first continuation"); @@ -193,10 +194,10 @@ public async Task ShouldReturnNotModifiedAfterCyclingOnAllRanges() StandByFeedIteratorCore iterator = new StandByFeedIteratorCore( mockContext.Object, new ContainerInlineCore(mockContext.Object, databaseCore, "myColl"), + ChangeFeedStartFrom.Beginning(), new ChangeFeedRequestOptions() { - MaxItemCount = 10, - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), + PageSizeHint = 10, }); ResponseMessage firstRequest = await iterator.ReadNextAsync(); Assert.IsTrue(firstRequest.Headers.ContinuationToken.Contains(firstResponse.Headers.ETag), "Response should contain the first continuation"); @@ -253,10 +254,10 @@ public async Task ShouldReturnNotModifiedOnSingleRange() StandByFeedIteratorCore iterator = new StandByFeedIteratorCore( mockContext.Object, new ContainerInlineCore(mockContext.Object, databaseCore, "myColl"), + ChangeFeedStartFrom.Beginning(), new ChangeFeedRequestOptions() { - MaxItemCount = 10, - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), + PageSizeHint = 10, }); ResponseMessage firstRequest = await iterator.ReadNextAsync(); Assert.IsTrue(firstRequest.Headers.ContinuationToken.Contains(firstResponse.Headers.ETag), "Response should contain the first continuation"); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/CosmosVirtualUnitTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/CosmosVirtualUnitTest.cs index ae8dcc6666..a3744e63f6 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/CosmosVirtualUnitTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/CosmosVirtualUnitTest.cs @@ -51,33 +51,43 @@ public void VerifyAllPublicMembersAreVirtualUnitTesting() } [TestMethod] - public void VerifyAllPublicClassesCanBeMockedUnitTesting() + public void VerifyAllPublicClassesCanBeMocked() { + // The following classes are public, but not meant to be mocked. + HashSet nonMockableClasses = new HashSet() + { + "ChangeFeedStartFrom" + }; + // All of the public classes should not contain an internal abstract method // create unit tests by mocking the different types. Data Contracts do not support mocking so exclude all types that end with Settings. IEnumerable allClasses = from t in Assembly.GetAssembly(typeof(CosmosClient)).GetTypes() - where t.IsClass && t.Namespace == "Microsoft.Azure.Cosmos" && t.IsPublic + where + t.IsClass && + t.Namespace == "Microsoft.Azure.Cosmos" && + t.IsPublic && + !nonMockableClasses.Contains(t.Name) select t; // Get the entire list to prevent running the test for each method/property List> nonVirtualPublic = new List>(); - foreach (Type publiClass in allClasses) + foreach (Type publicClass in allClasses) { // DeclaredOnly flag gets only the properties declared in the current class. // This ignores inherited properties to prevent duplicate findings. - IEnumerable> allProps = publiClass.GetProperties(BindingFlags.DeclaredOnly) + IEnumerable> allProps = publicClass.GetProperties(BindingFlags.DeclaredOnly) .Where(x => x.GetGetMethod().IsAbstract && !x.GetGetMethod().IsPublic) - .Select(x => new Tuple(publiClass.FullName, x.Name)); + .Select(x => new Tuple(publicClass.FullName, x.Name)); nonVirtualPublic.AddRange(allProps); - IEnumerable> allMethods = publiClass.GetMethods(BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.DeclaredOnly) + IEnumerable> allMethods = publicClass.GetMethods(BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.DeclaredOnly) .Where(x => x.IsAbstract && !x.IsPublic) - .Select(x => new Tuple(publiClass.FullName, x.Name)); + .Select(x => new Tuple(publicClass.FullName, x.Name)); nonVirtualPublic.AddRange(allMethods); } Assert.AreEqual(0, nonVirtualPublic.Count, - "The following methods and properties should be virtual to allow unit testing:" + + "The following methods and properties should be virtual to allow mocking:" + string.Join(";", nonVirtualPublic.Select(x => $"Class:{x.Item1}; Member:{x.Item2}"))); } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetPreviewSDKAPI.json b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetPreviewSDKAPI.json index 06bd33c8b5..e8131410f3 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetPreviewSDKAPI.json +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetPreviewSDKAPI.json @@ -3,41 +3,12 @@ "ChangeFeedRequestOptions": { "Subclasses": {}, "Members": { - "Microsoft.Azure.Cosmos.ChangeFeedRequestOptions+StartFrom": { - "Type": "NestedType", - "Attributes": [], - "MethodInfo": null - }, - "Microsoft.Azure.Cosmos.FeedRange FeedRange": { - "Type": "Property", - "Attributes": [], - "MethodInfo": null - }, - "Microsoft.Azure.Cosmos.FeedRange get_FeedRange()[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { - "Type": "Method", - "Attributes": [ - "CompilerGeneratedAttribute" - ], - "MethodInfo": "Microsoft.Azure.Cosmos.FeedRange get_FeedRange()" - }, - "StartFrom From": { - "Type": "Property", - "Attributes": [], - "MethodInfo": null - }, - "StartFrom get_From()[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { - "Type": "Method", - "Attributes": [ - "CompilerGeneratedAttribute" - ], - "MethodInfo": "StartFrom get_From()" - }, - "System.Nullable`1[System.Int32] get_MaxItemCount()": { + "System.Nullable`1[System.Int32] get_PageSizeHint()": { "Type": "Method", "Attributes": [], - "MethodInfo": "System.Nullable`1[System.Int32] get_MaxItemCount()" + "MethodInfo": "System.Nullable`1[System.Int32] get_PageSizeHint()" }, - "System.Nullable`1[System.Int32] MaxItemCount": { + "System.Nullable`1[System.Int32] PageSizeHint": { "Type": "Property", "Attributes": [], "MethodInfo": null @@ -73,20 +44,6 @@ "Attributes": [], "MethodInfo": "Void .ctor()" }, - "Void set_FeedRange(Microsoft.Azure.Cosmos.FeedRange)[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { - "Type": "Method", - "Attributes": [ - "CompilerGeneratedAttribute" - ], - "MethodInfo": "Void set_FeedRange(Microsoft.Azure.Cosmos.FeedRange)" - }, - "Void set_From(StartFrom)[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { - "Type": "Method", - "Attributes": [ - "CompilerGeneratedAttribute" - ], - "MethodInfo": "Void set_From(StartFrom)" - }, "Void set_IfMatchEtag(System.String)": { "Type": "Method", "Attributes": [], @@ -97,63 +54,51 @@ "Attributes": [], "MethodInfo": "Void set_IfNoneMatchEtag(System.String)" }, - "Void set_MaxItemCount(System.Nullable`1[System.Int32])": { + "Void set_PageSizeHint(System.Nullable`1[System.Int32])": { "Type": "Method", "Attributes": [], - "MethodInfo": "Void set_MaxItemCount(System.Nullable`1[System.Int32])" + "MethodInfo": "Void set_PageSizeHint(System.Nullable`1[System.Int32])" } }, - "NestedTypes": { - "StartFrom": { - "Subclasses": {}, - "Members": { - "StartFrom CreateFromBeginning()": { - "Type": "Method", - "Attributes": [], - "MethodInfo": "StartFrom CreateFromBeginning()" - }, - "StartFrom CreateFromContinuation(System.String)": { - "Type": "Method", - "Attributes": [], - "MethodInfo": "StartFrom CreateFromContinuation(System.String)" - }, - "StartFrom CreateFromNow()": { - "Type": "Method", - "Attributes": [], - "MethodInfo": "StartFrom CreateFromNow()" - }, - "StartFrom CreateFromTime(System.DateTime)": { - "Type": "Method", - "Attributes": [], - "MethodInfo": "StartFrom CreateFromTime(System.DateTime)" - } - }, - "NestedTypes": {} - } - } + "NestedTypes": {} }, - "StartFrom": { + "ChangeFeedStartFrom": { "Subclasses": {}, "Members": { - "StartFrom CreateFromBeginning()": { + "Microsoft.Azure.Cosmos.ChangeFeedStartFrom Beginning()": { + "Type": "Method", + "Attributes": [], + "MethodInfo": "Microsoft.Azure.Cosmos.ChangeFeedStartFrom Beginning()" + }, + "Microsoft.Azure.Cosmos.ChangeFeedStartFrom Beginning(Microsoft.Azure.Cosmos.FeedRange)": { + "Type": "Method", + "Attributes": [], + "MethodInfo": "Microsoft.Azure.Cosmos.ChangeFeedStartFrom Beginning(Microsoft.Azure.Cosmos.FeedRange)" + }, + "Microsoft.Azure.Cosmos.ChangeFeedStartFrom ContinuationToken(System.String)": { + "Type": "Method", + "Attributes": [], + "MethodInfo": "Microsoft.Azure.Cosmos.ChangeFeedStartFrom ContinuationToken(System.String)" + }, + "Microsoft.Azure.Cosmos.ChangeFeedStartFrom Now()": { "Type": "Method", "Attributes": [], - "MethodInfo": "StartFrom CreateFromBeginning()" + "MethodInfo": "Microsoft.Azure.Cosmos.ChangeFeedStartFrom Now()" }, - "StartFrom CreateFromContinuation(System.String)": { + "Microsoft.Azure.Cosmos.ChangeFeedStartFrom Now(Microsoft.Azure.Cosmos.FeedRange)": { "Type": "Method", "Attributes": [], - "MethodInfo": "StartFrom CreateFromContinuation(System.String)" + "MethodInfo": "Microsoft.Azure.Cosmos.ChangeFeedStartFrom Now(Microsoft.Azure.Cosmos.FeedRange)" }, - "StartFrom CreateFromNow()": { + "Microsoft.Azure.Cosmos.ChangeFeedStartFrom Time(System.DateTime)": { "Type": "Method", "Attributes": [], - "MethodInfo": "StartFrom CreateFromNow()" + "MethodInfo": "Microsoft.Azure.Cosmos.ChangeFeedStartFrom Time(System.DateTime)" }, - "StartFrom CreateFromTime(System.DateTime)": { + "Microsoft.Azure.Cosmos.ChangeFeedStartFrom Time(System.DateTime, Microsoft.Azure.Cosmos.FeedRange)": { "Type": "Method", "Attributes": [], - "MethodInfo": "StartFrom CreateFromTime(System.DateTime)" + "MethodInfo": "Microsoft.Azure.Cosmos.ChangeFeedStartFrom Time(System.DateTime, Microsoft.Azure.Cosmos.FeedRange)" } }, "NestedTypes": {} @@ -161,30 +106,20 @@ "Container": { "Subclasses": {}, "Members": { - "Microsoft.Azure.Cosmos.FeedIterator GetChangeFeedStreamIterator(Microsoft.Azure.Cosmos.ChangeFeedRequestOptions)": { - "Type": "Method", - "Attributes": [], - "MethodInfo": "Microsoft.Azure.Cosmos.FeedIterator GetChangeFeedStreamIterator(Microsoft.Azure.Cosmos.ChangeFeedRequestOptions)" - }, - "Microsoft.Azure.Cosmos.FeedIterator GetChangeFeedStreamIterator(Microsoft.Azure.Cosmos.PartitionKey, Microsoft.Azure.Cosmos.ChangeFeedRequestOptions)": { + "Microsoft.Azure.Cosmos.FeedIterator GetChangeFeedStreamIterator(Microsoft.Azure.Cosmos.ChangeFeedStartFrom, Microsoft.Azure.Cosmos.ChangeFeedRequestOptions)": { "Type": "Method", "Attributes": [], - "MethodInfo": "Microsoft.Azure.Cosmos.FeedIterator GetChangeFeedStreamIterator(Microsoft.Azure.Cosmos.PartitionKey, Microsoft.Azure.Cosmos.ChangeFeedRequestOptions)" + "MethodInfo": "Microsoft.Azure.Cosmos.FeedIterator GetChangeFeedStreamIterator(Microsoft.Azure.Cosmos.ChangeFeedStartFrom, Microsoft.Azure.Cosmos.ChangeFeedRequestOptions)" }, "Microsoft.Azure.Cosmos.FeedIterator GetItemQueryStreamIterator(Microsoft.Azure.Cosmos.FeedRange, Microsoft.Azure.Cosmos.QueryDefinition, System.String, Microsoft.Azure.Cosmos.QueryRequestOptions)": { "Type": "Method", "Attributes": [], "MethodInfo": "Microsoft.Azure.Cosmos.FeedIterator GetItemQueryStreamIterator(Microsoft.Azure.Cosmos.FeedRange, Microsoft.Azure.Cosmos.QueryDefinition, System.String, Microsoft.Azure.Cosmos.QueryRequestOptions)" }, - "Microsoft.Azure.Cosmos.FeedIterator`1[T] GetChangeFeedIterator[T](Microsoft.Azure.Cosmos.ChangeFeedRequestOptions)": { - "Type": "Method", - "Attributes": [], - "MethodInfo": "Microsoft.Azure.Cosmos.FeedIterator`1[T] GetChangeFeedIterator[T](Microsoft.Azure.Cosmos.ChangeFeedRequestOptions)" - }, - "Microsoft.Azure.Cosmos.FeedIterator`1[T] GetChangeFeedIterator[T](Microsoft.Azure.Cosmos.PartitionKey, Microsoft.Azure.Cosmos.ChangeFeedRequestOptions)": { + "Microsoft.Azure.Cosmos.FeedIterator`1[T] GetChangeFeedIterator[T](Microsoft.Azure.Cosmos.ChangeFeedStartFrom, Microsoft.Azure.Cosmos.ChangeFeedRequestOptions)": { "Type": "Method", "Attributes": [], - "MethodInfo": "Microsoft.Azure.Cosmos.FeedIterator`1[T] GetChangeFeedIterator[T](Microsoft.Azure.Cosmos.PartitionKey, Microsoft.Azure.Cosmos.ChangeFeedRequestOptions)" + "MethodInfo": "Microsoft.Azure.Cosmos.FeedIterator`1[T] GetChangeFeedIterator[T](Microsoft.Azure.Cosmos.ChangeFeedStartFrom, Microsoft.Azure.Cosmos.ChangeFeedRequestOptions)" }, "Microsoft.Azure.Cosmos.FeedIterator`1[T] GetItemQueryIterator[T](Microsoft.Azure.Cosmos.FeedRange, Microsoft.Azure.Cosmos.QueryDefinition, System.String, Microsoft.Azure.Cosmos.QueryRequestOptions)": { "Type": "Method", @@ -249,6 +184,11 @@ "Attributes": [], "MethodInfo": "Microsoft.Azure.Cosmos.FeedRange FromJsonString(System.String)" }, + "Microsoft.Azure.Cosmos.FeedRange FromPartitionKey(Microsoft.Azure.Cosmos.PartitionKey)": { + "Type": "Method", + "Attributes": [], + "MethodInfo": "Microsoft.Azure.Cosmos.FeedRange FromPartitionKey(Microsoft.Azure.Cosmos.PartitionKey)" + }, "System.String ToJsonString()": { "Type": "Method", "Attributes": [], @@ -275,41 +215,12 @@ "ChangeFeedRequestOptions": { "Subclasses": {}, "Members": { - "Microsoft.Azure.Cosmos.ChangeFeedRequestOptions+StartFrom": { - "Type": "NestedType", - "Attributes": [], - "MethodInfo": null - }, - "Microsoft.Azure.Cosmos.FeedRange FeedRange": { - "Type": "Property", - "Attributes": [], - "MethodInfo": null - }, - "Microsoft.Azure.Cosmos.FeedRange get_FeedRange()[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { - "Type": "Method", - "Attributes": [ - "CompilerGeneratedAttribute" - ], - "MethodInfo": "Microsoft.Azure.Cosmos.FeedRange get_FeedRange()" - }, - "StartFrom From": { - "Type": "Property", - "Attributes": [], - "MethodInfo": null - }, - "StartFrom get_From()[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { - "Type": "Method", - "Attributes": [ - "CompilerGeneratedAttribute" - ], - "MethodInfo": "StartFrom get_From()" - }, - "System.Nullable`1[System.Int32] get_MaxItemCount()": { + "System.Nullable`1[System.Int32] get_PageSizeHint()": { "Type": "Method", "Attributes": [], - "MethodInfo": "System.Nullable`1[System.Int32] get_MaxItemCount()" + "MethodInfo": "System.Nullable`1[System.Int32] get_PageSizeHint()" }, - "System.Nullable`1[System.Int32] MaxItemCount": { + "System.Nullable`1[System.Int32] PageSizeHint": { "Type": "Property", "Attributes": [], "MethodInfo": null @@ -345,20 +256,6 @@ "Attributes": [], "MethodInfo": "Void .ctor()" }, - "Void set_FeedRange(Microsoft.Azure.Cosmos.FeedRange)[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { - "Type": "Method", - "Attributes": [ - "CompilerGeneratedAttribute" - ], - "MethodInfo": "Void set_FeedRange(Microsoft.Azure.Cosmos.FeedRange)" - }, - "Void set_From(StartFrom)[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": { - "Type": "Method", - "Attributes": [ - "CompilerGeneratedAttribute" - ], - "MethodInfo": "Void set_From(StartFrom)" - }, "Void set_IfMatchEtag(System.String)": { "Type": "Method", "Attributes": [], @@ -369,40 +266,13 @@ "Attributes": [], "MethodInfo": "Void set_IfNoneMatchEtag(System.String)" }, - "Void set_MaxItemCount(System.Nullable`1[System.Int32])": { + "Void set_PageSizeHint(System.Nullable`1[System.Int32])": { "Type": "Method", "Attributes": [], - "MethodInfo": "Void set_MaxItemCount(System.Nullable`1[System.Int32])" + "MethodInfo": "Void set_PageSizeHint(System.Nullable`1[System.Int32])" } }, - "NestedTypes": { - "StartFrom": { - "Subclasses": {}, - "Members": { - "StartFrom CreateFromBeginning()": { - "Type": "Method", - "Attributes": [], - "MethodInfo": "StartFrom CreateFromBeginning()" - }, - "StartFrom CreateFromContinuation(System.String)": { - "Type": "Method", - "Attributes": [], - "MethodInfo": "StartFrom CreateFromContinuation(System.String)" - }, - "StartFrom CreateFromNow()": { - "Type": "Method", - "Attributes": [], - "MethodInfo": "StartFrom CreateFromNow()" - }, - "StartFrom CreateFromTime(System.DateTime)": { - "Type": "Method", - "Attributes": [], - "MethodInfo": "StartFrom CreateFromTime(System.DateTime)" - } - }, - "NestedTypes": {} - } - } + "NestedTypes": {} } }, "Members": { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedRange/ChangeFeedIteratorCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedRange/ChangeFeedIteratorCoreTests.cs index f8eed1deeb..936245a965 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedRange/ChangeFeedIteratorCoreTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedRange/ChangeFeedIteratorCoreTests.cs @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos.Tests.FeedRange using System.Net; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.ChangeFeed; using Microsoft.Azure.Cosmos.Handlers; using Microsoft.Azure.Cosmos.Routing; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -21,7 +22,10 @@ public class ChangeFeedIteratorCoreTests [ExpectedException(typeof(ArgumentNullException))] public void ChangeFeedIteratorCore_Null_Container() { - new ChangeFeedIteratorCore(container: null, new ChangeFeedRequestOptions()); + new ChangeFeedIteratorCore( + container: null, + ChangeFeedStartFrom.Beginning(), + new ChangeFeedRequestOptions()); } [DataTestMethod] @@ -32,16 +36,20 @@ public void ChangeFeedIteratorCore_ValidateOptions(int maxItemCount) { new ChangeFeedIteratorCore( Mock.Of(), + ChangeFeedStartFrom.Beginning(), new ChangeFeedRequestOptions() { - MaxItemCount = maxItemCount + PageSizeHint = maxItemCount }); } [TestMethod] public void ChangeFeedIteratorCore_HasMoreResultsDefault() { - ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(Mock.Of(), null); + ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore( + Mock.Of(), + ChangeFeedStartFrom.Beginning(), + null); Assert.IsTrue(changeFeedIteratorCore.HasMoreResults); } @@ -81,6 +89,9 @@ public async Task ChangeFeedIteratorCore_ReadNextAsync() Mock.Get(feedToken) .Setup(f => f.FeedRange) .Returns(range); + Mock.Get(feedToken) + .Setup(f => f.GetFeedRange()) + .Returns(range); Mock.Get(feedToken) .Setup(f => f.HandleSplitAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) .Returns(Task.FromResult(Documents.ShouldRetryResult.NoRetry())); @@ -135,6 +146,9 @@ public async Task ChangeFeedIteratorCore_OfT_ReadNextAsync() Mock.Get(feedToken) .Setup(f => f.FeedRange) .Returns(range); + Mock.Get(feedToken) + .Setup(f => f.GetFeedRange()) + .Returns(range); Mock.Get(feedToken) .Setup(f => f.HandleSplitAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) .Returns(Task.FromResult(Documents.ShouldRetryResult.NoRetry())); @@ -199,6 +213,9 @@ public async Task ChangeFeedIteratorCore_UpdatesContinuation_On304() Mock.Get(feedToken) .Setup(f => f.FeedRange) .Returns(range); + Mock.Get(feedToken) + .Setup(f => f.GetFeedRange()) + .Returns(range); Mock.Get(feedToken) .Setup(f => f.HandleSplitAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) .Returns(Task.FromResult(Documents.ShouldRetryResult.NoRetry())); @@ -308,6 +325,9 @@ public async Task ChangeFeedIteratorCore_Retries() Mock.Get(feedToken) .Setup(f => f.FeedRange) .Returns(range); + Mock.Get(feedToken) + .Setup(f => f.GetFeedRange()) + .Returns(range); Mock.Get(feedToken) .SetupSequence(f => f.HandleSplitAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) @@ -325,6 +345,10 @@ public async Task ChangeFeedIteratorCore_Retries() Mock.Get(feedToken) .Verify(f => f.ReplaceContinuation(It.IsAny()), Times.Once); + Mock.Get(feedToken) + .Setup(f => f.GetFeedRange()) + .Returns(range); + Mock.Get(feedToken) .Verify(f => f.HandleSplitAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny()), Times.Exactly(2)); Mock.Get(feedToken) @@ -434,9 +458,14 @@ private static CosmosClientContext GetMockedClientContext( return clientContext; } - private static ChangeFeedIteratorCore CreateWithCustomFeedToken(ContainerInternal containerInternal, FeedRangeContinuation feedToken) + private static ChangeFeedIteratorCore CreateWithCustomFeedToken( + ContainerInternal containerInternal, + FeedRangeContinuation feedToken) { - ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(containerInternal, changeFeedRequestOptions: default); + ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore( + containerInternal, + ChangeFeedStartFrom.Beginning(), + changeFeedRequestOptions: default); System.Reflection.FieldInfo prop = changeFeedIteratorCore .GetType() .GetField( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OfflineEngineTests/PathToken.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OfflineEngineTests/PathToken.cs index 8885f00ba1..442520ea6c 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OfflineEngineTests/PathToken.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OfflineEngineTests/PathToken.cs @@ -26,8 +26,6 @@ public int CompareTo(PathToken other) throw new InvalidEnumArgumentException($"{nameof(other)}"); } - break; - case StringPathToken stringPathToken1: switch (other) { @@ -41,8 +39,6 @@ public int CompareTo(PathToken other) throw new InvalidEnumArgumentException($"{nameof(other)}"); } - break; - default: throw new InvalidEnumArgumentException($"this"); } @@ -75,8 +71,6 @@ public bool Equals(PathToken other) throw new InvalidEnumArgumentException($"{nameof(other)}"); } - break; - case StringPathToken stringPathToken1: switch (other) { @@ -90,8 +84,6 @@ public bool Equals(PathToken other) throw new InvalidEnumArgumentException($"{nameof(other)}"); } - break; - default: throw new InvalidEnumArgumentException($"this"); } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StandByFeedContinuationTokenTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StandByFeedContinuationTokenTests.cs index 66564e856c..1cc040d326 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StandByFeedContinuationTokenTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/StandByFeedContinuationTokenTests.cs @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.ChangeFeed; using Microsoft.Azure.Cosmos.Query; using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -224,11 +225,8 @@ public void ConstructorWithRangeGeneratesSingleQueue() public void ChangeFeedRequestOptions_ContinuationIsSet() { RequestMessage request = new RequestMessage(); - ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation("something"), - }; - requestOptions.PopulateRequestOptions(request); + ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(request); + ChangeFeedStartFrom.ContinuationToken("something").Accept(visitor); Assert.AreEqual(expected: "something", actual: request.Headers.IfNoneMatch); Assert.IsNull(request.Headers[Documents.HttpConstants.HttpHeaders.IfModifiedSince]); @@ -238,12 +236,8 @@ public void ChangeFeedRequestOptions_ContinuationIsSet() public void ChangeFeedRequestOptions_StartFromNow() { RequestMessage request = new RequestMessage(); - ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromNow(), - }; - - requestOptions.PopulateRequestOptions(request); + ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(request); + ChangeFeedStartFrom.Now().Accept(visitor); Assert.AreEqual(expected: "*", request.Headers.IfNoneMatch); Assert.IsNull(request.Headers[Documents.HttpConstants.HttpHeaders.IfModifiedSince]); @@ -253,41 +247,24 @@ public void ChangeFeedRequestOptions_StartFromNow() public void ChangeFeedRequestOptions_StartFromBeginning() { RequestMessage request = new RequestMessage(); - ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), - }; - - requestOptions.PopulateRequestOptions(request); + ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(request); + ChangeFeedStartFrom.Beginning().Accept(visitor); Assert.IsNull(request.Headers.IfNoneMatch); Assert.IsNull(request.Headers[Documents.HttpConstants.HttpHeaders.IfModifiedSince]); } - [TestMethod] - public void ChangeFeedRequestOptions_Default() - { - RequestMessage request = new RequestMessage(); - ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions() - { - }; - - requestOptions.PopulateRequestOptions(request); - - Assert.AreEqual(expected: "*", request.Headers.IfNoneMatch); - Assert.IsNull(request.Headers[Documents.HttpConstants.HttpHeaders.IfModifiedSince]); - } - [TestMethod] public void ChangeFeedRequestOptions_MaxItemSizeIsSet() { RequestMessage request = new RequestMessage(); + ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(request); ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions() { - MaxItemCount = 10, - From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning(), + PageSizeHint = 10, }; requestOptions.PopulateRequestOptions(request); + ChangeFeedStartFrom.Beginning().Accept(visitor); Assert.AreEqual(expected: "10", actual: request.Headers[Documents.HttpConstants.HttpHeaders.PageSize]); Assert.IsNull(request.Headers.IfNoneMatch); @@ -298,11 +275,8 @@ public void ChangeFeedRequestOptions_MaxItemSizeIsSet() public void ChangeFeedRequestOptions_AddsStartTime() { RequestMessage request = new RequestMessage(); - ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions() - { - From = ChangeFeedRequestOptions.StartFrom.CreateFromTime(new DateTime(1985, 1, 1, 0, 0, 0, DateTimeKind.Utc)), - }; - requestOptions.PopulateRequestOptions(request); + ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(request); + ChangeFeedStartFrom.Time(new DateTime(1985, 1, 1, 0, 0, 0, DateTimeKind.Utc)).Accept(visitor); Assert.AreEqual( expected: "Tue, 01 Jan 1985 00:00:00 GMT", @@ -311,16 +285,26 @@ public void ChangeFeedRequestOptions_AddsStartTime() } [TestMethod] - public void ChangeFeedRequestOptions_AddsPartitionKeyRangeId() + public void ChangeFeedRequestOptions_AddsFeedRange() { - RequestMessage request = new RequestMessage(); - ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions() + FeedRange feedRange = new FeedRangePartitionKeyRange("randomPK"); + ChangeFeedStartFrom[] froms = new ChangeFeedStartFrom[] { - FeedRange = new FeedRangePartitionKeyRange("randomPK") + ChangeFeedStartFrom.Beginning(feedRange), + ChangeFeedStartFrom.Now(feedRange), + ChangeFeedStartFrom.Time(DateTime.MinValue.ToUniversalTime(), feedRange) }; - requestOptions.PopulateRequestOptions(request); - Assert.AreEqual(expected: "randomPK", actual: request.PartitionKeyRangeId.PartitionKeyRangeId); + foreach (ChangeFeedStartFrom from in froms) + { + RequestMessage request = new RequestMessage(); + ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(request); + from.Accept(visitor); + + Assert.AreEqual( + expected: "randomPK", + actual: request.PartitionKeyRangeId.PartitionKeyRangeId); + } } private static StandByFeedContinuationToken.PartitionKeyRangeCacheDelegate CreateCacheFromRange(IReadOnlyList keyRanges) @@ -332,15 +316,15 @@ private static StandByFeedContinuationToken.PartitionKeyRangeCacheDelegate Creat return Task.FromResult(keyRanges); } - IReadOnlyList filteredRanges = new List(keyRanges.Where(range=> range.MinInclusive.CompareTo(ranges.Min) >= 0 && range.MaxExclusive.CompareTo(ranges.Max) <= 0)); + IReadOnlyList filteredRanges = new List(keyRanges.Where(range => range.MinInclusive.CompareTo(ranges.Min) >= 0 && range.MaxExclusive.CompareTo(ranges.Max) <= 0)); return Task.FromResult(filteredRanges); }; } private static CompositeContinuationToken BuildTokenForRange( - string min, - string max, + string min, + string max, string token) { return new CompositeContinuationToken()