diff --git a/src/Core/Ipc/IpcMessage.cs b/src/Core/Ipc/IpcMessage.cs index e3a6508a..defc375f 100644 --- a/src/Core/Ipc/IpcMessage.cs +++ b/src/Core/Ipc/IpcMessage.cs @@ -12,6 +12,9 @@ public class IpcMessage { [ProtoMember(1)] public long RequestId { get; set; } + /// + /// See + /// [ProtoMember(2)] public string Protocol { get; set; } diff --git a/src/Core/Ipc/IpcRequest.cs b/src/Core/Ipc/IpcRequest.cs index f66bcca9..3d7ae25b 100644 --- a/src/Core/Ipc/IpcRequest.cs +++ b/src/Core/Ipc/IpcRequest.cs @@ -7,5 +7,13 @@ namespace VsChromium.Core.Ipc { [ProtoContract] public class IpcRequest : IpcMessage { + /// + /// Specify if server should wait for this request to be fully processed before + /// starting processing the next one marked as . + /// By default, requests are executed in parallel (best effort) to ensure long running + /// requests don't make the server unresponsive. + /// + [ProtoMember(3)] + public bool RunOnSequentialQueue { get; set; } } } diff --git a/src/Server/FileSystem/Builder/FileRegistrationQueue.cs b/src/Server/FileSystem/Builder/FileRegistrationQueue.cs index c89d3c75..1e08b2c1 100644 --- a/src/Server/FileSystem/Builder/FileRegistrationQueue.cs +++ b/src/Server/FileSystem/Builder/FileRegistrationQueue.cs @@ -16,9 +16,14 @@ public class FileRegistrationQueue { /// Note: Use this for debugging purposes only as all requests are kept in memory /// private static bool Debug_KeepAllItems = false; - private readonly ConcurrentBufferQueue _queue = new ConcurrentBufferQueue(); + /// + /// This is only active (and non-null) when is set to true + /// at build time. + /// private readonly ConcurrentBufferQueue _allItemsQueue; + private readonly ConcurrentBufferQueue _queue = new ConcurrentBufferQueue(); + public FileRegistrationQueue() { _allItemsQueue = (Debug_KeepAllItems ? new ConcurrentBufferQueue() : null); } diff --git a/src/Server/Ipc/IpcRequestDispatcher.cs b/src/Server/Ipc/IpcRequestDispatcher.cs index 300a1a20..af3e9d7b 100644 --- a/src/Server/Ipc/IpcRequestDispatcher.cs +++ b/src/Server/Ipc/IpcRequestDispatcher.cs @@ -18,22 +18,34 @@ public class IpcRequestDispatcher : IIpcRequestDispatcher { private readonly ICustomThreadPool _customThreadPool; private readonly IIpcResponseQueue _ipcResponseQueue; private readonly IEnumerable _protocolHandlers; + private readonly ITaskQueue _sequentialTaskQueue; [ImportingConstructor] public IpcRequestDispatcher( ICustomThreadPool customThreadPool, + ITaskQueueFactory taskQueueFactory, IIpcResponseQueue ipcResponseQueue, [ImportMany] IEnumerable protocolHandlers) { _customThreadPool = customThreadPool; _ipcResponseQueue = ipcResponseQueue; _protocolHandlers = protocolHandlers; + _sequentialTaskQueue = taskQueueFactory.CreateQueue("IpcRequestDispatcher sequential requests queue"); } public void ProcessRequestAsync(IpcRequest request) { - _customThreadPool.RunAsync(() => ProcessRequestTask(request)); + if (request.RunOnSequentialQueue) { + // Run on queue, with a unique ID since each request is unique + _sequentialTaskQueue.Enqueue(new TaskId(String.Format("RequestId={0}", request.RequestId)), t => ProcessRequestWorker(request)); + } else { + _customThreadPool.RunAsync(() => ProcessRequestWorker(request)); + } } - private void ProcessRequestTask(IpcRequest request) { + /// + /// Process one request synchronously (on a background thread) and sends the response back + /// to the response queue (i.e. communication pipe) + /// + private void ProcessRequestWorker(IpcRequest request) { var sw = Stopwatch.StartNew(); var response = ProcessOneRequest(request); sw.Stop(); diff --git a/src/Server/Threads/ITaskQueue.cs b/src/Server/Threads/ITaskQueue.cs index 9ce52412..4e71b78f 100644 --- a/src/Server/Threads/ITaskQueue.cs +++ b/src/Server/Threads/ITaskQueue.cs @@ -13,7 +13,7 @@ public interface ITaskQueue { /// /// Enqueue a new task to be run sequentially after all currently enqueue tasks have been /// run. If a task with the same is enqueued and not currently executing, - /// it it removed from the queue before the new task is enqueued. + /// it is removed from the queue before the new task is enqueued. /// void Enqueue(TaskId id, Action task); diff --git a/src/Tests/ServerProcess/TestServerBase.cs b/src/Tests/ServerProcess/TestServerBase.cs index 42e6ddc3..9f6f0d84 100644 --- a/src/Tests/ServerProcess/TestServerBase.cs +++ b/src/Tests/ServerProcess/TestServerBase.cs @@ -96,7 +96,7 @@ protected static Func SendRequest(ITypedRequestProcessProxy server, TypedR ErrorResponse error = null; sw.Start(); - server.RunAsync(request, typedResponse => { + server.RunAsync(request, RunAsyncOptions.Default, typedResponse => { Assert.IsInstanceOfType(typedResponse, typeof(T)); sw.Stop(); Logger.LogInfo("Request {0} took {1} msec to complete.", request.ClassName, sw.ElapsedMilliseconds); diff --git a/src/VsChromium/Features/SourceExplorerHierarchy/NodeViewModelLoader.cs b/src/VsChromium/Features/SourceExplorerHierarchy/NodeViewModelLoader.cs index ce44ef23..9474d212 100644 --- a/src/VsChromium/Features/SourceExplorerHierarchy/NodeViewModelLoader.cs +++ b/src/VsChromium/Features/SourceExplorerHierarchy/NodeViewModelLoader.cs @@ -25,7 +25,7 @@ public DirectoryEntry LoadChildren(DirectoryNodeViewModel node) { ProjectPath = node.GetProjectPath().Value, DirectoryRelativePath = node.RelativePath }; - _typedRequestProcessProxy.RunUnbufferedAsync(request, + _typedRequestProcessProxy.RunUnbufferedAsync(request, RunAsyncOptions.Default, response => { LoadChildrenCallback(tcs, response); }, response => { LoadChildrenErrorCallback(tcs, response); }); @@ -40,7 +40,7 @@ public List LoadChildrenMultiple( ProjectPath = projectNode.GetProjectPath().Value, RelativePathList = nodes.Select(x => x.RelativePath).ToList() }; - _typedRequestProcessProxy.RunUnbufferedAsync(request, + _typedRequestProcessProxy.RunUnbufferedAsync(request, RunAsyncOptions.Default, response => { LoadChildrenMultipleCallback(tcs, nodes, response); }, response => { LoadChildrenMultipleErrorCallback(tcs, response); }); diff --git a/src/VsChromium/Features/ToolWindows/CodeSearch/CodeSearchController.cs b/src/VsChromium/Features/ToolWindows/CodeSearch/CodeSearchController.cs index 70cca6fe..f395e383 100644 --- a/src/VsChromium/Features/ToolWindows/CodeSearch/CodeSearchController.cs +++ b/src/VsChromium/Features/ToolWindows/CodeSearch/CodeSearchController.cs @@ -243,12 +243,14 @@ public void PauseResumeIndexing() { ? new DispatchThreadServerRequest { Request = new ResumeIndexingRequest(), Id = nameof(ResumeIndexingRequest), + RunOnSequentialQueue = true, Delay = TimeSpan.FromSeconds(0.0), OnThreadPoolReceive = FetchDatabaseStatistics, } : new DispatchThreadServerRequest { Request = new PauseIndexingRequest(), Id = nameof(PauseIndexingRequest), + RunOnSequentialQueue = true, Delay = TimeSpan.FromSeconds(0.0), OnThreadPoolReceive = FetchDatabaseStatistics, }; diff --git a/src/VsChromium/ServerProxy/FileSystemTreeSource.cs b/src/VsChromium/ServerProxy/FileSystemTreeSource.cs index dcf7a6ad..1562e3fc 100644 --- a/src/VsChromium/ServerProxy/FileSystemTreeSource.cs +++ b/src/VsChromium/ServerProxy/FileSystemTreeSource.cs @@ -44,6 +44,7 @@ private void ProxyOnEventReceived(TypedEvent typedEvent) { private void FetchFileSystemTree() { _proxy.RunAsync( new GetFileSystemTreeRequest(), + RunAsyncOptions.Default, typedResponse => { var response = (GetFileSystemTreeResponse)typedResponse; OnTreeReceived(response.Tree); diff --git a/src/VsChromium/ServerProxy/ITypedRequestProcessProxy.cs b/src/VsChromium/ServerProxy/ITypedRequestProcessProxy.cs index 8d6903a4..d744888b 100644 --- a/src/VsChromium/ServerProxy/ITypedRequestProcessProxy.cs +++ b/src/VsChromium/ServerProxy/ITypedRequestProcessProxy.cs @@ -24,16 +24,13 @@ public interface ITypedRequestProcessProxy : IDisposable { /// posted. RunAsync can be called on any thread. /// will be called on an unspecified thread. /// - void RunAsync(TypedRequest request, Action successCallback, Action errorCallback); + void RunAsync(TypedRequest request, RunAsyncOptions options, Action successCallback, Action errorCallback); /// /// Same as , except responsed are dispatched as soon as they are received, /// irrespective of the send order. /// - /// - /// - /// - void RunUnbufferedAsync(TypedRequest request, Action successCallback, Action errorCallback); + void RunUnbufferedAsync(TypedRequest request, RunAsyncOptions options, Action successCallback, Action errorCallback); /// /// Event raised when the server proxy receives an event from the the @@ -41,8 +38,26 @@ public interface ITypedRequestProcessProxy : IDisposable { /// event Action EventReceived; + /// + /// Event raised when the server has started and is ready to process requests + /// event EventHandler ProcessStarted; + + /// + /// Event raise when the server encountered a fatal error and is not properly running anymore + /// event EventHandler ProcessFatalError; bool IsServerRunning { get; } } + + [Flags] + public enum RunAsyncOptions { + Default = 0x00, + /// + /// Ensures the request is processed on the sequential + /// queue on the server (i.e. all requests with this flag will always be + /// executed sequentially on the server). + /// + RunOnSequentialQueue = 0x01, + } } diff --git a/src/VsChromium/ServerProxy/TypedRequestProcessProxy.cs b/src/VsChromium/ServerProxy/TypedRequestProcessProxy.cs index 684df43c..1f824165 100644 --- a/src/VsChromium/ServerProxy/TypedRequestProcessProxy.cs +++ b/src/VsChromium/ServerProxy/TypedRequestProcessProxy.cs @@ -38,11 +38,11 @@ public TypedRequestProcessProxy(IServerProcessProxy serverProcessProxy, IIpcRequ public bool IsServerRunning => _serverProcessProxy.IsServerRunning; - public void RunAsync(TypedRequest request, Action successCallback, Action errorCallback) { + public void RunAsync(TypedRequest request, RunAsyncOptions options, Action successCallback, Action errorCallback) { // Note: We capture the value outside the RunAsync callback. var localSequenceNumber = Interlocked.Increment(ref _currentSequenceNumber); - RunAsyncWorker(request, successCallback, errorCallback, localSequenceNumber, response => { + RunAsyncWorker(request, options, successCallback, errorCallback, localSequenceNumber, response => { lock (_lock) { _bufferedResponses.Add(response); } @@ -50,17 +50,18 @@ public void RunAsync(TypedRequest request, Action successCallback }); } - public void RunUnbufferedAsync(TypedRequest request, Action successCallback, Action errorCallback) { - RunAsyncWorker(request, successCallback, errorCallback, -1, SendResponse); + public void RunUnbufferedAsync(TypedRequest request, RunAsyncOptions options, Action successCallback, Action errorCallback) { + RunAsyncWorker(request, options, successCallback, errorCallback, -1, SendResponse); } - public void RunAsyncWorker(TypedRequest request, Action successCallback, + public void RunAsyncWorker(TypedRequest request, RunAsyncOptions options, Action successCallback, Action errorCallback, long sequenceNumber, Action processResponse) { var sw = Stopwatch.StartNew(); var ipcRequest = new IpcRequest { RequestId = _ipcRequestIdFactory.GetNextId(), Protocol = IpcProtocols.TypedMessage, + RunOnSequentialQueue = (options & RunAsyncOptions.RunOnSequentialQueue) != 0, Data = request }; diff --git a/src/VsChromium/Threads/DispatchThreadDelayedOperationExecutor.cs b/src/VsChromium/Threads/DispatchThreadDelayedOperationExecutor.cs index b17ea507..f1fa156d 100644 --- a/src/VsChromium/Threads/DispatchThreadDelayedOperationExecutor.cs +++ b/src/VsChromium/Threads/DispatchThreadDelayedOperationExecutor.cs @@ -20,8 +20,9 @@ public DispatchThreadDelayedOperationExecutor( public void Post(DelayedOperation operation) { var action = operation.Action; - operation.Action = () => + operation.Action = () => { _synchronizationContextProvider.DispatchThreadContext.Post(action); + }; _delayedOperationExecutor.Post(operation); } } diff --git a/src/VsChromium/Threads/DispatchThreadServerRequest.cs b/src/VsChromium/Threads/DispatchThreadServerRequest.cs index 1ff50771..ea3aa0a3 100644 --- a/src/VsChromium/Threads/DispatchThreadServerRequest.cs +++ b/src/VsChromium/Threads/DispatchThreadServerRequest.cs @@ -29,6 +29,15 @@ public DispatchThreadServerRequest() { /// public TypedRequest Request { get; set; } + /// + /// (Optional) Ensures the request is processed on the sequential + /// queue on the server (i.e. all requests with this flag will always be + /// executed sequentially on the server). + /// + /// See + /// + public bool RunOnSequentialQueue { get; set; } + /// /// (Optional) Action executed on a background thread when the request delay /// has expired, just before the request is sent to the server. diff --git a/src/VsChromium/Threads/DispatchThreadServerRequestExecutor.cs b/src/VsChromium/Threads/DispatchThreadServerRequestExecutor.cs index b2e99f48..7386deb1 100644 --- a/src/VsChromium/Threads/DispatchThreadServerRequestExecutor.cs +++ b/src/VsChromium/Threads/DispatchThreadServerRequestExecutor.cs @@ -45,6 +45,7 @@ public void Post(DispatchThreadServerRequest request) { Logger.WrapActionInvocation(request.OnThreadPoolSend); _typedRequestProcessProxy.RunAsync(request.Request, + GetRunAsycOptions(request), response => OnRequestSuccess(request, response), errorResponse => OnRequestError(request, errorResponse)); }, @@ -98,5 +99,13 @@ protected virtual void OnProcessStarted() { protected virtual void OnProcessFatalError(ErrorEventArgs e) { ProcessFatalError?.Invoke(this, e); } + + private RunAsyncOptions GetRunAsycOptions(DispatchThreadServerRequest request) { + RunAsyncOptions options = default(RunAsyncOptions); + if (request.RunOnSequentialQueue) { + options |= RunAsyncOptions.RunOnSequentialQueue; + } + return options; + } } } diff --git a/src/VsChromium/Views/FileRegistrationRequestService.cs b/src/VsChromium/Views/FileRegistrationRequestService.cs index 404d959c..494ca61f 100644 --- a/src/VsChromium/Views/FileRegistrationRequestService.cs +++ b/src/VsChromium/Views/FileRegistrationRequestService.cs @@ -94,6 +94,7 @@ private void SendRegisterFileRequest(string path) { var request = new DispatchThreadServerRequest { Id = "RegisterFileRequest-" + path, + RunOnSequentialQueue = true, Request = new RegisterFileRequest { FileName = path } @@ -108,6 +109,7 @@ private void SendUnregisterFileRequest(string path) { var request = new DispatchThreadServerRequest { Id = "UnregisterFileRequest-" + path, + RunOnSequentialQueue = true, Request = new UnregisterFileRequest { FileName = path }