Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed: Adds ChangeFeedIterator in GA #2509

Merged
merged 6 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 6 additions & 34 deletions Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -302,40 +302,12 @@ private async Task<ResponseMessage> ReadNextInternalAsync(ITrace trace, Cancella
}

CrossFeedRangeState<ChangeFeedState> crossFeedRangeState = crossFeedRangePage.State;
string continuationToken;
if (this.changeFeedRequestOptions.EmitOldContinuationToken)
{
List<CompositeContinuationToken> compositeContinuationTokens = new List<CompositeContinuationToken>();
for (int i = 0; i < crossFeedRangeState.Value.Length; i++)
{
FeedRangeState<ChangeFeedState> changeFeedFeedRangeState = crossFeedRangeState.Value.Span[i];
string token = changeFeedFeedRangeState.State is ChangeFeedStateContinuation changeFeedStateContinuation ? ((CosmosString)changeFeedStateContinuation.ContinuationToken).Value : null;
Documents.Routing.Range<string> range = ((FeedRangeEpk)changeFeedFeedRangeState.FeedRange).Range;
CompositeContinuationToken compositeContinuationToken = new CompositeContinuationToken()
{
Range = range,
Token = token,
};

compositeContinuationTokens.Add(compositeContinuationToken);
}

FeedRangeCompositeContinuation feedRangeCompositeContinuationToken = new FeedRangeCompositeContinuation(
await this.documentContainer.GetResourceIdentifierAsync(trace, cancellationToken),
FeedRangeEpk.FullRange,
compositeContinuationTokens);

continuationToken = feedRangeCompositeContinuationToken.ToString();
}
else
{
ChangeFeedCrossFeedRangeState changeFeedCrossFeedRangeState = new ChangeFeedCrossFeedRangeState(crossFeedRangeState.Value);
continuationToken = VersionedAndRidCheckedCompositeToken.ToCosmosElement(
new VersionedAndRidCheckedCompositeToken(
VersionedAndRidCheckedCompositeToken.Version.V2,
changeFeedCrossFeedRangeState.ToCosmosElement(),
await this.documentContainer.GetResourceIdentifierAsync(trace, cancellationToken))).ToString();
}
ChangeFeedCrossFeedRangeState changeFeedCrossFeedRangeState = new ChangeFeedCrossFeedRangeState(crossFeedRangeState.Value);
string continuationToken = VersionedAndRidCheckedCompositeToken.ToCosmosElement(
new VersionedAndRidCheckedCompositeToken(
VersionedAndRidCheckedCompositeToken.Version.V2,
changeFeedCrossFeedRangeState.ToCosmosElement(),
await this.documentContainer.GetResourceIdentifierAsync(trace, cancellationToken))).ToString();

