Skip to content

Commit

Permalink
Provide deterministic way to await for async disposal
Browse files Browse the repository at this point in the history
This fixes up a regression in the last commit that came from the fact that on .NET Framework, `NamedPipeClientStream.ReadAsync(byte[], int, int, CancellationToken)` does not honor the `CancellationToken` while waiting for bytes to come in. As a result, pipes were never closed by the previous commit.

This ports PR microsoft#359 to fix microsoft#350 for 2.2 consumption
It includes cherrypicked and adjusted commits from:
b464e57
b4bed10
  • Loading branch information
AArnott committed Nov 7, 2019
1 parent 892cda3 commit 922d3f7
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 97 deletions.
4 changes: 2 additions & 2 deletions src/StreamJsonRpc.Tests/JsonRpcTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1394,12 +1394,12 @@ public async Task AddLocalRpcTarget_AllowedAfterListeningIfOptIn()
}

[Fact]
public void Completion_BeforeListeningAndAfterDisposal()
public async Task Completion_BeforeListeningAndAfterDisposal()
{
var rpc = new JsonRpc(Stream.Null, Stream.Null);
Task completion = rpc.Completion;
rpc.Dispose();
Assert.True(completion.IsCompleted);
await completion.WithCancellation(this.TimeoutToken);
}

[Fact]
Expand Down
2 changes: 1 addition & 1 deletion src/StreamJsonRpc.Tests/StreamMessageHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ protected override ValueTask<JsonRpcMessage> ReadCoreAsync(CancellationToken can
protected override ValueTask WriteCoreAsync(JsonRpcMessage content, CancellationToken cancellationToken)
{
Interlocked.Increment(ref this.WriteCoreCallCount);
return new ValueTask(this.WriteBlock.WaitAsync());
return new ValueTask(this.WriteBlock.WaitAsync(cancellationToken));
}
}

Expand Down
149 changes: 92 additions & 57 deletions src/StreamJsonRpc/MessageHandlerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ public abstract class MessageHandlerBase : IJsonRpcMessageHandler, IDisposableOb
/// </summary>
private readonly object syncObject = new object();

/// <summary>
/// A signal that the last read operation has completed.
/// </summary>
private readonly AsyncManualResetEvent readingCompleted = new AsyncManualResetEvent();

/// <summary>
/// A signal that the last write operation has completed.
/// </summary>
private readonly AsyncManualResetEvent writingCompleted = new AsyncManualResetEvent();

/// <summary>
/// A value indicating whether the <see cref="ReadAsync(CancellationToken)"/> method is in progress.
/// </summary>
Expand All @@ -53,6 +63,10 @@ public MessageHandlerBase(IJsonRpcMessageFormatter formatter)
{
Requires.NotNull(formatter, nameof(formatter));
this.Formatter = formatter;

Task readerDisposal = this.readingCompleted.WaitAsync().ContinueWith((_, s) => ((MessageHandlerBase)s).DisposeReader(), this, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default);
Task writerDisposal = this.writingCompleted.WaitAsync().ContinueWith((_, s) => ((MessageHandlerBase)s).DisposeWriter(), this, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default);
this.Completion = Task.WhenAll(readerDisposal, writerDisposal).ContinueWith((_, s) => ((MessageHandlerBase)s).Dispose(true), this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}

[Flags]
Expand All @@ -71,12 +85,7 @@ private enum MessageHandlerState
/// <summary>
/// Indicates that the <see cref="ReadAsync(CancellationToken)"/> method is in running.
/// </summary>
Reading = 0x4,

/// <summary>
/// Indicates that the <see cref="Dispose(bool)"/> method has been called.
/// </summary>
DisposeVirtualMethodInvoked = 0x10,
Reading = 0x2,
}

/// <summary>
Expand All @@ -102,6 +111,11 @@ private enum MessageHandlerState
/// </summary>
protected CancellationToken DisposalToken => this.disposalTokenSource.Token;

/// <summary>
/// Gets a task that completes when this instance has completed disposal.
/// </summary>
private Task Completion { get; }

/// <summary>
/// Reads a distinct and complete message from the transport, waiting for one if necessary.
/// </summary>
Expand Down Expand Up @@ -140,9 +154,13 @@ public async ValueTask<JsonRpcMessage> ReadAsync(CancellationToken cancellationT
}
finally
{
if (this.CheckIfDisposalAppropriate(MessageHandlerState.Reading))
lock (this.syncObject)
{
this.Dispose(true);
this.state &= ~MessageHandlerState.Reading;
if (this.DisposalToken.IsCancellationRequested)
{
this.readingCompleted.Set();
}
}
}
}
Expand All @@ -165,7 +183,6 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc
Verify.Operation(this.CanWrite, "No sending stream.");
cancellationToken.ThrowIfCancellationRequested();

