Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed Processor: Adds support for Resource Tokens #3566

Merged
merged 6 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ public static async Task<string> GetMonitoredDatabaseAndContainerRidAsync(
forceRefresh: false,
NoOpTrace.Singleton,
cancellationToken: cancellationToken);
string databaseRid = await ((DatabaseInternal)((ContainerInternal)monitoredContainer).Database).GetRIDAsync(cancellationToken);
ealsur marked this conversation as resolved.
Show resolved Hide resolved

// Extract DbRid from ContainerRid
Documents.ResourceId resourceId = Documents.ResourceId.Parse(containerRid);
string databaseRid = resourceId.DatabaseId.ToString();

return $"{databaseRid}_{containerRid}";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,14 @@ public async Task<ItemResponse<T>> ReadCurrentAsync<T>(
throw new ArgumentNullException(nameof(cosmosConflict));
}

// SourceResourceId is RID based on Conflicts, so we need to obtain the db and container rid
DatabaseInternal databaseCore = (DatabaseInternal)this.container.Database;
string databaseResourceId = await databaseCore.GetRIDAsync(cancellationToken);
string containerResourceId = await this.container.GetCachedRIDAsync(
forceRefresh: false,
trace,
cancellationToken: cancellationToken);

ResourceId resourceId = ResourceId.Parse(containerResourceId);
string databaseResourceId = resourceId.DatabaseId.ToString();

string dbLink = this.ClientContext.CreateLink(
parentLink: string.Empty,
uriPathSegment: Paths.DatabasesPathSegment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3068,8 +3068,6 @@
}]]></Json>
<OTelActivities><ACTIVITY><OPERATION>Cosmos.Change Feed Estimator Read Next Async</OPERATION><ATTRIBUTE-KEY>kind</ATTRIBUTE-KEY><ATTRIBUTE-KEY>az.namespace</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.system</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.hashed_machine_id</ATTRIBUTE-KEY><ATTRIBUTE-KEY>net.peer.name</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.client_id</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.user_agent</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.connection_mode</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.operation</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.name</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.container</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.request_content_length_bytes</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.response_content_length_bytes</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.status_code</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.sub_status_code</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.request_charge</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.item_count</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.operation_type</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.regions_contacted</ATTRIBUTE-KEY></ACTIVITY>
<ACTIVITY><OPERATION>Cosmos.FeedIterator Read Next Async</OPERATION><ATTRIBUTE-KEY>kind</ATTRIBUTE-KEY><ATTRIBUTE-KEY>az.namespace</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.system</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.hashed_machine_id</ATTRIBUTE-KEY><ATTRIBUTE-KEY>net.peer.name</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.client_id</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.user_agent</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.connection_mode</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.operation</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.name</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.container</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.request_content_length_bytes</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.response_content_length_bytes</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.status_code</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.sub_status_code</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.request_charge</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.item_count</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.operation_type</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.regions_contacted</ATTRIBUTE-KEY></ACTIVITY>
<ACTIVITY><OPERATION>Cosmos.ReadAsync</OPERATION><ATTRIBUTE-KEY>kind</ATTRIBUTE-KEY><ATTRIBUTE-KEY>az.namespace</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.system</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.hashed_machine_id</ATTRIBUTE-KEY><ATTRIBUTE-KEY>net.peer.name</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.client_id</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.user_agent</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.connection_mode</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.operation</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.name</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.container</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.request_content_length_bytes</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.response_content_length_bytes</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.status_code</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.sub_status_code</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.request_charge</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.item_count</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.operation_type</ATTRIBUTE-KEY><ATTRIBUTE-KEY>db.cosmosdb.regions_contacted</ATTRIBUTE-KEY></ACTIVITY>
<EVENT>Ideally, this should contain request diagnostics but request diagnostics is subject to change with each request as it contains few unique id. So just putting this tag with this static text to make sure event is getting generated for each test.</EVENT>
<EVENT>Ideally, this should contain request diagnostics but request diagnostics is subject to change with each request as it contains few unique id. So just putting this tag with this static text to make sure event is getting generated for each test.</EVENT>
<EVENT>Ideally, this should contain request diagnostics but request diagnostics is subject to change with each request as it contains few unique id. So just putting this tag with this static text to make sure event is getting generated for each test.</EVENT>
</OTelActivities>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,65 @@ public async Task WritesTriggerDelegate_WithLeaseContainer()
CollectionAssert.AreEqual(expectedIds.ToList(), receivedIds);
}

