Skip to content

Commit

Permalink
Fix thread pool hang
Browse files Browse the repository at this point in the history
- In dotnet#53471 the thread count goal was moved out of `ThreadCounts`, it turns out that are a few subtle races that it was avoiding. There are other ways to fix it, but I've added the goal back into `ThreadCounts` for now.
- Reverted PR dotnet#55985, which worked around the issue in the CI

Fixes dotnet#55642
  • Loading branch information
kouvel committed Jul 27, 2021
1 parent 12c4e4c commit 2d0aa63
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public short MinThreadsGoal
get
{
_threadAdjustmentLock.VerifyIsLocked();
return Math.Min(_separated.numThreadsGoal, TargetThreadsGoalForBlockingAdjustment);
return Math.Min(_separated.counts.NumThreadsGoal, TargetThreadsGoalForBlockingAdjustment);
}
}

Expand Down Expand Up @@ -44,7 +44,7 @@ public bool NotifyThreadBlocked()
Debug.Assert(_numBlockedThreads > 0);

if (_pendingBlockingAdjustment != PendingBlockingAdjustment.WithDelayIfNecessary &&
_separated.numThreadsGoal < TargetThreadsGoalForBlockingAdjustment)
_separated.counts.NumThreadsGoal < TargetThreadsGoalForBlockingAdjustment)
{
if (_pendingBlockingAdjustment == PendingBlockingAdjustment.None)
{
Expand Down Expand Up @@ -79,7 +79,7 @@ public void NotifyThreadUnblocked()

if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately &&
_numThreadsAddedDueToBlocking > 0 &&
_separated.numThreadsGoal > TargetThreadsGoalForBlockingAdjustment)
_separated.counts.NumThreadsGoal > TargetThreadsGoalForBlockingAdjustment)
{
wakeGateThread = true;
_pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately;
Expand Down Expand Up @@ -126,7 +126,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo
addWorker = false;

short targetThreadsGoal = TargetThreadsGoalForBlockingAdjustment;
short numThreadsGoal = _separated.numThreadsGoal;
ThreadCounts counts = _separated.counts;
short numThreadsGoal = counts.NumThreadsGoal;
if (numThreadsGoal == targetThreadsGoal)
{
return 0;
Expand All @@ -144,7 +145,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo

short toSubtract = Math.Min((short)(numThreadsGoal - targetThreadsGoal), _numThreadsAddedDueToBlocking);
_numThreadsAddedDueToBlocking -= toSubtract;
_separated.numThreadsGoal = numThreadsGoal -= toSubtract;
numThreadsGoal -= toSubtract;
_separated.counts.InterlockedSetNumThreadsGoal(numThreadsGoal);
HillClimbing.ThreadPoolHillClimber.ForceChange(
numThreadsGoal,
HillClimbing.StateOrTransition.CooperativeBlocking);
Expand All @@ -158,7 +160,6 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo
{
// Calculate how many threads can be added without a delay. Threads that were already created but may be just
// waiting for work can be released for work without a delay, but creating a new thread may need a delay.
ThreadCounts counts = _separated.counts;
short maxThreadsGoalWithoutDelay =
Math.Max(configuredMaxThreadsWithoutDelay, Math.Min(counts.NumExistingThreads, _maxThreads));
short targetThreadsGoalWithoutDelay = Math.Min(targetThreadsGoal, maxThreadsGoalWithoutDelay);
Expand Down Expand Up @@ -225,7 +226,7 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo
} while (false);

_numThreadsAddedDueToBlocking += (short)(newNumThreadsGoal - numThreadsGoal);
_separated.numThreadsGoal = newNumThreadsGoal;
counts = _separated.counts.InterlockedSetNumThreadsGoal(newNumThreadsGoal);
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.CooperativeBlocking);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,20 +126,31 @@ private static void GateThreadStart()
// of the number of existing threads, is compared with the goal. There may be alternative
// solutions, for now this is only to maintain consistency in behavior.
ThreadCounts counts = threadPoolInstance._separated.counts;
if (counts.NumProcessingWork < threadPoolInstance._maxThreads &&
counts.NumProcessingWork >= threadPoolInstance._separated.numThreadsGoal)
while (
counts.NumProcessingWork < threadPoolInstance._maxThreads &&
counts.NumProcessingWork >= counts.NumThreadsGoal)
{
if (debuggerBreakOnWorkStarvation)
{
Debugger.Break();
}

ThreadCounts newCounts = counts;
short newNumThreadsGoal = (short)(counts.NumProcessingWork + 1);
threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal;
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.Starvation);
addWorker = true;
newCounts.NumThreadsGoal = newNumThreadsGoal;

ThreadCounts countsBeforeUpdate =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (countsBeforeUpdate == counts)
{
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.Starvation);
addWorker = true;
break;
}

