From a33144864e96eb877941c766b55418c82f865186 Mon Sep 17 00:00:00 2001
From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com>
Date: Tue, 30 Apr 2024 10:49:53 -0700
Subject: [PATCH] FaultInjection: Adds ReadFeed Operation Type (#4448)
* initial commit
* fixed tests
* nits
* small fixes
* fixed test
---
.../src/FaultInjectionOperationType.cs | 5 ++
.../FaultInjectionRuleProcessor.cs | 1 +
...FaultInjectionServerErrorResultInternal.cs | 24 ++++-
.../tests/FaultInjectionDirectModeTests.cs | 89 +++++++++++++++++++
.../FaultInjection/tests/Utils/TestCommon.cs | 2 -
5 files changed, 115 insertions(+), 6 deletions(-)
diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/FaultInjectionOperationType.cs b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/FaultInjectionOperationType.cs
index 675e33f4bb..ea8b3d19d1 100644
--- a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/FaultInjectionOperationType.cs
+++ b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/FaultInjectionOperationType.cs
@@ -48,6 +48,11 @@ public enum FaultInjectionOperationType
///
Batch,
+ ///
+ /// Read Feed operations for ChangeFeed.
+ ///
+ ReadFeed,
+
///
/// All operation types. Default value.
///
diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionRuleProcessor.cs b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionRuleProcessor.cs
index a9ebdf9b73..4ef98d4c71 100644
--- a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionRuleProcessor.cs
+++ b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionRuleProcessor.cs
@@ -214,6 +214,7 @@ private OperationType GetEffectiveOperationType(FaultInjectionOperationType faul
FaultInjectionOperationType.DeleteItem => OperationType.Delete,
FaultInjectionOperationType.PatchItem => OperationType.Patch,
FaultInjectionOperationType.Batch => OperationType.Batch,
+ FaultInjectionOperationType.ReadFeed => OperationType.ReadFeed,
_ => throw new ArgumentException($"FaultInjectionOperationType: {faultInjectionOperationType} is not supported"),
};
}
diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionServerErrorResultInternal.cs b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionServerErrorResultInternal.cs
index e903d61ca5..d4f1a2407b 100644
--- a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionServerErrorResultInternal.cs
+++ b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/src/implementataion/FaultInjectionServerErrorResultInternal.cs
@@ -123,12 +123,14 @@ public FaultInjectionServerErrorType GetInjectedServerErrorType()
public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ruleId)
{
StoreResponse storeResponse;
+ string lsn = args.RequestHeaders.Get(WFConstants.BackendHeaders.LSN) ?? "0";
switch (this.serverErrorType)
{
case FaultInjectionServerErrorType.Gone:
INameValueCollection goneHeaders = args.RequestHeaders;
goneHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.ServerGenerated410).ToString(CultureInfo.InvariantCulture));
+ goneHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
storeResponse = new StoreResponse()
{
Status = 410,
@@ -139,10 +141,12 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
return storeResponse;
case FaultInjectionServerErrorType.RetryWith:
+ INameValueCollection retryWithHeaders = args.RequestHeaders;
+ retryWithHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
storeResponse = new StoreResponse()
{
Status = 449,
- Headers = args.RequestHeaders,
+ Headers = retryWithHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Retry With, rule: {ruleId}"))
};
@@ -152,6 +156,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
INameValueCollection tooManyRequestsHeaders = args.RequestHeaders;
tooManyRequestsHeaders.Set(HttpConstants.HttpHeaders.RetryAfterInMilliseconds, "500");
tooManyRequestsHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.RUBudgetExceeded).ToString(CultureInfo.InvariantCulture));
+ tooManyRequestsHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
storeResponse = new StoreResponse()
{
@@ -163,21 +168,26 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
return storeResponse;
case FaultInjectionServerErrorType.Timeout:
+ INameValueCollection timeoutHeaders = args.RequestHeaders;
+ timeoutHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
storeResponse = new StoreResponse()
{
Status = 408,
- Headers = args.RequestHeaders,
+ Headers = timeoutHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Timeout, rule: {ruleId}"))
};
return storeResponse;
case FaultInjectionServerErrorType.InternalServerEror:
+ INameValueCollection internalServerErrorHeaders = args.RequestHeaders;
+ internalServerErrorHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
+
storeResponse = new StoreResponse()
{
Status = 500,
- Headers = args.RequestHeaders,
+ Headers = internalServerErrorHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Internal Server Error, rule: {ruleId}"))
};
@@ -190,6 +200,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
INameValueCollection readSessionHeaders = args.RequestHeaders;
readSessionHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.ReadSessionNotAvailable).ToString(CultureInfo.InvariantCulture));
readSessionHeaders.Set(HttpConstants.HttpHeaders.SessionToken, badSesstionToken);
+ readSessionHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
storeResponse = new StoreResponse()
{
@@ -203,6 +214,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
case FaultInjectionServerErrorType.PartitionIsMigrating:
INameValueCollection partitionMigrationHeaders = args.RequestHeaders;
partitionMigrationHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.CompletingPartitionMigration).ToString(CultureInfo.InvariantCulture));
+ partitionMigrationHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
storeResponse = new StoreResponse()
{
@@ -216,6 +228,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
case FaultInjectionServerErrorType.PartitionIsSplitting:
INameValueCollection partitionSplitting = args.RequestHeaders;
partitionSplitting.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.CompletingSplit).ToString(CultureInfo.InvariantCulture));
+ partitionSplitting.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
storeResponse = new StoreResponse()
{
@@ -226,10 +239,13 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
return storeResponse;
case FaultInjectionServerErrorType.ServiceUnavailable:
+ INameValueCollection serviceUnavailableHeaders = args.RequestHeaders;
+ serviceUnavailableHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
+
storeResponse = new StoreResponse()
{
Status = 503,
- Headers = args.RequestHeaders,
+ Headers = serviceUnavailableHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Service Unavailable, rule: {ruleId}"))
};
diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/FaultInjectionDirectModeTests.cs b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/FaultInjectionDirectModeTests.cs
index 90fe71945a..56da481229 100644
--- a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/FaultInjectionDirectModeTests.cs
+++ b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/FaultInjectionDirectModeTests.cs
@@ -9,8 +9,10 @@ namespace Microsoft.Azure.Cosmos.FaultInjection.Tests
using System.Linq;
using System.Net;
using System.Net.Sockets;
+ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
+ using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Cosmos.FaultInjection.Tests.Utils;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
@@ -1276,6 +1278,93 @@ private async Task Timeout_FaultInjectionConnectionErrorRule_Test()
Assert.IsTrue(disposedChannel);
}
+ [TestMethod]
+ [Timeout(Timeout)]
+ [Owner("nalutripician")]
+ [Description("Tests ReadFeed FaultInjection")]
+ public async Task FaultInjectionServerErrorRule_ReadFeedTest()
+ {
+ string readFeedId = "readFeadRule-" + Guid.NewGuid().ToString();
+ FaultInjectionRule readFeedRule = new FaultInjectionRuleBuilder(
+ id: readFeedId,
+ condition:
+ new FaultInjectionConditionBuilder()
+ .WithOperationType(FaultInjectionOperationType.ReadFeed)
+ .Build(),
+ result:
+ FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ServiceUnavailable)
+ .Build())
+ .WithDuration(TimeSpan.FromMinutes(5))
+ .Build();
+
+ readFeedRule.Disable();
+
+ List ruleList = new List { readFeedRule };
+ FaultInjector faultInjector = new FaultInjector(ruleList);
+
+ await this.Initialize(faultInjector, false);
+
+ string changeFeedContainerName = "changeFeedContainer-" + Guid.NewGuid().ToString();
+ ContainerProperties containerProperties = new ContainerProperties
+ {
+ Id = changeFeedContainerName,
+ PartitionKeyPath = "/partitionKey"
+ };
+
+ if (this.database != null && this.container != null)
+ {
+ JObject createdItem = JObject.FromObject(new { id = Guid.NewGuid().ToString(), Pk = Guid.NewGuid().ToString() });
+ ItemResponse? itemResponse = await this.container.CreateItemAsync(createdItem);
+
+ readFeedRule.Enable();
+
+ Container? leaseContainer = await this.database.CreateContainerIfNotExistsAsync(containerProperties, 400);
+
+ ManualResetEvent changeFeedRan = new ManualResetEvent(false);
+
+ ChangeFeedProcessor changeFeedProcessor = this.container.GetChangeFeedProcessorBuilder(
+ "FaultInjectionTest",
+ (ChangeFeedProcessorContext context, IReadOnlyCollection docs, CancellationToken token) =>
+ {
+ Assert.Fail("Change Feed Should Fail");
+ return Task.CompletedTask;
+ })
+ .WithInstanceName("test")
+ .WithLeaseContainer(leaseContainer)
+ .WithStartFromBeginning()
+ .WithErrorNotification((string lease, Exception exception) =>
+ {
+ if (exception is CosmosException cosmosException)
+ {
+ Assert.AreEqual(HttpStatusCode.ServiceUnavailable, cosmosException.StatusCode);
+ }
+ else
+ {
+ Assert.Fail("Unexpected Exception");
+ }
+
+ changeFeedRan.Set();
+ return Task.CompletedTask;
+ })
+ .Build();
+
+ await changeFeedProcessor.StartAsync();
+ await Task.Delay(1000);
+
+ try
+ {
+ bool wasProcessed = changeFeedRan.WaitOne(60000);
+ Assert.IsTrue(wasProcessed, "Timed out waiting for handler to execute");
+ }
+ finally
+ {
+ await changeFeedProcessor.StopAsync();
+ readFeedRule.Disable();
+ }
+ }
+ }
+
+
private async Task PerformDocumentOperation(Container testContainer, OperationType operationType, JObject item)
{
try
diff --git a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/Utils/TestCommon.cs b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/Utils/TestCommon.cs
index 0a2e647ed1..310e4c0049 100644
--- a/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/Utils/TestCommon.cs
+++ b/Microsoft.Azure.Cosmos.Samples/Tools/FaultInjection/tests/Utils/TestCommon.cs
@@ -6,8 +6,6 @@
internal static class TestCommon
{
- public const string Endpoint = "";
- public const string AuthKey = "";
public const string EndpointMultiRegion = "";
public const string AuthKeyMultiRegion = "";