Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fewer exceptions #73766

Merged
merged 1 commit into from
May 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.Collections;
Expand Down Expand Up @@ -87,7 +88,7 @@ internal class AsyncBatchingWorkQueue<TItem, TResult>
/// Task kicked off to do the next batch of processing of <see cref="_nextBatch"/>. These
/// tasks form a chain so that the next task only processes when the previous one completes.
/// </summary>
private Task<TResult?> _updateTask = Task.FromResult(default(TResult));
private Task<(bool ranToCompletion, TResult? result)> _updateTask = Task.FromResult((ranToCompletion: true, default(TResult?)));

/// <summary>
/// Whether or not there is an existing task in flight that will process the current batch
Expand Down Expand Up @@ -189,7 +190,7 @@ void AddItemsToBatch(IEnumerable<TItem> items)
}
}

async Task<TResult?> ContinueAfterDelayAsync(Task lastTask)
async Task<(bool ranToCompletion, TResult? result)> ContinueAfterDelayAsync(Task lastTask)
{
using var _ = _asyncListener.BeginAsyncOperation(nameof(AddWork));

Expand All @@ -198,15 +199,21 @@ void AddItemsToBatch(IEnumerable<TItem> items)
await lastTask.NoThrowAwaitableInternal(captureContext: false);

// If we were asked to shutdown, immediately transition to the canceled state without doing any more work.
_entireQueueCancellationToken.ThrowIfCancellationRequested();
if (_entireQueueCancellationToken.IsCancellationRequested)
return (ranToCompletion: false, default(TResult?));

// Ensure that we always yield the current thread this is necessary for correctness as we are called
// inside a lock that _taskInFlight to true. We must ensure that the work to process the next batch
// must be on another thread that runs afterwards, can only grab the thread once we release it and will
// then reset that bool back to false
await Task.Yield().ConfigureAwait(false);
await _asyncListener.Delay(_delay, _entireQueueCancellationToken).ConfigureAwait(false);
return await ProcessNextBatchAsync().ConfigureAwait(false);
await _asyncListener.Delay(_delay, _entireQueueCancellationToken).NoThrowAwaitableInternal(false);

// If we were asked to shutdown, immediately transition to the canceled state without doing any more work.
if (_entireQueueCancellationToken.IsCancellationRequested)
return (ranToCompletion: false, default(TResult?));

return (ranToCompletion: true, await ProcessNextBatchAsync().ConfigureAwait(false));
}
}

Expand All @@ -215,10 +222,22 @@ void AddItemsToBatch(IEnumerable<TItem> items)
/// cref="_processBatchAsync"/>. If the last <see cref="_processBatchAsync"/> canceled or failed, then a
/// corresponding canceled or faulted task will be returned that propagates that outwards.
/// </summary>
public Task<TResult?> WaitUntilCurrentBatchCompletesAsync()
public async Task<TResult?> WaitUntilCurrentBatchCompletesAsync()
{
Task<(bool ranToCompletion, TResult? result)> updateTask;
lock (_gate)
return _updateTask;
{
updateTask = _updateTask;
}

var (ranToCompletion, result) = await updateTask.ConfigureAwait(false);
if (!ranToCompletion)
{
Debug.Assert(_entireQueueCancellationToken.IsCancellationRequested);
_entireQueueCancellationToken.ThrowIfCancellationRequested();
}

return result;
}

private async ValueTask<TResult?> ProcessNextBatchAsync()
Expand Down
Loading