diff --git a/src/StreamJsonRpc/MessageHandlerBase.cs b/src/StreamJsonRpc/MessageHandlerBase.cs
index 8a24811c..86535e79 100644
--- a/src/StreamJsonRpc/MessageHandlerBase.cs
+++ b/src/StreamJsonRpc/MessageHandlerBase.cs
@@ -35,6 +35,16 @@ public abstract class MessageHandlerBase : IJsonRpcMessageHandler, IDisposableOb
///
private readonly AsyncSemaphore sendingSemaphore = new AsyncSemaphore(1);
+ ///
+ /// The sync object to lock when inspecting and mutating the field.
+ ///
+ private readonly object syncObject = new object();
+
+ ///
+ /// A value indicating whether the method is in progress.
+ ///
+ private MessageHandlerState state;
+
///
/// Initializes a new instance of the class.
///
@@ -45,6 +55,30 @@ public MessageHandlerBase(IJsonRpcMessageFormatter formatter)
this.Formatter = formatter;
}
+ [Flags]
+ private enum MessageHandlerState
+ {
+ ///
+ /// No flags.
+ ///
+ None = 0,
+
+ ///
+ /// Indicates that the method is in running.
+ ///
+ Writing = 0x1,
+
+ ///
+ /// Indicates that the method is in running.
+ ///
+ Reading = 0x4,
+
+ ///
+ /// Indicates that the method has been called.
+ ///
+ DisposeVirtualMethodInvoked = 0x10,
+ }
+
///
/// Gets a value indicating whether this message handler can receive messages.
///
@@ -85,6 +119,7 @@ public async ValueTask ReadAsync(CancellationToken cancellationT
Verify.Operation(this.CanRead, "No receiving stream.");
cancellationToken.ThrowIfCancellationRequested();
Verify.NotDisposed(this);
+ this.SetState(MessageHandlerState.Reading);
try
{
@@ -103,6 +138,13 @@ public async ValueTask ReadAsync(CancellationToken cancellationT
cancellationToken.ThrowIfCancellationRequested();
throw;
}
+ finally
+ {
+ if (this.CheckIfDisposalAppropriate(MessageHandlerState.Reading))
+ {
+ this.Dispose(true);
+ }
+ }
}
///
@@ -122,15 +164,23 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc
Requires.NotNull(content, nameof(content));
Verify.Operation(this.CanWrite, "No sending stream.");
cancellationToken.ThrowIfCancellationRequested();
- Verify.NotDisposed(this);
+ bool shouldDispose = false;
try
{
using (await this.sendingSemaphore.EnterAsync(cancellationToken).ConfigureAwait(false))
{
- cancellationToken.ThrowIfCancellationRequested();
- await this.WriteCoreAsync(content, cancellationToken).ConfigureAwait(false);
- await this.FlushAsync(cancellationToken).ConfigureAwait(false);
+ this.SetState(MessageHandlerState.Writing);
+ try
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ await this.WriteCoreAsync(content, cancellationToken).ConfigureAwait(false);
+ await this.FlushAsync(cancellationToken).ConfigureAwait(false);
+ }
+ finally
+ {
+ shouldDispose = this.CheckIfDisposalAppropriate(MessageHandlerState.Writing);
+ }
}
}
catch (ObjectDisposedException)
@@ -139,6 +189,13 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc
cancellationToken.ThrowIfCancellationRequested();
throw;
}
+ finally
+ {
+ if (shouldDispose)
+ {
+ this.Dispose(true);
+ }
+ }
}
///
@@ -149,8 +206,10 @@ public void Dispose()
if (!this.disposalTokenSource.IsCancellationRequested)
{
this.disposalTokenSource.Cancel();
- this.Dispose(true);
- GC.SuppressFinalize(this);
+ if (this.CheckIfDisposalAppropriate())
+ {
+ this.Dispose(true);
+ }
}
}
@@ -158,17 +217,24 @@ public void Dispose()
/// Disposes resources allocated by this instance.
///
/// true when being disposed; false when being finalized.
+ ///
+ ///
+ /// This method is guaranteed to not be called until any pending
+ /// or calls have completed.
+ ///
+ /// Overrides of this method *should* call the base method as well.
+ ///
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
this.sendingSemaphore.Dispose();
(this.Formatter as IDisposable)?.Dispose();
+
+ GC.SuppressFinalize(this);
}
}
-#pragma warning disable AvoidAsyncSuffix // Avoid Async suffix
-
///
/// Reads a distinct and complete message, waiting for one if necessary.
///
@@ -199,6 +265,39 @@ protected virtual void Dispose(bool disposing)
///
protected abstract ValueTask FlushAsync(CancellationToken cancellationToken);
-#pragma warning restore AvoidAsyncSuffix // Avoid Async suffix
+ private void SetState(MessageHandlerState startingOperation)
+ {
+ lock (this.syncObject)
+ {
+ Verify.NotDisposed(this);
+ MessageHandlerState state = this.state;
+ Assumes.False(state.HasFlag(startingOperation));
+ this.state |= startingOperation;
+ }
+ }
+
+ private bool CheckIfDisposalAppropriate(MessageHandlerState completedOperation = MessageHandlerState.None)
+ {
+ lock (this.syncObject)
+ {
+ // Revert the flag of our caller to indicate that writing or reading has finished, if applicable.
+ this.state &= ~completedOperation;
+
+ if (!this.DisposalToken.IsCancellationRequested)
+ {
+ // Disposal hasn't been requested.
+ return false;
+ }
+
+ // We should dispose ourselves if we have not done so and if reading and writing are not active.
+ bool shouldDispose = (this.state & (MessageHandlerState.Reading | MessageHandlerState.Writing | MessageHandlerState.DisposeVirtualMethodInvoked)) == MessageHandlerState.None;
+ if (shouldDispose)
+ {
+ this.state |= MessageHandlerState.DisposeVirtualMethodInvoked;
+ }
+
+ return shouldDispose;
+ }
+ }
}
}