bool shouldDispose = false;
try
{
using (await this.sendingSemaphore.EnterAsync(cancellationToken).ConfigureAwait(false))
Expand All @@ -179,7 +196,14 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc
}
finally
{
shouldDispose = this.CheckIfDisposalAppropriate(MessageHandlerState.Writing);
lock (this.syncObject)
{
this.state &= ~MessageHandlerState.Writing;
if (this.DisposalToken.IsCancellationRequested)
{
this.writingCompleted.Set();
}
}
}
}
}
Expand All @@ -189,48 +213,30 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc
cancellationToken.ThrowIfCancellationRequested();
throw;
}
finally
{
if (shouldDispose)
{
this.Dispose(true);
}
}
}

#pragma warning disable VSTHRD002 // We synchronously block, but nothing here should ever require the main thread.
/// <summary>
/// Disposes this instance, and cancels any pending read or write operations.
/// </summary>
public void Dispose()
{
if (!this.disposalTokenSource.IsCancellationRequested)
{
this.disposalTokenSource.Cancel();
if (this.CheckIfDisposalAppropriate())
{
this.Dispose(true);
}
}
}
public void Dispose() => this.DisposeAsync().GetAwaiter().GetResult();
#pragma warning restore VSTHRD002

/// <summary>
/// Disposes resources allocated by this instance.
/// Disposes resources allocated by this instance that are common to both reading and writing.
/// </summary>
/// <param name="disposing"><c>true</c> when being disposed; <c>false</c> when being finalized.</param>
/// <remarks>
/// <para>
/// This method is guaranteed to not be called until any pending <see cref="WriteCoreAsync(JsonRpcMessage, CancellationToken)"/>
/// or <see cref="ReadCoreAsync(CancellationToken)"/> calls have completed.
/// This method is called by <see cref="DisposeAsync"/> after both <see cref="DisposeReader"/> and <see cref="DisposeWriter"/> have completed.
/// </para>
/// <para>Overrides of this method *should* call the base method as well.</para>
/// </remarks>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
this.sendingSemaphore.Dispose();
(this.Formatter as IDisposable)?.Dispose();

GC.SuppressFinalize(this);
}
}
Expand Down Expand Up @@ -265,38 +271,67 @@ protected virtual void Dispose(bool disposing)
/// </returns>
protected abstract ValueTask FlushAsync(CancellationToken cancellationToken);

private void SetState(MessageHandlerState startingOperation)
/// <summary>
/// Disposes this instance, and cancels any pending read or write operations.
/// </summary>
private protected virtual async Task DisposeAsync()
{
lock (this.syncObject)
if (!this.disposalTokenSource.IsCancellationRequested)
{
Verify.NotDisposed(this);
MessageHandlerState state = this.state;
Assumes.False(state.HasFlag(startingOperation));
this.state |= startingOperation;
this.disposalTokenSource.Cancel();

// Kick off disposal of reading and/or writing resources based on whether they're active right now or not.
// If they're active, they'll take care of themselves when they finish since we signaled disposal.
lock (this.syncObject)
{
if (!this.state.HasFlag(MessageHandlerState.Reading))
{
this.readingCompleted.Set();
}

if (!this.state.HasFlag(MessageHandlerState.Writing))
{
this.writingCompleted.Set();
}
}

// Wait for completion to actually complete, and re-throw any exceptions.
await this.Completion.ConfigureAwait(false);

this.Dispose(true);
}
}

