Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FaultInjection: Adds ReadFeed Operation Type #4448

Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ public FaultInjectionServerErrorType GetInjectedServerErrorType()
public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ruleId)
{
StoreResponse storeResponse;
string lsn = args.RequestHeaders.Get(WFConstants.BackendHeaders.LSN) ?? "0";

switch (this.serverErrorType)
{
case FaultInjectionServerErrorType.Gone:
INameValueCollection goneHeaders = args.RequestHeaders;
goneHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.ServerGenerated410).ToString(CultureInfo.InvariantCulture));
goneHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
storeResponse = new StoreResponse()
{
Status = 410,
Expand All @@ -139,10 +141,12 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
return storeResponse;

case FaultInjectionServerErrorType.RetryWith:
INameValueCollection retryWithHeaders = args.RequestHeaders;
retryWithHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);
storeResponse = new StoreResponse()
{
Status = 449,
Headers = args.RequestHeaders,
Headers = retryWithHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Retry With, rule: {ruleId}"))
};

Expand All @@ -152,6 +156,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
INameValueCollection tooManyRequestsHeaders = args.RequestHeaders;
tooManyRequestsHeaders.Set(HttpConstants.HttpHeaders.RetryAfterInMilliseconds, "500");
tooManyRequestsHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.RUBudgetExceeded).ToString(CultureInfo.InvariantCulture));
tooManyRequestsHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Expand All @@ -163,21 +168,26 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
return storeResponse;

case FaultInjectionServerErrorType.Timeout:
INameValueCollection timeoutHeaders = args.RequestHeaders;
timeoutHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Status = 408,
Headers = args.RequestHeaders,
Headers = timeoutHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Timeout, rule: {ruleId}"))
};

return storeResponse;

case FaultInjectionServerErrorType.InternalServerEror:
INameValueCollection internalServerErrorHeaders = args.RequestHeaders;
internalServerErrorHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Status = 500,
Headers = args.RequestHeaders,
Headers = internalServerErrorHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Internal Server Error, rule: {ruleId}"))
};

