Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FaultInjection: Adds ReadFeed Operation Type #4448

Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public enum FaultInjectionOperationType
/// </summary>
Batch,

/// <summary>
/// Read Feed operations for ChangeFeed.
/// </summary>
ReadFeed,

/// <summary>
/// All operation types. Default value.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ private OperationType GetEffectiveOperationType(FaultInjectionOperationType faul
FaultInjectionOperationType.DeleteItem => OperationType.Delete,
FaultInjectionOperationType.PatchItem => OperationType.Patch,
FaultInjectionOperationType.Batch => OperationType.Batch,
FaultInjectionOperationType.ReadFeed => OperationType.ReadFeed,
_ => throw new ArgumentException($"FaultInjectionOperationType: {faultInjectionOperationType} is not supported"),
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ public FaultInjectionServerErrorType GetInjectedServerErrorType()
public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ruleId)
{
StoreResponse storeResponse;
string lsn = args.RequestHeaders.Get(WFConstants.BackendHeaders.LSN) ?? "0";

switch (this.serverErrorType)
{
case FaultInjectionServerErrorType.Gone:
INameValueCollection goneHeaders = args.RequestHeaders;
goneHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.ServerGenerated410).ToString(CultureInfo.InvariantCulture));
goneHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
storeResponse = new StoreResponse()
{
Status = 410,
Expand All @@ -139,10 +141,12 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
return storeResponse;

case FaultInjectionServerErrorType.RetryWith:
INameValueCollection retryWithHeaders = args.RequestHeaders;
retryWithHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
storeResponse = new StoreResponse()
{
Status = 449,
Headers = args.RequestHeaders,
Headers = retryWithHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Retry With, rule: {ruleId}"))
};

Expand All @@ -152,6 +156,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
INameValueCollection tooManyRequestsHeaders = args.RequestHeaders;
tooManyRequestsHeaders.Set(HttpConstants.HttpHeaders.RetryAfterInMilliseconds, "500");
tooManyRequestsHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.RUBudgetExceeded).ToString(CultureInfo.InvariantCulture));
tooManyRequestsHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Expand All @@ -163,21 +168,26 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
return storeResponse;

case FaultInjectionServerErrorType.Timeout:
INameValueCollection timeoutHeaders = args.RequestHeaders;
timeoutHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Status = 408,
Headers = args.RequestHeaders,
Headers = timeoutHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Timeout, rule: {ruleId}"))
};

return storeResponse;

case FaultInjectionServerErrorType.InternalServerEror:
INameValueCollection internalServerErrorHeaders = args.RequestHeaders;
internalServerErrorHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Status = 500,
Headers = args.RequestHeaders,
Headers = internalServerErrorHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Internal Server Error, rule: {ruleId}"))
};

Expand All @@ -190,6 +200,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
INameValueCollection readSessionHeaders = args.RequestHeaders;
readSessionHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.ReadSessionNotAvailable).ToString(CultureInfo.InvariantCulture));
readSessionHeaders.Set(HttpConstants.HttpHeaders.SessionToken, badSesstionToken);
readSessionHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Expand All @@ -203,6 +214,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
case FaultInjectionServerErrorType.PartitionIsMigrating:
INameValueCollection partitionMigrationHeaders = args.RequestHeaders;
partitionMigrationHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.CompletingPartitionMigration).ToString(CultureInfo.InvariantCulture));
partitionMigrationHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Expand All @@ -216,6 +228,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
case FaultInjectionServerErrorType.PartitionIsSplitting:
INameValueCollection partitionSplitting = args.RequestHeaders;
partitionSplitting.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.CompletingSplit).ToString(CultureInfo.InvariantCulture));
partitionSplitting.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Expand All @@ -226,10 +239,13 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru

return storeResponse;
case FaultInjectionServerErrorType.ServiceUnavailable:
INameValueCollection serviceUnavailableHeaders = args.RequestHeaders;
serviceUnavailableHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Status = 503,
Headers = args.RequestHeaders,
Headers = serviceUnavailableHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Service Unavailable, rule: {ruleId}"))
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ namespace Microsoft.Azure.Cosmos.FaultInjection.Tests
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Cosmos.FaultInjection.Tests.Utils;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -1276,6 +1278,93 @@ private async Task Timeout_FaultInjectionConnectionErrorRule_Test()
Assert.IsTrue(disposedChannel);
}

[TestMethod]
[Timeout(Timeout)]
[Owner("nalutripician")]
[Description("Tests ReadFeed FaultInjection")]
public async Task FaultInjectionServerErrorRule_ReadFeedTest()
{
string readFeedId = "readFeadRule-" + Guid.NewGuid().ToString();
FaultInjectionRule readFeedRule = new FaultInjectionRuleBuilder(
id: readFeedId,
condition:
new FaultInjectionConditionBuilder()
.WithOperationType(FaultInjectionOperationType.ReadFeed)
.Build(),
result:
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ServiceUnavailable)
.Build())
.WithDuration(TimeSpan.FromMinutes(5))
.Build();

readFeedRule.Disable();

List<FaultInjectionRule> ruleList = new List<FaultInjectionRule> { readFeedRule };
FaultInjector faultInjector = new FaultInjector(ruleList);

await this.Initialize(faultInjector, false);

string changeFeedContainerName = "changeFeedContainer-" + Guid.NewGuid().ToString();
ContainerProperties containerProperties = new ContainerProperties
{
Id = changeFeedContainerName,
PartitionKeyPath = "/partitionKey"
};

if (this.database != null && this.container != null)
{
JObject createdItem = JObject.FromObject(new { id = Guid.NewGuid().ToString(), Pk = Guid.NewGuid().ToString() });
ItemResponse<JObject>? itemResponse = await this.container.CreateItemAsync<JObject>(createdItem);

readFeedRule.Enable();

Container? leaseContainer = await this.database.CreateContainerIfNotExistsAsync(containerProperties, 400);

ManualResetEvent changeFeedRan = new ManualResetEvent(false);

ChangeFeedProcessor changeFeedProcessor = this.container.GetChangeFeedProcessorBuilder<JObject>(
"FaultInjectionTest",
(ChangeFeedProcessorContext context, IReadOnlyCollection<JObject> docs, CancellationToken token) =>
{
Assert.Fail("Change Feed Should Fail");
return Task.CompletedTask;
})
.WithInstanceName("test")
.WithLeaseContainer(leaseContainer)
.WithStartFromBeginning()
.WithErrorNotification((string lease, Exception exception) =>
{
if (exception is CosmosException cosmosException)
{
Assert.AreEqual(HttpStatusCode.ServiceUnavailable, cosmosException.StatusCode);
}
else
{
Assert.Fail("Unexpected Exception");
}

changeFeedRan.Set();
return Task.CompletedTask;
})
.Build();

await changeFeedProcessor.StartAsync();
await Task.Delay(1000);

try
{
bool wasProcessed = changeFeedRan.WaitOne(60000);
Assert.IsTrue(wasProcessed, "Timed out waiting for handler to execute");
}
finally
{
await changeFeedProcessor.StopAsync();
readFeedRule.Disable();
}
}
}


private async Task<CosmosDiagnostics> PerformDocumentOperation(Container testContainer, OperationType operationType, JObject item)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

internal static class TestCommon
{
public const string Endpoint = "";
public const string AuthKey = "";
public const string EndpointMultiRegion = "";
public const string AuthKeyMultiRegion = "";

Expand Down