[TestMethod]
public async Task WritesTriggerDelegate_WithLeaseContainer_UsingResourceTokens()
{
User user = await this.Container.Database.CreateUserAsync("testUser");
PermissionResponse monitoredContainerPermissions = await user.CreatePermissionAsync(
new PermissionProperties(
id: "testPermission",
permissionMode: PermissionMode.All,
container: this.Container)
);

PermissionResponse leaseContainerPermissions = await user.CreatePermissionAsync(
new PermissionProperties(
id: "testPermission2",
permissionMode: PermissionMode.All,
container: this.LeaseContainer)
);

using CosmosClient clientForMonitoredContainer = new CosmosClient(this.Container.Database.Client.Endpoint.ToString(), authKeyOrResourceToken: monitoredContainerPermissions.Resource.Token);
using CosmosClient clientForLeaseContainer = new CosmosClient(this.Container.Database.Client.Endpoint.ToString(), authKeyOrResourceToken: leaseContainerPermissions.Resource.Token);

ManualResetEvent allDocsProcessed = new ManualResetEvent(false);
IEnumerable<int> expectedIds = Enumerable.Range(0, 100);
List<int> receivedIds = new List<int>();
ChangeFeedProcessor processor = clientForMonitoredContainer.GetContainer(this.Container.Database.Id, this.Container.Id)
.GetChangeFeedProcessorBuilder("test", (IReadOnlyCollection<TestClass> docs, CancellationToken token) =>
{
foreach (TestClass doc in docs)
{
receivedIds.Add(int.Parse(doc.id));
}

if (receivedIds.Count == 100)
{
allDocsProcessed.Set();
}

return Task.CompletedTask;
})
.WithInstanceName("random")
.WithLeaseContainer(clientForLeaseContainer.GetContainer(this.LeaseContainer.Database.Id, this.LeaseContainer.Id)).Build();

await processor.StartAsync();
// Letting processor initialize
await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime);
// Inserting documents
foreach (int id in expectedIds)
{
await this.Container.CreateItemAsync<dynamic>(new { id = id.ToString() });
}

// Waiting on all notifications to finish
bool isStartOk = allDocsProcessed.WaitOne(30 * BaseChangeFeedClientHelper.ChangeFeedCleanupTime);
await processor.StopAsync();
Assert.IsTrue(isStartOk, "Timed out waiting for docs to process");
// Verify that we maintain order
CollectionAssert.AreEqual(expectedIds.ToList(), receivedIds);
}

[TestMethod]
public async Task ExceptionsRetryBatch()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,10 @@ static FeedIteratorInternal feedCreator(DocumentServiceLease lease, string conti
}
});

string databaseRid = Guid.NewGuid().ToString();
Mock<DatabaseInternal> mockedMonitoredDatabase = new Mock<DatabaseInternal>(MockBehavior.Strict);
mockedMonitoredDatabase.Setup(c => c.GetRIDAsync(It.IsAny<CancellationToken>())).ReturnsAsync(databaseRid);

string monitoredContainerRid = Guid.NewGuid().ToString();
string monitoredContainerRid = "V4lVAMl0wuQ=";
string databaseRid = Documents.ResourceId.Parse(monitoredContainerRid).DatabaseId.ToString();
Mock<ContainerInternal> mockedMonitoredContainer = new Mock<ContainerInternal>(MockBehavior.Strict);
mockedMonitoredContainer.Setup(c => c.GetCachedRIDAsync(It.IsAny<bool>(), It.IsAny<ITrace>(), It.IsAny<CancellationToken>())).ReturnsAsync(monitoredContainerRid);
mockedMonitoredContainer.Setup(c => c.Database).Returns(mockedMonitoredDatabase.Object);
mockedMonitoredContainer.Setup(c => c.ClientContext).Returns(mockedContext.Object);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.Azure.Cosmos.Tests;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

