Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable to take Lease Collection Partition Key from end user #159

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
//----------------------------------------------------------------
// Copyright (c) Microsoft Corporation. Licensed under the MIT license.
//----------------------------------------------------------------

namespace Microsoft.Azure.Documents.ChangeFeedProcessor.IntegrationTests
{
using Microsoft.Azure.Documents.ChangeFeedProcessor.IntegrationTests.Utils;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Linq;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

/// <summary>
/// Test ChangeFeed with Custom Partition key lease collections
/// </summary>
[Trait("Category", "Integration")]
[Collection("Integration tests")]
public class CustomPartitionedLeaseCollectionTests:IntegrationTest
{
const int documentCount = 500;
public CustomPartitionedLeaseCollectionTests():base(isPartitionedMonitoredCollection:true, isPartitionedLeaseCollection:true,leaseCollectionPartitionKey:"leaseId")
{
}

[Fact]
public async Task CountDocumentsInCollection_TwoHosts()
{
await this.InitializeDocumentsAsync();
int partitionKeyRangeCount = await IntegrationTestsHelper.GetPartitionCount(this.MonitoredCollectionInfo);
Assert.True(partitionKeyRangeCount > 1, "Prerequisite failed: expected monitored collection with at least 2 partitions.");

int processedCount = 0;
var allDocsProcessed = new ManualResetEvent(false);

var observerFactory = new V2TestObserverFactory(
openProcessor: null,
changeProcessor:(FeedProcessing.IChangeFeedObserverContext context, IReadOnlyList<Document> docs) =>
{
int newCount = Interlocked.Add(ref processedCount, docs.Count);
if (newCount == documentCount)
{
allDocsProcessed.Set();
}
return Task.CompletedTask;
});

var host1 = await new ChangeFeedProcessorBuilder()
.WithObserverFactory(observerFactory)
.WithHostName(Guid.NewGuid().ToString())
.WithFeedCollection(this.MonitoredCollectionInfo)
.WithLeaseCollection(this.LeaseCollectionInfo)
.WithProcessorOptions(new ChangeFeedProcessorOptions()
{
StartFromBeginning = true
})
.BuildAsync();
await host1.StartAsync();
var host2 = await new ChangeFeedProcessorBuilder()
.WithObserverFactory(observerFactory)
.WithHostName(Guid.NewGuid().ToString())
.WithFeedCollection(this.MonitoredCollectionInfo)
.WithLeaseCollection(this.LeaseCollectionInfo)
.WithProcessorOptions(new ChangeFeedProcessorOptions()
{
StartFromBeginning = true
})
.BuildAsync();
await host2.StartAsync();
await this.WaitUntilLeaseStoreIsInitializedAsync(new CancellationTokenSource(5000).Token);
allDocsProcessed.WaitOne(changeWaitTimeout + changeWaitTimeout);

try
{
Assert.True(documentCount == processedCount, $"Wrong processedCount {documentCount} {processedCount}");
}
finally
{
await host1.StopAsync();
await host2.StopAsync();
}
}

private async Task InitializeDocumentsAsync()
{
using (var client = new DocumentClient(this.MonitoredCollectionInfo.Uri, this.MonitoredCollectionInfo.MasterKey, this.MonitoredCollectionInfo.ConnectionPolicy))
{
var collectionUri = UriFactory.CreateDocumentCollectionUri(this.MonitoredCollectionInfo.DatabaseName, this.MonitoredCollectionInfo.CollectionName);

await IntegrationTestsHelper.CreateDocumentsAsync(client, collectionUri, documentCount);
}
}

private async Task WaitUntilLeaseStoreIsInitializedAsync(CancellationToken cancellationToken)
{
bool infoExists = false;
bool lockExists = false;
while (true)
{
infoExists = false;
lockExists = false;
cancellationToken.ThrowIfCancellationRequested();
using (DocumentClient client = new DocumentClient(this.LeaseCollectionInfo.Uri, this.LeaseCollectionInfo.MasterKey, this.LeaseCollectionInfo.ConnectionPolicy))
{
Uri collectionUri = UriFactory.CreateDocumentCollectionUri(this.LeaseCollectionInfo.DatabaseName, this.LeaseCollectionInfo.CollectionName);

IDocumentQuery<JObject> query = client.CreateDocumentQuery<JObject>(collectionUri, "SELECT * FROM c WHERE CONTAINS(c.id, \".info\") OR CONTAINS(c.id, \".lock\")",new FeedOptions() { EnableCrossPartitionQuery = true }).AsDocumentQuery();
while (query.HasMoreResults)
{
foreach (JObject lease in await query.ExecuteNextAsync())
{
string leaseId = lease.Value<string>("id");
if (leaseId.Contains(".info"))
{
infoExists = true;
}

if (leaseId.Contains(".lock"))
{
lockExists = true;
}
}
}
}

if (infoExists && !lockExists)
{
return;
}

await Task.Delay(100, cancellationToken);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,16 @@ protected DocumentCollectionInfo MonitoredCollectionInfo

protected readonly bool IsPartitionedLeaseCollection;

protected readonly string leaseCollectionPartitionKey;

public IntegrationTest(
bool isPartitionedMonitoredCollection = true,
bool isPartitionedLeaseCollection = false)
bool isPartitionedLeaseCollection = false,
string leaseCollectionPartitionKey="id")
{
this.IsPartitionedMonitoredCollection = isPartitionedMonitoredCollection;
this.IsPartitionedLeaseCollection = isPartitionedLeaseCollection;
this.leaseCollectionPartitionKey = leaseCollectionPartitionKey;
}

public async Task InitializeAsync()
Expand All @@ -119,7 +123,7 @@ public async Task InitializeAsync()

if (this.IsPartitionedLeaseCollection)
{
leaseCollection.PartitionKey = new PartitionKeyDefinition { Paths = { "/id" } };
leaseCollection.PartitionKey = new PartitionKeyDefinition { Paths = { $"/{leaseCollectionPartitionKey}" } };
}

using (var client = new DocumentClient(this.LeaseCollectionInfo.Uri, this.LeaseCollectionInfo.MasterKey, this.LeaseCollectionInfo.ConnectionPolicy))
Expand Down Expand Up @@ -160,7 +164,7 @@ private async Task CreateMonitoredCollectionAsync(string monitoredCollectionName

if (this.IsPartitionedMonitoredCollection)
{
monitoredCollection.PartitionKey = new PartitionKeyDefinition { Paths = { "/id" } };
monitoredCollection.PartitionKey = new PartitionKeyDefinition { Paths = { "/partitionId" } };
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.Documents.ChangeFeedProcessor.IntegrationTests.Utils
{
Expand Down Expand Up @@ -105,7 +106,6 @@ internal static async Task CreateDocumentsAsync(DocumentClient client, Uri colle

var dummyCounts = Enumerable.Repeat(0, count);
var emptyDocument = new object();

await dummyCounts.ForEachAsync(
async dummyCounter => { await client.CreateDocumentAsync(collectionUri, emptyDocument); },
128);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public V3CompatibilityTests() : base()
[Fact]
public async Task Schema_DefaultsToNoLeaseToken()
{
TestObserverFactory observerFactory = new TestObserverFactory(
V2TestObserverFactory observerFactory = new V2TestObserverFactory(
openProcessor: null,
(FeedProcessing.IChangeFeedObserverContext context, IReadOnlyList<Document> docs) =>
{
Expand Down Expand Up @@ -92,7 +92,7 @@ public async Task Schema_OnV2MigrationMaintainLeaseToken()
ManualResetEvent firstSetOfResultsProcessed = new ManualResetEvent(false);
ManualResetEvent secondSetOfResultsProcessed = new ManualResetEvent(false);
List<int> receivedIds = new List<int>();
TestObserverFactory observerFactory = new TestObserverFactory(
V2TestObserverFactory observerFactory = new V2TestObserverFactory(
context =>
{
int newCount = Interlocked.Increment(ref openedCount);
Expand Down Expand Up @@ -265,43 +265,43 @@ private async Task WaitUntilLeaseStoreIsInitializedAsync(CancellationToken cance

await Task.Delay(100, cancellationToken);
}
}
}
}

class TestObserverFactory : FeedProcessing.IChangeFeedObserverFactory, FeedProcessing.IChangeFeedObserver
public class V2TestObserverFactory : FeedProcessing.IChangeFeedObserverFactory, FeedProcessing.IChangeFeedObserver
{
private readonly Func<FeedProcessing.IChangeFeedObserverContext, IReadOnlyList<Document>, Task> changeProcessor;
private readonly Func<FeedProcessing.IChangeFeedObserverContext, Task> openProcessor;

public V2TestObserverFactory(
Func<FeedProcessing.IChangeFeedObserverContext, Task> openProcessor,
Func<FeedProcessing.IChangeFeedObserverContext, IReadOnlyList<Document>, Task> changeProcessor)
{
private readonly Func<FeedProcessing.IChangeFeedObserverContext, IReadOnlyList<Document>, Task> changeProcessor;
private readonly Func<FeedProcessing.IChangeFeedObserverContext, Task> openProcessor;
this.changeProcessor = changeProcessor;
this.openProcessor = openProcessor;
}

public TestObserverFactory(
Func<FeedProcessing.IChangeFeedObserverContext, Task> openProcessor,
Func<FeedProcessing.IChangeFeedObserverContext, IReadOnlyList<Document>, Task> changeProcessor)
{
this.changeProcessor = changeProcessor;
this.openProcessor = openProcessor;
}
public FeedProcessing.IChangeFeedObserver CreateObserver()
{
return this;
}

public FeedProcessing.IChangeFeedObserver CreateObserver()
public Task OpenAsync(FeedProcessing.IChangeFeedObserverContext context)
{
if (this.openProcessor != null)
{
return this;
return this.openProcessor(context);
}

public Task OpenAsync(FeedProcessing.IChangeFeedObserverContext context)
{
if (this.openProcessor != null)
{
return this.openProcessor(context);
}

return Task.CompletedTask;
}
return Task.CompletedTask;
}

public Task CloseAsync(FeedProcessing.IChangeFeedObserverContext context, FeedProcessing.ChangeFeedObserverCloseReason reason) => Task.CompletedTask;
public Task CloseAsync(FeedProcessing.IChangeFeedObserverContext context, FeedProcessing.ChangeFeedObserverCloseReason reason) => Task.CompletedTask;

public Task ProcessChangesAsync(FeedProcessing.IChangeFeedObserverContext context, IReadOnlyList<Document> docs, CancellationToken cancellationToken)
{
if (this.changeProcessor != null) return this.changeProcessor(context, docs);
else return Task.CompletedTask;
}
public Task ProcessChangesAsync(FeedProcessing.IChangeFeedObserverContext context, IReadOnlyList<Document> docs, CancellationToken cancellationToken)
{
if (this.changeProcessor != null) return this.changeProcessor(context, docs);
else return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,16 @@ public async Task UseCustomLoadBalancingStrategy()
}

[Fact]
public async Task BuildThrowsWhenLeaseCollectionPartitionedNotById()
public async Task BuildWhenLeaseCollectionPartitionedById()
{
SetupBuilderForPartitionedLeaseCollection("/not_id");
await Assert.ThrowsAsync<ArgumentException>(async () => await builder.BuildAsync() );
SetupBuilderForPartitionedLeaseCollection("/id");
await this.builder.BuildAsync();
}

[Fact]
public async Task BuildWhenLeaseCollectionPartitionedById()
public async Task BuildWhenLeaseCollectionPartitionedByCustomPk()
{
SetupBuilderForPartitionedLeaseCollection("/id");
SetupBuilderForPartitionedLeaseCollection("/leaseId");
await this.builder.BuildAsync();
}

Expand Down Expand Up @@ -193,13 +193,6 @@ public async Task BuildWhenOnlyLeaseStoreManagerSpecified()
await builder.BuildAsync();
}

[Fact]
public async Task BuildEstimatorThrowsWhenLeaseCollectionPartitionedNotById()
{
SetupBuilderForPartitionedLeaseCollection("/not_id");
await Assert.ThrowsAsync<ArgumentException>(async () => await builder.BuildEstimatorAsync());
}

[Fact]
public async Task BuildEstimatorWhenLeaseCollectionPartitionedById()
{
Expand Down Expand Up @@ -304,6 +297,48 @@ public async Task BuildPassesPartitionKey_WhenLeaseCollectionIsPartitionedById()
Assert.Equal(typeof(LeaseLostException), exception.GetType());
}

[Fact]
public async Task BuildPassesPartitionKey_WhenLeaseCollectionIsPartitionedByCustomPk()
{
var leaseCollection = MockHelpers.CreateCollection(
"collectionId",
"collectionRid",
new PartitionKeyDefinition { Paths = { "/leaseid" } },
collectionLink);

var lease = Mock.Of<ILease>();
Mock.Get(lease)
.SetupGet(l => l.Id)
.Returns("leaseId");

var leaseClient = this.CreateMockDocumentClient(collection);
Mock.Get(leaseClient)
.Setup(c => c.ReadDocumentCollectionAsync(
It.IsAny<Uri>(),
It.IsAny<RequestOptions>()))
.ReturnsAsync(new ResourceResponse<DocumentCollection>(leaseCollection));
Mock.Get(leaseClient)
.Setup(c => c.ReadDocumentAsync(
It.IsAny<Uri>(),
It.IsAny<RequestOptions>(),
It.IsAny<CancellationToken>()))
.Callback((Uri uri, RequestOptions options, CancellationToken token) =>
{
if (new PartitionKey(lease.Id).Equals(options.PartitionKey))
throw DocumentExceptionHelpers.CreateNotFoundException(); // Success code path: cause lease lost.
throw new Exception("Failure");
});

this.builder
.WithFeedDocumentClient(this.CreateMockDocumentClient())
.WithLeaseDocumentClient(leaseClient)
.WithObserverFactory(Mock.Of<IChangeFeedObserverFactory>());
await this.builder.BuildAsync();

Exception exception = await Record.ExceptionAsync(() => this.builder.LeaseStoreManager.ReleaseAsync(lease));
Assert.Equal(typeof(LeaseLostException), exception.GetType());
}

[Fact]
public async Task BuildThrowsWhenBothPartitionProcessorFactoriesSpecified()
{
Expand Down
Loading