Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[browser][MT] use regular POSIX portable threadpool #99836

Merged
merged 11 commits into from
Mar 21, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace System.Threading
/// <summary>
/// A LIFO semaphore implemented using the PAL's semaphore with uninterruptible waits.
/// </summary>
internal sealed partial class LowLevelLifoSemaphore : LowLevelLifoSemaphoreBase, IDisposable
internal sealed partial class LowLevelLifoSemaphore : IDisposable
{
private Semaphore? _semaphore;

Expand All @@ -34,7 +34,7 @@ public bool WaitCore(int timeoutMs)
[LibraryImport(RuntimeHelpers.QCall, EntryPoint = "WaitHandle_CorWaitOnePrioritizedNative")]
private static partial int WaitNative(SafeWaitHandle handle, int timeoutMs);

protected override void ReleaseCore(int count)
private void ReleaseCore(int count)
{
Debug.Assert(_semaphore != null);
Debug.Assert(count > 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace System.Threading
/// A LIFO semaphore.
/// Waits on this semaphore are uninterruptible.
/// </summary>
internal sealed partial class LowLevelLifoSemaphore : LowLevelLifoSemaphoreBase, IDisposable
internal sealed partial class LowLevelLifoSemaphore : IDisposable
{
private WaitSubsystem.WaitableObject _semaphore;

Expand All @@ -27,7 +27,7 @@ private bool WaitCore(int timeoutMs)
return WaitSubsystem.Wait(_semaphore, timeoutMs, false, true) == WaitHandle.WaitSuccess;
}

protected override void ReleaseCore(int count)
private void ReleaseCore(int count)
{
WaitSubsystem.ReleaseSemaphore(_semaphore, count);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2685,7 +2685,6 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.ThreadCounts.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WaitThread.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.NonBrowser.cs" Condition="'$(TargetsBrowser)' != 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerTracking.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Unix.cs" Condition="'$(TargetsUnix)' == 'true' or ('$(TargetsBrowser)' == 'true' and '$(FeatureWasmManagedThreads)' != 'true') or '$(TargetsWasi)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
Expand All @@ -2697,7 +2696,6 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\RegisteredWaitHandle.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\RegisteredWaitHandle.Unix.cs" Condition="'$(TargetsWindows)' != 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\RegisteredWaitHandle.Portable.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\LowLevelLifoSemaphoreBase.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadPoolBoundHandle.Portable.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadPoolBoundHandle.Unix.cs" Condition="'$(TargetsWindows)' != 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\ThreadPoolBoundHandle.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public bool WaitCore(int timeoutMs)
return success;
}

protected override void ReleaseCore(int count)
private void ReleaseCore(int count)
{
Debug.Assert(count > 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,29 @@ namespace System.Threading
/// A LIFO semaphore.
/// Waits on this semaphore are uninterruptible.
/// </summary>
internal sealed partial class LowLevelLifoSemaphore : LowLevelLifoSemaphoreBase, IDisposable
internal sealed partial class LowLevelLifoSemaphore : IDisposable
{
private CacheLineSeparatedCounts _separated;

private readonly int _maximumSignalCount;
private readonly int _spinCount;
private readonly Action _onWait;

private const int SpinSleep0Threshold = 10;

public LowLevelLifoSemaphore(int initialSignalCount, int maximumSignalCount, int spinCount, Action onWait)
: base(initialSignalCount, maximumSignalCount, spinCount, onWait)
{
Debug.Assert(initialSignalCount >= 0);
Debug.Assert(initialSignalCount <= maximumSignalCount);
Debug.Assert(maximumSignalCount > 0);
Debug.Assert(spinCount >= 0);

_separated = default;
_separated._counts.SignalCount = (uint)initialSignalCount;
_maximumSignalCount = maximumSignalCount;
_spinCount = spinCount;
_onWait = onWait;

Create(maximumSignalCount);
}

Expand Down Expand Up @@ -185,5 +201,178 @@ private bool WaitForSignal(int timeoutMs)
}
}
}

public void Release(int releaseCount)
{
Debug.Assert(releaseCount > 0);
Debug.Assert(releaseCount <= _maximumSignalCount);

int countOfWaitersToWake;
Counts counts = _separated._counts;
while (true)
{
Counts newCounts = counts;

// Increase the signal count. The addition doesn't overflow because of the limit on the max signal count in constructor.
newCounts.AddSignalCount((uint)releaseCount);

// Determine how many waiters to wake, taking into account how many spinners and waiters there are and how many waiters
// have previously been signaled to wake but have not yet woken
countOfWaitersToWake =
(int)Math.Min(newCounts.SignalCount, (uint)counts.WaiterCount + counts.SpinnerCount) -
counts.SpinnerCount -
counts.CountOfWaitersSignaledToWake;
if (countOfWaitersToWake > 0)
{
// Ideally, limiting to a maximum of releaseCount would not be necessary and could be an assert instead, but since
// WaitForSignal() does not have enough information to tell whether a woken thread was signaled, and due to the cap
// below, it's possible for countOfWaitersSignaledToWake to be less than the number of threads that have actually
// been signaled to wake.
if (countOfWaitersToWake > releaseCount)
{
countOfWaitersToWake = releaseCount;
}

// Cap countOfWaitersSignaledToWake to its max value. It's ok to ignore some woken threads in this count, it just
// means some more threads will be woken next time. Typically, it won't reach the max anyway.
newCounts.AddUpToMaxCountOfWaitersSignaledToWake((uint)countOfWaitersToWake);
}

Counts countsBeforeUpdate = _separated._counts.InterlockedCompareExchange(newCounts, counts);
if (countsBeforeUpdate == counts)
{
Debug.Assert(releaseCount <= _maximumSignalCount - counts.SignalCount);
if (countOfWaitersToWake > 0)
ReleaseCore(countOfWaitersToWake);
return;
}

counts = countsBeforeUpdate;
}
}

private struct Counts : IEquatable<Counts>
{
private const byte SignalCountShift = 0;
private const byte WaiterCountShift = 32;
private const byte SpinnerCountShift = 48;
private const byte CountOfWaitersSignaledToWakeShift = 56;

private ulong _data;

private Counts(ulong data) => _data = data;

private uint GetUInt32Value(byte shift) => (uint)(_data >> shift);
private void SetUInt32Value(uint value, byte shift) =>
_data = (_data & ~((ulong)uint.MaxValue << shift)) | ((ulong)value << shift);
private ushort GetUInt16Value(byte shift) => (ushort)(_data >> shift);
private void SetUInt16Value(ushort value, byte shift) =>
_data = (_data & ~((ulong)ushort.MaxValue << shift)) | ((ulong)value << shift);
private byte GetByteValue(byte shift) => (byte)(_data >> shift);
private void SetByteValue(byte value, byte shift) =>
_data = (_data & ~((ulong)byte.MaxValue << shift)) | ((ulong)value << shift);

public uint SignalCount
{
get => GetUInt32Value(SignalCountShift);
set => SetUInt32Value(value, SignalCountShift);
}

public void AddSignalCount(uint value)
{
Debug.Assert(value <= uint.MaxValue - SignalCount);
_data += (ulong)value << SignalCountShift;
}

public void IncrementSignalCount() => AddSignalCount(1);

public void DecrementSignalCount()
{
Debug.Assert(SignalCount != 0);
_data -= (ulong)1 << SignalCountShift;
}

public ushort WaiterCount
{
get => GetUInt16Value(WaiterCountShift);
set => SetUInt16Value(value, WaiterCountShift);
}

public void IncrementWaiterCount()
{
Debug.Assert(WaiterCount < ushort.MaxValue);
_data += (ulong)1 << WaiterCountShift;
}

public void DecrementWaiterCount()
{
Debug.Assert(WaiterCount != 0);
_data -= (ulong)1 << WaiterCountShift;
}

public void InterlockedDecrementWaiterCount()
{
var countsAfterUpdate = new Counts(Interlocked.Add(ref _data, unchecked((ulong)-1) << WaiterCountShift));
Debug.Assert(countsAfterUpdate.WaiterCount != ushort.MaxValue); // underflow check
}

public byte SpinnerCount
{
get => GetByteValue(SpinnerCountShift);
set => SetByteValue(value, SpinnerCountShift);
}

public void IncrementSpinnerCount()
{
Debug.Assert(SpinnerCount < byte.MaxValue);
_data += (ulong)1 << SpinnerCountShift;
}

public void DecrementSpinnerCount()
{
Debug.Assert(SpinnerCount != 0);
_data -= (ulong)1 << SpinnerCountShift;
}

public byte CountOfWaitersSignaledToWake
{
get => GetByteValue(CountOfWaitersSignaledToWakeShift);
set => SetByteValue(value, CountOfWaitersSignaledToWakeShift);
}

public void AddUpToMaxCountOfWaitersSignaledToWake(uint value)
{
uint availableCount = (uint)(byte.MaxValue - CountOfWaitersSignaledToWake);
if (value > availableCount)
{
value = availableCount;
}
_data += (ulong)value << CountOfWaitersSignaledToWakeShift;
}

public void DecrementCountOfWaitersSignaledToWake()
{
Debug.Assert(CountOfWaitersSignaledToWake != 0);
_data -= (ulong)1 << CountOfWaitersSignaledToWakeShift;
}

public Counts InterlockedCompareExchange(Counts newCounts, Counts oldCounts) =>
new Counts(Interlocked.CompareExchange(ref _data, newCounts._data, oldCounts._data));

public static bool operator ==(Counts lhs, Counts rhs) => lhs.Equals(rhs);
public static bool operator !=(Counts lhs, Counts rhs) => !lhs.Equals(rhs);

public override bool Equals([NotNullWhen(true)] object? obj) => obj is Counts other && Equals(other);
public bool Equals(Counts other) => _data == other._data;
public override int GetHashCode() => (int)_data + (int)(_data >> 32);
}

[StructLayout(LayoutKind.Sequential)]
private struct CacheLineSeparatedCounts
{
private readonly Internal.PaddingFor32 _pad1;
public Counts _counts;
private readonly Internal.PaddingFor32 _pad2;
}
}
}
Loading
Loading