Skip to content

Commit

Permalink
feat: Dispose Requests on JobPool.Dispose()
Browse files Browse the repository at this point in the history
  • Loading branch information
mycroes committed Mar 12, 2024
1 parent 022c6c4 commit d73a6c3
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 5 deletions.
15 changes: 15 additions & 0 deletions Sally7.Tests/RequestExecutor/JobPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,19 @@ public void ReturnJobId_Does_Not_Throw_If_Disposed()
// Assert
sut.ReturnJobId(jobId);
}

[Fact]
public void Dispose_Calls_Dispose_On_Requests()
{
// Arrange
var sut = new JobPool(1);
var jobId = sut.RentJobIdAsync(CancellationToken.None).Result;
var request = sut.GetRequest(jobId);

// Act
sut.Dispose();

// Assert
Should.Throw<ObjectDisposedException>(() => request.GetResult());
}
}
35 changes: 35 additions & 0 deletions Sally7.Tests/RequestExecutor/RequestTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using FakeItEasy;
using Sally7.RequestExecutor;

namespace Sally7.Tests.RequestExecutor;

public class RequestTests
{
[Fact]
public void Completes_On_Dispose()
{
// Arrange
var sut = new Request();
var callback = A.Fake<Action>();
sut.OnCompleted(callback);

// Act
sut.Dispose();

// Assert
A.CallTo(() => callback.Invoke()).MustHaveHappenedOnceExactly();
}

[Fact]
public async Task Throws_When_Awaited_After_Dispose()
{
// Arrange
var sut = new Request();

// Act
sut.Dispose();

// Assert
await Should.ThrowAsync<ObjectDisposedException>(async () => await sut);
}
}
1 change: 1 addition & 0 deletions Sally7.Tests/Sally7.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="FakeItEasy" Version="8.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.5.0" />
<PackageReference Include="Shouldly" Version="4.2.1" />
<PackageReference Include="xunit" Version="2.4.2" />
Expand Down
13 changes: 12 additions & 1 deletion Sally7/RequestExecutor/JobPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Sally7.Internal;

namespace Sally7.RequestExecutor;

Expand Down Expand Up @@ -32,6 +33,11 @@ public void Dispose()
{
Volatile.Write(ref _disposed, true);
_jobIdPool.Writer.Complete();

foreach (var request in _requests)
{
request.Dispose();
}
}

public ValueTask<int> RentJobIdAsync(CancellationToken cancellationToken) => _jobIdPool.Reader.ReadAsync(cancellationToken);
Expand All @@ -45,7 +51,12 @@ public void ReturnJobId(int jobId)
}

[DebuggerNonUserCode]
public Request GetRequest(int jobId) => _requests[jobId - 1];
public Request GetRequest(int jobId)
{
DisposableHelper.ThrowIf(Volatile.Read(ref _disposed), this);

return _requests[jobId - 1];
}

public void SetBufferForRequest(int jobId, Memory<byte> buffer)
{
Expand Down
24 changes: 20 additions & 4 deletions Sally7/RequestExecutor/Request.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using Sally7.Internal;

namespace Sally7.RequestExecutor;

internal class Request : INotifyCompletion
internal class Request : INotifyCompletion, IDisposable
{
private static readonly Action Sentinel = () => { };

private Memory<byte> _buffer;

private bool _disposed;

public bool IsCompleted { get; private set; }
private int _length;
private Action? _continuation = Sentinel;
Expand All @@ -21,12 +24,13 @@ public void Complete(int length)
this._length = length;
IsCompleted = true;

var prev = _continuation ?? Interlocked.CompareExchange(ref _continuation, Sentinel, null);
prev?.Invoke();
InvokeContinuation();
}

public Memory<byte> GetResult()
{
DisposableHelper.ThrowIf(_disposed, this);

return _buffer.Slice(0, _length);
}

Expand All @@ -44,11 +48,23 @@ public void OnCompleted(Action continuation)
public void Reset()
{
_continuation = null;
IsCompleted = false;
IsCompleted = _disposed;
}

public void SetBuffer(Memory<byte> buffer)
{
this._buffer = buffer;
}

public void Dispose()
{
_disposed = true;
InvokeContinuation();
}

private void InvokeContinuation()
{
var prev = _continuation ?? Interlocked.CompareExchange(ref _continuation, Sentinel, null);
prev?.Invoke();
}
}

0 comments on commit d73a6c3

Please sign in to comment.