Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispose requests on JobPool Dispose #53

Merged
merged 10 commits into from
Mar 13, 2024
15 changes: 15 additions & 0 deletions Sally7.Tests/RequestExecutor/JobPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,19 @@ public void ReturnJobId_Does_Not_Throw_If_Disposed()
// Assert
sut.ReturnJobId(jobId);
}

[Fact]
public void Dispose_Calls_Dispose_On_Requests()
{
// Arrange
var sut = new JobPool(1);
var jobId = sut.RentJobIdAsync(CancellationToken.None).Result;
var request = sut.GetRequest(jobId);

// Act
sut.Dispose();

// Assert
Should.Throw<ObjectDisposedException>(() => request.GetResult());
}
}
35 changes: 35 additions & 0 deletions Sally7.Tests/RequestExecutor/RequestTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using FakeItEasy;
using Sally7.RequestExecutor;

namespace Sally7.Tests.RequestExecutor;

public class RequestTests
{
[Fact]
public void Completes_On_Dispose()
{
// Arrange
var sut = new Request();
var callback = A.Fake<Action>();
sut.OnCompleted(callback);

// Act
sut.Dispose();

// Assert
A.CallTo(() => callback.Invoke()).MustHaveHappenedOnceExactly();
}

[Fact]
public async Task Throws_When_Awaited_After_Dispose()
{
// Arrange
var sut = new Request();

// Act
sut.Dispose();

// Assert
await Should.ThrowAsync<ObjectDisposedException>(async () => await sut);
}
}
19 changes: 19 additions & 0 deletions Sally7.Tests/RequestExecutor/SignalTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Threading.Channels;
using Sally7.RequestExecutor;

namespace Sally7.Tests.RequestExecutor;

public class SignalTests
{
[Fact]
public async Task WaitAsync_Throws_If_Disposed()
{
// Arrange
var sut = new Signal();
sut.Dispose();

// Act
// Assert
await Should.ThrowAsync<ChannelClosedException>(() => sut.WaitAsync(CancellationToken.None).AsTask());
}
}
17 changes: 17 additions & 0 deletions Sally7/Internal/DisposableHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;

namespace Sally7.Internal;

internal static class DisposableHelper
{
public static void ThrowIf(bool condition, object instance)
{
#if NET7_OR_GREATER
ObjectDisposedException.ThrowIf(condition, instance);
#else
void Throw() => throw new ObjectDisposedException(instance.GetType().FullName);

if (condition) Throw();
#endif
}
}
19 changes: 0 additions & 19 deletions Sally7/RequestExecutor/ConcurrentRequestExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
using System.Buffers;
using System.Diagnostics;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Sally7.Internal;

Expand Down Expand Up @@ -187,22 +185,5 @@ public async ValueTask<Memory<byte>> PerformRequest(ReadOnlyMemory<byte> request
_jobPool.ReturnJobId(jobId);
}
}

[DebuggerNonUserCode]
[DebuggerDisplay(nameof(NeedToWait) + ": {" + nameof(NeedToWait) + ",nq}")]
private class Signal : IDisposable
{
private readonly Channel<int> _channel = Channel.CreateBounded<int>(1);

public void Dispose() => _channel.Writer.Complete();

public bool TryInit() => _channel.Writer.TryWrite(0);

public ValueTask<int> WaitAsync(CancellationToken cancellationToken) => _channel.Reader.ReadAsync(cancellationToken);

public bool TryRelease() => _channel.Writer.TryWrite(0);

private bool NeedToWait => _channel.Reader.Count == 0;
}
}
}
13 changes: 12 additions & 1 deletion Sally7/RequestExecutor/JobPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Sally7.Internal;

namespace Sally7.RequestExecutor;

Expand Down Expand Up @@ -32,6 +33,11 @@ public void Dispose()
{
Volatile.Write(ref _disposed, true);
_jobIdPool.Writer.Complete();

foreach (var request in _requests)
{
request.Dispose();
}
}

public ValueTask<int> RentJobIdAsync(CancellationToken cancellationToken) => _jobIdPool.Reader.ReadAsync(cancellationToken);
Expand All @@ -45,7 +51,12 @@ public void ReturnJobId(int jobId)
}

[DebuggerNonUserCode]
public Request GetRequest(int jobId) => _requests[jobId - 1];
public Request GetRequest(int jobId)
{
DisposableHelper.ThrowIf(Volatile.Read(ref _disposed), this);

return _requests[jobId - 1];
}

public void SetBufferForRequest(int jobId, Memory<byte> buffer)
{
Expand Down
24 changes: 20 additions & 4 deletions Sally7/RequestExecutor/Request.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using Sally7.Internal;

namespace Sally7.RequestExecutor;

internal class Request : INotifyCompletion
internal class Request : INotifyCompletion, IDisposable
{
private static readonly Action Sentinel = () => { };

private Memory<byte> _buffer;

private bool _disposed;

public bool IsCompleted { get; private set; }
private int _length;
private Action? _continuation = Sentinel;
Expand All @@ -21,12 +24,13 @@ public void Complete(int length)
this._length = length;
IsCompleted = true;

var prev = _continuation ?? Interlocked.CompareExchange(ref _continuation, Sentinel, null);
prev?.Invoke();
InvokeContinuation();
}

public Memory<byte> GetResult()
{
DisposableHelper.ThrowIf(_disposed, this);

return _buffer.Slice(0, _length);
}

Expand All @@ -44,11 +48,23 @@ public void OnCompleted(Action continuation)
public void Reset()
{
_continuation = null;
IsCompleted = false;
IsCompleted = _disposed;
}

public void SetBuffer(Memory<byte> buffer)
{
this._buffer = buffer;
}

public void Dispose()
{
_disposed = true;
InvokeContinuation();
}

private void InvokeContinuation()
{
var prev = _continuation ?? Interlocked.CompareExchange(ref _continuation, Sentinel, null);
prev?.Invoke();
}
}
26 changes: 26 additions & 0 deletions Sally7/RequestExecutor/Signal.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace Sally7.RequestExecutor
{
[DebuggerNonUserCode]
[DebuggerDisplay(nameof(NeedToWait) + ": {" + nameof(NeedToWait) + ",nq}")]
internal class Signal : IDisposable
{
private readonly Channel<int> channel = Channel.CreateBounded<int>(1);

public void Dispose() => channel.Writer.Complete();

public bool TryInit() => channel.Writer.TryWrite(0);

public ValueTask<int> WaitAsync(CancellationToken cancellationToken) =>
channel.Reader.ReadAsync(cancellationToken);

public bool TryRelease() => channel.Writer.TryWrite(0);

private bool NeedToWait => channel.Reader.Count == 0;
}
}