Skip to content

Commit

Permalink
Merge pull request #359 from AArnott/fix350
Browse files Browse the repository at this point in the history
Provide deterministic way to await for async disposal
  • Loading branch information
AArnott authored Nov 6, 2019
2 parents 45a4172 + b4bed10 commit 2de64c4
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 114 deletions.
4 changes: 2 additions & 2 deletions src/StreamJsonRpc.Tests/JsonRpcTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1469,12 +1469,12 @@ public async Task AddLocalRpcTarget_AllowedAfterListeningIfOptIn()
}

[Fact]
public void Completion_BeforeListeningAndAfterDisposal()
public async Task Completion_BeforeListeningAndAfterDisposal()
{
var rpc = new JsonRpc(Stream.Null, Stream.Null);
Task completion = rpc.Completion;
rpc.Dispose();
Assert.True(completion.IsCompleted);
await completion.WithCancellation(this.TimeoutToken);
}

[Fact]
Expand Down
30 changes: 15 additions & 15 deletions src/StreamJsonRpc.Tests/StreamMessageHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,30 @@ public async Task Ctor_AcceptsNullReceivingStream()
}

[Fact]
public void IsDisposed()
public async Task IsDisposed()
{
IDisposableObservable observable = this.handler;
Assert.False(observable.IsDisposed);
this.handler.Dispose();
await this.handler.DisposeAsync();
Assert.True(observable.IsDisposed);
}

[Fact]
public void Dispose_StreamsAreDisposed()
public async Task Dispose_StreamsAreDisposed()
{
var streams = FullDuplexStream.CreateStreams();
var handler = new MyStreamMessageHandler(streams.Item1, streams.Item2, new JsonMessageFormatter());
Assert.False(streams.Item1.IsDisposed);
Assert.False(streams.Item2.IsDisposed);
handler.Dispose();
await handler.DisposeAsync();
Assert.True(streams.Item1.IsDisposed);
Assert.True(streams.Item2.IsDisposed);
}

[Fact]
public void WriteAsync_ThrowsObjectDisposedException()
public async Task WriteAsync_ThrowsObjectDisposedException()
{
this.handler.Dispose();
await this.handler.DisposeAsync();
ValueTask result = this.handler.WriteAsync(CreateNotifyMessage(), this.TimeoutToken);
Assert.Throws<ObjectDisposedException>(() => result.GetAwaiter().GetResult());
}
Expand All @@ -99,9 +99,9 @@ public void WriteAsync_ThrowsObjectDisposedException()
/// when we first invoke the method, the <see cref="OperationCanceledException"/> is thrown.
/// </summary>
[Fact]
public void WriteAsync_PreferOperationCanceledException_AtEntry()
public async Task WriteAsync_PreferOperationCanceledException_AtEntry()
{
this.handler.Dispose();
await this.handler.DisposeAsync();
Assert.Throws<OperationCanceledException>(() => this.handler.WriteAsync(CreateNotifyMessage(), PrecanceledToken).GetAwaiter().GetResult());
}

Expand All @@ -119,7 +119,7 @@ public async Task WriteAsync_PreferOperationCanceledException_MidExecution()
var writeTask = handler.WriteAsync(CreateRequestMessage(), cts.Token);

cts.Cancel();
handler.Dispose();
await handler.DisposeAsync();

// Unblock writer. It should not throw anything as it is to emulate not recognizing the
// CancellationToken before completing its work.
Expand Down Expand Up @@ -148,9 +148,9 @@ public async Task WriteAsync_SemaphoreIncludesWriteCoreAsync_Task()
}

