Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't signal shutdown Cancellation Token when drain mode is enabled #2821

Merged
merged 1 commit into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions src/Microsoft.Azure.WebJobs.Host/Executors/FunctionExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,19 +272,6 @@ internal async Task<string> ExecuteWithLoggingAsync(IFunctionInstanceEx instance
// (e.g. Based on TimeoutAttribute, etc.)
CancellationTokenSource functionCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

if (_drainModeManager != null)
{
if (_drainModeManager.IsDrainModeEnabled == true)
{
functionCancellationTokenSource.Cancel();
logger?.LogInformation("Requesting cancellation for function invocation '{invocationId}'", instance.Id);
}
else
{
_drainModeManager?.RegisterTokenSource(instance.Id, functionCancellationTokenSource);
}
}

using (functionCancellationTokenSource)
{
FunctionOutputLogger.SetOutput(outputLog);
Expand Down Expand Up @@ -336,7 +323,6 @@ internal async Task<string> ExecuteWithLoggingAsync(IFunctionInstanceEx instance
{
updateParameterLogTimer?.Dispose();
parameterHelper.FlushParameterWatchers();
_drainModeManager?.UnRegisterTokenSource(instance.Id);
}

var exceptionInfo = GetExceptionDispatchInfo(invocationException, instance);
Expand Down
33 changes: 0 additions & 33 deletions src/Microsoft.Azure.WebJobs.Host/Hosting/DrainModeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,13 @@ public void RegisterListener(IListener listener)
Listeners.Add(listener);
}

public void RegisterTokenSource(Guid guid, CancellationTokenSource tokenSource)
{
_cancellationTokenSources.TryAdd(guid, tokenSource);
}

public void UnRegisterTokenSource(Guid guid)
{
_cancellationTokenSources.TryRemove(guid, out CancellationTokenSource cts);
}

public async Task EnableDrainModeAsync(CancellationToken cancellationToken)
{
if (!IsDrainModeEnabled)
{
IsDrainModeEnabled = true;
_logger.LogInformation("DrainMode mode enabled");

CancelFunctionInvocations();

List<Task> tasks = new List<Task>();

_logger.LogInformation("Calling StopAsync on the registered listeners");
Expand All @@ -64,26 +52,5 @@ public async Task EnableDrainModeAsync(CancellationToken cancellationToken)
_logger.LogInformation("Call to StopAsync complete, registered listeners are now stopped");
}
}

public void CancelFunctionInvocations()
{
foreach (var keyValuePair in _cancellationTokenSources)
{
string invocationId = keyValuePair.Key.ToString();
try
{
_logger?.LogInformation("Requesting cancellation for function invocation '{invocationId}'", invocationId);
keyValuePair.Value.Cancel();
}
catch (ObjectDisposedException)
{
_logger?.LogInformation("Cancellation token for function invocation '{invocationId}' already disposed. No action required", invocationId);
}
catch (Exception exception)
{
_logger?.LogError(exception, "Exception occured when attempting to request cancellation for function invocation '{invocationId}'", invocationId);
}
}
}
}
}
4 changes: 0 additions & 4 deletions src/Microsoft.Azure.WebJobs.Host/Hosting/IDrainModeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ public interface IDrainModeManager

void RegisterListener(IListener listener);

void RegisterTokenSource(Guid guid, CancellationTokenSource tokenSource);

void UnRegisterTokenSource(Guid guid);

Task EnableDrainModeAsync(CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -375,37 +375,6 @@ public async Task ExecuteLoggingAsync_WithDrainModeManagerNull_SkipsDrainModeOpe
}
}

[Fact]
public async Task ExecuteLoggingAsync_WithDrainModeManagerEnabled_RequestsCancellation()
{
var loggerFactory = new LoggerFactory();
var loggerProvider = new TestLoggerProvider();
loggerFactory.AddProvider(loggerProvider);
var drainModeLogger = loggerFactory.CreateLogger<DrainModeManager>();

var logger = new TestLogger("Tests.FunctionExecutor");
var mockFunctionInstanceEx = GetFunctionInstanceExMockInstance();
var parameterHelper = new FunctionExecutor.ParameterHelper(mockFunctionInstanceEx);

var drainModeManager = new DrainModeManager(drainModeLogger);
await drainModeManager.EnableDrainModeAsync(CancellationToken.None);
var functionExecutor = GetTestFunctionExecutor(drainModeManager);

try
{
await functionExecutor.ExecuteWithLoggingAsync(mockFunctionInstanceEx, new FunctionStartedMessage(), new FunctionInstanceLogEntry(), parameterHelper, logger, CancellationToken.None);
}
// Function Invocation Exception is expected
catch (FunctionInvocationException)
{
}
finally
{
var messages = logger.GetLogMessages().Select(p => p.FormattedMessage);
Assert.Equal(messages.ElementAt(0), "Requesting cancellation for function invocation '00000000-0000-0000-0000-000000000000'");
}
}

[Fact]
public void OnFunctionTimeout_PerformsExpectedOperations()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,32 +59,5 @@ public async Task RegisterListener_EnableDrainModeAsync_CallsStopAsyncAndEnables
p => Assert.Equal("Calling StopAsync on the registered listeners", p),
p => Assert.Equal("Call to StopAsync complete, registered listeners are now stopped", p));
}

[Fact]
public async Task RegisterListener_EnableDrainModeAsync_CancelAllRegisteredTokens()
{
Mock<IListener> listener = new Mock<IListener>(MockBehavior.Strict);
listener.Setup(bl => bl.StopAsync(It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(false));

var drainModeManager = new DrainModeManager(_logger);
drainModeManager.RegisterListener(listener.Object);

var tokenSource = new CancellationTokenSource();
var guid = Guid.NewGuid();
drainModeManager.RegisterTokenSource(guid, tokenSource);

await drainModeManager.EnableDrainModeAsync(CancellationToken.None);
listener.VerifyAll();

Assert.Equal(drainModeManager.IsDrainModeEnabled, true);
Assert.Equal(tokenSource.Token.IsCancellationRequested, true);

Assert.Collection(_loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage),
p => Assert.Equal("DrainMode mode enabled", p),
p => Assert.Equal($"Requesting cancellation for function invocation '{guid}'", p),
p => Assert.Equal("Calling StopAsync on the registered listeners", p),
p => Assert.Equal("Call to StopAsync complete, registered listeners are now stopped", p));
}
}
}