Skip to content

Commit

Permalink
Add cancellation token to all runtime client methods
Browse files Browse the repository at this point in the history
Add an optional CancellationToken argument to all IRuntimeApiClient methods.
  • Loading branch information
martincostello committed Nov 3, 2019
1 parent 6945418 commit f2f3b97
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ public interface IRuntimeApiClient
/// Report an initialization error as an asynchronous operation.
/// </summary>
/// <param name="exception">The exception to report.</param>
/// <param name="cancellationToken">The optional cancellation token to use.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
Task ReportInitializationErrorAsync(Exception exception);
Task ReportInitializationErrorAsync(Exception exception, CancellationToken cancellationToken = default);

/// <summary>
/// Send an initialization error with a type string but no other information as an asynchronous operation.
/// This can be used to directly control flow in Step Functions without creating an Exception class and throwing it.
/// </summary>
/// <param name="errorType">The type of the error to report to Lambda. This does not need to be a .NET type name.</param>
/// <param name="cancellationToken">The optional cancellation token to use.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
Task ReportInitializationErrorAsync(string errorType);
Task ReportInitializationErrorAsync(string errorType, CancellationToken cancellationToken = default);

/// <summary>
/// Get the next function invocation from the Runtime API as an asynchronous operation.
Expand All @@ -57,24 +59,27 @@ public interface IRuntimeApiClient
/// </summary>
/// <param name="awsRequestId">The ID of the function request that caused the error.</param>
/// <param name="exception">The exception to report.</param>
/// <param name="cancellationToken">The optional cancellation token to use.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
Task ReportInvocationErrorAsync(string awsRequestId, Exception exception);
Task ReportInvocationErrorAsync(string awsRequestId, Exception exception, CancellationToken cancellationToken = default);

/// <summary>
/// Send an initialization error with a type string but no other information as an asynchronous operation.
/// This can be used to directly control flow in Step Functions without creating an Exception class and throwing it.
/// </summary>
/// <param name="awsRequestId">The ID of the function request that caused the error.</param>
/// <param name="errorType">The type of the error to report to Lambda. This does not need to be a .NET type name.</param>
/// <param name="cancellationToken">The optional cancellation token to use.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
Task ReportInvocationErrorAsync(string awsRequestId, string errorType);
Task ReportInvocationErrorAsync(string awsRequestId, string errorType, CancellationToken cancellationToken = default);

