Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WASI] improve single-threaded threadpool #107395

Merged
merged 9 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static int Main(string[] args)
return PollWasiEventLoopUntilResolved((Thread)null!, MainAsync(args));

[UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "PollWasiEventLoopUntilResolved")]
static extern int PollWasiEventLoopUntilResolved(Thread t, Task<int> mainTask);
static extern T PollWasiEventLoopUntilResolved<T>(Thread t, Task<T> mainTask);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,16 @@ internal static System.Threading.Tasks.Task RegisterWasiPollableHandle(int handl
return WasiEventLoop.RegisterWasiPollableHandle(handle, cancellationToken);
}

internal static int PollWasiEventLoopUntilResolved(Task<int> mainTask)
internal static T PollWasiEventLoopUntilResolved<T>(Task<T> mainTask)
{
while (!mainTask.IsCompleted)
{
WasiEventLoop.DispatchWasiEventLoop();
}
var exception = mainTask.Exception;
if (exception is not null)
{
throw exception;
}

return mainTask.Result;
return WasiEventLoop.PollWasiEventLoopUntilResolved<T>(mainTask);
}

#endif
internal static void PollWasiEventLoopUntilResolvedVoid(Task mainTask)
{
WasiEventLoop.PollWasiEventLoopUntilResolvedVoid(mainTask);
}
#endif // TARGET_WASI

// the closest analog to Sleep(0) on Unix is sched_yield
internal static void UninterruptibleSleep0() => Thread.Yield();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,10 +906,7 @@ internal static bool Dispatch()
// thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing
// work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of
// Scheduled, and try to dequeue again or request another thread.
#if !TARGET_WASI
// TODO https://github.com/dotnet/runtime/issues/104803
Debug.Assert(workQueue._separated.queueProcessingStage == QueueProcessingStage.Scheduled);
#endif
workQueue._separated.queueProcessingStage = QueueProcessingStage.Determining;
Interlocked.MemoryBarrier();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using WasiPollWorld.wit.imports.wasi.io.v0_2_1;
using Pollable = WasiPollWorld.wit.imports.wasi.io.v0_2_1.IPoll.Pollable;
using MonotonicClockInterop = WasiPollWorld.wit.imports.wasi.clocks.v0_2_1.MonotonicClockInterop;

