From 68f4077e5df712dd78828e34fb9e81707204c755 Mon Sep 17 00:00:00 2001 From: CyrusNajmabadi Date: Sat, 6 May 2017 17:23:14 -0700 Subject: [PATCH 1/3] Remove forced unecessary asynchrony in flushing data to disk. --- .../SQLite/SQLitePersistentStorage_Helpers.cs | 8 +-- .../SQLitePersistentStorage_WriteBatching.cs | 66 +++++++++++-------- 2 files changed, 42 insertions(+), 32 deletions(-) diff --git a/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_Helpers.cs b/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_Helpers.cs index 86d448098498c..b8523d652faa9 100644 --- a/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_Helpers.cs +++ b/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_Helpers.cs @@ -72,10 +72,10 @@ private static void CopyTo(Stream stream, byte[] bytes, int length) } /// - /// Amount of time to wait between flushing writes to disk. 250ms means we can flush - /// writes to disk four times a second. + /// Amount of time to wait between flushing writes to disk. 500ms means we can flush + /// writes to disk twp times a second. /// - private const int FlushAllDelayMS = 250; + private const int FlushAllDelayMS = 500; /// /// We use a pool to cache reads/writes that are less than 4k. Testing with Roslyn, @@ -89,7 +89,7 @@ private static void CopyTo(Stream stream, byte[] bytes, int length) /// /// The max amount of byte[]s we cache. This caps our cache at 4MB while allowing /// us to massively speed up writing (by batching writes). Because we can write to - /// disk 4 times a second. That means a total of 16MB/s that can be written to disk + /// disk two times a second. That means a total of 8MB/s that can be written to disk /// using only our cache. Given that Roslyn itself only writes about 50MB to disk /// after several minutes of analysis, this amount of bandwidth is more than sufficient. /// diff --git a/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_WriteBatching.cs b/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_WriteBatching.cs index f2e3633283c5b..a6d2642944eab 100644 --- a/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_WriteBatching.cs +++ b/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_WriteBatching.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Microsoft.CodeAnalysis.SQLite.Interop; @@ -72,16 +73,38 @@ private async Task FlushSpecificWritesAsync( ArrayBuilder> writesToProcess, CancellationToken cancellationToken) { - // Get the task that is responsible for doing the writes for this queue. - // This task will complete when all previously enqueued writes for this queue - // complete, and all the currently enqueued writes for this queue complete as well. - var writeTask = await GetWriteTaskAsync().ConfigureAwait(false); - await writeTask.ConfigureAwait(false); + // Get's the task representing the current writes being performed by another + // thread for this queue+key, and a TaskCompletionSource we can use to let + // other threads know about our own progress writing any new writes in this queue. + var (previousWritesTask, taskCompletionSource) = await GetWriteTaskAsync().ConfigureAwait(false); + + // Wait for all previous writes to be flushed. + await previousWritesTask.ConfigureAwait(false); + + if (writesToProcess.Count == 0) + { + // No additional writes for us to flush. We can immediately bail out. + Debug.Assert(taskCompletionSource == null); + return; + } + + // Now, if we have writes of our own, do them on this thread. + // + // Note: this flushing is not cancellable. We've already removed the + // writes from the write queue. If we were not to write them out we + // would be losing data. + Debug.Assert(taskCompletionSource != null); + + ProcessWriteQueue(connection, writesToProcess); + + // Mark our TCS as completed. Any other threads waiting on us will now be able + // to proceed. + taskCompletionSource.TrySetResult(0); return; // Local functions - async Task GetWriteTaskAsync() + async Task<(Task previousTask, TaskCompletionSource taskCompletionSource)> GetWriteTaskAsync() { // Have to acquire the semaphore. We're going to mutate the shared 'keyToWriteActions' // and 'keyToWriteTask' collections. @@ -106,30 +129,17 @@ async Task GetWriteTaskAsync() // We have no writes of our own. But there may be an existing task that // is writing out this queue. Return this so our caller can wait for // all existing writes to complete. - return existingWriteTask; + return (previousTask: existingWriteTask, taskCompletionSource: null); } - // We have our own writes to process. Enqueue the task to write - // these out after the existing write-task for this queue completes. - // - // We're currently under a lock, so tell the continuation to run - // *asynchronously* so that the TPL does not try to execute it inline - // with this thread. - // - // Note: this flushing is not cancellable. We've already removed the - // writes from the write queue. If we were not to write them out we - // would be losing data. - var nextTask = existingWriteTask.ContinueWith( - _ => ProcessWriteQueue(connection, writesToProcess), - CancellationToken.None, - TaskContinuationOptions.RunContinuationsAsynchronously, - TaskScheduler.Default); - - // Store this for the next flush call to see. - keyToWriteTask[key] = nextTask; - - // And return this to our caller so it can 'await' all these writes completing. - return nextTask; + // Create a TCS that represents our own work writing out "writesToProcess". + // Store it in keyToWriteTask so that if other threads come along, they'll + // wait for us to complete before doing their own reads/writes on this queue. + var localCompletionSource = new TaskCompletionSource(); + + keyToWriteTask[key] = localCompletionSource.Task; + + return (previousTask: existingWriteTask, taskCompletionSource: localCompletionSource); } } } From a8258312e2e9334d30091468a74519f9d302be63 Mon Sep 17 00:00:00 2001 From: CyrusNajmabadi Date: Sat, 6 May 2017 17:29:23 -0700 Subject: [PATCH 2/3] Spelling. --- .../Desktop/Workspace/SQLite/SQLitePersistentStorage_Helpers.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_Helpers.cs b/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_Helpers.cs index b8523d652faa9..ee63e5fe77c28 100644 --- a/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_Helpers.cs +++ b/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_Helpers.cs @@ -73,7 +73,7 @@ private static void CopyTo(Stream stream, byte[] bytes, int length) /// /// Amount of time to wait between flushing writes to disk. 500ms means we can flush - /// writes to disk twp times a second. + /// writes to disk two times a second. /// private const int FlushAllDelayMS = 500; From fadaba7dc7aea3d6a4cac16589a20e1e09d2487a Mon Sep 17 00:00:00 2001 From: CyrusNajmabadi Date: Mon, 8 May 2017 11:37:49 -0700 Subject: [PATCH 3/3] Be resilient to exceptions getting thrown. --- .../SQLitePersistentStorage_WriteBatching.cs | 50 ++++++++++++------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_WriteBatching.cs b/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_WriteBatching.cs index a6d2642944eab..783d1ac68aecb 100644 --- a/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_WriteBatching.cs +++ b/src/Workspaces/Core/Desktop/Workspace/SQLite/SQLitePersistentStorage_WriteBatching.cs @@ -77,29 +77,41 @@ private async Task FlushSpecificWritesAsync( // thread for this queue+key, and a TaskCompletionSource we can use to let // other threads know about our own progress writing any new writes in this queue. var (previousWritesTask, taskCompletionSource) = await GetWriteTaskAsync().ConfigureAwait(false); - - // Wait for all previous writes to be flushed. - await previousWritesTask.ConfigureAwait(false); - - if (writesToProcess.Count == 0) + try { - // No additional writes for us to flush. We can immediately bail out. - Debug.Assert(taskCompletionSource == null); - return; - } + // Wait for all previous writes to be flushed. + await previousWritesTask.ConfigureAwait(false); - // Now, if we have writes of our own, do them on this thread. - // - // Note: this flushing is not cancellable. We've already removed the - // writes from the write queue. If we were not to write them out we - // would be losing data. - Debug.Assert(taskCompletionSource != null); + if (writesToProcess.Count == 0) + { + // No additional writes for us to flush. We can immediately bail out. + Debug.Assert(taskCompletionSource == null); + return; + } - ProcessWriteQueue(connection, writesToProcess); + // Now, if we have writes of our own, do them on this thread. + // + // Note: this flushing is not cancellable. We've already removed the + // writes from the write queue. If we were not to write them out we + // would be losing data. + Debug.Assert(taskCompletionSource != null); - // Mark our TCS as completed. Any other threads waiting on us will now be able - // to proceed. - taskCompletionSource.TrySetResult(0); + ProcessWriteQueue(connection, writesToProcess); + } + catch (OperationCanceledException ex) + { + taskCompletionSource?.TrySetCanceled(ex.CancellationToken); + } + catch (Exception ex) + { + taskCompletionSource?.TrySetException(ex); + } + finally + { + // Mark our TCS as completed. Any other threads waiting on us will now be able + // to proceed. + taskCompletionSource?.TrySetResult(0); + } return;