diff --git a/src/StreamJsonRpc.Tests/JsonRpcTests.cs b/src/StreamJsonRpc.Tests/JsonRpcTests.cs index 29e8b3da..21c1d3cb 100644 --- a/src/StreamJsonRpc.Tests/JsonRpcTests.cs +++ b/src/StreamJsonRpc.Tests/JsonRpcTests.cs @@ -1394,12 +1394,12 @@ public async Task AddLocalRpcTarget_AllowedAfterListeningIfOptIn() } [Fact] - public void Completion_BeforeListeningAndAfterDisposal() + public async Task Completion_BeforeListeningAndAfterDisposal() { var rpc = new JsonRpc(Stream.Null, Stream.Null); Task completion = rpc.Completion; rpc.Dispose(); - Assert.True(completion.IsCompleted); + await completion.WithCancellation(this.TimeoutToken); } [Fact] diff --git a/src/StreamJsonRpc.Tests/StreamMessageHandlerTests.cs b/src/StreamJsonRpc.Tests/StreamMessageHandlerTests.cs index cbe9c17b..186bd690 100644 --- a/src/StreamJsonRpc.Tests/StreamMessageHandlerTests.cs +++ b/src/StreamJsonRpc.Tests/StreamMessageHandlerTests.cs @@ -207,7 +207,7 @@ protected override ValueTask ReadCoreAsync(CancellationToken can protected override ValueTask WriteCoreAsync(JsonRpcMessage content, CancellationToken cancellationToken) { Interlocked.Increment(ref this.WriteCoreCallCount); - return new ValueTask(this.WriteBlock.WaitAsync()); + return new ValueTask(this.WriteBlock.WaitAsync(cancellationToken)); } } diff --git a/src/StreamJsonRpc/MessageHandlerBase.cs b/src/StreamJsonRpc/MessageHandlerBase.cs index 86535e79..a35dc089 100644 --- a/src/StreamJsonRpc/MessageHandlerBase.cs +++ b/src/StreamJsonRpc/MessageHandlerBase.cs @@ -41,7 +41,17 @@ public abstract class MessageHandlerBase : IJsonRpcMessageHandler, IDisposableOb private readonly object syncObject = new object(); /// - /// A value indicating whether the method is in progress. + /// A signal that the last read operation has completed. + /// + private readonly AsyncManualResetEvent readingCompleted = new AsyncManualResetEvent(); + + /// + /// A signal that the last write operation has completed. + /// + private readonly AsyncManualResetEvent writingCompleted = new AsyncManualResetEvent(); + + /// + /// A value indicating whether the and/or methods are in progress. /// private MessageHandlerState state; @@ -53,6 +63,10 @@ public MessageHandlerBase(IJsonRpcMessageFormatter formatter) { Requires.NotNull(formatter, nameof(formatter)); this.Formatter = formatter; + + Task readerDisposal = this.readingCompleted.WaitAsync().ContinueWith((_, s) => ((MessageHandlerBase)s).DisposeReader(), this, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default); + Task writerDisposal = this.writingCompleted.WaitAsync().ContinueWith((_, s) => ((MessageHandlerBase)s).DisposeWriter(), this, CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default); + this.Completion = Task.WhenAll(readerDisposal, writerDisposal).ContinueWith((_, s) => ((MessageHandlerBase)s).Dispose(true), this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } [Flags] @@ -71,12 +85,7 @@ private enum MessageHandlerState /// /// Indicates that the method is in running. /// - Reading = 0x4, - - /// - /// Indicates that the method has been called. - /// - DisposeVirtualMethodInvoked = 0x10, + Reading = 0x2, } /// @@ -102,6 +111,11 @@ private enum MessageHandlerState /// protected CancellationToken DisposalToken => this.disposalTokenSource.Token; + /// + /// Gets a task that completes when this instance has completed disposal. + /// + private Task Completion { get; } + /// /// Reads a distinct and complete message from the transport, waiting for one if necessary. /// @@ -140,9 +154,13 @@ public async ValueTask ReadAsync(CancellationToken cancellationT } finally { - if (this.CheckIfDisposalAppropriate(MessageHandlerState.Reading)) + lock (this.syncObject) { - this.Dispose(true); + this.state &= ~MessageHandlerState.Reading; + if (this.DisposalToken.IsCancellationRequested) + { + this.readingCompleted.Set(); + } } } } @@ -165,7 +183,6 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc Verify.Operation(this.CanWrite, "No sending stream."); cancellationToken.ThrowIfCancellationRequested(); - bool shouldDispose = false; try { using (await this.sendingSemaphore.EnterAsync(cancellationToken).ConfigureAwait(false)) @@ -179,7 +196,14 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc } finally { - shouldDispose = this.CheckIfDisposalAppropriate(MessageHandlerState.Writing); + lock (this.syncObject) + { + this.state &= ~MessageHandlerState.Writing; + if (this.DisposalToken.IsCancellationRequested) + { + this.writingCompleted.Set(); + } + } } } } @@ -189,38 +213,22 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc cancellationToken.ThrowIfCancellationRequested(); throw; } - finally - { - if (shouldDispose) - { - this.Dispose(true); - } - } } +#pragma warning disable VSTHRD002 // We synchronously block, but nothing here should ever require the main thread. /// /// Disposes this instance, and cancels any pending read or write operations. /// - public void Dispose() - { - if (!this.disposalTokenSource.IsCancellationRequested) - { - this.disposalTokenSource.Cancel(); - if (this.CheckIfDisposalAppropriate()) - { - this.Dispose(true); - } - } - } + public void Dispose() => this.DisposeAsync().GetAwaiter().GetResult(); +#pragma warning restore VSTHRD002 /// - /// Disposes resources allocated by this instance. + /// Disposes resources allocated by this instance that are common to both reading and writing. /// /// true when being disposed; false when being finalized. /// /// - /// This method is guaranteed to not be called until any pending - /// or calls have completed. + /// This method is called by after both and have completed. /// /// Overrides of this method *should* call the base method as well. /// @@ -228,9 +236,7 @@ protected virtual void Dispose(bool disposing) { if (disposing) { - this.sendingSemaphore.Dispose(); (this.Formatter as IDisposable)?.Dispose(); - GC.SuppressFinalize(this); } } @@ -265,38 +271,67 @@ protected virtual void Dispose(bool disposing) /// protected abstract ValueTask FlushAsync(CancellationToken cancellationToken); - private void SetState(MessageHandlerState startingOperation) + /// + /// Disposes this instance, and cancels any pending read or write operations. + /// + private protected virtual async Task DisposeAsync() { - lock (this.syncObject) + if (!this.disposalTokenSource.IsCancellationRequested) { - Verify.NotDisposed(this); - MessageHandlerState state = this.state; - Assumes.False(state.HasFlag(startingOperation)); - this.state |= startingOperation; + this.disposalTokenSource.Cancel(); + + // Kick off disposal of reading and/or writing resources based on whether they're active right now or not. + // If they're active, they'll take care of themselves when they finish since we signaled disposal. + lock (this.syncObject) + { + if (!this.state.HasFlag(MessageHandlerState.Reading)) + { + this.readingCompleted.Set(); + } + + if (!this.state.HasFlag(MessageHandlerState.Writing)) + { + this.writingCompleted.Set(); + } + } + + // Wait for completion to actually complete, and re-throw any exceptions. + await this.Completion.ConfigureAwait(false); + + this.Dispose(true); } } - private bool CheckIfDisposalAppropriate(MessageHandlerState completedOperation = MessageHandlerState.None) + /// + /// Disposes resources allocated by this instance that are used for reading (not writing). + /// + /// + /// This method is called by after the last read operation has completed. + /// + private protected virtual void DisposeReader() { - lock (this.syncObject) - { - // Revert the flag of our caller to indicate that writing or reading has finished, if applicable. - this.state &= ~completedOperation; - - if (!this.DisposalToken.IsCancellationRequested) - { - // Disposal hasn't been requested. - return false; - } + } - // We should dispose ourselves if we have not done so and if reading and writing are not active. - bool shouldDispose = (this.state & (MessageHandlerState.Reading | MessageHandlerState.Writing | MessageHandlerState.DisposeVirtualMethodInvoked)) == MessageHandlerState.None; - if (shouldDispose) - { - this.state |= MessageHandlerState.DisposeVirtualMethodInvoked; - } + /// + /// Disposes resources allocated by this instance that are used for writing (not reading). + /// + /// + /// This method is called by after the last write operation has completed. + /// Overrides of this method *should* call the base method as well. + /// + private protected virtual void DisposeWriter() + { + this.sendingSemaphore.Dispose(); + } - return shouldDispose; + private void SetState(MessageHandlerState startingOperation) + { + lock (this.syncObject) + { + Verify.NotDisposed(this); + MessageHandlerState state = this.state; + Assumes.False(state.HasFlag(startingOperation)); + this.state |= startingOperation; } } } diff --git a/src/StreamJsonRpc/PipeMessageHandler.cs b/src/StreamJsonRpc/PipeMessageHandler.cs index 8171cba4..a5d61e9e 100644 --- a/src/StreamJsonRpc/PipeMessageHandler.cs +++ b/src/StreamJsonRpc/PipeMessageHandler.cs @@ -37,11 +37,6 @@ public abstract class PipeMessageHandler : MessageHandlerBase /// private static readonly long LargeMessageThreshold = new PipeOptions().PauseWriterThreshold; - /// - /// Objects that we should dispose when we are disposed. May be null. - /// - private List disposables; - /// /// Initializes a new instance of the class. /// @@ -79,15 +74,19 @@ public PipeMessageHandler(Stream writer, Stream reader, IJsonRpcMessageFormatter this.Reader = reader?.UseStrictPipeReader(); this.Writer = writer?.UsePipeWriter(); - this.disposables = new List(); - if (reader != null) + // After we've completed writing, only dispose the underlying write stream when we've flushed everything. + if (writer != null) { - this.disposables.Add(reader); + Assumes.NotNull(this.Writer); + this.Writer.OnReaderCompleted((ex, state) => ((Stream)state).Dispose(), writer); } - if (writer != null && writer != reader) + // NamedPipeClientStream.ReadAsync(byte[], int, int, CancellationToken) ignores the CancellationToken except at the entrypoint. + // To avoid an async hang there or in similar streams upon disposal, we're going to Dispose the read stream directly. + // We only need to do this if the read stream is distinct from the write stream, which is already handled above. + if (reader != null && reader != writer) { - this.disposables.Add(writer); + this.DisposalToken.Register(state => ((Stream)state).Dispose(), reader); } } @@ -136,20 +135,6 @@ protected override void Dispose(bool disposing) this.Reader?.Complete(); this.Writer?.Complete(); - if (this.disposables != null) - { - // Only dispose the underlying streams (if any) *after* our writer's work has been fully read. - // Otherwise we risk cutting of data that we claimed to have transmitted. - if (this.Writer != null && this.disposables != null) - { - this.Writer.OnReaderCompleted((ex, s) => this.DisposeDisposables(), null); - } - else - { - this.DisposeDisposables(); - } - } - base.Dispose(disposing); } } @@ -193,6 +178,22 @@ protected async ValueTask ReadAtLeastAsync(int requiredBytes, bool a return readResult; } + /// + private protected override void DisposeReader() + { + this.Reader?.Complete(); + + base.DisposeReader(); + } + + /// + private protected override void DisposeWriter() + { + this.Writer?.Complete(); + + base.DisposeWriter(); + } + /// /// Deserializes a JSON-RPC message using the . /// @@ -266,18 +267,5 @@ private protected Exception ThrowNoTextEncoder() { throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, Resources.TextEncoderNotApplicable, this.Formatter.GetType().FullName, typeof(IJsonRpcMessageTextFormatter).FullName)); } - - private void DisposeDisposables() - { - if (this.disposables != null) - { - foreach (IDisposable disposable in this.disposables) - { - disposable?.Dispose(); - } - - this.disposables = null; - } - } } }