[Fact]
public void ReadAsync_ThrowsObjectDisposedException()
public async Task ReadAsync_ThrowsObjectDisposedException()
{
this.handler.Dispose();
await this.handler.DisposeAsync();
ValueTask<JsonRpcMessage?> result = this.handler.ReadAsync(this.TimeoutToken);
Assert.Throws<ObjectDisposedException>(() => result.GetAwaiter().GetResult());
Assert.Throws<OperationCanceledException>(() => this.handler.ReadAsync(PrecanceledToken).GetAwaiter().GetResult());
Expand All @@ -161,9 +161,9 @@ public void ReadAsync_ThrowsObjectDisposedException()
/// when we first invoke the method, the <see cref="OperationCanceledException"/> is thrown.
/// </summary>
[Fact]
public void ReadAsync_PreferOperationCanceledException_AtEntry()
public async Task ReadAsync_PreferOperationCanceledException_AtEntry()
{
this.handler.Dispose();
await this.handler.DisposeAsync();
Assert.Throws<OperationCanceledException>(() => this.handler.ReadAsync(PrecanceledToken).GetAwaiter().GetResult());
}

Expand All @@ -179,7 +179,7 @@ public async Task ReadAsync_PreferOperationCanceledException_MidExecution()
var readTask = this.handler.ReadAsync(cts.Token).AsTask();

cts.Cancel();
this.handler.Dispose();
await this.handler.DisposeAsync();

await Assert.ThrowsAnyAsync<OperationCanceledException>(() => readTask);
}
Expand Down Expand Up @@ -207,7 +207,7 @@ public DelayedWriter(Stream sendingStream, Stream receivingStream, IJsonRpcMessa
protected override ValueTask WriteCoreAsync(JsonRpcMessage content, CancellationToken cancellationToken)
{
Interlocked.Increment(ref this.WriteCoreCallCount);
return new ValueTask(this.WriteBlock.WaitAsync());
return new ValueTask(this.WriteBlock.WaitAsync(cancellationToken));
}
}

Expand Down
13 changes: 11 additions & 2 deletions src/StreamJsonRpc.Tests/WebSocketMessageHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,26 @@ public void WebSocket_Property()
}

[Fact]
public void Dispose_DoesNotDisposeSocket()
public async Task Dispose_DoesNotDisposeSocket()
{
this.handler.Dispose();
await this.handler.DisposeAsync();
Assert.Equal(0, this.socket.DisposalCount);
}

[Fact]
public void Dispose_TwiceDoesNotThrow()
{
#pragma warning disable CS0618 // Type or member is obsolete
this.handler.Dispose();
this.handler.Dispose();
#pragma warning restore CS0618 // Type or member is obsolete
}

[Fact]
public async Task DisposeAsync_TwiceDoesNotThrow()
{
await this.handler.DisposeAsync();
await this.handler.DisposeAsync();
}

[Fact]
Expand Down
33 changes: 26 additions & 7 deletions src/StreamJsonRpc/JsonRpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1960,18 +1960,37 @@ private void OnJsonRpcDisconnected(JsonRpcDisconnectedEventArgs eventArgs)
// Dispose the stream and fault pending requests in the finally block
// So this is executed even if Disconnected event handler throws.
this.disconnectedSource.Cancel();
(this.MessageHandler as IDisposable)?.Dispose();
this.FaultPendingRequests();

// Ensure the Task we may have returned from Completion is completed.
if (eventArgs.Exception != null)
Task messageHandlerDisposal = Task.CompletedTask;
if (this.MessageHandler is IAsyncDisposable asyncDisposableMessageHandler)
{
this.completionSource.TrySetException(eventArgs.Exception);
messageHandlerDisposal = asyncDisposableMessageHandler.DisposeAsync();
}
else
else if (this.MessageHandler is IDisposable disposableMessageHandler)
{
this.completionSource.TrySetResult(true);
disposableMessageHandler.Dispose();
}

this.FaultPendingRequests();

// Ensure the Task we may have returned from Completion is completed,
// but not before any asynchronous disposal of our message handler completes.
messageHandlerDisposal.ContinueWith(
handlerDisposal =>
{
Exception fault = eventArgs.Exception ?? handlerDisposal.Exception;
if (fault != null)
{
this.completionSource.TrySetException(fault);
}
else
{
this.completionSource.TrySetResult(true);
}
},
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default).Forget();
}
}

Expand Down
Loading

0 comments on commit 2de64c4

Please sign in to comment.