Skip to content

Commit

Permalink
FaultInjection: Adds ReadFeed Operation Type (#4448)
Browse files Browse the repository at this point in the history
* initial commit

* fixed tests

* nits

* small fixes

* fixed test
  • Loading branch information
NaluTripician committed Apr 30, 2024
1 parent bda8290 commit a331448
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public enum FaultInjectionOperationType
/// </summary>
Batch,

/// <summary>
/// Read Feed operations for ChangeFeed.
/// </summary>
ReadFeed,

/// <summary>
/// All operation types. Default value.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ private OperationType GetEffectiveOperationType(FaultInjectionOperationType faul
FaultInjectionOperationType.DeleteItem => OperationType.Delete,
FaultInjectionOperationType.PatchItem => OperationType.Patch,
FaultInjectionOperationType.Batch => OperationType.Batch,
FaultInjectionOperationType.ReadFeed => OperationType.ReadFeed,
_ => throw new ArgumentException($"FaultInjectionOperationType: {faultInjectionOperationType} is not supported"),
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ public FaultInjectionServerErrorType GetInjectedServerErrorType()
public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ruleId)
{
StoreResponse storeResponse;
string lsn = args.RequestHeaders.Get(WFConstants.BackendHeaders.LSN) ?? "0";

switch (this.serverErrorType)
{
case FaultInjectionServerErrorType.Gone:
INameValueCollection goneHeaders = args.RequestHeaders;
goneHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.ServerGenerated410).ToString(CultureInfo.InvariantCulture));
goneHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
storeResponse = new StoreResponse()
{
Status = 410,
Expand All @@ -139,10 +141,12 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
return storeResponse;

case FaultInjectionServerErrorType.RetryWith:
INameValueCollection retryWithHeaders = args.RequestHeaders;
retryWithHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
storeResponse = new StoreResponse()
{
Status = 449,
Headers = args.RequestHeaders,
Headers = retryWithHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Retry With, rule: {ruleId}"))
};

Expand All @@ -152,6 +156,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
INameValueCollection tooManyRequestsHeaders = args.RequestHeaders;
tooManyRequestsHeaders.Set(HttpConstants.HttpHeaders.RetryAfterInMilliseconds, "500");
tooManyRequestsHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.RUBudgetExceeded).ToString(CultureInfo.InvariantCulture));
tooManyRequestsHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Expand All @@ -163,21 +168,26 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
return storeResponse;

case FaultInjectionServerErrorType.Timeout:
INameValueCollection timeoutHeaders = args.RequestHeaders;
timeoutHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Status = 408,
Headers = args.RequestHeaders,
Headers = timeoutHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Timeout, rule: {ruleId}"))
};

return storeResponse;

case FaultInjectionServerErrorType.InternalServerEror:
INameValueCollection internalServerErrorHeaders = args.RequestHeaders;
internalServerErrorHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Status = 500,
Headers = args.RequestHeaders,
Headers = internalServerErrorHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Internal Server Error, rule: {ruleId}"))
};

Expand All @@ -190,6 +200,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
INameValueCollection readSessionHeaders = args.RequestHeaders;
readSessionHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.ReadSessionNotAvailable).ToString(CultureInfo.InvariantCulture));
readSessionHeaders.Set(HttpConstants.HttpHeaders.SessionToken, badSesstionToken);
readSessionHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Expand All @@ -203,6 +214,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
case FaultInjectionServerErrorType.PartitionIsMigrating:
INameValueCollection partitionMigrationHeaders = args.RequestHeaders;
partitionMigrationHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.CompletingPartitionMigration).ToString(CultureInfo.InvariantCulture));
partitionMigrationHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Expand All @@ -216,6 +228,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
case FaultInjectionServerErrorType.PartitionIsSplitting:
INameValueCollection partitionSplitting = args.RequestHeaders;
partitionSplitting.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.CompletingSplit).ToString(CultureInfo.InvariantCulture));
partitionSplitting.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Expand All @@ -226,10 +239,13 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru

return storeResponse;
case FaultInjectionServerErrorType.ServiceUnavailable:
INameValueCollection serviceUnavailableHeaders = args.RequestHeaders;
serviceUnavailableHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Status = 503,
Headers = args.RequestHeaders,
Headers = serviceUnavailableHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Service Unavailable, rule: {ruleId}"))
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ namespace Microsoft.Azure.Cosmos.FaultInjection.Tests
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Cosmos.FaultInjection.Tests.Utils;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -1276,6 +1278,93 @@ private async Task Timeout_FaultInjectionConnectionErrorRule_Test()
Assert.IsTrue(disposedChannel);
}

[TestMethod]
[Timeout(Timeout)]
[Owner("nalutripician")]
[Description("Tests ReadFeed FaultInjection")]
public async Task FaultInjectionServerErrorRule_ReadFeedTest()
{
string readFeedId = "readFeadRule-" + Guid.NewGuid().ToString();
FaultInjectionRule readFeedRule = new FaultInjectionRuleBuilder(
id: readFeedId,
condition:
new FaultInjectionConditionBuilder()
.WithOperationType(FaultInjectionOperationType.ReadFeed)
.Build(),
result:
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ServiceUnavailable)
.Build())
.WithDuration(TimeSpan.FromMinutes(5))
.Build();

readFeedRule.Disable();

List<FaultInjectionRule> ruleList = new List<FaultInjectionRule> { readFeedRule };
FaultInjector faultInjector = new FaultInjector(ruleList);

await this.Initialize(faultInjector, false);

string changeFeedContainerName = "changeFeedContainer-" + Guid.NewGuid().ToString();
ContainerProperties containerProperties = new ContainerProperties
{
Id = changeFeedContainerName,
PartitionKeyPath = "/partitionKey"
};

if (this.database != null && this.container != null)
{
JObject createdItem = JObject.FromObject(new { id = Guid.NewGuid().ToString(), Pk = Guid.NewGuid().ToString() });
ItemResponse<JObject>? itemResponse = await this.container.CreateItemAsync<JObject>(createdItem);

readFeedRule.Enable();

Container? leaseContainer = await this.database.CreateContainerIfNotExistsAsync(containerProperties, 400);

ManualResetEvent changeFeedRan = new ManualResetEvent(false);

ChangeFeedProcessor changeFeedProcessor = this.container.GetChangeFeedProcessorBuilder<JObject>(
"FaultInjectionTest",
(ChangeFeedProcessorContext context, IReadOnlyCollection<JObject> docs, CancellationToken token) =>
{
Assert.Fail("Change Feed Should Fail");
return Task.CompletedTask;
})
.WithInstanceName("test")
.WithLeaseContainer(leaseContainer)
.WithStartFromBeginning()
.WithErrorNotification((string lease, Exception exception) =>
{
if (exception is CosmosException cosmosException)
{
Assert.AreEqual(HttpStatusCode.ServiceUnavailable, cosmosException.StatusCode);
}
else
{
Assert.Fail("Unexpected Exception");
}
changeFeedRan.Set();
return Task.CompletedTask;
})
.Build();

await changeFeedProcessor.StartAsync();
await Task.Delay(1000);

try
{
bool wasProcessed = changeFeedRan.WaitOne(60000);
Assert.IsTrue(wasProcessed, "Timed out waiting for handler to execute");
}
finally
{
await changeFeedProcessor.StopAsync();
readFeedRule.Disable();
}
}
}


private async Task<CosmosDiagnostics> PerformDocumentOperation(Container testContainer, OperationType operationType, JObject item)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

internal static class TestCommon
{
public const string Endpoint = "";
public const string AuthKey = "";
public const string EndpointMultiRegion = "";
public const string AuthKeyMultiRegion = "";

Expand Down

0 comments on commit a331448

Please sign in to comment.