Expand All @@ -190,6 +200,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
INameValueCollection readSessionHeaders = args.RequestHeaders;
readSessionHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.ReadSessionNotAvailable).ToString(CultureInfo.InvariantCulture));
readSessionHeaders.Set(HttpConstants.HttpHeaders.SessionToken, badSesstionToken);
readSessionHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Expand All @@ -203,6 +214,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
case FaultInjectionServerErrorType.PartitionIsMigrating:
INameValueCollection partitionMigrationHeaders = args.RequestHeaders;
partitionMigrationHeaders.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.CompletingPartitionMigration).ToString(CultureInfo.InvariantCulture));
partitionMigrationHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Expand All @@ -216,6 +228,7 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru
case FaultInjectionServerErrorType.PartitionIsSplitting:
INameValueCollection partitionSplitting = args.RequestHeaders;
partitionSplitting.Set(WFConstants.BackendHeaders.SubStatus, ((int)SubStatusCodes.CompletingSplit).ToString(CultureInfo.InvariantCulture));
partitionSplitting.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Expand All @@ -226,10 +239,13 @@ public StoreResponse GetInjectedServerError(ChannelCallArguments args, string ru

return storeResponse;
case FaultInjectionServerErrorType.ServiceUnavailable:
INameValueCollection serviceUnavailableHeaders = args.RequestHeaders;
serviceUnavailableHeaders.Set(WFConstants.BackendHeaders.LocalLSN, lsn);

storeResponse = new StoreResponse()
{
Status = 503,
Headers = args.RequestHeaders,
Headers = serviceUnavailableHeaders,
ResponseBody = new MemoryStream(Encoding.UTF8.GetBytes($"Fault Injection Server Error: Service Unavailable, rule: {ruleId}"))
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1116,23 +1116,24 @@ private async Task Timeout_FaultInjectionConnectionErrorRule_Test()
[Description("Tests ReadFeed FaultInjection")]
public void FaultInjectionServerErrorRule_ReadFeedTest()
{
if (!this.Timeout_FaultInjectionServerErrorRule_ReadFeedTest().Wait(Timeout))
{
Assert.Fail("Test timed out");
}
this.Timeout_FaultInjectionServerErrorRule_ReadFeedTest().Wait();
//if (!this.Timeout_FaultInjectionServerErrorRule_ReadFeedTest().Wait(Timeout))
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
//{
// Assert.Fail("Test timed out");
//}
}

private async Task Timeout_FaultInjectionServerErrorRule_ReadFeedTest()
{
string readFeedId = "readFeadGoneRule-" + Guid.NewGuid().ToString();
string readFeedId = "readFeadRule-" + Guid.NewGuid().ToString();
FaultInjectionRule readFeedRule = new FaultInjectionRuleBuilder(
id: readFeedId,
condition:
new FaultInjectionConditionBuilder()
.WithOperationType(FaultInjectionOperationType.ReadFeed)
.Build(),
result:
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.Gone)
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ServiceUnavailable)
.Build())
.WithDuration(TimeSpan.FromMinutes(5))
.Build();
Expand All @@ -1144,61 +1145,67 @@ private async Task Timeout_FaultInjectionServerErrorRule_ReadFeedTest()

await this.Initialize(faultInjector, false);

try
string changeFeedContainerName = "changeFeedContainer-" + Guid.NewGuid().ToString();
ContainerProperties containerProperties = new ContainerProperties
{
string changeFeedContainerName = "changeFeedContainer-" + Guid.NewGuid().ToString();
ContainerProperties containerProperties = new ContainerProperties
{
Id = changeFeedContainerName,
PartitionKeyPath = "/partitionKey"
};
if (this.database != null && this.container != null)
{
Container? leaseContainer = await this.database.CreateContainerIfNotExistsAsync(containerProperties, 400);
Id = changeFeedContainerName,
PartitionKeyPath = "/partitionKey"
};

ChangeFeedProcessor changeFeedProcessor = this.container.GetChangeFeedProcessorBuilder<JObject>(
processorName: "FaultInjectionTest",
onChangesDelegate: HandleChangesAsync)
.WithInstanceName("test")
.WithLeaseContainer(leaseContainer)
.Build();
await changeFeedProcessor.StartAsync();
await Task.Delay(1000);
if (this.database != null && this.container != null)
{
JObject createdItem = JObject.FromObject(new { id = Guid.NewGuid().ToString(), Pk = Guid.NewGuid().ToString() });
ItemResponse<JObject>? itemResponse = await this.container.CreateItemAsync<JObject>(createdItem);

JObject createdItem = JObject.FromObject(new { id = Guid.NewGuid().ToString(), Pk = Guid.NewGuid().ToString() });
readFeedRule.Enable();

ItemResponse<JObject>? itemResponse = this.container != null
? await this.container.CreateItemAsync<JObject>(createdItem)
: null;
Assert.IsNotNull(itemResponse);
Container? leaseContainer = await this.database.CreateContainerIfNotExistsAsync(containerProperties, 400);

readFeedRule.Enable();
ManualResetEvent changeFeedRan = new ManualResetEvent(false);

await Task.Delay(5000);
}
}
finally
{
readFeedRule.Disable();
}
}
ChangeFeedProcessor changeFeedProcessor;
changeFeedProcessor = this.container.GetChangeFeedProcessorBuilder<JObject>(
NaluTripician marked this conversation as resolved.
Show resolved Hide resolved
"FaultInjectionTest",
(ChangeFeedProcessorContext context, IReadOnlyCollection<JObject> docs, CancellationToken token) =>
{
Assert.Fail("Change Feed Should Fail");
return Task.CompletedTask;
})
.WithInstanceName("test")
.WithLeaseContainer(leaseContainer)
.WithStartFromBeginning()
.WithErrorNotification((string lease, Exception exception) =>
{
if (exception is CosmosException cosmosException)
{
Assert.AreEqual(HttpStatusCode.ServiceUnavailable, cosmosException.StatusCode);
}
else
{
Assert.Fail("Unexpected Exception");
}

private static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<JObject> changes,
CancellationToken cancellationToken)
{
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Assert.Fail("Change Feed Processor took too long");
}
changeFeedRan.Set();
return Task.CompletedTask;
})
.Build();

CosmosTraceDiagnostics? traceDiagnostic = context.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
Assert.IsTrue(traceDiagnostic.Value.Data.TryGetValue("StatusCode", out object? statusCode));
Assert.AreEqual((int)StatusCodes.Gone, (int)statusCode);
await Task.Delay(1);
await changeFeedProcessor.StartAsync();
await Task.Delay(1000);

try
{
bool wasProcessed = changeFeedRan.WaitOne(60000);
Assert.IsTrue(wasProcessed, "Timed out waiting for handler to execute");
}
finally
{
await changeFeedProcessor.StopAsync();
readFeedRule.Disable();
}
}
}


private async Task<CosmosDiagnostics> PerformDocumentOperation(Container testContainer, OperationType operationType, JObject item)
{
Expand Down
Loading