Skip to content

Commit

Permalink
Only complete PipeReader/PipeWriter after operations complete
Browse files Browse the repository at this point in the history
We must not allow a race to exist between PipeReader/PipeWriter use and calling Complete() on them. This not only produces exceptions like the one in #350 but it opens up the possibility of accessing recycled buffers.

Fixes #350
  • Loading branch information
AArnott committed Nov 2, 2019
1 parent 06d287b commit e124896
Showing 1 changed file with 90 additions and 6 deletions.
96 changes: 90 additions & 6 deletions src/StreamJsonRpc/MessageHandlerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ public abstract class MessageHandlerBase : IJsonRpcMessageHandler, IDisposableOb
/// </summary>
private readonly AsyncSemaphore sendingSemaphore = new AsyncSemaphore(1);

/// <summary>
/// The sync object.
/// </summary>
private readonly object syncObject = new object();

/// <summary>
/// A value indicating whether the <see cref="ReadAsync(CancellationToken)"/> method is in progress.
/// </summary>
private MessageHandlerState state;

/// <summary>
/// Initializes a new instance of the <see cref="MessageHandlerBase"/> class.
/// </summary>
Expand All @@ -45,6 +55,30 @@ public MessageHandlerBase(IJsonRpcMessageFormatter formatter)
this.Formatter = formatter;
}

[Flags]
private enum MessageHandlerState
{
/// <summary>
/// No flags.
/// </summary>
None = 0,

/// <summary>
/// Indicates that the <see cref="WriteAsync(JsonRpcMessage, CancellationToken)"/> method is in running.
/// </summary>
Writing = 0x1,

/// <summary>
/// Indicates that the <see cref="ReadAsync(CancellationToken)"/> method is in running.
/// </summary>
Reading = 0x4,

/// <summary>
/// Indicates that the <see cref="Dispose(bool)"/> method has been called.
/// </summary>
DisposeVirtualMethodInvoked = 0x10,
}

/// <summary>
/// Gets a value indicating whether this message handler can receive messages.
/// </summary>
Expand Down Expand Up @@ -85,6 +119,7 @@ public async ValueTask<JsonRpcMessage> ReadAsync(CancellationToken cancellationT
Verify.Operation(this.CanRead, "No receiving stream.");
cancellationToken.ThrowIfCancellationRequested();
Verify.NotDisposed(this);
this.SetState(MessageHandlerState.Reading);

try
{
Expand All @@ -103,6 +138,10 @@ public async ValueTask<JsonRpcMessage> ReadAsync(CancellationToken cancellationT
cancellationToken.ThrowIfCancellationRequested();
throw;
}
finally
{
this.DisposeIfAppropriate(MessageHandlerState.Reading);
}
}

/// <summary>
Expand All @@ -122,7 +161,7 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc
Requires.NotNull(content, nameof(content));
Verify.Operation(this.CanWrite, "No sending stream.");
cancellationToken.ThrowIfCancellationRequested();
Verify.NotDisposed(this);
this.SetState(MessageHandlerState.Writing);

try
{
Expand All @@ -139,6 +178,10 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc
cancellationToken.ThrowIfCancellationRequested();
throw;
}
finally
{
this.DisposeIfAppropriate(MessageHandlerState.Writing);
}
}

/// <summary>
Expand All @@ -149,26 +192,32 @@ public void Dispose()
if (!this.disposalTokenSource.IsCancellationRequested)
{
this.disposalTokenSource.Cancel();
this.Dispose(true);
GC.SuppressFinalize(this);
this.DisposeIfAppropriate();
}
}

/// <summary>
/// Disposes resources allocated by this instance.
/// </summary>
/// <param name="disposing"><c>true</c> when being disposed; <c>false</c> when being finalized.</param>
/// <remarks>
/// <para>
/// This method is guaranteed to not be called until any pending <see cref="WriteCoreAsync(JsonRpcMessage, CancellationToken)"/>
/// or <see cref="ReadCoreAsync(CancellationToken)"/> calls have completed.
/// </para>
/// <para>Overrides of this method *should* call the base method as well.</para>
/// </remarks>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
this.sendingSemaphore.Dispose();
(this.Formatter as IDisposable)?.Dispose();

GC.SuppressFinalize(this);
}
}

#pragma warning disable AvoidAsyncSuffix // Avoid Async suffix

/// <summary>
/// Reads a distinct and complete message, waiting for one if necessary.
/// </summary>
Expand Down Expand Up @@ -199,6 +248,41 @@ protected virtual void Dispose(bool disposing)
/// </returns>
protected abstract ValueTask FlushAsync(CancellationToken cancellationToken);

#pragma warning restore AvoidAsyncSuffix // Avoid Async suffix
private void SetState(MessageHandlerState startingOperation)
{
lock (this.syncObject)
{
Verify.NotDisposed(this);
MessageHandlerState state = this.state;
Assumes.False(state.HasFlag(startingOperation));
this.state |= startingOperation;
}
}

private void DisposeIfAppropriate(MessageHandlerState completedOperation = MessageHandlerState.None)
{
lock (this.syncObject)
{
// Revert the flag of our caller to indicate that writing or reading has finished, if applicable.
this.state &= ~completedOperation;

if (!this.DisposalToken.IsCancellationRequested)
{
// Disposal hasn't been requested.
return;
}

// We should dispose ourselves if we have not done so and if reading and writing are not active.
bool shouldDispose = (this.state & (MessageHandlerState.Reading | MessageHandlerState.Writing | MessageHandlerState.DisposeVirtualMethodInvoked)) == MessageHandlerState.None;
if (!shouldDispose)
{
return;
}
}

// We explicitly want to exit the lock we held above before invoking this virtual method.
this.state |= MessageHandlerState.DisposeVirtualMethodInvoked;
this.Dispose(true);
}
}
}

0 comments on commit e124896

Please sign in to comment.