responseMessage.Headers.ContinuationToken = continuationToken;
responseMessage.Headers.RequestCharge = changeFeedPage.RequestCharge;
Expand Down
14 changes: 7 additions & 7 deletions Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedMode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@ namespace Microsoft.Azure.Cosmos
/// Base class for the change feed mode <see cref="ChangeFeedRequestOptions"/>.
/// </summary>
/// <remarks>Use one of the static constructors to generate a ChangeFeedMode option.</remarks>
#if PREVIEW
public
#else
internal
#endif
abstract class ChangeFeedMode
public abstract class ChangeFeedMode
{
/// <summary>
/// Initializes an instance of the <see cref="ChangeFeedMode"/> class.
Expand Down Expand Up @@ -49,6 +44,11 @@ internal ChangeFeedMode()
/// but no events for deletes or intermediary updates would be included.
/// </remarks>
/// <returns>A <see cref="ChangeFeedMode"/> to receive notifications for insertions, updates, and delete operations.</returns>
public static ChangeFeedMode FullFidelity => ChangeFeedModeFullFidelity.Instance;
#if PREVIEW
public
#else
internal
#endif
static ChangeFeedMode FullFidelity => ChangeFeedModeFullFidelity.Instance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@ namespace Microsoft.Azure.Cosmos
/// Base class for where to start a ChangeFeed operation in <see cref="ChangeFeedRequestOptions"/>.
/// </summary>
/// <remarks>Use one of the static constructors to generate a StartFrom option.</remarks>
#if PREVIEW
public
#else
internal
#endif
abstract class ChangeFeedStartFrom
public abstract class ChangeFeedStartFrom
{
/// <summary>
/// Initializes an instance of the <see cref="ChangeFeedStartFrom"/> class.
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ public virtual Task<AccountProperties> ReadAccountAsync()
/// <example>
/// <code language="c#">
/// <![CDATA[
/// Database db = cosmosClient.GetDatabase("myDatabaseId"];
/// Database db = cosmosClient.GetDatabase("myDatabaseId");
/// DatabaseResponse response = await db.ReadAsync();
/// ]]>
/// </code>
Expand Down
7 changes: 1 addition & 6 deletions Microsoft.Azure.Cosmos/src/FeedRange/FeedRange.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@ namespace Microsoft.Azure.Cosmos
/// Represents a unit of feed consumption that can be used as unit of parallelism.
/// </summary>
[Serializable]
#if PREVIEW || INTERNAL
public
#else
internal
#endif
abstract class FeedRange
public abstract class FeedRange
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// Gets a string representation of the current range.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@ namespace Microsoft.Azure.Cosmos
/// <summary>
/// The Cosmos Change Feed request options
/// </summary>
#if PREVIEW
public
#else
internal
#endif
sealed class ChangeFeedRequestOptions : RequestOptions
public sealed class ChangeFeedRequestOptions : RequestOptions
{
private int? pageSizeHint;

Expand All @@ -43,20 +38,6 @@ public int? PageSizeHint
}
}

/// <summary>
/// Gets or sets whether or not to emit the old continuation token.
/// </summary>
/// <remarks>
/// This is useful for when you want to upgrade your SDK, but can not upgrade all nodes atomically.
/// In that scenario perform a "two phase deployment".
/// In the first phase deploy with EmitOldContinuationToken = true; this will ensure that none of the old SDKs encounter a new continuation token.
/// Once the first phase is complete and all nodes are able to read the new continuation token, then
/// in the second phase redeploy with EmitOldContinuationToken = false.
/// This will succesfully migrate all nodes to emit the new continuation token format.
/// Eventually all your tokens from the previous version will be migrated.
/// </remarks>
public bool EmitOldContinuationToken { get; set; }

/// <summary>
/// Fill the CosmosRequestMessage headers with the set properties
/// </summary>
Expand Down
74 changes: 40 additions & 34 deletions Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1234,25 +1234,6 @@ public abstract ChangeFeedEstimator GetChangeFeedEstimator(
/// <returns>A new instance of <see cref="TransactionalBatch"/>.</returns>
public abstract TransactionalBatch CreateTransactionalBatch(PartitionKey partitionKey);

#if INTERNAL
/// <summary>
/// Deletes all items in the Container with the specified <see cref="PartitionKey"/> value.
/// Starts an asynchronous Cosmos DB background operation which deletes all items in the Container with the specified value.
/// The asynchronous Cosmos DB background operation runs using a percentage of user RUs.
/// </summary>
/// <param name="partitionKey">The <see cref="PartitionKey"/> of the items to be deleted.</param>
/// <param name="requestOptions">(Optional) The options for the Partition Key Delete request.</param>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>
/// A <see cref="Task"/> containing a <see cref="ResponseMessage"/>.
/// </returns>
public abstract Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
Cosmos.PartitionKey partitionKey,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken));
#endif

#if PREVIEW
/// <summary>
/// Obtains a list of <see cref="FeedRange"/> that can be used to parallelize Feed operations.
/// </summary>
Expand All @@ -1272,24 +1253,26 @@ public abstract Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
/// <example>
/// <code language="c#">
/// <![CDATA[
/// IReadOnlyList<FeedRange> feedRanges = await this.Container.GetFeedRangesAsync();
/// // Distribute feedRanges across multiple compute units and pass each one to a different iterator
///
/// ChangeFeedRequestOptions options = new ChangeFeedRequestOptions()
/// {
/// PageSizeHint = 10,
/// }
///
/// FeedIterator feedIterator = this.Container.GetChangeFeedStreamIterator(
/// ChangeFeedStartFrom.Beginning(feedRanges[0]),
/// ChangeFeedStartFrom.Beginning(),
/// ChangeFeedMode.Incremental,
/// options);
///
/// while (feedIterator.HasMoreResults)
/// {
/// while (feedIterator.HasMoreResults)
/// using (ResponseMessage response = await feedIterator.ReadNextAsync())
/// {
/// using (ResponseMessage response = await feedIterator.ReadNextAsync())
/// if (response.StatusCode == NotModified)
/// {
/// // No new changes
/// // Capture response.ContinuationToken and break or sleep for some time
/// }
/// else
/// {
/// using (StreamReader sr = new StreamReader(response.Content))
/// using (JsonTextReader jtr = new JsonTextReader(sr))
Expand Down Expand Up @@ -1319,29 +1302,33 @@ public abstract FeedIterator GetChangeFeedStreamIterator(
/// <example>
/// <code language="c#">
/// <![CDATA[
/// IReadOnlyList<FeedRange> feedRanges = await this.Container.GetFeedRangessAsync();
/// // Distribute feedRangess across multiple compute units and pass each one to a different iterator
///
/// ChangeFeedRequestOptions options = new ChangeFeedRequestOptions()
/// {
/// PageSizeHint = 10,
/// }
///
/// FeedIterator<MyItem> feedIterator = this.Container.GetChangeFeedIterator<MyItem>(
/// ChangeFeedStartFrom.Beginning(feedRanges[0]),
/// ChangeFeedStartFrom.Beginning(),
/// ChangeFeedMode.Incremental,
/// options);
/// while (feedIterator.HasMoreResults)
/// {
///
/// while (feedIterator.HasMoreResults)
/// {
/// FeedResponse<MyItem> response = await feedIterator.ReadNextAsync();
/// foreach (var item in response)
///
/// if (response.StatusCode == NotModified)
/// {
/// Console.WriteLine(item);
/// // No new changes
/// // Capture response.ContinuationToken and break or sleep for some time
/// }
/// else
/// {
/// foreach (var item in response)
/// {
/// Console.WriteLine(item);
/// }
/// }
/// }
/// }
/// ]]>
/// </code>
/// </example>
Expand All @@ -1351,6 +1338,25 @@ public abstract FeedIterator<T> GetChangeFeedIterator<T>(
ChangeFeedMode changeFeedMode,
ChangeFeedRequestOptions changeFeedRequestOptions = null);

#if INTERNAL
ealsur marked this conversation as resolved.
Show resolved Hide resolved
/// <summary>
/// Deletes all items in the Container with the specified <see cref="PartitionKey"/> value.
/// Starts an asynchronous Cosmos DB background operation which deletes all items in the Container with the specified value.
/// The asynchronous Cosmos DB background operation runs using a percentage of user RUs.
/// </summary>
/// <param name="partitionKey">The <see cref="PartitionKey"/> of the items to be deleted.</param>
/// <param name="requestOptions">(Optional) The options for the Partition Key Delete request.</param>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>
/// A <see cref="Task"/> containing a <see cref="ResponseMessage"/>.
/// </returns>
public abstract Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
Cosmos.PartitionKey partitionKey,
RequestOptions requestOptions = null,
CancellationToken cancellationToken = default(CancellationToken));
#endif

