Skip to content

Commit

Permalink
[browser][MT] use regular POSIX portable threadpool (#99836)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelsavara authored Mar 21, 2024
1 parent adb639c commit d9b9eb9
Show file tree
Hide file tree
Showing 23 changed files with 327 additions and 1,191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace System.Threading
/// <summary>
/// A LIFO semaphore implemented using the PAL's semaphore with uninterruptible waits.
/// </summary>
internal sealed partial class LowLevelLifoSemaphore : LowLevelLifoSemaphoreBase, IDisposable
internal sealed partial class LowLevelLifoSemaphore : IDisposable
{
private Semaphore? _semaphore;

Expand All @@ -34,7 +34,7 @@ public bool WaitCore(int timeoutMs)
[LibraryImport(RuntimeHelpers.QCall, EntryPoint = "WaitHandle_CorWaitOnePrioritizedNative")]
private static partial int WaitNative(SafeWaitHandle handle, int timeoutMs);

protected override void ReleaseCore(int count)
private void ReleaseCore(int count)
{
Debug.Assert(_semaphore != null);
Debug.Assert(count > 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace System.Threading
/// A LIFO semaphore.
/// Waits on this semaphore are uninterruptible.
/// </summary>
internal sealed partial class LowLevelLifoSemaphore : LowLevelLifoSemaphoreBase, IDisposable
internal sealed partial class LowLevelLifoSemaphore : IDisposable
{
private WaitSubsystem.WaitableObject _semaphore;

Expand All @@ -27,7 +27,7 @@ private bool WaitCore(int timeoutMs)
return WaitSubsystem.Wait(_semaphore, timeoutMs, false, true) == WaitHandle.WaitSuccess;
}

protected override void ReleaseCore(int count)
private void ReleaseCore(int count)
{
WaitSubsystem.ReleaseSemaphore(_semaphore, count);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2685,7 +2685,6 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.ThreadCounts.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WaitThread.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.NonBrowser.cs" Condition="'$(TargetsBrowser)' != 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerTracking.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Unix.cs" Condition="'$(TargetsUnix)' == 'true' or ('$(TargetsBrowser)' == 'true' and '$(FeatureWasmManagedThreads)' != 'true') or '$(TargetsWasi)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
Expand All @@ -2697,7 +2696,6 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\RegisteredWaitHandle.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\RegisteredWaitHandle.Unix.cs" Condition="'$(TargetsWindows)' != 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\RegisteredWaitHandle.Portable.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\LowLevelLifoSemaphoreBase.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadPoolBoundHandle.Portable.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadPoolBoundHandle.Unix.cs" Condition="'$(TargetsWindows)' != 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadPoolBoundHandle.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public bool WaitCore(int timeoutMs)
return success;
}

protected override void ReleaseCore(int count)
private void ReleaseCore(int count)
{
Debug.Assert(count > 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,29 @@ namespace System.Threading
/// A LIFO semaphore.
/// Waits on this semaphore are uninterruptible.
/// </summary>
internal sealed partial class LowLevelLifoSemaphore : LowLevelLifoSemaphoreBase, IDisposable
internal sealed partial class LowLevelLifoSemaphore : IDisposable
{
private CacheLineSeparatedCounts _separated;

private readonly int _maximumSignalCount;
private readonly int _spinCount;
private readonly Action _onWait;

private const int SpinSleep0Threshold = 10;

public LowLevelLifoSemaphore(int initialSignalCount, int maximumSignalCount, int spinCount, Action onWait)
: base(initialSignalCount, maximumSignalCount, spinCount, onWait)
{
Debug.Assert(initialSignalCount >= 0);
Debug.Assert(initialSignalCount <= maximumSignalCount);
Debug.Assert(maximumSignalCount > 0);
Debug.Assert(spinCount >= 0);

_separated = default;
_separated._counts.SignalCount = (uint)initialSignalCount;
_maximumSignalCount = maximumSignalCount;
_spinCount = spinCount;
_onWait = onWait;

Create(maximumSignalCount);
}

Expand Down Expand Up @@ -185,5 +201,178 @@ private bool WaitForSignal(int timeoutMs)
}
}
}

public void Release(int releaseCount)
{
Debug.Assert(releaseCount > 0);
Debug.Assert(releaseCount <= _maximumSignalCount);

int countOfWaitersToWake;
Counts counts = _separated._counts;
while (true)
{
Counts newCounts = counts;

// Increase the signal count. The addition doesn't overflow because of the limit on the max signal count in constructor.
newCounts.AddSignalCount((uint)releaseCount);

// Determine how many waiters to wake, taking into account how many spinners and waiters there are and how many waiters
// have previously been signaled to wake but have not yet woken
countOfWaitersToWake =
(int)Math.Min(newCounts.SignalCount, (uint)counts.WaiterCount + counts.SpinnerCount) -
counts.SpinnerCount -
counts.CountOfWaitersSignaledToWake;
if (countOfWaitersToWake > 0)
{
// Ideally, limiting to a maximum of releaseCount would not be necessary and could be an assert instead, but since
// WaitForSignal() does not have enough information to tell whether a woken thread was signaled, and due to the cap
// below, it's possible for countOfWaitersSignaledToWake to be less than the number of threads that have actually
// been signaled to wake.
if (countOfWaitersToWake > releaseCount)
{
countOfWaitersToWake = releaseCount;
}

// Cap countOfWaitersSignaledToWake to its max value. It's ok to ignore some woken threads in this count, it just
// means some more threads will be woken next time. Typically, it won't reach the max anyway.
newCounts.AddUpToMaxCountOfWaitersSignaledToWake((uint)countOfWaitersToWake);
}

Counts countsBeforeUpdate = _separated._counts.InterlockedCompareExchange(newCounts, counts);
if (countsBeforeUpdate == counts)
{
Debug.Assert(releaseCount <= _maximumSignalCount - counts.SignalCount);
if (countOfWaitersToWake > 0)
ReleaseCore(countOfWaitersToWake);
return;
}

counts = countsBeforeUpdate;
}
}

private struct Counts : IEquatable<Counts>
{
private const byte SignalCountShift = 0;
private const byte WaiterCountShift = 32;
private const byte SpinnerCountShift = 48;
private const byte CountOfWaitersSignaledToWakeShift = 56;

private ulong _data;

private Counts(ulong data) => _data = data;

private uint GetUInt32Value(byte shift) => (uint)(_data >> shift);
private void SetUInt32Value(uint value, byte shift) =>
_data = (_data & ~((ulong)uint.MaxValue << shift)) | ((ulong)value << shift);
private ushort GetUInt16Value(byte shift) => (ushort)(_data >> shift);
private void SetUInt16Value(ushort value, byte shift) =>
_data = (_data & ~((ulong)ushort.MaxValue << shift)) | ((ulong)value << shift);
private byte GetByteValue(byte shift) => (byte)(_data >> shift);
private void SetByteValue(byte value, byte shift) =>
_data = (_data & ~((ulong)byte.MaxValue << shift)) | ((ulong)value << shift);

public uint SignalCount
{
get => GetUInt32Value(SignalCountShift);
set => SetUInt32Value(value, SignalCountShift);
}

public void AddSignalCount(uint value)
{
Debug.Assert(value <= uint.MaxValue - SignalCount);
_data += (ulong)value << SignalCountShift;
}

public void IncrementSignalCount() => AddSignalCount(1);

public void DecrementSignalCount()
{
Debug.Assert(SignalCount != 0);
_data -= (ulong)1 << SignalCountShift;
}

public ushort WaiterCount
{
get => GetUInt16Value(WaiterCountShift);
set => SetUInt16Value(value, WaiterCountShift);
}

public void IncrementWaiterCount()
{
Debug.Assert(WaiterCount < ushort.MaxValue);
_data += (ulong)1 << WaiterCountShift;
}

public void DecrementWaiterCount()
{
Debug.Assert(WaiterCount != 0);
_data -= (ulong)1 << WaiterCountShift;
}

public void InterlockedDecrementWaiterCount()
{
var countsAfterUpdate = new Counts(Interlocked.Add(ref _data, unchecked((ulong)-1) << WaiterCountShift));
Debug.Assert(countsAfterUpdate.WaiterCount != ushort.MaxValue); // underflow check
}

public byte SpinnerCount
{
get => GetByteValue(SpinnerCountShift);
set => SetByteValue(value, SpinnerCountShift);
}

public void IncrementSpinnerCount()
{
Debug.Assert(SpinnerCount < byte.MaxValue);
_data += (ulong)1 << SpinnerCountShift;
}

public void DecrementSpinnerCount()
{
Debug.Assert(SpinnerCount != 0);
_data -= (ulong)1 << SpinnerCountShift;
}

public byte CountOfWaitersSignaledToWake
{
get => GetByteValue(CountOfWaitersSignaledToWakeShift);
set => SetByteValue(value, CountOfWaitersSignaledToWakeShift);
}

public void AddUpToMaxCountOfWaitersSignaledToWake(uint value)
{
uint availableCount = (uint)(byte.MaxValue - CountOfWaitersSignaledToWake);
if (value > availableCount)
{
value = availableCount;
}
_data += (ulong)value << CountOfWaitersSignaledToWakeShift;
}

public void DecrementCountOfWaitersSignaledToWake()
{
Debug.Assert(CountOfWaitersSignaledToWake != 0);
_data -= (ulong)1 << CountOfWaitersSignaledToWakeShift;
}

public Counts InterlockedCompareExchange(Counts newCounts, Counts oldCounts) =>
new Counts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data));

public static bool operator ==(Counts lhs, Counts rhs) => lhs.Equals(rhs);
public static bool operator !=(Counts lhs, Counts rhs) => !lhs.Equals(rhs);

public override bool Equals([NotNullWhen(true)] object? obj) => obj is Counts other && Equals(other);
public bool Equals(Counts other) => _data == other._data;
public override int GetHashCode() => (int)_data + (int)(_data >> 32);
}

[StructLayout(LayoutKind.Sequential)]
private struct CacheLineSeparatedCounts
{
private readonly Internal.PaddingFor32 _pad1;
public Counts _counts;
private readonly Internal.PaddingFor32 _pad2;
}
}
}
Loading

0 comments on commit d9b9eb9

Please sign in to comment.