Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cancellation tokens #21

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dotnet add package PipelineNet
- [Middleware](#middleware)
- [Pipelines](#pipelines)
- [Chains of responsibility](#chains-of-responsibility)
- [Cancellation tokens](#cancellation-tokens)
- [Middleware resolver](#middleware-resolver)
- [ServiceProvider implementation](#serviceprovider-implementation)
- [Unity implementation](#unity-implementation)
Expand Down Expand Up @@ -224,6 +225,37 @@ result = await exceptionHandlersChain.Execute(new ArgumentException()); // Resul
result = await exceptionHandlersChain.Execute(new InvalidOperationException()); // Result will be false
```

## Cancellation tokens
If you want to pass the cancellation token to your asynchronous pipeline middleware, you can do so by implementing the `ICancellableAsyncMiddleware<TParameter>` interface
and passing the cancellation token argument to the `IAsyncPipeline<TParameter>.Execute` method:
```C#
var pipeline = new AsyncPipeline<Bitmap>(new ActivatorMiddlewareResolver())
.AddCancellable<RoudCornersCancellableAsyncMiddleware>()
.Add<AddTransparencyAsyncMiddleware>() // You can mix both kinds of asynchronous middleware
.AddCancellable<AddWatermarkCancellableAsyncMiddleware>();

Bitmap image = (Bitmap) Image.FromFile("party-photo.png");
CancellationToken cancellationToken = CancellationToken.None;
await pipeline.Execute(image, cancellationToken);

public class RoudCornersCancellableAsyncMiddleware : ICancellableAsyncMiddleware<Bitmap>
{
public async Task Run(Bitmap parameter, Func<Bitmap, Task> next, CancellationToken cancellationToken)
{
await RoundCournersAsync(parameter, cancellationToken);
await next(parameter);
}

private async Task RoudCournersAsync(Bitmap bitmap, CancellationToken cancellationToken)
{
// Handle somehow
await Task.CompletedTask;
}
}
```
And to pass the cancellation token to your asynchronous chain of responsibility middleware, you can implement the `ICancellableAsyncMiddleware<TParameter, TReturn>` interface
and pass the cancellation token argument to the `IAsynchChainOfResponsibility<TParamete, TReturnr>.Execute` method.

## Middleware resolver
You may be wondering what is all this `ActivatorMiddlewareResolver` class being passed to every instance of pipeline and chain of responsibility.
This is a default implementation of the `IMiddlewareResolver`, which is used to create instances of the middleware types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ public static IServiceCollection AddMiddlewareFromAssembly(
{
typeof(IMiddleware<>),
typeof(IAsyncMiddleware<>),
typeof(ICancellableAsyncMiddleware<>),
typeof(IMiddleware<,>),
typeof(IAsyncMiddleware<,>),
typeof(ICancellableAsyncFinally<,>),
typeof(IAsyncFinally<,>)
typeof(ICancellableAsyncMiddleware<,>),
typeof(IFinally<,>),
typeof(IAsyncFinally<,>),
typeof(ICancellableAsyncFinally<,>)
};

var types = assembly.GetTypes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@
}
}

public class ThrowIfCancellationRequestedMiddleware : ICancellableAsyncMiddleware<Exception, bool>
{
public async Task<bool> Run(Exception exception, Func<Exception, Task<bool>> executeNext, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
return await executeNext(exception);
}
}

public class FinallyThrow : IAsyncFinally<Exception, bool>
{
public Task<bool> Finally(Exception exception)
Expand Down Expand Up @@ -200,10 +209,30 @@
.Finally(input => Task.FromResult(input));

var resultTask = responsibilityChain.Execute(" Test\nwith spaces\n and new lines \n ");
var result = resultTask.Result;

Check warning on line 212 in src/PipelineNet.Tests/ChainsOfResponsibility/AsyncResponsibilityChainTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Check warning on line 212 in src/PipelineNet.Tests/ChainsOfResponsibility/AsyncResponsibilityChainTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Check warning on line 212 in src/PipelineNet.Tests/ChainsOfResponsibility/AsyncResponsibilityChainTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Check warning on line 212 in src/PipelineNet.Tests/ChainsOfResponsibility/AsyncResponsibilityChainTests.cs

View workflow job for this annotation

GitHub Actions / build

Test methods should not use blocking task operations, as they can cause deadlocks. Use an async test method and await instead. (https://xunit.net/xunit.analyzers/rules/xUnit1031)

Assert.Equal("Test with spaces and new lines", result);
}

[Fact]
public async Task Execute_ChainOfMiddlewareWithCancellableMiddleware_CancellableMiddlewareIsExecuted()
{
var responsibilityChain = new AsyncResponsibilityChain<Exception, bool>(new ActivatorMiddlewareResolver())
.Chain<UnavailableResourcesExceptionHandler>()
.Chain(typeof(InvalidateDataExceptionHandler))
.Chain<MyExceptionHandler>()
.ChainCancellable<ThrowIfCancellationRequestedMiddleware>();

// Creates an ArgumentNullException. The 'ThrowIfCancellationRequestedMiddleware'
// middleware should be the last one to execute.
var exception = new ArgumentNullException();

// Create the cancellation token in the canceled state.
var cancellationToken = new CancellationToken(canceled: true);

// The 'ThrowIfCancellationRequestedMiddleware' should throw 'OperationCanceledException'.
await Assert.ThrowsAsync<OperationCanceledException>(() => responsibilityChain.Execute(exception, cancellationToken));
}
#pragma warning restore CS0618 // Type or member is obsolete

[Fact]
Expand Down
37 changes: 37 additions & 0 deletions src/PipelineNet.Tests/Pipelines/AsyncPipelineTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@
await executeNext(context);
}
}

public class ThrowIfCancellationRequestedMiddleware : ICancellableAsyncMiddleware<PersonModel>
{
public async Task Run(PersonModel context, Func<PersonModel, Task> executeNext, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
await executeNext(context);
}
}
#endregion

[Fact]
Expand Down Expand Up @@ -123,7 +132,7 @@
Gender = Gender.Other
};

pipeline.Execute(personModel);

Check warning on line 135 in src/PipelineNet.Tests/Pipelines/AsyncPipelineTests.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check warning on line 135 in src/PipelineNet.Tests/Pipelines/AsyncPipelineTests.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check warning on line 135 in src/PipelineNet.Tests/Pipelines/AsyncPipelineTests.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check warning on line 135 in src/PipelineNet.Tests/Pipelines/AsyncPipelineTests.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

// Check if the level of 'personModel' is 4, which is configured by 'PersonWithGenderProperty' middleware.
Assert.Equal(4, personModel.Level);
Expand Down Expand Up @@ -162,5 +171,33 @@
pipeline.Add(typeof(AsyncPipelineTests));
});
}

[Fact]
public async Task Execute_RunPipelineWithCancellableMiddleware_CancellableMiddlewareIsExecuted()
{
var pipeline = new AsyncPipeline<PersonModel>(new ActivatorMiddlewareResolver())
.Add<PersonWithEvenId>()
.Add<PersonWithOddId>()
.Add<PersonWithEmailName>()
.Add<PersonWithGenderProperty>()
.AddCancellable<ThrowIfCancellationRequestedMiddleware>();

// Create a new instance with a 'Gender' property. The 'ThrowIfCancellationRequestedMiddleware'
// middleware should be the last one to execute.
var personModel = new PersonModel
{
Name = "this_is_my_email@servername.js",
Gender = Gender.Other
};

// Create the cancellation token in the canceled state.
var cancellationToken = new CancellationToken(canceled: true);

// Check if 'ThrowIfCancellationRequestedMiddleware' threw 'OperationCanceledException'.
await Assert.ThrowsAsync<OperationCanceledException>(() => pipeline.Execute(personModel, cancellationToken));

// Check if the level of 'personModel' is 4, which is configured by 'PersonWithGenderProperty' middleware.
Assert.Equal(4, personModel.Level);
}
}
}
64 changes: 64 additions & 0 deletions src/PipelineNet/AsyncBaseMiddlewareFlow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using PipelineNet.MiddlewareResolver;
using System;
using System.Collections.Generic;
using System.Reflection;

namespace PipelineNet
{
/// <summary>
/// Defines the base class for asynchronous middleware flows.
/// </summary>
/// <typeparam name="TMiddleware">The middleware type.</typeparam>
/// <typeparam name="TCancellableMiddleware">The cancellable middleware type.</typeparam>
public abstract class AsyncBaseMiddlewareFlow<TMiddleware, TCancellableMiddleware>
{
/// <summary>
/// The list of middleware types.
/// </summary>
protected IList<Type> MiddlewareTypes { get; private set; }

/// <summary>
/// The resolver used to create the middleware types.
/// </summary>
protected IMiddlewareResolver MiddlewareResolver { get; private set; }

internal AsyncBaseMiddlewareFlow(IMiddlewareResolver middlewareResolver)
{
MiddlewareResolver = middlewareResolver ?? throw new ArgumentNullException("middlewareResolver",
"An instance of IMiddlewareResolver must be provided. You can use ActivatorMiddlewareResolver.");
MiddlewareTypes = new List<Type>();
}

/// <summary>
/// Stores the <see cref="TypeInfo"/> of the middleware type.
/// </summary>
private static readonly TypeInfo MiddlewareTypeInfo = typeof(TMiddleware).GetTypeInfo();

/// <summary>
/// Stores the <see cref="TypeInfo"/> of the cancellable middleware type.
/// </summary>
private static readonly TypeInfo CancellableMiddlewareTypeInfo = typeof(TCancellableMiddleware).GetTypeInfo();


/// <summary>
/// Adds a new middleware type to the internal list of types.
/// Middleware will be executed in the same order they are added.
/// </summary>
/// <param name="middlewareType">The middleware type to be executed.</param>
/// <exception cref="ArgumentException">Thrown if the <paramref name="middlewareType"/> is
/// not an implementation of <typeparamref name="TMiddleware"/> or <see cref="TCancellableMiddleware"/>.</exception>

Check warning on line 49 in src/PipelineNet/AsyncBaseMiddlewareFlow.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'TCancellableMiddleware' that refers to a type parameter

Check warning on line 49 in src/PipelineNet/AsyncBaseMiddlewareFlow.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'TCancellableMiddleware' that refers to a type parameter

Check warning on line 49 in src/PipelineNet/AsyncBaseMiddlewareFlow.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'TCancellableMiddleware' that refers to a type parameter

Check warning on line 49 in src/PipelineNet/AsyncBaseMiddlewareFlow.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'TCancellableMiddleware' that refers to a type parameter

Check warning on line 49 in src/PipelineNet/AsyncBaseMiddlewareFlow.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'TCancellableMiddleware' that refers to a type parameter

Check warning on line 49 in src/PipelineNet/AsyncBaseMiddlewareFlow.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'TCancellableMiddleware' that refers to a type parameter

Check warning on line 49 in src/PipelineNet/AsyncBaseMiddlewareFlow.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'TCancellableMiddleware' that refers to a type parameter

Check warning on line 49 in src/PipelineNet/AsyncBaseMiddlewareFlow.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'TCancellableMiddleware' that refers to a type parameter

Check warning on line 49 in src/PipelineNet/AsyncBaseMiddlewareFlow.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'TCancellableMiddleware' that refers to a type parameter

Check warning on line 49 in src/PipelineNet/AsyncBaseMiddlewareFlow.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'TCancellableMiddleware' that refers to a type parameter

Check warning on line 49 in src/PipelineNet/AsyncBaseMiddlewareFlow.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'TCancellableMiddleware' that refers to a type parameter

Check warning on line 49 in src/PipelineNet/AsyncBaseMiddlewareFlow.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'TCancellableMiddleware' that refers to a type parameter
/// <exception cref="ArgumentNullException">Thrown if <paramref name="middlewareType"/> is null.</exception>
protected void AddMiddleware(Type middlewareType)
{
if (middlewareType == null) throw new ArgumentNullException("middlewareType");

bool isAssignableFromMiddleware = MiddlewareTypeInfo.IsAssignableFrom(middlewareType.GetTypeInfo())
|| CancellableMiddlewareTypeInfo.IsAssignableFrom(middlewareType.GetTypeInfo());
if (!isAssignableFromMiddleware)
throw new ArgumentException(
$"The middleware type must implement \"{typeof(TMiddleware)}\" or \"{typeof(TCancellableMiddleware)}\".");

this.MiddlewareTypes.Add(middlewareType);
}
}
}
38 changes: 31 additions & 7 deletions src/PipelineNet/ChainsOfResponsibility/AsyncResponsibilityChain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace PipelineNet.ChainsOfResponsibility
/// </summary>
/// <typeparam name="TParameter">The input type for the chain.</typeparam>
/// <typeparam name="TReturn">The return type of the chain.</typeparam>
public class AsyncResponsibilityChain<TParameter, TReturn> : BaseMiddlewareFlow<IAsyncMiddleware<TParameter, TReturn>>,
public class AsyncResponsibilityChain<TParameter, TReturn> : AsyncBaseMiddlewareFlow<IAsyncMiddleware<TParameter, TReturn>, ICancellableAsyncMiddleware<TParameter, TReturn>>,
IAsyncResponsibilityChain<TParameter, TReturn>
{
/// <summary>
Expand Down Expand Up @@ -50,13 +50,25 @@ public IAsyncResponsibilityChain<TParameter, TReturn> Chain<TMiddleware>() where
return this;
}

/// <summary>
/// Chains a new cancellable middleware to the chain of responsibility.
/// Middleware will be executed in the same order they are added.
/// </summary>
/// <typeparam name="TCancellableMiddleware">The new middleware being added.</typeparam>
/// <returns>The current instance of <see cref="IAsyncResponsibilityChain{TParameter, TReturn}"/>.</returns>
public IAsyncResponsibilityChain<TParameter, TReturn> ChainCancellable<TCancellableMiddleware>() where TCancellableMiddleware : ICancellableAsyncMiddleware<TParameter, TReturn>
{
MiddlewareTypes.Add(typeof(TCancellableMiddleware));
return this;
}

/// <summary>
/// Chains a new middleware type to the chain of responsibility.
/// Middleware will be executed in the same order they are added.
/// </summary>
/// <param name="middlewareType">The middleware type to be executed.</param>
/// <exception cref="ArgumentException">Thrown if the <paramref name="middlewareType"/> is
/// not an implementation of <see cref="IAsyncMiddleware{TParameter, TReturn}"/>.</exception>
/// not an implementation of <see cref="IAsyncMiddleware{TParameter, TReturn}"/> or <see cref="ICancellableAsyncMiddleware{TParameter, TReturn}"/>.</exception>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="middlewareType"/> is null.</exception>
/// <returns>The current instance of <see cref="IAsyncResponsibilityChain{TParameter, TReturn}"/>.</returns>
public IAsyncResponsibilityChain<TParameter, TReturn> Chain(Type middlewareType)
Expand Down Expand Up @@ -92,7 +104,6 @@ public async Task<TReturn> Execute(TParameter parameter, CancellationToken cance
{
var type = MiddlewareTypes[index];
resolverResult = MiddlewareResolver.Resolve(type);
var middleware = (IAsyncMiddleware<TParameter, TReturn>)resolverResult.Middleware;

index++;
// If the current instance of middleware is the last one in the list,
Expand Down Expand Up @@ -142,20 +153,33 @@ public async Task<TReturn> Execute(TParameter parameter, CancellationToken cance
}
}

if (resolverResult.IsDisposable && !(middleware is IDisposable
if (resolverResult == null || resolverResult.Middleware == null)
{
throw new InvalidOperationException($"'{MiddlewareResolver.GetType()}' failed to resolve middleware of type '{type}'.");
}

if (resolverResult.IsDisposable && !(resolverResult.Middleware is IDisposable
#if NETSTANDARD2_1_OR_GREATER
|| middleware is IAsyncDisposable
|| resolverResult.Middleware is IAsyncDisposable
#endif
))
{
throw new InvalidOperationException($"'{middleware.GetType().FullName}' type does not implement IDisposable" +
throw new InvalidOperationException($"'{resolverResult.Middleware.GetType()}' type does not implement IDisposable" +
#if NETSTANDARD2_1_OR_GREATER
" or IAsyncDisposable" +
#endif
".");
}

return await middleware.Run(param, func).ConfigureAwait(false);
if (resolverResult.Middleware is ICancellableAsyncMiddleware<TParameter, TReturn> cancellableMiddleware)
{
return await cancellableMiddleware.Run(param, func, cancellationToken).ConfigureAwait(false);
}
else
{
var middleware = (IAsyncMiddleware<TParameter, TReturn>)resolverResult.Middleware;
return await middleware.Run(param, func).ConfigureAwait(false);
}
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,22 @@ IAsyncResponsibilityChain<TParameter, TReturn> CancellableFinally<TCancellableFi
IAsyncResponsibilityChain<TParameter, TReturn> Chain<TMiddleware>()
where TMiddleware : IAsyncMiddleware<TParameter, TReturn>;

/// <summary>
/// Chains a new cancellable middleware to the chain of responsibility.
/// Middleware will be executed in the same order they are added.
/// </summary>
/// <typeparam name="TCancellableMiddleware">The new cancellable middleware being added.</typeparam>
/// <returns>The current instance of <see cref="IAsyncResponsibilityChain{TParameter, TReturn}"/>.</returns>
IAsyncResponsibilityChain<TParameter, TReturn> ChainCancellable<TCancellableMiddleware>()
where TCancellableMiddleware : ICancellableAsyncMiddleware<TParameter, TReturn>;

/// <summary>
/// Chains a new middleware type to the chain of responsibility.
/// Middleware will be executed in the same order they are added.
/// </summary>
/// <param name="middlewareType">The middleware type to be executed.</param>
/// <exception cref="ArgumentException">Thrown if the <paramref name="middlewareType"/> is
/// not an implementation of <see cref="IAsyncMiddleware{TParameter, TReturn}"/>.</exception>
/// not an implementation of <see cref="IAsyncMiddleware{TParameter, TReturn}"/> or <see cref="ICancellableAsyncMiddleware{TParameter, TReturn}"/>.</exception>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="middlewareType"/> is null.</exception>
/// <returns>The current instance of <see cref="IAsyncResponsibilityChain{TParameter, TReturn}"/>.</returns>
IAsyncResponsibilityChain<TParameter, TReturn> Chain(Type middlewareType);
Expand Down
15 changes: 10 additions & 5 deletions src/PipelineNet/ChainsOfResponsibility/ResponsibilityChain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public TReturn Execute(TParameter parameter)
{
var type = MiddlewareTypes[index];
resolverResult = MiddlewareResolver.Resolve(type);
var middleware = (IMiddleware<TParameter, TReturn>)resolverResult.Middleware;

index++;
// If the current instance of middleware is the last one in the list,
Expand Down Expand Up @@ -165,19 +164,25 @@ public TReturn Execute(TParameter parameter)
}
}

if (resolverResult.IsDisposable && !(middleware is IDisposable))
if (resolverResult == null || resolverResult.Middleware == null)
{
throw new InvalidOperationException($"'{MiddlewareResolver.GetType()}' failed to resolve middleware of type '{type}'.");
}

if (resolverResult.IsDisposable && !(resolverResult.Middleware is IDisposable))
{
#if NETSTANDARD2_1_OR_GREATER
if (middleware is IAsyncDisposable)
if (resolverResult.Middleware is IAsyncDisposable)
{
throw new InvalidOperationException($"'{middleware.GetType().FullName}' type only implements IAsyncDisposable." +
throw new InvalidOperationException($"'{resolverResult.Middleware.GetType()}' type only implements IAsyncDisposable." +
" Use AsyncResponsibilityChain to execute the configured pipeline.");
}
#endif

throw new InvalidOperationException($"'{middleware.GetType().FullName}' type does not implement IDisposable.");
throw new InvalidOperationException($"'{resolverResult.Middleware.GetType()}' type does not implement IDisposable.");
}

var middleware = (IMiddleware<TParameter, TReturn>)resolverResult.Middleware;
return middleware.Run(param, func);
}
finally
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace PipelineNet.Middleware
{
/// <summary>
/// Defines the asynchronous chain of responsibility middleware with cancellation token.
/// </summary>
/// <typeparam name="TParameter">The input type for the middleware.</typeparam>
/// <typeparam name="TReturn">The return type of the middleware.</typeparam>
public interface ICancellableAsyncMiddleware<TParameter, TReturn>
{
/// <summary>
/// Runs the middleware.
/// </summary>
/// <param name="parameter">The input parameter.</param>
/// <param name="next">The next middleware in the flow.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The return value.</returns>
Task<TReturn> Run(TParameter parameter, Func<TParameter, Task<TReturn>> next, CancellationToken cancellationToken);
}
}
Loading