private bool CheckIfDisposalAppropriate(MessageHandlerState completedOperation = MessageHandlerState.None)
/// <summary>
/// Disposes resources allocated by this instance that are used for reading (not writing).
/// </summary>
/// <remarks>
/// This method is called by <see cref="MessageHandlerBase"/> after the last read operation has completed.
/// </remarks>
private protected virtual void DisposeReader()
{
lock (this.syncObject)
{
// Revert the flag of our caller to indicate that writing or reading has finished, if applicable.
this.state &= ~completedOperation;

if (!this.DisposalToken.IsCancellationRequested)
{
// Disposal hasn't been requested.
return false;
}
}

// We should dispose ourselves if we have not done so and if reading and writing are not active.
bool shouldDispose = (this.state & (MessageHandlerState.Reading | MessageHandlerState.Writing | MessageHandlerState.DisposeVirtualMethodInvoked)) == MessageHandlerState.None;
if (shouldDispose)
{
this.state |= MessageHandlerState.DisposeVirtualMethodInvoked;
}
/// <summary>
/// Disposes resources allocated by this instance that are used for writing (not reading).
/// </summary>
/// <remarks>
/// This method is called by <see cref="MessageHandlerBase"/> after the last write operation has completed.
/// Overrides of this method *should* call the base method as well.
/// </remarks>
private protected virtual void DisposeWriter()
{
this.sendingSemaphore.Dispose();
}

return shouldDispose;
private void SetState(MessageHandlerState startingOperation)
{
lock (this.syncObject)
{
Verify.NotDisposed(this);
MessageHandlerState state = this.state;
Assumes.False(state.HasFlag(startingOperation));
this.state |= startingOperation;
}
}
}
Expand Down
62 changes: 25 additions & 37 deletions src/StreamJsonRpc/PipeMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ public abstract class PipeMessageHandler : MessageHandlerBase
/// </remarks>
private static readonly long LargeMessageThreshold = new PipeOptions().PauseWriterThreshold;

/// <summary>
/// Objects that we should dispose when we are disposed. May be null.
/// </summary>
private List<IDisposable> disposables;

/// <summary>
/// Initializes a new instance of the <see cref="PipeMessageHandler"/> class.
/// </summary>
Expand Down Expand Up @@ -79,15 +74,19 @@ public PipeMessageHandler(Stream writer, Stream reader, IJsonRpcMessageFormatter
this.Reader = reader?.UseStrictPipeReader();
this.Writer = writer?.UsePipeWriter();

this.disposables = new List<IDisposable>();
if (reader != null)
// After we've completed writing, only dispose the underlying write stream when we've flushed everything.
if (writer != null)
{
this.disposables.Add(reader);
Assumes.NotNull(this.Writer);
this.Writer.OnReaderCompleted((ex, state) => ((Stream)state).Dispose(), writer);
}

if (writer != null && writer != reader)
// NamedPipeClientStream.ReadAsync(byte[], int, int, CancellationToken) ignores the CancellationToken except at the entrypoint.
// To avoid an async hang there or in similar streams upon disposal, we're going to Dispose the read stream directly.
// We only need to do this if the read stream is distinct from the write stream, which is already handled above.
if (reader != null && reader != writer)
{
this.disposables.Add(writer);
this.DisposalToken.Register(state => ((Stream)state).Dispose(), reader);
}
}

Expand Down Expand Up @@ -136,20 +135,6 @@ protected override void Dispose(bool disposing)
this.Reader?.Complete();
this.Writer?.Complete();

if (this.disposables != null)
{
// Only dispose the underlying streams (if any) *after* our writer's work has been fully read.
// Otherwise we risk cutting of data that we claimed to have transmitted.
if (this.Writer != null && this.disposables != null)
{
this.Writer.OnReaderCompleted((ex, s) => this.DisposeDisposables(), null);
}
else
{
this.DisposeDisposables();
}
}

base.Dispose(disposing);
}
}
Expand Down Expand Up @@ -193,6 +178,22 @@ protected async ValueTask<ReadResult> ReadAtLeastAsync(int requiredBytes, bool a
return readResult;
}

/// <inheritdoc />
private protected override void DisposeReader()
{
this.Reader?.Complete();

base.DisposeReader();
}

/// <inheritdoc />
private protected override void DisposeWriter()
{
this.Writer?.Complete();

base.DisposeWriter();
}

/// <summary>
/// Deserializes a JSON-RPC message using the <see cref="MessageHandlerBase.Formatter"/>.
/// </summary>
Expand Down Expand Up @@ -266,18 +267,5 @@ private protected Exception ThrowNoTextEncoder()
{
throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, Resources.TextEncoderNotApplicable, this.Formatter.GetType().FullName, typeof(IJsonRpcMessageTextFormatter).FullName));
}

private void DisposeDisposables()
{
if (this.disposables != null)
{
foreach (IDisposable disposable in this.disposables)
{
disposable?.Dispose();
}

this.disposables = null;
}
}
}
}

0 comments on commit 922d3f7

Please sign in to comment.