Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CFP AVAD: Adds new FeedRange to ChangeFeedProcessorContext #4621

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@ public abstract class ChangeFeedProcessorContext
/// Gets the headers related to the service response that provided the changes.
/// </summary>
public abstract Headers Headers { get; }

/// <summary>
/// Gets the feed range.
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
public abstract FeedRange FeedRange { get; }
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ private void HandleFailedRequest(

private Task DispatchChangesAsync(ResponseMessage response, CancellationToken cancellationToken)
{
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.options.LeaseToken, response, this.checkpointer);
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(
this.options.LeaseToken,
response,
this.checkpointer,
this.options.FeedRange);
return this.observer.ProcessChangesAsync(context, response.Content, cancellationToken);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public override FeedProcessor Create(DocumentServiceLease lease, ChangeFeedObser
FeedPollDelay = this.changeFeedProcessorOptions.FeedPollDelay,
MaxItemCount = this.changeFeedProcessorOptions.MaxItemCount,
StartFromBeginning = this.changeFeedProcessorOptions.StartFromBeginning,
StartTime = this.changeFeedProcessorOptions.StartTime
StartTime = this.changeFeedProcessorOptions.StartTime,
FeedRange = lease.FeedRange,
};

PartitionCheckpointerCore checkpointer = new PartitionCheckpointerCore(this.leaseCheckpointer, lease);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing
{
using System;
using Microsoft.Azure.Documents;

internal class ProcessorOptions
{
Expand All @@ -22,5 +21,7 @@ internal class ProcessorOptions
public DateTime? StartTime { get; set; }

public TimeSpan RequestTimeout { get; set; } = CosmosHttpClient.GatewayRequestTimeout;

public FeedRangeInternal FeedRange { get; set; }
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
Expand All @@ -24,11 +23,13 @@ internal sealed class ChangeFeedObserverContextCore
internal ChangeFeedObserverContextCore(
string leaseToken,
ResponseMessage feedResponse,
PartitionCheckpointer checkpointer)
PartitionCheckpointer checkpointer,
FeedRange feedRange)
{
this.LeaseToken = leaseToken;
this.responseMessage = feedResponse;
this.checkpointer = checkpointer;
this.FeedRange = feedRange;
}

public string LeaseToken { get; }
Expand All @@ -37,6 +38,8 @@ internal ChangeFeedObserverContextCore(

public Headers Headers => this.responseMessage.Headers;

public FeedRange FeedRange { get; }

public async Task CheckpointAsync()
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ public ChangeFeedProcessorContextCore(ChangeFeedObserverContextCore changeFeedOb
public override CosmosDiagnostics Diagnostics => this.changeFeedObserverContextCore.Diagnostics;

public override Headers Headers => this.changeFeedObserverContextCore.Headers;

public override FeedRange FeedRange => this.changeFeedObserverContextCore.FeedRange;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.Azure.Cosmos.Services.Management.Tests;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
Expand All @@ -20,6 +21,8 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed
[TestCategory("ChangeFeedProcessor")]
public class GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests : BaseChangeFeedClientHelper
{
private ContainerResponse containerResponse;

[TestInitialize]
public async Task TestInitialize()
{
Expand Down Expand Up @@ -573,20 +576,86 @@ private static async Task BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync
private async Task<ContainerInternal> CreateMonitoredContainer(ChangeFeedMode changeFeedMode)
{
string PartitionKey = "/pk";
ContainerProperties properties = new ContainerProperties(id: Guid.NewGuid().ToString(),
ContainerProperties containerProperties = new ContainerProperties(id: Guid.NewGuid().ToString(),
partitionKeyPath: PartitionKey);

if (changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes)
{
properties.ChangeFeedPolicy.FullFidelityRetention = TimeSpan.FromMinutes(5);
properties.DefaultTimeToLive = -1;
containerProperties.ChangeFeedPolicy.FullFidelityRetention = TimeSpan.FromMinutes(5);
containerProperties.DefaultTimeToLive = -1;
}

ContainerResponse response = await this.database.CreateContainerAsync(properties,
this.containerResponse = await this.database.CreateContainerAsync(containerProperties,
throughput: 10000,
cancellationToken: this.cancellationToken);

return (ContainerInternal)response;
return (ContainerInternal)this.containerResponse;
}

[TestMethod]
[Owner("philipthomas-MSFT")]
[Description("Scenario: Test to confirm that FeedRange is coming back in the context.")]
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
public async Task WhenADocumentHasChangedThenFeedRangeInContextTestsAsync()
{
ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes);
ManualResetEvent allDocsProcessed = new ManualResetEvent(false);
Exception exception = default;

ChangeFeedProcessor processor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: async (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<dynamic>> docs, CancellationToken token) =>
{
await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.ValidateFeedRangeAsync(
this.GetClient(),
context.FeedRange,
containerRid: this.containerResponse.Resource.ResourceId);

allDocsProcessed.Set();

return;
})
.WithInstanceName(Guid.NewGuid().ToString())
.WithLeaseContainer(this.LeaseContainer)
.WithErrorNotification((leaseToken, error) =>
{
exception = error.InnerException;

return Task.CompletedTask;
})
.Build();

// Start the processor, insert 1 document to generate a checkpoint, modify it, and then delete it.
// 1 second delay between operations to get different timestamps.

await processor.StartAsync();
await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime);

await monitoredContainer.CreateItemAsync<dynamic>(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
await Task.Delay(1000);

bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime);

await processor.StopAsync();

if (exception != default)
{
Assert.Fail(exception.ToString());
}
}

private static async Task ValidateFeedRangeAsync(CosmosClient cosmosClient, FeedRange feedRange, string containerRid)
{
Assert.IsNotNull(feedRange);

Routing.PartitionKeyRangeCache partitionKeyRangeCache = await cosmosClient.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton);
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
IReadOnlyList<Documents.PartitionKeyRange> parttionKeyRanges = await partitionKeyRangeCache.TryGetOverlappingRangesAsync(
collectionRid: containerRid,
range: ((FeedRangeEpk)feedRange).Range,
trace: NoOpTrace.Singleton,
forceRefresh: true);

Assert.IsNotNull(parttionKeyRanges);
Logger.LogLine($"{nameof(parttionKeyRanges)} -> {JsonConvert.SerializeObject(parttionKeyRanges)}");
Assert.AreEqual(1, parttionKeyRanges.Count);
}
}
}
Loading