namespace System.Threading
{
Expand All @@ -14,7 +15,9 @@ internal static class WasiEventLoop
// it will be leaked and stay in this list forever.
// it will also keep the Pollable handle alive and prevent it from being disposed
private static readonly List<PollableHolder> s_pollables = new();
private static bool s_tasksCanceled;
private static bool s_checkScheduled;
private static Pollable? s_resolvedPollable;
private static Task? s_mainTask;

internal static Task RegisterWasiPollableHandle(int handle, CancellationToken cancellationToken)
{
Expand All @@ -29,18 +32,70 @@ internal static Task RegisterWasiPollable(Pollable pollable, CancellationToken c
// this will register the pollable holder into s_pollables
var holder = new PollableHolder(pollable, cancellationToken);
s_pollables.Add(holder);

ScheduleCheck();

return holder.taskCompletionSource.Task;
}

// this is not thread safe
internal static void DispatchWasiEventLoop()

internal static T PollWasiEventLoopUntilResolved<T>(Task<T> mainTask)
{
try
{
s_mainTask = mainTask;
while (!mainTask.IsCompleted)
{
ThreadPoolWorkQueue.Dispatch();
}
}
finally
{
s_mainTask = null;
}
var exception = mainTask.Exception;
if (exception is not null)
{
throw exception;
}

return mainTask.Result;
}

internal static void PollWasiEventLoopUntilResolvedVoid(Task mainTask)
{
try
{
s_mainTask = mainTask;
while (!mainTask.IsCompleted)
{
ThreadPoolWorkQueue.Dispatch();
}
}
finally
{
s_mainTask = null;
}

var exception = mainTask.Exception;
if (exception is not null)
{
throw exception;
}
}

internal static void ScheduleCheck()
{
ThreadPoolWorkQueue.Dispatch();
if (s_tasksCanceled)
if (!s_checkScheduled && s_pollables.Count > 0)
{
s_tasksCanceled = false;
return;
s_checkScheduled = true;
ThreadPool.UnsafeQueueUserWorkItem(CheckPollables, null);
}
}

internal static void CheckPollables(object? _)
{
s_checkScheduled = false;

var holders = new List<PollableHolder>(s_pollables.Count);
var pending = new List<Pollable>(s_pollables.Count);
Expand All @@ -58,13 +113,28 @@ internal static void DispatchWasiEventLoop()

if (pending.Count > 0)
{
var resolvedPollableIndex = -1;
// if there is CPU-bound work to do, we should not block on PollInterop.Poll below
// so we will append pollable resolved in 0ms
// in effect, the PollInterop.Poll would not block us
if (ThreadPool.PendingWorkItemCount > 0 || (s_mainTask != null && s_mainTask.IsCompleted))
{
s_resolvedPollable ??= MonotonicClockInterop.SubscribeDuration(0);
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
resolvedPollableIndex = pending.Count;
pending.Add(s_resolvedPollable);
}

var readyIndexes = PollInterop.Poll(pending);
for (int i = 0; i < readyIndexes.Length; i++)
{
uint readyIndex = readyIndexes[i];
var holder = holders[(int)readyIndex];
holder.ResolveAndDispose();
if (resolvedPollableIndex != readyIndex)
{
var holder = holders[(int)readyIndex];
holder.ResolveAndDispose();
}
}

for (int i = 0; i < holders.Count; i++)
{
PollableHolder holder = holders[i];
Expand All @@ -73,6 +143,8 @@ internal static void DispatchWasiEventLoop()
s_pollables.Add(holder);
}
}

ScheduleCheck();
}
}

Expand Down Expand Up @@ -112,19 +184,14 @@ public void ResolveAndDispose()
}

// for GC of abandoned Tasks or for cancellation
private static void CancelAndDispose(object? s)
public static void CancelAndDispose(object? s)
{
PollableHolder self = (PollableHolder)s!;
if (self.isDisposed)
{
return;
}

// Tell event loop to exit early, giving the application a
// chance to quit if the task(s) it is interested in have
// completed.
s_tasksCanceled = true;

// it will be removed from s_pollables on the next run
self.isDisposed = true;
self.pollable.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static partial class ThreadPool
{
// Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that
// the runtime may use the thread for processing other work
internal static bool YieldFromDispatchLoop => false;
internal static bool YieldFromDispatchLoop => true;

private const bool IsWorkerTrackingEnabledInConfig = false;

Expand Down
3 changes: 2 additions & 1 deletion src/mono/sample/wasi/http-p2/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public static class WasiMainWrapper
{
public static async Task<int> MainAsync(string[] args)
{
_ = Task.Delay(100_000_000); // create a task that will not complete before main
await Task.Delay(100);
GC.Collect(); // test that Pollable->Task is not collected until resolved

Expand Down Expand Up @@ -78,7 +79,7 @@ public static int Main(string[] args)
return PollWasiEventLoopUntilResolved((Thread)null!, MainAsync(args));

[UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "PollWasiEventLoopUntilResolved")]
static extern int PollWasiEventLoopUntilResolved(Thread t, Task<int> mainTask);
static extern T PollWasiEventLoopUntilResolved<T>(Thread t, Task<T> mainTask);
}

}
2 changes: 1 addition & 1 deletion src/mono/wasi/testassets/Http.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ public static int Main(string[] args)
return PollWasiEventLoopUntilResolved((Thread)null!, MainAsync(args));

[UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "PollWasiEventLoopUntilResolved")]
static extern int PollWasiEventLoopUntilResolved(Thread t, Task<int> mainTask);
static extern T PollWasiEventLoopUntilResolved<T>(Thread t, Task<T> mainTask);
}
}
Loading