Skip to content

Commit

Permalink
Make CustomThreadPool non blocking
Browse files Browse the repository at this point in the history
The `RunAsync` method is not blocking anymore when there are
no available threads. Blocking in a `Async` method is not
intuitive.

Add a `RunAsync` overload that takes a `TimeSpan` to delay a task
execution.

Also, remove an unused method (`RunInParallel`).
  • Loading branch information
rpaquay committed Feb 12, 2020
1 parent 3718a00 commit cd52cf4
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 81 deletions.
143 changes: 66 additions & 77 deletions src/Server/Threads/CustomThreadPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,124 +5,113 @@
using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.Linq;
using System.Threading;
using VsChromium.Core.Linq;
using System.Threading.Tasks;
using VsChromium.Core.Logging;
using VsChromium.Core.Threads;

namespace VsChromium.Server.Threads {
[Export(typeof(ICustomThreadPool))]
public class CustomThreadPool : ICustomThreadPool {
private readonly IDateTimeProvider _dateTimeProvider;
private readonly object _lock = new object();
private readonly ThreadPool _threadPool;
private readonly object _queueLock = new object();
private readonly Queue<Action> _taskQueue = new Queue<Action>();

[ImportingConstructor]
public CustomThreadPool(IDateTimeProvider dateTimeProvider)
: this(dateTimeProvider, 10) {
}

public CustomThreadPool(IDateTimeProvider dateTimeProvider, int capacity) {
_dateTimeProvider = dateTimeProvider;
_threadPool = new ThreadPool(dateTimeProvider, capacity);
}

public void RunAsync(Action task) {
var thread = AcquireThread();
thread.RunAsync(() => ExecuteTaskAndReleaseThread(thread, task));
RunAsync(task, TimeSpan.Zero);
}

public IEnumerable<TDest> RunInParallel<TSource, TDest>(
IList<TSource> source,
Func<TSource, TDest> selector,
CancellationToken token) {
lock (_lock) {
return RunInParallelWorker(source, selector, token);
public void RunAsync(Action task, TimeSpan delay) {
if (delay > TimeSpan.Zero) {
// Call ourselves after delay
// Note: This is a "cheap" way of delaying execution without blocking a thread.
// The caveat is that this is done with the default TaskScheduler, meaning
// The task may be delayed even more if lots of tasks are already running.
// Given that the caller ask for a delay, it seems reasonable compromise.
// The other option would have been to write custom with some sort of
// timer usage.
Task.Delay(delay).ContinueWith(_ => RunAsync(task, TimeSpan.Zero));
} else {
// Enqueue
lock (_queueLock) {
_taskQueue.Enqueue(task);
}

// Process queue if any available thread
ProcessQueueAsync();
}
}

private IEnumerable<TDest> RunInParallelWorker<TSource, TDest>(
IList<TSource> source,
Func<TSource, TDest> selector,
CancellationToken token) {
var partitions = source
.PartitionByChunks(_threadPool.Capacity)
.Select(items => new Partition<TSource, TDest> {
Items = items,
ThreadObject = null,
WaitHandle = new ManualResetEvent(false),
Selector = selector,
Result = new List<TDest>()
})
.ToList();
private void ProcessQueueAsync() {
var thread = TryAcquireThread();
if (thread != null) {
thread.RunAsync(() => ProcessQueueAndReleaseThread(thread));
}
}

private void ProcessQueueAndReleaseThread(ThreadObject thread) {
try {
partitions.ForAll(t => t.ThreadObject = AcquireThread());
partitions.ForAll(t => RunPartitionAsync(t, token));
partitions.ForAll(t => t.WaitHandle.WaitOne());
token.ThrowIfCancellationRequested();
var errors = partitions.Select(x => x.Exception).Where(x => x != null).ToList();
if (errors.Any()) {
throw new AggregateException(errors);
}
return partitions.SelectMany(t => t.Result);
ProcessQueue();
} finally {
ReleaseThread(thread);
}

// There may have been more items enqueued concurrently: Mote tasks may have been
// enueue we may have been
// releasing the thread while
// schedule another queue processing if needed
bool queueIsEmpty;
lock (_queueLock) {
queueIsEmpty = _taskQueue.Count == 0;
}
finally {
partitions.ForAll(x => {
if (x.ThreadObject != null)
ReleaseThread(x.ThreadObject);
x.WaitHandle.Dispose();
});
if (!queueIsEmpty) {
ProcessQueueAsync();
}
}

private static void RunPartitionAsync<TSource, TDest>(Partition<TSource, TDest> partition, CancellationToken token) {
partition.ThreadObject.RunAsync(() => {
try {
foreach (var item in partition.Items) {
if (token.IsCancellationRequested)
break;
var destItem = partition.Selector(item);
if (destItem != null)
partition.Result.Add(destItem);
}
}
catch (Exception e) {
partition.Exception = e;
private void ProcessQueue() {
// Process all items in the queue. This happens concurrently on
// all active threads of the thread pool.
while (true) {
Action task = TryGetTaskFromQueue();

// Queue is empty, bail
if (task == null) {
break;
}
finally {
partition.WaitHandle.Set();

try {
task();
} catch (Exception e) {
// TODO(rpaquay): Do we want to propage the exception here?
Logger.LogError(e, "Error executing task on custom thread pool.");
}
});
}
}

private void ExecuteTaskAndReleaseThread(ThreadObject thread, Action task) {
try {
task();
}
catch (Exception e) {
// TODO(rpaquay): Do we want to propage the exception here?
Logger.LogError(e, "Error executing task on custom thread pool.");
}
finally {
ReleaseThread(thread);
private Action TryGetTaskFromQueue() {
lock (_queueLock) {
return (_taskQueue.Count == 0) ? null : _taskQueue.Dequeue();
}
}

private ThreadObject AcquireThread() {
return _threadPool.AcquireThread();
private ThreadObject TryAcquireThread() {
return _threadPool.TryAcquireThread();
}

private void ReleaseThread(ThreadObject thread) {
thread.Release();
}

public class Partition<TSource, TDest> {
public IList<TSource> Items { get; set; }
public ThreadObject ThreadObject { get; set; }
public ManualResetEvent WaitHandle { get; set; }
public List<TDest> Result { get; set; }
public Exception Exception { get; set; }
public Func<TSource, TDest> Selector { get; set; }
}
}
}
5 changes: 1 addition & 4 deletions src/Server/Threads/ICustomThreadPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ namespace VsChromium.Server.Threads {
public interface ICustomThreadPool {
void RunAsync(Action task);

IEnumerable<TDest> RunInParallel<TSource, TDest>(
IList<TSource> source,
Func<TSource, TDest> selector,
CancellationToken token);
void RunAsync(Action task, TimeSpan delay);
}
}
4 changes: 4 additions & 0 deletions src/Server/Threads/ThreadPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public ThreadObject AcquireThread() {
}
}

public ThreadObject TryAcquireThread() {
return TryGetThread();
}

private ThreadObject TryGetThread() {
lock (_lock) {
if (_threads.Count == 0)
Expand Down

0 comments on commit cd52cf4

Please sign in to comment.