diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedManagement/RemainingWorkEstimatorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedManagement/RemainingWorkEstimatorCore.cs index c6d04731ce..1d998c62d0 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedManagement/RemainingWorkEstimatorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedManagement/RemainingWorkEstimatorCore.cs @@ -15,6 +15,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement using System.Threading.Tasks; using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; + using Microsoft.Azure.Cosmos.ChangeFeed.Utils; using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Documents; using Newtonsoft.Json.Linq; @@ -24,7 +25,6 @@ internal sealed class RemainingWorkEstimatorCore : RemainingWorkEstimator private const char PKRangeIdSeparator = ':'; private const char SegmentSeparator = '#'; private const string LSNPropertyName = "_lsn"; - private static readonly CosmosSerializer DefaultSerializer = new CosmosJsonDotNetSerializer(); private readonly Func feedCreator; private readonly DocumentServiceLeaseContainer leaseContainer; private readonly int degreeOfParallelism; @@ -174,7 +174,7 @@ private static Collection GetItemsFromResponse(ResponseMessage response return new Collection(); } - return RemainingWorkEstimatorCore.DefaultSerializer.FromStream>(response.Content).Data; + return CosmosContainerExtensions.DefaultJsonSerializer.FromStream>(response.Content).Data; } private async Task GetRemainingWorkAsync(DocumentServiceLease existingLease, CancellationToken cancellationToken) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainerCosmos.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainerCosmos.cs index 1528aaef46..eb802370d4 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainerCosmos.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainerCosmos.cs @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement using System; using System.Collections.Generic; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.ChangeFeed.Utils; internal sealed class DocumentServiceLeaseContainerCosmos : DocumentServiceLeaseContainer { @@ -29,7 +30,7 @@ public override async Task> GetAllLeasesAsyn public override async Task> GetOwnedLeasesAsync() { - var ownedLeases = new List(); + List ownedLeases = new List(); foreach (DocumentServiceLease lease in await this.GetAllLeasesAsync().ConfigureAwait(false)) { if (string.Compare(lease.Owner, this.options.HostName, StringComparison.OrdinalIgnoreCase) == 0) @@ -46,15 +47,18 @@ private async Task> ListDocumentsAsync(s if (string.IsNullOrEmpty(prefix)) throw new ArgumentException("Prefix must be non-empty string", nameof(prefix)); - var query = this.container.GetItemQueryIterator( + FeedIterator iterator = this.container.GetItemQueryStreamIterator( "SELECT * FROM c WHERE STARTSWITH(c.id, '" + prefix + "')", continuationToken: null, requestOptions: queryRequestOptions); - var leases = new List(); - while (query.HasMoreResults) + List leases = new List(); + while (iterator.HasMoreResults) { - leases.AddRange(await query.ReadNextAsync().ConfigureAwait(false)); + using (ResponseMessage responseMessage = await iterator.ReadNextAsync().ConfigureAwait(false)) + { + leases.AddRange(CosmosContainerExtensions.DefaultJsonSerializer.FromStream>(responseMessage.Content).Data); + } } return leases; diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs index dc127a75b7..81c588e6f8 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseManagerCosmos.cs @@ -64,7 +64,7 @@ public override async Task CreateLeaseIfNotExistAsync(stri throw new ArgumentNullException(nameof(leaseToken)); string leaseDocId = this.GetDocumentId(leaseToken); - var documentServiceLease = new DocumentServiceLeaseCore + DocumentServiceLeaseCore documentServiceLease = new DocumentServiceLeaseCore { LeaseId = leaseDocId, LeaseToken = leaseToken, diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreCosmos.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreCosmos.cs index fb848274fb..0b4a3e6123 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreCosmos.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreCosmos.cs @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement { using System; + using System.IO; using System.Threading.Tasks; using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos.ChangeFeed.Utils; @@ -40,18 +41,24 @@ public override async Task IsInitializedAsync() public override async Task MarkInitializedAsync() { string markerDocId = this.GetStoreMarkerName(); - var containerDocument = new { id = markerDocId }; + dynamic containerDocument = new { id = markerDocId }; - await this.container.CreateItemAsync( - item: containerDocument, - partitionKey: this.requestOptionsFactory.GetPartitionKey(markerDocId)).ConfigureAwait(false); + using (Stream itemStream = CosmosContainerExtensions.DefaultJsonSerializer.ToStream(containerDocument)) + { + using (ResponseMessage responseMessage = await this.container.CreateItemStreamAsync( + itemStream, + this.requestOptionsFactory.GetPartitionKey(markerDocId)).ConfigureAwait(false)) + { + responseMessage.EnsureSuccessStatusCode(); + } + } } public override async Task AcquireInitializationLockAsync(TimeSpan lockTime) { string lockId = this.GetStoreLockName(); - var containerDocument = new LockDocument() { Id = lockId, TimeToLive = (int)lockTime.TotalSeconds }; - var document = await this.container.TryCreateItemAsync( + LockDocument containerDocument = new LockDocument() { Id = lockId, TimeToLive = (int)lockTime.TotalSeconds }; + ItemResponse document = await this.container.TryCreateItemAsync( this.requestOptionsFactory.GetPartitionKey(lockId), containerDocument).ConfigureAwait(false); @@ -67,17 +74,17 @@ public override async Task AcquireInitializationLockAsync(TimeSpan lockTim public override async Task ReleaseInitializationLockAsync() { string lockId = this.GetStoreLockName(); - var requestOptions = new ItemRequestOptions() + ItemRequestOptions requestOptions = new ItemRequestOptions() { IfMatchEtag = this.lockETag, }; - var document = await this.container.TryDeleteItemAsync( + bool deleted = await this.container.TryDeleteItemAsync( this.requestOptionsFactory.GetPartitionKey(lockId), lockId, requestOptions).ConfigureAwait(false); - if (document != null) + if (deleted) { this.lockETag = null; return true; diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseUpdaterCosmos.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseUpdaterCosmos.cs index 8505536b84..5751fe09f3 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseUpdaterCosmos.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseUpdaterCosmos.cs @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement using System.Threading.Tasks; using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos.ChangeFeed.Exceptions; + using Microsoft.Azure.Cosmos.ChangeFeed.Utils; using Microsoft.Azure.Cosmos.Core.Trace; /// @@ -51,9 +52,7 @@ public override async Task UpdateLeaseAsync( try { - ItemResponse response = await this.container.ReadItemAsync( - itemId, partitionKey).ConfigureAwait(false); - DocumentServiceLeaseCore serverLease = response.Resource; + DocumentServiceLeaseCore serverLease = await this.container.TryGetItemAsync(partitionKey, itemId); DefaultTrace.TraceInformation( "Lease with token {0} update failed because the lease with concurrency token '{1}' was updated by host '{2}' with concurrency token '{3}'. Will retry, {4} retry(s) left.", @@ -77,17 +76,17 @@ public override async Task UpdateLeaseAsync( private async Task TryReplaceLeaseAsync( DocumentServiceLeaseCore lease, - Cosmos.PartitionKey? partitionKey, + PartitionKey partitionKey, string itemId) { try { ItemRequestOptions itemRequestOptions = this.CreateIfMatchOptions(lease); - ItemResponse response = await this.container.ReplaceItemAsync( - id: itemId, - item: lease, - partitionKey: partitionKey, - requestOptions: itemRequestOptions).ConfigureAwait(false); + ItemResponse response = await this.container.TryReplaceItemAsync( + itemId, + lease, + partitionKey, + itemRequestOptions).ConfigureAwait(false); return response.Resource; } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Utils/CosmosContainerExtensions.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Utils/CosmosContainerExtensions.cs index 97abf21466..02aea42659 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Utils/CosmosContainerExtensions.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/Utils/CosmosContainerExtensions.cs @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Utils { using System.Globalization; + using System.IO; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -13,41 +14,70 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Utils internal static class CosmosContainerExtensions { + public static readonly CosmosSerializer DefaultJsonSerializer = new CosmosJsonDotNetSerializer(); + public static async Task TryGetItemAsync( this Container container, PartitionKey partitionKey, string itemId) { - return await container.ReadItemAsync( + using (ResponseMessage responseMessage = await container.ReadItemStreamAsync( itemId, partitionKey) - .ConfigureAwait(false); + .ConfigureAwait(false)) + { + responseMessage.EnsureSuccessStatusCode(); + return CosmosContainerExtensions.DefaultJsonSerializer.FromStream(responseMessage.Content); + } } public static async Task> TryCreateItemAsync( this Container container, - object partitionKey, + PartitionKey partitionKey, T item) { - var response = await container.CreateItemAsync(item).ConfigureAwait(false); - if (response.StatusCode == HttpStatusCode.Conflict) + using (Stream itemStream = CosmosContainerExtensions.DefaultJsonSerializer.ToStream(item)) { - // Ignore-- document already exists. - return null; + using (ResponseMessage response = await container.CreateItemStreamAsync(itemStream, partitionKey).ConfigureAwait(false)) + { + if (response.StatusCode == HttpStatusCode.Conflict) + { + // Ignore-- document already exists. + return null; + } + + return new ItemResponse(response.StatusCode, response.Headers, CosmosContainerExtensions.DefaultJsonSerializer.FromStream(response.Content), response.Diagnostics); + } } + } - return response; + public static async Task> TryReplaceItemAsync( + this Container container, + string itemId, + T item, + PartitionKey partitionKey, + ItemRequestOptions itemRequestOptions) + { + using (Stream itemStream = CosmosContainerExtensions.DefaultJsonSerializer.ToStream(item)) + { + using (ResponseMessage response = await container.ReplaceItemStreamAsync(itemStream, itemId, partitionKey, itemRequestOptions).ConfigureAwait(false)) + { + response.EnsureSuccessStatusCode(); + return new ItemResponse(response.StatusCode, response.Headers, CosmosContainerExtensions.DefaultJsonSerializer.FromStream(response.Content), response.Diagnostics); + } + } } - public static async Task TryDeleteItemAsync( + public static async Task TryDeleteItemAsync( this Container container, PartitionKey partitionKey, string itemId, ItemRequestOptions cosmosItemRequestOptions = null) { - var response = await container.DeleteItemAsync(itemId, partitionKey, cosmosItemRequestOptions).ConfigureAwait(false); - - return response.Resource; + using (ResponseMessage response = await container.DeleteItemStreamAsync(itemId, partitionKey, cosmosItemRequestOptions).ConfigureAwait(false)) + { + return response.IsSuccessStatusCode; + } } public static async Task ItemExistsAsync( @@ -55,7 +85,7 @@ public static async Task ItemExistsAsync( PartitionKey partitionKey, string itemId) { - var response = await container.ReadItemStreamAsync( + ResponseMessage response = await container.ReadItemStreamAsync( itemId, partitionKey) .ConfigureAwait(false); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/SmokeTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/SmokeTests.cs index ace858ae14..f9196e9e2a 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/SmokeTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ChangeFeed/SmokeTests.cs @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed { using System.Collections.Generic; + using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -162,6 +163,68 @@ public async Task WritesTriggerDelegate_WithInMemoryContainerWithDynamic() CollectionAssert.AreEqual(expectedIds.ToList(), receivedIds); } + [TestMethod] + public async Task DoesNotUseUserSerializer() + { + CosmosClient cosmosClient = TestCommon.CreateCosmosClient(builder => builder.WithCustomSerializer(new FailedUserSerializer())); + + ManualResetEvent allDocsProcessed = new ManualResetEvent(false); + + int processedDocCount = 0; + string accumulator = string.Empty; + ChangeFeedProcessor processor = cosmosClient.GetContainer(this.database.Id, this.Container.Id) + .GetChangeFeedProcessorBuilder("test", (IReadOnlyCollection docs, CancellationToken token) => + { + processedDocCount += docs.Count(); + foreach (TestClass doc in docs) + { + accumulator += doc.id.ToString() + "."; + } + + if (processedDocCount == 10) + { + allDocsProcessed.Set(); + } + + return Task.CompletedTask; + }) + .WithInstanceName("random") + .WithLeaseContainer(cosmosClient.GetContainer(this.database.Id, this.LeaseContainer.Id)).Build(); + + // Start the processor, insert 1 document to generate a checkpoint + await processor.StartAsync(); + await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime); + foreach (int id in Enumerable.Range(0, 10)) + { + await this.Container.CreateItemAsync(new TestClass { id = id.ToString() }); + } + + bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime); + await processor.StopAsync(); + Assert.IsTrue(isStartOk, "Timed out waiting for docs to process"); + Assert.AreEqual("0.1.2.3.4.5.6.7.8.9.", accumulator); + } + + private class FailedUserSerializer : CosmosSerializer + { + private readonly CosmosSerializer cosmosSerializer = new CosmosJsonDotNetSerializer(); + public override T FromStream(Stream stream) + { + // Only let changes serialization pass through + if (typeof(T) == typeof(CosmosFeedResponseUtil)) + { + return this.cosmosSerializer.FromStream(stream); + } + + throw new System.NotImplementedException(); + } + + public override Stream ToStream(T input) + { + throw new System.NotImplementedException(); + } + } + public class TestClass { public string id { get; set; } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseContainerCosmosTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseContainerCosmosTests.cs index 0a7907783a..1d007929aa 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseContainerCosmosTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseContainerCosmosTests.cs @@ -47,8 +47,12 @@ public async Task GetAllLeasesAsync_ReturnsAllLeaseDocuments() DocumentServiceLeaseContainerCosmosTests.GetMockedContainer(), DocumentServiceLeaseContainerCosmosTests.leaseStoreManagerSettings); - IEnumerable readLeases = await documentServiceLeaseContainerCosmos.GetAllLeasesAsync(); - CollectionAssert.AreEqual(DocumentServiceLeaseContainerCosmosTests.allLeases, readLeases.ToList()); + List readLeases = (await documentServiceLeaseContainerCosmos.GetAllLeasesAsync()).ToList(); + Assert.AreEqual(DocumentServiceLeaseContainerCosmosTests.allLeases.Count, readLeases.Count); + Assert.AreEqual(DocumentServiceLeaseContainerCosmosTests.allLeases[0].Id, readLeases[0].Id); + Assert.AreEqual(DocumentServiceLeaseContainerCosmosTests.allLeases[1].Id, readLeases[1].Id); + Assert.AreEqual(DocumentServiceLeaseContainerCosmosTests.allLeases[0].Owner, readLeases[0].Owner); + Assert.AreEqual(DocumentServiceLeaseContainerCosmosTests.allLeases[1].Owner, readLeases[1].Owner); } [TestMethod] @@ -58,8 +62,13 @@ public async Task GetOwnedLeasesAsync_ReturnsOnlyMatched() DocumentServiceLeaseContainerCosmosTests.GetMockedContainer(), DocumentServiceLeaseContainerCosmosTests.leaseStoreManagerSettings); - IEnumerable readLeases = await documentServiceLeaseContainerCosmos.GetOwnedLeasesAsync(); - CollectionAssert.AreEqual(DocumentServiceLeaseContainerCosmosTests.allLeases.Where(l => l.Owner == DocumentServiceLeaseContainerCosmosTests.leaseStoreManagerSettings.HostName).ToList(), readLeases.ToList()); + List readLeases = (await documentServiceLeaseContainerCosmos.GetOwnedLeasesAsync()).ToList(); + List owned = DocumentServiceLeaseContainerCosmosTests.allLeases.Where(l => l.Owner == DocumentServiceLeaseContainerCosmosTests.leaseStoreManagerSettings.HostName).ToList(); + + Assert.AreEqual(owned.Count, readLeases.Count); + Assert.AreEqual(owned[0].Id, readLeases[0].Id); + Assert.AreEqual(owned[0].Owner, readLeases[0].Owner); + } private static Container GetMockedContainer(string containerName = "myColl") @@ -67,22 +76,25 @@ private static Container GetMockedContainer(string containerName = "myColl") Headers headers = new Headers(); headers.ContinuationToken = string.Empty; - Mock> mockFeedResponse = new Mock>(); - mockFeedResponse.Setup(x => x.ContinuationToken).Returns(string.Empty); - mockFeedResponse.Setup(x => x.Headers).Returns(headers); - mockFeedResponse.Setup(x => x.Resource).Returns(DocumentServiceLeaseContainerCosmosTests.allLeases); - mockFeedResponse.Setup(x => x.Headers).Returns(headers); - mockFeedResponse.Setup(x => x.GetEnumerator()).Returns(DocumentServiceLeaseContainerCosmosTests.allLeases.GetEnumerator()); + MockFeedResponse cosmosFeedResponse = new MockFeedResponse() + { + Documents = DocumentServiceLeaseContainerCosmosTests.allLeases + }; - Mock> mockedQuery = new Mock>(); + ResponseMessage mockFeedResponse = new ResponseMessage() + { + Content = new CosmosJsonDotNetSerializer().ToStream(cosmosFeedResponse) + }; + + Mock mockedQuery = new Mock(); mockedQuery.Setup(q => q.ReadNextAsync(It.IsAny())) - .ReturnsAsync(() => mockFeedResponse.Object); + .ReturnsAsync(() => mockFeedResponse); mockedQuery.SetupSequence(q => q.HasMoreResults) .Returns(true) .Returns(false); Mock mockedItems = new Mock(); - mockedItems.Setup(i => i.GetItemQueryIterator( + mockedItems.Setup(i => i.GetItemQueryStreamIterator( // To make sure the SQL Query gets correctly created It.Is(value => string.Equals("SELECT * FROM c WHERE STARTSWITH(c.id, '" + DocumentServiceLeaseContainerCosmosTests.leaseStoreManagerSettings.GetPartitionLeasePrefix() + "')", value)), It.IsAny(), @@ -95,13 +107,9 @@ private static Container GetMockedContainer(string containerName = "myColl") return mockedItems.Object; } - private static CosmosClient GetMockedClient() + private class MockFeedResponse { - DocumentClient documentClient = new MockDocumentClient(); - - CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder("http://localhost", Guid.NewGuid().ToString()); - - return cosmosClientBuilder.Build(documentClient); + public List Documents { get; set; } } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseUpdaterCosmosTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseUpdaterCosmosTests.cs index a9ad0f20e0..128035e8aa 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseUpdaterCosmosTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseUpdaterCosmosTests.cs @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests { using System; + using System.IO; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -26,18 +27,21 @@ public async Task UpdatesLease() Cosmos.PartitionKey partitionKey = new Cosmos.PartitionKey("1"); DocumentServiceLeaseCore leaseToUpdate = new DocumentServiceLeaseCore(); + Stream leaseStream = new CosmosJsonDotNetSerializer().ToStream(leaseToUpdate); + Mock mockedItems = new Mock(); - mockedItems.Setup(i => i.ReplaceItemAsync( - It.Is((lease) => lease == leaseToUpdate), + mockedItems.Setup(i => i.ReplaceItemStreamAsync( + It.IsAny(), It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny())) - .ReturnsAsync(() => + .ReturnsAsync((Stream stream, string id, PartitionKey pk, ItemRequestOptions options, CancellationToken cancellationToken) => { - var itemResponse = new Mock>(); - itemResponse.Setup(i => i.Resource).Returns(leaseToUpdate); - return itemResponse.Object; + return new ResponseMessage(HttpStatusCode.OK) + { + Content = stream + }; }); var updater = new DocumentServiceLeaseUpdaterCosmos(DocumentServiceLeaseUpdaterCosmosTests.GetMockedContainer(mockedItems)); @@ -49,14 +53,14 @@ public async Task UpdatesLease() Assert.AreEqual("newHost", updatedLease.Owner); Mock.Get(mockedItems.Object) - .Verify(items => items.ReplaceItemAsync( - It.Is((lease) => lease == leaseToUpdate), + .Verify(items => items.ReplaceItemStreamAsync( + It.IsAny(), It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny()), Times.Once); Mock.Get(mockedItems.Object) - .Verify(items => items.ReadItemAsync( + .Verify(items => items.ReadItemStreamAsync( It.Is((id) => id == itemId), It.Is((pk) => pk.Equals(partitionKey)), It.IsAny(), @@ -71,30 +75,35 @@ public async Task RetriesOnPreconditionFailed() DocumentServiceLeaseCore leaseToUpdate = new DocumentServiceLeaseCore(); Mock mockedItems = new Mock(); - mockedItems.Setup(i => i.ReadItemAsync( + mockedItems.Setup(i => i.ReadItemStreamAsync( It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny())) .ReturnsAsync(() => { - var itemResponse = new Mock>(); - itemResponse.Setup(i => i.Resource).Returns(leaseToUpdate); - return itemResponse.Object; + return new ResponseMessage(HttpStatusCode.OK) + { + Content = new CosmosJsonDotNetSerializer().ToStream(leaseToUpdate) + }; }); - mockedItems.SetupSequence(i => i.ReplaceItemAsync( - It.Is((lease) => lease == leaseToUpdate), + mockedItems.SetupSequence(i => i.ReplaceItemStreamAsync( + It.IsAny(), It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny())) - .Throws(new CosmosException(string.Empty, HttpStatusCode.PreconditionFailed, 0, string.Empty, 0)) .Returns(() => { - var itemResponse = new Mock>(); - itemResponse.Setup(i => i.Resource).Returns(leaseToUpdate); - return Task.FromResult(itemResponse.Object); + return Task.FromResult(new ResponseMessage(HttpStatusCode.PreconditionFailed)); + }) + .Returns(() => + { + return Task.FromResult(new ResponseMessage(HttpStatusCode.OK) + { + Content = new CosmosJsonDotNetSerializer().ToStream(leaseToUpdate) + }); }); var updater = new DocumentServiceLeaseUpdaterCosmos(DocumentServiceLeaseUpdaterCosmosTests.GetMockedContainer(mockedItems)); @@ -104,15 +113,15 @@ public async Task RetriesOnPreconditionFailed() return serverLease; }); - Assert.AreEqual("newHost", updatedLease.Owner); Mock.Get(mockedItems.Object) - .Verify(items => items.ReplaceItemAsync(It.Is((lease) => lease == leaseToUpdate), + .Verify(items => items.ReplaceItemStreamAsync( + It.IsAny(), It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny()), Times.Exactly(2)); Mock.Get(mockedItems.Object) - .Verify(items => items.ReadItemAsync(It.Is((id) => id == itemId), + .Verify(items => items.ReadItemStreamAsync(It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny()), Times.Once); @@ -127,25 +136,29 @@ public async Task ThrowsAfterMaxRetries() DocumentServiceLeaseCore leaseToUpdate = new DocumentServiceLeaseCore(); Mock mockedItems = new Mock(); - mockedItems.Setup(i => i.ReadItemAsync( + mockedItems.Setup(i => i.ReadItemStreamAsync( It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny())) .ReturnsAsync(() => { - var itemResponse = new Mock>(); - itemResponse.Setup(i => i.Resource).Returns(leaseToUpdate); - return itemResponse.Object; + return new ResponseMessage(HttpStatusCode.OK) + { + Content = new CosmosJsonDotNetSerializer().ToStream(leaseToUpdate) + }; }); - mockedItems.Setup(i => i.ReplaceItemAsync( - It.Is((lease) => lease == leaseToUpdate), - It.Is((id) => id == itemId), + mockedItems.Setup(i => i.ReplaceItemStreamAsync( + It.IsAny(), + It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny())) - .Throws(new CosmosException(string.Empty, HttpStatusCode.PreconditionFailed, 0, string.Empty, 0)); + .Returns(() => + { + return Task.FromResult(new ResponseMessage(HttpStatusCode.PreconditionFailed)); + }); var updater = new DocumentServiceLeaseUpdaterCosmos(DocumentServiceLeaseUpdaterCosmosTests.GetMockedContainer(mockedItems)); var updatedLease = await updater.UpdateLeaseAsync(leaseToUpdate, itemId, partitionKey, serverLease => @@ -164,30 +177,35 @@ public async Task ThrowsOnConflict() DocumentServiceLeaseCore leaseToUpdate = new DocumentServiceLeaseCore(); Mock mockedItems = new Mock(); - mockedItems.Setup(i => i.ReadItemAsync( + mockedItems.Setup(i => i.ReadItemStreamAsync( It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny())) .ReturnsAsync(() => { - var itemResponse = new Mock>(); - itemResponse.Setup(i => i.Resource).Returns(leaseToUpdate); - return itemResponse.Object; + return new ResponseMessage(HttpStatusCode.OK) + { + Content = new CosmosJsonDotNetSerializer().ToStream(leaseToUpdate) + }; }); - mockedItems.SetupSequence(i => i.ReplaceItemAsync( - It.Is((lease) => lease == leaseToUpdate), + mockedItems.SetupSequence(i => i.ReplaceItemStreamAsync( + It.IsAny(), It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny())) - .Throws(new CosmosException(string.Empty, HttpStatusCode.Conflict, 0, string.Empty, 0)) .Returns(() => { - var itemResponse = new Mock>(); - itemResponse.Setup(i => i.Resource).Returns(leaseToUpdate); - return Task.FromResult(itemResponse.Object); + return Task.FromResult(new ResponseMessage(HttpStatusCode.Conflict)); + }) + .Returns(() => + { + return Task.FromResult(new ResponseMessage(HttpStatusCode.OK) + { + Content = new CosmosJsonDotNetSerializer().ToStream(leaseToUpdate) + }); }); var updater = new DocumentServiceLeaseUpdaterCosmos(DocumentServiceLeaseUpdaterCosmosTests.GetMockedContainer(mockedItems)); @@ -207,33 +225,35 @@ public async Task ThrowsOnNotFoundReplace() DocumentServiceLeaseCore leaseToUpdate = new DocumentServiceLeaseCore(); Mock mockedItems = new Mock(); - mockedItems.Setup(i => i.ReadItemAsync( + mockedItems.Setup(i => i.ReadItemStreamAsync( It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny())) .ReturnsAsync(() => { - var itemResponse = new Mock>(); - itemResponse.Setup(i => i.Resource).Returns(leaseToUpdate); - return itemResponse.Object; + return new ResponseMessage(HttpStatusCode.OK) + { + Content = new CosmosJsonDotNetSerializer().ToStream(leaseToUpdate) + }; }); - mockedItems.SetupSequence(i => i.ReplaceItemAsync( - It.Is((lease) => lease == leaseToUpdate), + mockedItems.SetupSequence(i => i.ReplaceItemStreamAsync( + It.IsAny(), It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny())) .Returns(() => { - throw new CosmosException(HttpStatusCode.NotFound, ""); + return Task.FromResult(new ResponseMessage(HttpStatusCode.NotFound)); }) .Returns(() => { - var itemResponse = new Mock>(); - itemResponse.Setup(i => i.Resource).Returns(leaseToUpdate); - return Task.FromResult(itemResponse.Object); + return Task.FromResult(new ResponseMessage(HttpStatusCode.OK) + { + Content = new CosmosJsonDotNetSerializer().ToStream(leaseToUpdate) + }); }); var updater = new DocumentServiceLeaseUpdaterCosmos(DocumentServiceLeaseUpdaterCosmosTests.GetMockedContainer(mockedItems)); @@ -253,28 +273,32 @@ public async Task ThrowsOnNotFoundRead() DocumentServiceLeaseCore leaseToUpdate = new DocumentServiceLeaseCore(); Mock mockedItems = new Mock(); - mockedItems.Setup(i => i.ReadItemAsync( + mockedItems.Setup(i => i.ReadItemStreamAsync( It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny())) .ReturnsAsync(() => { - throw new CosmosException(HttpStatusCode.NotFound, ""); + return new ResponseMessage(HttpStatusCode.NotFound); }); - mockedItems.SetupSequence(i => i.ReplaceItemAsync( - It.Is((lease) => lease == leaseToUpdate), + mockedItems.SetupSequence(i => i.ReplaceItemStreamAsync( + It.IsAny(), It.Is((id) => id == itemId), It.Is(pk => pk.Equals(partitionKey)), It.IsAny(), It.IsAny())) - .Throws(new CosmosException(string.Empty, HttpStatusCode.PreconditionFailed, 0, string.Empty, 0)) .Returns(() => { - var itemResponse = new Mock>(); - itemResponse.Setup(i => i.Resource).Returns(leaseToUpdate); - return Task.FromResult(itemResponse.Object); + return Task.FromResult(new ResponseMessage(HttpStatusCode.PreconditionFailed)); + }) + .Returns(() => + { + return Task.FromResult(new ResponseMessage(HttpStatusCode.OK) + { + Content = new CosmosJsonDotNetSerializer().ToStream(leaseToUpdate) + }); }); var updater = new DocumentServiceLeaseUpdaterCosmos(DocumentServiceLeaseUpdaterCosmosTests.GetMockedContainer(mockedItems)); diff --git a/changelog.md b/changelog.md index ba444c7aff..78d2eddcb3 100644 --- a/changelog.md +++ b/changelog.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Fixed + +- [#944](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/944) Change Feed Processor won't use user serializer for internal operations + + ## [3.4.1](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.4.1) - 2019-11-06 ### Fixed