Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid cancelling readers before end of stream #53905

Merged
merged 2 commits into from
Jun 10, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions src/Workspaces/Remote/Core/BrokeredServiceConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -293,22 +293,33 @@ internal static async ValueTask<TResult> InvokeStreamingServiceAsync<TResult>(
// 6. 'pipe.Writer' is completed
// 7. 'pipe.Reader' is completed
// 8. OperationCanceledException is thrown back to the caller
//
// Since StreamJsonRpc will automatically complete writers when an error occurs (and cancellation is
sharwell marked this conversation as resolved.
Show resolved Hide resolved
// treated as an error), we create a buffered reader to isolate callback actions from this
// implementation.

// Create a separate cancellation token for the reader, which we keep open until after the call to invoke
// completes. If we close the reader before cancellation is processed by the remote call, it might block
// (deadlock) while writing to a stream which is no longer processing data.
using var readerCancellationSource = new CancellationTokenSource();

var pipe = new Pipe();
var writeBufferPipe = new Pipe();
var readBufferPipe = new Pipe();

// Create new tasks that both start executing, rather than invoking the delegates directly
// to make sure both invocation and reader start executing and transfering data.

var writerTask = Task.Run(async () =>
{
var copyTask = writeBufferPipe.Reader.CopyToAsync(readBufferPipe.Writer, cancellationToken);

try
{
await invocation(service, pipe.Writer, cancellationToken).ConfigureAwait(false);
sharwell marked this conversation as resolved.
Show resolved Hide resolved
await invocation(service, writeBufferPipe.Writer, cancellationToken).ConfigureAwait(false);

// Complete the copy and mark the operation complete
await copyTask.NoThrowAwaitable(captureContext: false);
sharwell marked this conversation as resolved.
Show resolved Hide resolved
await readBufferPipe.Writer.CompleteAsync().ConfigureAwait(false);
}
catch (Exception e)
{
Expand All @@ -318,9 +329,11 @@ internal static async ValueTask<TResult> InvokeStreamingServiceAsync<TResult>(
readerCancellationSource.Cancel();

// Ensure that the writer is complete if an exception is thrown
// before the writer is passed to the RPC proxy. Once it's passed to the proxy
// before the writer is passed to the RPC proxy. Once it's passed to the proxy
// the proxy should complete it as soon as the remote side completes it.
await pipe.Writer.CompleteAsync(e).ConfigureAwait(false);
await writeBufferPipe.Writer.CompleteAsync(e).ConfigureAwait(false);
await copyTask.NoThrowAwaitable(captureContext: false);
await readBufferPipe.Writer.CompleteAsync(e).ConfigureAwait(false);

throw;
}
Expand All @@ -333,15 +346,15 @@ internal static async ValueTask<TResult> InvokeStreamingServiceAsync<TResult>(

try
{
return await reader(pipe.Reader, readerCancellationSource.Token).ConfigureAwait(false);
return await reader(readBufferPipe.Reader, readerCancellationSource.Token).ConfigureAwait(false);
}
catch (Exception e) when ((exception = e) == null)
{
throw ExceptionUtilities.Unreachable;
}
finally
{
await pipe.Reader.CompleteAsync(exception).ConfigureAwait(false);
await readBufferPipe.Reader.CompleteAsync(exception).ConfigureAwait(false);
}
}, readerCancellationSource.Token);

Expand Down