Expand Down Expand Up @@ -233,6 +235,8 @@ private static ContainerInternal GetMockedContainer(string containerName = null)
{
Mock<ContainerInternal> mockedContainer = MockCosmosUtil.CreateMockContainer(containerName: containerName);
mockedContainer.Setup(c => c.ClientContext).Returns(ChangeFeedProcessorCoreTests.GetMockedClientContext());
string monitoredContainerRid = "V4lVAMl0wuQ=";
mockedContainer.Setup(c => c.GetCachedRIDAsync(It.IsAny<bool>(), It.IsAny<ITrace>(), It.IsAny<CancellationToken>())).ReturnsAsync(monitoredContainerRid);
Mock<DatabaseInternal> mockedDatabase = MockCosmosUtil.CreateMockDatabase();
mockedContainer.Setup(c => c.Database).Returns(mockedDatabase.Object);
return mockedContainer.Object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Cosmos.Tests
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.Cosmos.Tracing;

[TestClass]
public class CosmosConflictTests
Expand All @@ -38,7 +39,7 @@ public async Task ReadCurrentGetsCorrectRID()
const string expectedRID = "something";
Cosmos.PartitionKey partitionKey = new Cosmos.PartitionKey("pk");
// Using "test" as container name because the Mocked DocumentClient has it hardcoded
Uri expectedRequestUri = new Uri($"dbs/conflictsDb/colls/test/docs/{expectedRID}", UriKind.Relative);
Uri expectedRequestUri = new Uri($"dbs/V4lVAA==/colls/V4lVAMl0wuQ=/docs/{expectedRID}", UriKind.Relative);
ContainerInternal container = CosmosConflictTests.GetMockedContainer((request, cancellationToken) => {
Assert.AreEqual(OperationType.Read, request.OperationType);
Assert.AreEqual(ResourceType.Document, request.ResourceType);
Expand Down Expand Up @@ -74,7 +75,7 @@ public async Task DeleteSendsCorrectPayload()
{
const string expectedId = "something";
Cosmos.PartitionKey partitionKey = new Cosmos.PartitionKey("pk");
Uri expectedRequestUri = new Uri($"/dbs/conflictsDb/colls/conflictsColl/conflicts/{expectedId}", UriKind.Relative);
Uri expectedRequestUri = new Uri($"/dbs/myDb/colls/conflictsColl/conflicts/{expectedId}", UriKind.Relative);
ContainerInternal container = CosmosConflictTests.GetMockedContainer((request, cancellationToken) => {
Assert.AreEqual(OperationType.Delete, request.OperationType);
Assert.AreEqual(ResourceType.Conflict, request.ResourceType);
Expand All @@ -91,7 +92,18 @@ public async Task DeleteSendsCorrectPayload()
private static ContainerInternal GetMockedContainer(Func<RequestMessage,
CancellationToken, Task<ResponseMessage>> handlerFunc)
{
return new ContainerInlineCore(CosmosConflictTests.GetMockedClientContext(handlerFunc), MockCosmosUtil.CreateMockDatabase("conflictsDb").Object, "conflictsColl");
CosmosClientContext clientContext = CosmosConflictTests.GetMockedClientContext(handlerFunc);
Mock<ContainerInternal> mockedContainer = MockCosmosUtil.CreateMockContainer(containerName: "conflictsColl");
DatabaseInternal database = MockCosmosUtil.CreateMockDatabase("conflictsDb").Object;
string monitoredContainerRid = "V4lVAMl0wuQ=";
mockedContainer.Setup(c => c.GetCachedRIDAsync(It.IsAny<bool>(), It.IsAny<ITrace>(), It.IsAny<CancellationToken>())).ReturnsAsync(monitoredContainerRid);
mockedContainer.Setup(c => c.Database).Returns(database);
mockedContainer.Setup(c => c.GetReadFeedIterator(It.IsAny<QueryDefinition>(), It.IsAny<QueryRequestOptions>(), It.IsAny<string>(), It.Is<ResourceType>(r => r == ResourceType.Conflict), It.IsAny<string>(), It.IsAny<int>()))
.Returns(
(QueryDefinition qd, QueryRequestOptions o, string link, ResourceType t, string ct, int p) => new ContainerInlineCore(clientContext, database, "conflictsColl").GetReadFeedIterator(qd, o, link, t, ct, p));
mockedContainer.Setup(c => c.ClientContext).Returns(clientContext);
mockedContainer.Setup(c => c.Conflicts).Returns(new ConflictsInlineCore(clientContext, mockedContainer.Object));
return mockedContainer.Object;
}

private static CosmosClientContext GetMockedClientContext(
Expand Down