Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only complete PipeReader/PipeWriter after operations complete #352

Merged
merged 1 commit into from
Nov 4, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 108 additions & 9 deletions src/StreamJsonRpc/MessageHandlerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ public abstract class MessageHandlerBase : IJsonRpcMessageHandler, IDisposableOb
/// </summary>
private readonly AsyncSemaphore sendingSemaphore = new AsyncSemaphore(1);

/// <summary>
/// The sync object to lock when inspecting and mutating the <see cref="state"/> field.
/// </summary>
private readonly object syncObject = new object();

/// <summary>
/// A value indicating whether the <see cref="ReadAsync(CancellationToken)"/> method is in progress.
/// </summary>
private MessageHandlerState state;

/// <summary>
/// Initializes a new instance of the <see cref="MessageHandlerBase"/> class.
/// </summary>
Expand All @@ -45,6 +55,30 @@ public MessageHandlerBase(IJsonRpcMessageFormatter formatter)
this.Formatter = formatter;
}

[Flags]
private enum MessageHandlerState
{
/// <summary>
/// No flags.
/// </summary>
None = 0,

/// <summary>
/// Indicates that the <see cref="WriteAsync(JsonRpcMessage, CancellationToken)"/> method is in running.
/// </summary>
Writing = 0x1,

/// <summary>
/// Indicates that the <see cref="ReadAsync(CancellationToken)"/> method is in running.
/// </summary>
Reading = 0x4,

/// <summary>
/// Indicates that the <see cref="Dispose(bool)"/> method has been called.
/// </summary>
DisposeVirtualMethodInvoked = 0x10,
}

/// <summary>
/// Gets a value indicating whether this message handler can receive messages.
/// </summary>
Expand Down Expand Up @@ -85,6 +119,7 @@ public async ValueTask<JsonRpcMessage> ReadAsync(CancellationToken cancellationT
Verify.Operation(this.CanRead, "No receiving stream.");
cancellationToken.ThrowIfCancellationRequested();
Verify.NotDisposed(this);
this.SetState(MessageHandlerState.Reading);

try
{
Expand All @@ -103,6 +138,13 @@ public async ValueTask<JsonRpcMessage> ReadAsync(CancellationToken cancellationT
cancellationToken.ThrowIfCancellationRequested();
throw;
}
finally
{
if (this.CheckIfDisposalAppropriate(MessageHandlerState.Reading))
{
this.Dispose(true);
}
}
}

/// <summary>
Expand All @@ -122,15 +164,23 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc
Requires.NotNull(content, nameof(content));
Verify.Operation(this.CanWrite, "No sending stream.");
cancellationToken.ThrowIfCancellationRequested();
Verify.NotDisposed(this);

bool shouldDispose = false;
try
{
using (await this.sendingSemaphore.EnterAsync(cancellationToken).ConfigureAwait(false))
{
cancellationToken.ThrowIfCancellationRequested();
await this.WriteCoreAsync(content, cancellationToken).ConfigureAwait(false);
await this.FlushAsync(cancellationToken).ConfigureAwait(false);
this.SetState(MessageHandlerState.Writing);
try
{
cancellationToken.ThrowIfCancellationRequested();
await this.WriteCoreAsync(content, cancellationToken).ConfigureAwait(false);
await this.FlushAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
shouldDispose = this.CheckIfDisposalAppropriate(MessageHandlerState.Writing);
}
}
}
catch (ObjectDisposedException)
Expand All @@ -139,6 +189,13 @@ public async ValueTask WriteAsync(JsonRpcMessage content, CancellationToken canc
cancellationToken.ThrowIfCancellationRequested();
throw;
}
finally
{
if (shouldDispose)
{
this.Dispose(true);
}
}
}

/// <summary>
Expand All @@ -149,26 +206,35 @@ public void Dispose()
if (!this.disposalTokenSource.IsCancellationRequested)
{
this.disposalTokenSource.Cancel();
this.Dispose(true);
GC.SuppressFinalize(this);
if (this.CheckIfDisposalAppropriate())
{
this.Dispose(true);
}
}
}

/// <summary>
/// Disposes resources allocated by this instance.
/// </summary>
/// <param name="disposing"><c>true</c> when being disposed; <c>false</c> when being finalized.</param>
/// <remarks>
/// <para>
/// This method is guaranteed to not be called until any pending <see cref="WriteCoreAsync(JsonRpcMessage, CancellationToken)"/>
/// or <see cref="ReadCoreAsync(CancellationToken)"/> calls have completed.
/// </para>
/// <para>Overrides of this method *should* call the base method as well.</para>
/// </remarks>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
this.sendingSemaphore.Dispose();
(this.Formatter as IDisposable)?.Dispose();

GC.SuppressFinalize(this);
}
}

#pragma warning disable AvoidAsyncSuffix // Avoid Async suffix

/// <summary>
/// Reads a distinct and complete message, waiting for one if necessary.
/// </summary>
Expand Down Expand Up @@ -199,6 +265,39 @@ protected virtual void Dispose(bool disposing)
/// </returns>
protected abstract ValueTask FlushAsync(CancellationToken cancellationToken);

#pragma warning restore AvoidAsyncSuffix // Avoid Async suffix
private void SetState(MessageHandlerState startingOperation)
{
lock (this.syncObject)
{
Verify.NotDisposed(this);
MessageHandlerState state = this.state;
Assumes.False(state.HasFlag(startingOperation));
this.state |= startingOperation;
}
}

private bool CheckIfDisposalAppropriate(MessageHandlerState completedOperation = MessageHandlerState.None)
{
lock (this.syncObject)
{
// Revert the flag of our caller to indicate that writing or reading has finished, if applicable.
this.state &= ~completedOperation;

if (!this.DisposalToken.IsCancellationRequested)
{
// Disposal hasn't been requested.
return false;
}

// We should dispose ourselves if we have not done so and if reading and writing are not active.
bool shouldDispose = (this.state & (MessageHandlerState.Reading | MessageHandlerState.Writing | MessageHandlerState.DisposeVirtualMethodInvoked)) == MessageHandlerState.None;
if (shouldDispose)
{
this.state |= MessageHandlerState.DisposeVirtualMethodInvoked;
}

return shouldDispose;
}
}
}
}