diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/FaultInjectionOperationType.cs b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/FaultInjectionOperationType.cs index 675e33f4bb..ea8b3d19d1 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/FaultInjectionOperationType.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/FaultInjectionOperationType.cs @@ -48,6 +48,11 @@ public enum FaultInjectionOperationType /// Batch, + /// + /// Read Feed operations for ChangeFeed. + /// + ReadFeed, + /// /// All operation types. Default value. /// diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionRuleProcessor.cs b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionRuleProcessor.cs index a9ebdf9b73..4ef98d4c71 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionRuleProcessor.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionRuleProcessor.cs @@ -214,6 +214,7 @@ private OperationType GetEffectiveOperationType(FaultInjectionOperationType faul FaultInjectionOperationType.DeleteItem => OperationType.Delete, FaultInjectionOperationType.PatchItem => OperationType.Patch, FaultInjectionOperationType.Batch => OperationType.Batch, + FaultInjectionOperationType.ReadFeed => OperationType.ReadFeed, _ => throw new ArgumentException($"FaultInjectionOperationType: {faultInjectionOperationType} is not supported"), }; } diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionServerErrorResultInternal.cs b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionServerErrorResultInternal.cs index e903d61ca5..d4f1a2407b 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionServerErrorResultInternal.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionServerErrorResultInternal.cs @@ -123,12 +123,14 @@ public FaultInjectionServerErrorType GetInjectedServerErrorType() public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ruleId) { StoreResponse storeResponse; + string lsn = args.RequestHeaders.Get(WFConstants.BackendHeaders.LSN) ?? "0"; switch (this.serverErrorType) { case FaultInjectionServerErrorType.Gone: INameValueCollection goneHeaders = args.RequestHeaders; goneHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.ServerGenerated410).ToString(CultureInfo.InvariantCulture)); + goneHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn); storeResponse = new StoreResponse() { Status = 410, @@ -139,10 +141,12 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru return storeResponse; case FaultInjectionServerErrorType.RetryWith: + INameValueCollection retryWithHeaders = args.RequestHeaders; + retryWithHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn); storeResponse = new StoreResponse() { Status = 449, - Headers = args.RequestHeaders, + Headers = retryWithHeaders, ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Retry With, rule: {ruleId}")) }; @@ -152,6 +156,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru INameValueCollection tooManyRequestsHeaders = args.RequestHeaders; tooManyRequestsHeaders.Set(HttpConstants.HttpHeaders.RetryAfterInMilliseconds, "500"); tooManyRequestsHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.RUBudgetExceeded).ToString(CultureInfo.InvariantCulture)); + tooManyRequestsHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn); storeResponse = new StoreResponse() { @@ -163,21 +168,26 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru return storeResponse; case FaultInjectionServerErrorType.Timeout: + INameValueCollection timeoutHeaders = args.RequestHeaders; + timeoutHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn); storeResponse = new StoreResponse() { Status = 408, - Headers = args.RequestHeaders, + Headers = timeoutHeaders, ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Timeout, rule: {ruleId}")) }; return storeResponse; case FaultInjectionServerErrorType.InternalServerEror: + INameValueCollection internalServerErrorHeaders = args.RequestHeaders; + internalServerErrorHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn); + storeResponse = new StoreResponse() { Status = 500, - Headers = args.RequestHeaders, + Headers = internalServerErrorHeaders, ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Internal Server Error, rule: {ruleId}")) }; @@ -190,6 +200,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru INameValueCollection readSessionHeaders = args.RequestHeaders; readSessionHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.ReadSessionNotAvailable).ToString(CultureInfo.InvariantCulture)); readSessionHeaders.Set(HttpConstants.HttpHeaders.SessionToken, badSesstionToken); + readSessionHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn); storeResponse = new StoreResponse() { @@ -203,6 +214,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru case FaultInjectionServerErrorType.PartitionIsMigrating: INameValueCollection partitionMigrationHeaders = args.RequestHeaders; partitionMigrationHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.CompletingPartitionMigration).ToString(CultureInfo.InvariantCulture)); + partitionMigrationHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn); storeResponse = new StoreResponse() { @@ -216,6 +228,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru case FaultInjectionServerErrorType.PartitionIsSplitting: INameValueCollection partitionSplitting = args.RequestHeaders; partitionSplitting.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.CompletingSplit).ToString(CultureInfo.InvariantCulture)); + partitionSplitting.Set(WFConstants.BackendHeaders.LocalLSN, lsn); storeResponse = new StoreResponse() { @@ -226,10 +239,13 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru return storeResponse; case FaultInjectionServerErrorType.ServiceUnavailable: + INameValueCollection serviceUnavailableHeaders = args.RequestHeaders; + serviceUnavailableHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn); + storeResponse = new StoreResponse() { Status = 503, - Headers = args.RequestHeaders, + Headers = serviceUnavailableHeaders, ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Service Unavailable, rule: {ruleId}")) }; diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/FaultInjectionDirectModeTests.cs b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/FaultInjectionDirectModeTests.cs index 90fe71945a..56da481229 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/FaultInjectionDirectModeTests.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/FaultInjectionDirectModeTests.cs @@ -9,8 +9,10 @@ namespace Microsoft.Azure.Cosmos.FaultInjection.Tests using System.Linq; using System.Net; using System.Net.Sockets; + using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos; + using Microsoft.Azure.Cosmos.Diagnostics; using Microsoft.Azure.Cosmos.FaultInjection.Tests.Utils; using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Documents; @@ -1276,6 +1278,93 @@ private async Task Timeout_FaultInjectionConnectionErrorRule_Test() Assert.IsTrue(disposedChannel); } + [TestMethod] + [Timeout(Timeout)] + [Owner("nalutripician")] + [Description("Tests ReadFeed FaultInjection")] + public async Task FaultInjectionServerErrorRule_ReadFeedTest() + { + string readFeedId = "readFeadRule-" + Guid.NewGuid().ToString(); + FaultInjectionRule readFeedRule = new FaultInjectionRuleBuilder( + id: readFeedId, + condition: + new FaultInjectionConditionBuilder() + .WithOperationType(FaultInjectionOperationType.ReadFeed) + .Build(), + result: + FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ServiceUnavailable) + .Build()) + .WithDuration(TimeSpan.FromMinutes(5)) + .Build(); + + readFeedRule.Disable(); + + List ruleList = new List { readFeedRule }; + FaultInjector faultInjector = new FaultInjector(ruleList); + + await this.Initialize(faultInjector, false); + + string changeFeedContainerName = "changeFeedContainer-" + Guid.NewGuid().ToString(); + ContainerProperties containerProperties = new ContainerProperties + { + Id = changeFeedContainerName, + PartitionKeyPath = "/partitionKey" + }; + + if (this.database != null && this.container != null) + { + JObject createdItem = JObject.FromObject(new { id = Guid.NewGuid().ToString(), Pk = Guid.NewGuid().ToString() }); + ItemResponse? itemResponse = await this.container.CreateItemAsync(createdItem); + + readFeedRule.Enable(); + + Container? leaseContainer = await this.database.CreateContainerIfNotExistsAsync(containerProperties, 400); + + ManualResetEvent changeFeedRan = new ManualResetEvent(false); + + ChangeFeedProcessor changeFeedProcessor = this.container.GetChangeFeedProcessorBuilder( + "FaultInjectionTest", + (ChangeFeedProcessorContext context, IReadOnlyCollection docs, CancellationToken token) => + { + Assert.Fail("Change Feed Should Fail"); + return Task.CompletedTask; + }) + .WithInstanceName("test") + .WithLeaseContainer(leaseContainer) + .WithStartFromBeginning() + .WithErrorNotification((string lease, Exception exception) => + { + if (exception is CosmosException cosmosException) + { + Assert.AreEqual(HttpStatusCode.ServiceUnavailable, cosmosException.StatusCode); + } + else + { + Assert.Fail("Unexpected Exception"); + } + + changeFeedRan.Set(); + return Task.CompletedTask; + }) + .Build(); + + await changeFeedProcessor.StartAsync(); + await Task.Delay(1000); + + try + { + bool wasProcessed = changeFeedRan.WaitOne(60000); + Assert.IsTrue(wasProcessed, "Timed out waiting for handler to execute"); + } + finally + { + await changeFeedProcessor.StopAsync(); + readFeedRule.Disable(); + } + } + } + + private async Task PerformDocumentOperation(Container testContainer, OperationType operationType, JObject item) { try diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/Utils/TestCommon.cs b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/Utils/TestCommon.cs index 0a2e647ed1..310e4c0049 100644 --- a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/Utils/TestCommon.cs +++ b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/Utils/TestCommon.cs @@ -6,8 +6,6 @@ internal static class TestCommon { - public const string Endpoint = ""; - public const string AuthKey = ""; public const string EndpointMultiRegion = ""; public const string AuthKeyMultiRegion = "";