Skip to content

Commit

Permalink
Merge pull request #352 from AArnott/fix350
Browse files Browse the repository at this point in the history
Only complete PipeReader/PipeWriter after operations complete
  • Loading branch information
AArnott authored Nov 4, 2019
2 parents 2b9873c + 892cda3 commit 635efdd
Showing 1 changed file with 108 additions and 9 deletions.
117 changes: 108 additions & 9 deletions src/StreamJsonRpc/MessageHandlerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ public abstract class MessageHandlerBase : IJsonRpcMessageHandler, IDisposableOb
/// </summary>
private readonly AsyncSemaphore sendingSemaphore = new AsyncSemaphore(1);

/// <summary>
/// The sync object to lock when inspecting and mutating the <see cref="state"/> field.
/// </summary>
private readonly object syncObject = new object();

/// <summary>
/// A value indicating whether the <see cref="ReadAsync(CancellationToken)"/> method is in progress.
/// </summary>
private MessageHandlerState state;

/// <summary>
/// Initializes a new instance of the <see cref="MessageHandlerBase"/> class.
/// </summary>
Expand All @@ -45,6 +55,30 @@ public MessageHandlerBase(IJsonRpcMessageFormatter formatter)
this.Formatter = formatter;
}

[Flags]
private enum MessageHandlerState
{
/// <summary>
/// No flags.
/// </summary>
None = 0,

/// <summary>
/// Indicates that the <see cref="WriteAsync(JsonRpcMessage, CancellationToken)"/> method is in running.
/// </summary>
Writing = 0x1,

/// <summary>
/// Indicates that the <see cref="ReadAsync(CancellationToken)"/> method is in running.
/// </summary>
Reading = 0x4,

/// <summary>
/// Indicates that the <see cref="Dispose(bool)"/> method has been called.
/// </summary>
DisposeVirtualMethodInvoked = 0x10,
}

/// <summary>
/// Gets a value indicating whether this message handler can receive messages.
/// </summary>
Expand Down Expand Up @@ -85,6 +119,7 @@ public async ValueTask<JsonRpcMessage> ReadAsync(CancellationToken cancellationT
Verify.Operation(this.CanRead, "No receiving stream.");
cancellationToken.ThrowIfCancellationRequested();
Verify.NotDisposed(this);
this.SetState(MessageHandlerState.Reading);

try
{
Expand All @@ -103,6 +138,13 @@ public async ValueTask<JsonRpcMessage> ReadAsync(CancellationToken cancellationT
cancellationToken.ThrowIfCancellationRequested();
throw;
}
finally
{
if (this.CheckIfDisposalAppropriate(MessageHandlerState.Reading))
{
this.Dispose(true);
}
}
}

/// <summary>
Expand All @@ -122,15 +164,23 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc
Requires.NotNull(content, nameof(content));
Verify.Operation(this.CanWrite, "No sending stream.");
cancellationToken.ThrowIfCancellationRequested();
Verify.NotDisposed(this);

bool shouldDispose = false;
try
{
using (await this.sendingSemaphore.EnterAsync(cancellationToken).ConfigureAwait(false))
{
cancellationToken.ThrowIfCancellationRequested();
await this.WriteCoreAsync(content, cancellationToken).ConfigureAwait(false);
await this.FlushAsync(cancellationToken).ConfigureAwait(false);
this.SetState(MessageHandlerState.Writing);
try
{
cancellationToken.ThrowIfCancellationRequested();
await this.WriteCoreAsync(content, cancellationToken).ConfigureAwait(false);
await this.FlushAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
shouldDispose = this.CheckIfDisposalAppropriate(MessageHandlerState.Writing);
}
}
}
catch (ObjectDisposedException)
Expand All @@ -139,6 +189,13 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc
cancellationToken.ThrowIfCancellationRequested();
throw;
}
finally
{
if (shouldDispose)
{
this.Dispose(true);
}
}
}

/// <summary>
Expand All @@ -149,26 +206,35 @@ public void Dispose()
if (!this.disposalTokenSource.IsCancellationRequested)
{
this.disposalTokenSource.Cancel();
this.Dispose(true);
GC.SuppressFinalize(this);
if (this.CheckIfDisposalAppropriate())
{
this.Dispose(true);
}
}
}

/// <summary>
/// Disposes resources allocated by this instance.
/// </summary>
/// <param name="disposing"><c>true</c> when being disposed; <c>false</c> when being finalized.</param>
/// <remarks>
/// <para>
/// This method is guaranteed to not be called until any pending <see cref="WriteCoreAsync(JsonRpcMessage, CancellationToken)"/>
/// or <see cref="ReadCoreAsync(CancellationToken)"/> calls have completed.
/// </para>
/// <para>Overrides of this method *should* call the base method as well.</para>
/// </remarks>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
this.sendingSemaphore.Dispose();
(this.Formatter as IDisposable)?.Dispose();

GC.SuppressFinalize(this);
}
}

#pragma warning disable AvoidAsyncSuffix // Avoid Async suffix

/// <summary>
/// Reads a distinct and complete message, waiting for one if necessary.
/// </summary>
Expand Down Expand Up @@ -199,6 +265,39 @@ protected virtual void Dispose(bool disposing)
/// </returns>
protected abstract ValueTask FlushAsync(CancellationToken cancellationToken);

#pragma warning restore AvoidAsyncSuffix // Avoid Async suffix
private void SetState(MessageHandlerState startingOperation)
{
lock (this.syncObject)
{
Verify.NotDisposed(this);
MessageHandlerState state = this.state;
Assumes.False(state.HasFlag(startingOperation));
this.state |= startingOperation;
}
}

private bool CheckIfDisposalAppropriate(MessageHandlerState completedOperation = MessageHandlerState.None)
{
lock (this.syncObject)
{
// Revert the flag of our caller to indicate that writing or reading has finished, if applicable.
this.state &= ~completedOperation;

if (!this.DisposalToken.IsCancellationRequested)
{
// Disposal hasn't been requested.
return false;
}

// We should dispose ourselves if we have not done so and if reading and writing are not active.
bool shouldDispose = (this.state & (MessageHandlerState.Reading | MessageHandlerState.Writing | MessageHandlerState.DisposeVirtualMethodInvoked)) == MessageHandlerState.None;
if (shouldDispose)
{
this.state |= MessageHandlerState.DisposeVirtualMethodInvoked;
}

return shouldDispose;
}
}
}
}

0 comments on commit 635efdd

Please sign in to comment.