Skip to content

Commit

Permalink
Merge pull request #19325 from CyrusNajmabadi/synchronousFlushing
Browse files Browse the repository at this point in the history
Remove forced unecessary asynchrony in flushing data to disk.
  • Loading branch information
CyrusNajmabadi authored May 8, 2017
2 parents e25fd7c + fadaba7 commit f9b1f75
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ private static void CopyTo(Stream stream, byte[] bytes, int length)
}

/// <summary>
/// Amount of time to wait between flushing writes to disk. 250ms means we can flush
/// writes to disk four times a second.
/// Amount of time to wait between flushing writes to disk. 500ms means we can flush
/// writes to disk two times a second.
/// </summary>
private const int FlushAllDelayMS = 250;
private const int FlushAllDelayMS = 500;

/// <summary>
/// We use a pool to cache reads/writes that are less than 4k. Testing with Roslyn,
Expand All @@ -89,7 +89,7 @@ private static void CopyTo(Stream stream, byte[] bytes, int length)
/// <summary>
/// The max amount of byte[]s we cache. This caps our cache at 4MB while allowing
/// us to massively speed up writing (by batching writes). Because we can write to
/// disk 4 times a second. That means a total of 16MB/s that can be written to disk
/// disk two times a second. That means a total of 8MB/s that can be written to disk
/// using only our cache. Given that Roslyn itself only writes about 50MB to disk
/// after several minutes of analysis, this amount of bandwidth is more than sufficient.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.SQLite.Interop;
Expand Down Expand Up @@ -72,16 +73,50 @@ private async Task FlushSpecificWritesAsync<TKey>(
ArrayBuilder<Action<SqlConnection>> writesToProcess,
CancellationToken cancellationToken)
{
// Get the task that is responsible for doing the writes for this queue.
// This task will complete when all previously enqueued writes for this queue
// complete, and all the currently enqueued writes for this queue complete as well.
var writeTask = await GetWriteTaskAsync().ConfigureAwait(false);
await writeTask.ConfigureAwait(false);
// Get's the task representing the current writes being performed by another
// thread for this queue+key, and a TaskCompletionSource we can use to let
// other threads know about our own progress writing any new writes in this queue.
var (previousWritesTask, taskCompletionSource) = await GetWriteTaskAsync().ConfigureAwait(false);
try
{
// Wait for all previous writes to be flushed.
await previousWritesTask.ConfigureAwait(false);

if (writesToProcess.Count == 0)
{
// No additional writes for us to flush. We can immediately bail out.
Debug.Assert(taskCompletionSource == null);
return;
}

// Now, if we have writes of our own, do them on this thread.
//
// Note: this flushing is not cancellable. We've already removed the
// writes from the write queue. If we were not to write them out we
// would be losing data.
Debug.Assert(taskCompletionSource != null);

ProcessWriteQueue(connection, writesToProcess);
}
catch (OperationCanceledException ex)
{
taskCompletionSource?.TrySetCanceled(ex.CancellationToken);
}
catch (Exception ex)
{
taskCompletionSource?.TrySetException(ex);
}
finally
{
// Mark our TCS as completed. Any other threads waiting on us will now be able
// to proceed.
taskCompletionSource?.TrySetResult(0);
}

return;

// Local functions
async Task<Task> GetWriteTaskAsync()
async Task<(Task previousTask, TaskCompletionSource<int> taskCompletionSource)> GetWriteTaskAsync()
{
// Have to acquire the semaphore. We're going to mutate the shared 'keyToWriteActions'
// and 'keyToWriteTask' collections.
Expand All @@ -106,30 +141,17 @@ async Task<Task> GetWriteTaskAsync()
// We have no writes of our own. But there may be an existing task that
// is writing out this queue. Return this so our caller can wait for
// all existing writes to complete.
return existingWriteTask;
return (previousTask: existingWriteTask, taskCompletionSource: null);
}

// We have our own writes to process. Enqueue the task to write
// these out after the existing write-task for this queue completes.
//
// We're currently under a lock, so tell the continuation to run
// *asynchronously* so that the TPL does not try to execute it inline
// with this thread.
//
// Note: this flushing is not cancellable. We've already removed the
// writes from the write queue. If we were not to write them out we
// would be losing data.
var nextTask = existingWriteTask.ContinueWith(
_ => ProcessWriteQueue(connection, writesToProcess),
CancellationToken.None,
TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default);

// Store this for the next flush call to see.
keyToWriteTask[key] = nextTask;

// And return this to our caller so it can 'await' all these writes completing.
return nextTask;
// Create a TCS that represents our own work writing out "writesToProcess".
// Store it in keyToWriteTask so that if other threads come along, they'll
// wait for us to complete before doing their own reads/writes on this queue.
var localCompletionSource = new TaskCompletionSource<int>();

keyToWriteTask[key] = localCompletionSource.Task;

return (previousTask: existingWriteTask, taskCompletionSource: localCompletionSource);
}
}
}
Expand Down

0 comments on commit f9b1f75

Please sign in to comment.