#if PREVIEW
/// <summary>
/// Gets the list of Partition Key Range identifiers for a <see cref="FeedRange"/>.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ public abstract Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
#endif

#if !PREVIEW
public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
FeedRange feedRange,
CancellationToken cancellationToken = default);

public abstract Task<ResponseMessage> PatchItemStreamAsync(
string id,
PartitionKey partitionKey,
Expand All @@ -147,22 +151,6 @@ public abstract Task<ItemResponse<T>> PatchItemAsync<T>(
PatchItemRequestOptions requestOptions = null,
CancellationToken cancellationToken = default);

public abstract Task<IReadOnlyList<FeedRange>> GetFeedRangesAsync(CancellationToken cancellationToken = default);

public abstract FeedIterator GetChangeFeedStreamIterator(
ChangeFeedStartFrom changeFeedStartFrom,
ChangeFeedMode changeFeedMode,
ChangeFeedRequestOptions changeFeedRequestOptions = null);

public abstract FeedIterator<T> GetChangeFeedIterator<T>(
ChangeFeedStartFrom changeFeedStartFrom,
ChangeFeedMode changeFeedMode,
ChangeFeedRequestOptions changeFeedRequestOptions = null);

public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
FeedRange feedRange,
CancellationToken cancellationToken = default);

public abstract FeedIterator GetItemQueryStreamIterator(
FeedRange feedRange,
QueryDefinition queryDefinition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Cosmos

internal sealed class FeedRangeInternalConverter : JsonConverter
{
private const string PartitionKeyNoneValue = "None";
private const string RangePropertyName = "Range";
private const string PartitionKeyPropertyName = "PK";
private const string PartitionKeyRangeIdPropertyName = "PKRangeId";
Expand Down Expand Up @@ -73,7 +74,13 @@ public static FeedRangeInternal ReadJObject(

if (jObject.TryGetValue(FeedRangeInternalConverter.PartitionKeyPropertyName, out JToken pkJToken))
{
if (!PartitionKey.TryParseJsonString(pkJToken.Value<string>(), out PartitionKey partitionKey))
string value = pkJToken.Value<string>();
if (FeedRangeInternalConverter.PartitionKeyNoneValue.Equals(value, StringComparison.OrdinalIgnoreCase))
{
return new FeedRangePartitionKey(PartitionKey.None);
}

if (!PartitionKey.TryParseJsonString(value, out PartitionKey partitionKey))
{
throw new JsonReaderException();
}
Expand Down Expand Up @@ -104,7 +111,15 @@ public static void WriteJObject(
if (value is FeedRangePartitionKey feedRangePartitionKey)
{
writer.WritePropertyName(FeedRangeInternalConverter.PartitionKeyPropertyName);
writer.WriteValue(feedRangePartitionKey.PartitionKey.ToJsonString());
if (feedRangePartitionKey.PartitionKey.IsNone)
{
writer.WriteValue(FeedRangeInternalConverter.PartitionKeyNoneValue);
}
else
{
writer.WriteValue(feedRangePartitionKey.PartitionKey.ToJsonString());
}

return;
}

Expand Down
Loading