counts = countsBeforeUpdate;
}
}
finally
Expand Down Expand Up @@ -183,7 +194,7 @@ private static bool SufficientDelaySinceLastDequeue(PortableThreadPool threadPoo
}
else
{
minimumDelay = (uint)threadPoolInstance._separated.numThreadsGoal * DequeueDelayThresholdMs;
minimumDelay = (uint)threadPoolInstance._separated.counts.NumThreadsGoal * DequeueDelayThresholdMs;
}

return delay > minimumDelay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ private struct ThreadCounts
// SOS's ThreadPool command depends on this layout
private const byte NumProcessingWorkShift = 0;
private const byte NumExistingThreadsShift = 16;
private const byte NumThreadsGoalShift = 32;

private uint _data; // SOS's ThreadPool command depends on this name
private ulong _data; // SOS's ThreadPool command depends on this name

private ThreadCounts(uint data) => _data = data;
private ThreadCounts(ulong data) => _data = data;

private short GetInt16Value(byte shift) => (short)(_data >> shift);
private void SetInt16Value(short value, byte shift) =>
_data = (_data & ~((uint)ushort.MaxValue << shift)) | ((uint)(ushort)value << shift);
_data = (_data & ~((ulong)ushort.MaxValue << shift)) | ((ulong)(ushort)value << shift);

/// <summary>
/// Number of threads processing work items.
Expand All @@ -43,7 +44,7 @@ public void SubtractNumProcessingWork(short value)
Debug.Assert(value >= 0);
Debug.Assert(value <= NumProcessingWork);

_data -= (uint)(ushort)value << NumProcessingWorkShift;
_data -= (ulong)(ushort)value << NumProcessingWorkShift;
}

public void InterlockedDecrementNumProcessingWork()
Expand Down Expand Up @@ -72,19 +73,61 @@ public void SubtractNumExistingThreads(short value)
Debug.Assert(value >= 0);
Debug.Assert(value <= NumExistingThreads);

_data -= (uint)(ushort)value << NumExistingThreadsShift;
_data -= (ulong)(ushort)value << NumExistingThreadsShift;
}

/// <summary>
/// Max possible thread pool threads we want to have.
/// </summary>
public short NumThreadsGoal
{
get => GetInt16Value(NumThreadsGoalShift);
set
{
Debug.Assert(value > 0);
SetInt16Value(value, NumThreadsGoalShift);
}
}

public ThreadCounts InterlockedSetNumThreadsGoal(short value)
{
ThreadPoolInstance._threadAdjustmentLock.VerifyIsLocked();

ThreadCounts counts = this;
while (true)
{
ThreadCounts newCounts = counts;
newCounts.NumThreadsGoal = value;

ThreadCounts countsBeforeUpdate = InterlockedCompareExchange(newCounts, counts);
if (countsBeforeUpdate == counts)
{
return newCounts;
}

counts = countsBeforeUpdate;
}
}

public ThreadCounts VolatileRead() => new ThreadCounts(Volatile.Read(ref _data));

public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCounts oldCounts) =>
new ThreadCounts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data));
public ThreadCounts InterlockedCompareExchange(ThreadCounts newCounts, ThreadCounts oldCounts)
{
#if DEBUG
if (newCounts.NumThreadsGoal != oldCounts.NumThreadsGoal)
{
ThreadPoolInstance._threadAdjustmentLock.VerifyIsLocked();
}
#endif

return new ThreadCounts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data));
}

public static bool operator ==(ThreadCounts lhs, ThreadCounts rhs) => lhs._data == rhs._data;
public static bool operator !=(ThreadCounts lhs, ThreadCounts rhs) => lhs._data != rhs._data;

public override bool Equals([NotNullWhen(true)] object? obj) => obj is ThreadCounts other && _data == other._data;
public override int GetHashCode() => (int)_data;
public override int GetHashCode() => (int)_data + (int)(_data >> 32);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,19 @@ private static void WorkerThreadStart()
ThreadCounts newCounts = counts;
newCounts.SubtractNumExistingThreads(1);
short newNumExistingThreads = (short)(numExistingThreads - 1);

ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
newCounts.NumThreadsGoal = newNumThreadsGoal;

ThreadCounts oldCounts =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, threadPoolInstance._separated.numThreadsGoal));
if (threadPoolInstance._separated.numThreadsGoal != newNumThreadsGoal)
{
threadPoolInstance._separated.numThreadsGoal = newNumThreadsGoal;
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
}

HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
Expand Down Expand Up @@ -181,7 +178,7 @@ internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance
while (true)
{
numProcessingWork = counts.NumProcessingWork;
if (numProcessingWork >= threadPoolInstance._separated.numThreadsGoal)
if (numProcessingWork >= counts.NumThreadsGoal)
{
return;
}
Expand Down Expand Up @@ -256,7 +253,7 @@ internal static bool ShouldStopProcessingWorkNow(PortableThreadPool threadPoolIn
// code from which this implementation was ported, which turns a processing thread into a retired thread
// and checks for pending requests like RemoveWorkingWorker. In this implementation there are
// no retired threads, so only the count of threads processing work is considered.
if (counts.NumProcessingWork <= threadPoolInstance._separated.numThreadsGoal)
if (counts.NumProcessingWork <= counts.NumThreadsGoal)
{
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ private struct CacheLineSeparated
{
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1)]
public ThreadCounts counts; // SOS's ThreadPool command depends on this name
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1 + sizeof(uint))]
public short numThreadsGoal;

[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 2)]
public int lastDequeueTime;
Expand Down Expand Up @@ -103,7 +101,7 @@ private PortableThreadPool()
_maxThreads = _minThreads;
}

_separated.numThreadsGoal = _minThreads;
_separated.counts.NumThreadsGoal = _minThreads;
}

public bool SetMinThreads(int workerThreads, int ioCompletionThreads)
Expand Down Expand Up @@ -142,9 +140,9 @@ public bool SetMinThreads(int workerThreads, int ioCompletionThreads)
wakeGateThread = true;
}
}
else if (_separated.numThreadsGoal < newMinThreads)
else if (_separated.counts.NumThreadsGoal < newMinThreads)
{
_separated.numThreadsGoal = newMinThreads;
_separated.counts.InterlockedSetNumThreadsGoal(newMinThreads);
if (_separated.numRequestedWorkers > 0)
{
addWorker = true;
Expand Down Expand Up @@ -193,9 +191,9 @@ public bool SetMaxThreads(int workerThreads, int ioCompletionThreads)

short newMaxThreads = (short)Math.Min(workerThreads, MaxPossibleThreadCount);
_maxThreads = newMaxThreads;
if (_separated.numThreadsGoal > newMaxThreads)
if (_separated.counts.NumThreadsGoal > newMaxThreads)
{
_separated.numThreadsGoal = newMaxThreads;
_separated.counts.InterlockedSetNumThreadsGoal(newMaxThreads);
}
return true;
}
Expand Down Expand Up @@ -272,13 +270,15 @@ private void AdjustMaxWorkersActive()
bool addWorker = false;
try
{
// Skip hill climbing when there is a pending blocking adjustment. Hill climbing may otherwise bypass the
// blocking adjustment heuristics and increase the thread count too quickly.
if (_pendingBlockingAdjustment != PendingBlockingAdjustment.None)
// Repeated checks from ShouldAdjustMaxWorkersActive() inside the lock
ThreadCounts counts = _separated.counts;
if (counts.NumProcessingWork > counts.NumThreadsGoal ||
_pendingBlockingAdjustment != PendingBlockingAdjustment.None)
{
return;
}


long startTime = _currentSampleStartTime;
long endTime = Stopwatch.GetTimestamp();
long freq = Stopwatch.Frequency;
Expand All @@ -291,13 +291,13 @@ private void AdjustMaxWorkersActive()
int totalNumCompletions = (int)_completionCounter.Count;
int numCompletions = totalNumCompletions - _separated.priorCompletionCount;

short oldNumThreadsGoal = counts.NumThreadsGoal;
int newNumThreadsGoal;
(newNumThreadsGoal, _threadAdjustmentIntervalMs) =
HillClimbing.ThreadPoolHillClimber.Update(_separated.numThreadsGoal, elapsedSeconds, numCompletions);
short oldNumThreadsGoal = _separated.numThreadsGoal;
HillClimbing.ThreadPoolHillClimber.Update(oldNumThreadsGoal, elapsedSeconds, numCompletions);
if (oldNumThreadsGoal != (short)newNumThreadsGoal)
{
_separated.numThreadsGoal = (short)newNumThreadsGoal;
_separated.counts.InterlockedSetNumThreadsGoal((short)newNumThreadsGoal);

//
// If we're increasing the goal, inject a thread. If that thread finds work, it will inject
Expand Down Expand Up @@ -354,7 +354,8 @@ private bool ShouldAdjustMaxWorkersActive(int currentTimeMs)
// threads processing work to stop in response to a decreased thread count goal. The logic here is a bit
// different from the original CoreCLR code from which this implementation was ported because in this
// implementation there are no retired threads, so only the count of threads processing work is considered.
if (_separated.counts.NumProcessingWork > _separated.numThreadsGoal)
ThreadCounts counts = _separated.counts;
if (counts.NumProcessingWork > counts.NumThreadsGoal)
{
return false;
}
Expand Down

0 comments on commit 2d0aa63

Please sign in to comment.