/// <summary>
/// Send a response to a function invocation to the Runtime API as an asynchronous operation.
/// </summary>
/// <param name="awsRequestId">The ID of the function request being responded to.</param>
/// <param name="outputStream">The content of the response to the function invocation.</param>
/// <param name="cancellationToken">The optional cancellation token to use.</param>
/// <returns></returns>
Task SendResponseAsync(string awsRequestId, Stream outputStream);
Task SendResponseAsync(string awsRequestId, Stream outputStream, CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,29 @@ internal RuntimeApiClient(IEnvironmentVariables environmentVariables, IInternalR
/// Report an initialization error as an asynchronous operation.
/// </summary>
/// <param name="exception">The exception to report.</param>
/// <param name="cancellationToken">The optional cancellation token to use.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
public Task ReportInitializationErrorAsync(Exception exception)
public Task ReportInitializationErrorAsync(Exception exception, CancellationToken cancellationToken = default)
{
if (exception == null)
throw new ArgumentNullException(nameof(exception));

return _internalClient.ErrorAsync(null, LambdaJsonExceptionWriter.WriteJson(ExceptionInfo.GetExceptionInfo(exception)));
return _internalClient.ErrorAsync(null, LambdaJsonExceptionWriter.WriteJson(ExceptionInfo.GetExceptionInfo(exception)), cancellationToken);
}

/// <summary>
/// Send an initialization error with a type string but no other information as an asynchronous operation.
/// This can be used to directly control flow in Step Functions without creating an Exception class and throwing it.
/// </summary>
/// <param name="errorType">The type of the error to report to Lambda. This does not need to be a .NET type name.</param>
/// <param name="cancellationToken">The optional cancellation token to use.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
public Task ReportInitializationErrorAsync(string errorType)
public Task ReportInitializationErrorAsync(string errorType, CancellationToken cancellationToken = default)
{
if (errorType == null)
throw new ArgumentNullException(nameof(errorType));

return _internalClient.ErrorAsync(errorType, null);
return _internalClient.ErrorAsync(errorType, null, cancellationToken);
}

/// <summary>
Expand All @@ -94,7 +96,7 @@ public Task ReportInitializationErrorAsync(string errorType)
/// </summary>
/// <param name="cancellationToken">The optional cancellation token to use to stop listening for the next invocation.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
public async Task<InvocationRequest> GetNextInvocationAsync(CancellationToken cancellationToken)
public async Task<InvocationRequest> GetNextInvocationAsync(CancellationToken cancellationToken = default)
{
SwaggerResponse<Stream> response = await _internalClient.NextAsync(cancellationToken);

Expand All @@ -111,8 +113,9 @@ public async Task<InvocationRequest> GetNextInvocationAsync(CancellationToken ca
/// </summary>
/// <param name="awsRequestId">The ID of the function request that caused the error.</param>
/// <param name="exception">The exception to report.</param>
/// <param name="cancellationToken">The optional cancellation token to use.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
public Task ReportInvocationErrorAsync(string awsRequestId, Exception exception)
public Task ReportInvocationErrorAsync(string awsRequestId, Exception exception, CancellationToken cancellationToken = default)
{
if (awsRequestId == null)
throw new ArgumentNullException(nameof(awsRequestId));
Expand All @@ -121,7 +124,7 @@ public Task ReportInvocationErrorAsync(string awsRequestId, Exception exception)
throw new ArgumentNullException(nameof(exception));

var exceptionInfo = ExceptionInfo.GetExceptionInfo(exception);
return _internalClient.Error2Async(awsRequestId, exceptionInfo.ErrorType, LambdaJsonExceptionWriter.WriteJson(exceptionInfo));
return _internalClient.Error2Async(awsRequestId, exceptionInfo.ErrorType, LambdaJsonExceptionWriter.WriteJson(exceptionInfo), cancellationToken);
}

/// <summary>
Expand All @@ -130,21 +133,23 @@ public Task ReportInvocationErrorAsync(string awsRequestId, Exception exception)
/// </summary>
/// <param name="awsRequestId">The ID of the function request that caused the error.</param>
/// <param name="errorType">The type of the error to report to Lambda. This does not need to be a .NET type name.</param>
/// <param name="cancellationToken">The optional cancellation token to use.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
public Task ReportInvocationErrorAsync(string awsRequestId, string errorType)
public Task ReportInvocationErrorAsync(string awsRequestId, string errorType, CancellationToken cancellationToken = default)
{
return _internalClient.Error2Async(awsRequestId, errorType, null);
return _internalClient.Error2Async(awsRequestId, errorType, null, cancellationToken);
}

/// <summary>
/// Send a response to a function invocation to the Runtime API as an asynchronous operation.
/// </summary>
/// <param name="awsRequestId">The ID of the function request being responded to.</param>
/// <param name="outputStream">The content of the response to the function invocation.</param>
/// <param name="cancellationToken">The optional cancellation token to use.</param>
/// <returns></returns>
public async Task SendResponseAsync(string awsRequestId, Stream outputStream)
public async Task SendResponseAsync(string awsRequestId, Stream outputStream, CancellationToken cancellationToken = default)
{
await _internalClient.ResponseAsync(awsRequestId, outputStream, CancellationToken.None);
await _internalClient.ResponseAsync(awsRequestId, outputStream, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

Expand Down Expand Up @@ -75,7 +76,7 @@ public void VerifyOutput(byte[] expectedOutput)
}
}

public Task<InvocationRequest> GetNextInvocationAsync(System.Threading.CancellationToken cancellationToken = default)
public Task<InvocationRequest> GetNextInvocationAsync(CancellationToken cancellationToken = default)
{
GetNextInvocationAsyncCalled = true;

Expand All @@ -94,31 +95,31 @@ public Task<InvocationRequest> GetNextInvocationAsync(System.Threading.Cancellat
});
}

public Task ReportInitializationErrorAsync(Exception exception)
public Task ReportInitializationErrorAsync(Exception exception, CancellationToken cancellationToken = default)
{
ReportInitializationErrorAsyncExceptionCalled = true;
return Task.Run(() => { });
}

public Task ReportInitializationErrorAsync(string errorType)
public Task ReportInitializationErrorAsync(string errorType, CancellationToken cancellationToken = default)
{
ReportInitializationErrorAsyncTypeCalled = true;
return Task.Run(() => { });
}

public Task ReportInvocationErrorAsync(string awsRequestId, Exception exception)
public Task ReportInvocationErrorAsync(string awsRequestId, Exception exception, CancellationToken cancellationToken = default)
{
ReportInvocationErrorAsyncExceptionCalled = true;
return Task.Run(() => { });
}

public Task ReportInvocationErrorAsync(string awsRequestId, string errorType)
public Task ReportInvocationErrorAsync(string awsRequestId, string errorType, CancellationToken cancellationToken = default)
{
ReportInvocationErrorAsyncTypeCalled = true;
return Task.Run(() => { });
}

public Task SendResponseAsync(string awsRequestId, Stream outputStream)
public Task SendResponseAsync(string awsRequestId, Stream outputStream, CancellationToken cancellationToken = default)
{
if (outputStream != null)
{
Expand Down

0 comments on commit f2f3b97

Please sign in to comment.