Skip to content

Commit

Permalink
Ensure in-order processing of certain IPC requests
Browse files Browse the repository at this point in the history
The Server processes requests from VS, it uses a high priority
thread pool of 10 threads. This means there is a chance requests
may be processed out of order. This is fine for most requests,
and actually useful sometime (e.g. a new search request should
be able to run and cancel the previous one). However, there are
cases when requests should be processed in order. For example,
"RegisterFile"/"UnregisterFile" requests are used to keep track
of roots to detect project files. These requests need to be
processed in-order, otherwise the server might lose track of
some roots (or keep some roots active even they are not
open in VS anymore).

This is the 2nd part of fixing issue #64
  • Loading branch information
rpaquay committed Jun 8, 2020
1 parent f5fb235 commit 351c5da
Show file tree
Hide file tree
Showing 15 changed files with 86 additions and 18 deletions.
3 changes: 3 additions & 0 deletions src/Core/Ipc/IpcMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ public class IpcMessage {
[ProtoMember(1)]
public long RequestId { get; set; }

/// <summary>
/// See <see cref="IpcProtocols"/>
/// </summary>
[ProtoMember(2)]
public string Protocol { get; set; }

Expand Down
8 changes: 8 additions & 0 deletions src/Core/Ipc/IpcRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,13 @@
namespace VsChromium.Core.Ipc {
[ProtoContract]
public class IpcRequest : IpcMessage {
/// <summary>
/// Specify if server should wait for this request to be fully processed before
/// starting processing the next one marked as <see cref="RunOnSequentialQueue"/> .
/// By default, requests are executed in parallel (best effort) to ensure long running
/// requests don't make the server unresponsive.
/// </summary>
[ProtoMember(3)]
public bool RunOnSequentialQueue { get; set; }
}
}
7 changes: 6 additions & 1 deletion src/Server/FileSystem/Builder/FileRegistrationQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ public class FileRegistrationQueue {
/// <para>Note: Use this for debugging purposes only as all requests are kept in memory</para>
/// </summary>
private static bool Debug_KeepAllItems = false;
private readonly ConcurrentBufferQueue<FileRegistrationEntry> _queue = new ConcurrentBufferQueue<FileRegistrationEntry>();
/// <summary>
/// This is only active (and non-null) when <see cref="Debug_KeepAllItems"/> is set to <code>true</code>
/// at build time.
/// </summary>
private readonly ConcurrentBufferQueue<FileRegistrationEntry> _allItemsQueue;

private readonly ConcurrentBufferQueue<FileRegistrationEntry> _queue = new ConcurrentBufferQueue<FileRegistrationEntry>();

public FileRegistrationQueue() {
_allItemsQueue = (Debug_KeepAllItems ? new ConcurrentBufferQueue<FileRegistrationEntry>() : null);
}
Expand Down
16 changes: 14 additions & 2 deletions src/Server/Ipc/IpcRequestDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,34 @@ public class IpcRequestDispatcher : IIpcRequestDispatcher {
private readonly ICustomThreadPool _customThreadPool;
private readonly IIpcResponseQueue _ipcResponseQueue;
private readonly IEnumerable<IProtocolHandler> _protocolHandlers;
private readonly ITaskQueue _sequentialTaskQueue;

[ImportingConstructor]
public IpcRequestDispatcher(
ICustomThreadPool customThreadPool,
ITaskQueueFactory taskQueueFactory,
IIpcResponseQueue ipcResponseQueue,
[ImportMany] IEnumerable<IProtocolHandler> protocolHandlers) {
_customThreadPool = customThreadPool;
_ipcResponseQueue = ipcResponseQueue;
_protocolHandlers = protocolHandlers;
_sequentialTaskQueue = taskQueueFactory.CreateQueue("IpcRequestDispatcher sequential requests queue");
}

public void ProcessRequestAsync(IpcRequest request) {
_customThreadPool.RunAsync(() => ProcessRequestTask(request));
if (request.RunOnSequentialQueue) {
// Run on queue, with a unique ID since each request is unique
_sequentialTaskQueue.Enqueue(new TaskId(String.Format("RequestId={0}", request.RequestId)), t => ProcessRequestWorker(request));
} else {
_customThreadPool.RunAsync(() => ProcessRequestWorker(request));
}
}

private void ProcessRequestTask(IpcRequest request) {
/// <summary>
/// Process one request synchronously (on a background thread) and sends the response back
/// to the response queue (i.e. communication pipe)
/// </summary>
private void ProcessRequestWorker(IpcRequest request) {
var sw = Stopwatch.StartNew();
var response = ProcessOneRequest(request);
sw.Stop();
Expand Down
2 changes: 1 addition & 1 deletion src/Server/Threads/ITaskQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface ITaskQueue {
/// <summary>
/// Enqueue a new task to be run sequentially after all currently enqueue tasks have been
/// run. If a task with the same <see cref="TaskId"/> is enqueued and not currently executing,
/// it it removed from the queue before the new task is enqueued.
/// it is removed from the queue before the new task is enqueued.
/// </summary>
void Enqueue(TaskId id, Action<CancellationToken> task);

Expand Down
2 changes: 1 addition & 1 deletion src/Tests/ServerProcess/TestServerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected static Func<T> SendRequest<T>(ITypedRequestProcessProxy server, TypedR
ErrorResponse error = null;

sw.Start();
server.RunAsync(request, typedResponse => {
server.RunAsync(request, RunAsyncOptions.Default, typedResponse => {
Assert.IsInstanceOfType(typedResponse, typeof(T));
sw.Stop();
Logger.LogInfo("Request {0} took {1} msec to complete.", request.ClassName, sw.ElapsedMilliseconds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public DirectoryEntry LoadChildren(DirectoryNodeViewModel node) {
ProjectPath = node.GetProjectPath().Value,
DirectoryRelativePath = node.RelativePath
};
_typedRequestProcessProxy.RunUnbufferedAsync(request,
_typedRequestProcessProxy.RunUnbufferedAsync(request, RunAsyncOptions.Default,
response => { LoadChildrenCallback(tcs, response); },
response => { LoadChildrenErrorCallback(tcs, response); });

Expand All @@ -40,7 +40,7 @@ public List<LoadChildrenResult> LoadChildrenMultiple(
ProjectPath = projectNode.GetProjectPath().Value,
RelativePathList = nodes.Select(x => x.RelativePath).ToList()
};
_typedRequestProcessProxy.RunUnbufferedAsync(request,
_typedRequestProcessProxy.RunUnbufferedAsync(request, RunAsyncOptions.Default,
response => { LoadChildrenMultipleCallback(tcs, nodes, response); },
response => { LoadChildrenMultipleErrorCallback(tcs, response); });

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,14 @@ public void PauseResumeIndexing() {
? new DispatchThreadServerRequest {
Request = new ResumeIndexingRequest(),
Id = nameof(ResumeIndexingRequest),
RunOnSequentialQueue = true,
Delay = TimeSpan.FromSeconds(0.0),
OnThreadPoolReceive = FetchDatabaseStatistics,
}
: new DispatchThreadServerRequest {
Request = new PauseIndexingRequest(),
Id = nameof(PauseIndexingRequest),
RunOnSequentialQueue = true,
Delay = TimeSpan.FromSeconds(0.0),
OnThreadPoolReceive = FetchDatabaseStatistics,
};
Expand Down
1 change: 1 addition & 0 deletions src/VsChromium/ServerProxy/FileSystemTreeSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ private void ProxyOnEventReceived(TypedEvent typedEvent) {
private void FetchFileSystemTree() {
_proxy.RunAsync(
new GetFileSystemTreeRequest(),
RunAsyncOptions.Default,
typedResponse => {
var response = (GetFileSystemTreeResponse)typedResponse;
OnTreeReceived(response.Tree);
Expand Down
25 changes: 20 additions & 5 deletions src/VsChromium/ServerProxy/ITypedRequestProcessProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,40 @@ public interface ITypedRequestProcessProxy : IDisposable {
/// posted. RunAsync can be called on any thread. <paramref name="successCallback"/>
/// will be called on an unspecified thread.</para>
/// </summary>
void RunAsync(TypedRequest request, Action<TypedResponse> successCallback, Action<ErrorResponse> errorCallback);
void RunAsync(TypedRequest request, RunAsyncOptions options, Action<TypedResponse> successCallback, Action<ErrorResponse> errorCallback);

/// <summary>
/// Same as <see cref="RunAsync"/>, except responsed are dispatched as soon as they are received,
/// irrespective of the send order.
/// </summary>
/// <param name="request"></param>
/// <param name="successCallback"></param>
/// <param name="errorCallback"></param>
void RunUnbufferedAsync(TypedRequest request, Action<TypedResponse> successCallback, Action<ErrorResponse> errorCallback);
void RunUnbufferedAsync(TypedRequest request, RunAsyncOptions options, Action<TypedResponse> successCallback, Action<ErrorResponse> errorCallback);

/// <summary>
/// Event raised when the server proxy receives an event from the the
/// VsChromium server. The event is fired on an unspecified thread.
/// </summary>
event Action<TypedEvent> EventReceived;

/// <summary>
/// Event raised when the server has started and is ready to process requests
/// </summary>
event EventHandler ProcessStarted;

/// <summary>
/// Event raise when the server encountered a fatal error and is not properly running anymore
/// </summary>
event EventHandler<ErrorEventArgs> ProcessFatalError;
bool IsServerRunning { get; }
}

[Flags]
public enum RunAsyncOptions {
Default = 0x00,
/// <summary>
/// Ensures the request is processed on the sequential
/// queue on the server (i.e. all requests with this flag will always be
/// executed sequentially on the server).
/// </summary>
RunOnSequentialQueue = 0x01,
}
}
11 changes: 6 additions & 5 deletions src/VsChromium/ServerProxy/TypedRequestProcessProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,30 @@ public TypedRequestProcessProxy(IServerProcessProxy serverProcessProxy, IIpcRequ

public bool IsServerRunning => _serverProcessProxy.IsServerRunning;

public void RunAsync(TypedRequest request, Action<TypedResponse> successCallback, Action<ErrorResponse> errorCallback) {
public void RunAsync(TypedRequest request, RunAsyncOptions options, Action<TypedResponse> successCallback, Action<ErrorResponse> errorCallback) {
// Note: We capture the value outside the RunAsync callback.
var localSequenceNumber = Interlocked.Increment(ref _currentSequenceNumber);

RunAsyncWorker(request, successCallback, errorCallback, localSequenceNumber, response => {
RunAsyncWorker(request, options, successCallback, errorCallback, localSequenceNumber, response => {
lock (_lock) {
_bufferedResponses.Add(response);
}
OnResponseReceived();
});
}

public void RunUnbufferedAsync(TypedRequest request, Action<TypedResponse> successCallback, Action<ErrorResponse> errorCallback) {
RunAsyncWorker(request, successCallback, errorCallback, -1, SendResponse);
public void RunUnbufferedAsync(TypedRequest request, RunAsyncOptions options, Action<TypedResponse> successCallback, Action<ErrorResponse> errorCallback) {
RunAsyncWorker(request, options, successCallback, errorCallback, -1, SendResponse);
}

public void RunAsyncWorker(TypedRequest request, Action<TypedResponse> successCallback,
public void RunAsyncWorker(TypedRequest request, RunAsyncOptions options, Action<TypedResponse> successCallback,
Action<ErrorResponse> errorCallback, long sequenceNumber, Action<BufferedResponse> processResponse) {
var sw = Stopwatch.StartNew();

var ipcRequest = new IpcRequest {
RequestId = _ipcRequestIdFactory.GetNextId(),
Protocol = IpcProtocols.TypedMessage,
RunOnSequentialQueue = (options & RunAsyncOptions.RunOnSequentialQueue) != 0,
Data = request
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ public DispatchThreadDelayedOperationExecutor(

public void Post(DelayedOperation operation) {
var action = operation.Action;
operation.Action = () =>
operation.Action = () => {
_synchronizationContextProvider.DispatchThreadContext.Post(action);
};
_delayedOperationExecutor.Post(operation);
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/VsChromium/Threads/DispatchThreadServerRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ public DispatchThreadServerRequest() {
/// </summary>
public TypedRequest Request { get; set; }

/// <summary>
/// (Optional) Ensures the request is processed on the sequential
/// queue on the server (i.e. all requests with this flag will always be
/// executed sequentially on the server).
///
/// <para>See <see cref="VsChromium.ServerProxy.RunAsyncOptions.RunOnSequentialQueue"/></para>
/// </summary>
public bool RunOnSequentialQueue { get; set; }

/// <summary>
/// (Optional) Action executed on a background thread when the request delay
/// has expired, just before the request is sent to the server.
Expand Down
9 changes: 9 additions & 0 deletions src/VsChromium/Threads/DispatchThreadServerRequestExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public void Post(DispatchThreadServerRequest request) {
Logger.WrapActionInvocation(request.OnThreadPoolSend);

_typedRequestProcessProxy.RunAsync(request.Request,
GetRunAsycOptions(request),
response => OnRequestSuccess(request, response),
errorResponse => OnRequestError(request, errorResponse));
},
Expand Down Expand Up @@ -98,5 +99,13 @@ protected virtual void OnProcessStarted() {
protected virtual void OnProcessFatalError(ErrorEventArgs e) {
ProcessFatalError?.Invoke(this, e);
}

private RunAsyncOptions GetRunAsycOptions(DispatchThreadServerRequest request) {
RunAsyncOptions options = default(RunAsyncOptions);
if (request.RunOnSequentialQueue) {
options |= RunAsyncOptions.RunOnSequentialQueue;
}
return options;
}
}
}
2 changes: 2 additions & 0 deletions src/VsChromium/Views/FileRegistrationRequestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private void SendRegisterFileRequest(string path) {

var request = new DispatchThreadServerRequest {
Id = "RegisterFileRequest-" + path,
RunOnSequentialQueue = true,
Request = new RegisterFileRequest {
FileName = path
}
Expand All @@ -108,6 +109,7 @@ private void SendUnregisterFileRequest(string path) {

var request = new DispatchThreadServerRequest {
Id = "UnregisterFileRequest-" + path,
RunOnSequentialQueue = true,
Request = new UnregisterFileRequest {
FileName = path
}
Expand Down

0 comments on commit 351c5da

Please sign in to comment.