diff --git a/README.md b/README.md index 86769df8..e075a94b 100644 --- a/README.md +++ b/README.md @@ -701,8 +701,8 @@ with .NET tasks inside of workflows: * Use `Workflow.WhenAnyAsync` instead. * Technically this only applies to an enumerable set of tasks with results or more than 2 tasks with results. Other uses are safe. See [this issue](https://github.com/dotnet/runtime/issues/87481). -* Do not use `System.Threading.Semaphore` or `System.Threading.SemaphoreSlim`. - * Use `Temporalio.Workflows.Semaphore` instead. +* Do not use `System.Threading.Semaphore` or `System.Threading.SemaphoreSlim` or `System.Threading.Mutex`. + * Use `Temporalio.Workflows.Semaphore` or `Temporalio.Workflows.Mutex` instead. * _Technically_ `SemaphoreSlim` does work if only the async form of `WaitAsync` is used without no timeouts and `Release` is used. But anything else can deadlock the workflow and its use is cumbersome since it must be disposed. * Be wary of additional libraries' implicit use of the default scheduler. diff --git a/src/Temporalio/Workflows/Mutex.cs b/src/Temporalio/Workflows/Mutex.cs new file mode 100644 index 00000000..18e46bdd --- /dev/null +++ b/src/Temporalio/Workflows/Mutex.cs @@ -0,0 +1,94 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Temporalio.Workflows +{ + /// + /// Mutex is an alternative to . It is a thin wrapper around + /// with a single count. + /// + public class Mutex + { + private readonly Semaphore semaphore = new(1); + + /// + /// Wait for the mutex to become available. If the task succeeds, users must call + /// to properly release the mutex when done. + /// + /// + /// Cancellation token that can interrupt this wait. If unset, this defaults to + /// . Upon cancel, awaiting the resulting task will + /// throw a exception. + /// + /// + /// A representing the result of the asynchronous operation. This task is + /// canceled if the cancellation token is. + /// + public Task WaitOneAsync(CancellationToken? cancellationToken = null) => + semaphore.WaitAsync(cancellationToken); + + /// + /// Wait for the mutex to become available or timeout to be reached. If the task returns + /// true, users must call to properly release the mutex when + /// done. + /// + /// + /// Milliseconds until timeout. If this is 0, this is a non-blocking task that will return + /// immediately. If this value is -1 (i.e. ), there is no + /// timeout and the result will always be true on success (at which point you might as well + /// use ). Otherwise a timer is started for + /// the timeout and canceled if the wait succeeds. + /// + /// + /// Cancellation token that can interrupt this wait. If unset, this defaults to + /// . Upon cancel, awaiting the resulting task will + /// throw a exception. + /// + /// + /// A representing the result of the asynchronous operation. The task is + /// true if the wait succeeded and false if it timed out. This task is canceled if the + /// cancellation token is. + /// + public Task WaitOneAsync( + int millisecondsTimeout, + CancellationToken? cancellationToken = null) => + semaphore.WaitAsync(millisecondsTimeout, cancellationToken); + + /// + /// Wait for the mutex to become available or timeout to be reached. If the task returns + /// true, users must call to properly release the mutex when + /// done. + /// + /// + /// TimeSpan until timeout. If this is , this is a non-blocking + /// task that will return immediately. If this value is -1ms (i.e. + /// ), there is no timeout and the result will always + /// be true on success (at which point you might as well use + /// ). Otherwise a timer is started for the + /// timeout and canceled if the wait succeeds. + /// + /// + /// Cancellation token that can interrupt this wait. If unset, this defaults to + /// . Upon cancel, awaiting the resulting task will + /// throw a exception. + /// + /// + /// A representing the result of the asynchronous operation. The task is + /// true if the wait succeeded and false if it timed out. This task is canceled if the + /// cancellation token is. + /// + public Task WaitOneAsync( + TimeSpan timeout, + CancellationToken? cancellationToken = null) => + semaphore.WaitAsync(timeout, cancellationToken); + + /// + /// Release the mutex for use by another waiter. + /// + /// + /// Thrown if the mutex was not waited on successsfully. + /// + public void ReleaseMutex() => semaphore.Release(); + } +} \ No newline at end of file diff --git a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs index d44fdc2b..8522063b 100644 --- a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs +++ b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs @@ -5002,11 +5002,16 @@ await Task.WhenAll( [Workflow] public sealed class SemaphoreWorkflow : IDisposable { - private readonly Semaphore sema; + private readonly Semaphore? semaphore; + private readonly Mutex? mutex; private readonly Dictionary cancellations = new(); [WorkflowInit] - public SemaphoreWorkflow(int initialCount) => sema = new(initialCount); + public SemaphoreWorkflow(int? semaphoreCount) + { + semaphore = semaphoreCount is { } initialCount ? new(initialCount) : null; + mutex = semaphoreCount == null ? new() : null; + } public void Dispose() { @@ -5017,7 +5022,7 @@ public void Dispose() } [WorkflowRun] - public Task RunAsync(int initialCount) => Workflow.WaitConditionAsync(() => false); + public Task RunAsync(int? semaphoreCount) => Workflow.WaitConditionAsync(() => false); [WorkflowQuery] public List Waiting { get; } = new(); @@ -5045,15 +5050,15 @@ public async Task UpdateAsync(Update update) { throw new ApplicationFailureException("Cannot have MS and timespan"); } - acquired = await sema.WaitAsync(timeoutMs, cancellationToken); + acquired = await WaitAsync(timeoutMs, cancellationToken); } else if (update.TimeoutTimeSpan is { } timeoutTimeSpan) { - acquired = await sema.WaitAsync(timeoutTimeSpan, cancellationToken); + acquired = await WaitAsync(timeoutTimeSpan, cancellationToken); } else { - await sema.WaitAsync(cancellationToken); + await WaitAsync(cancellationToken); acquired = true; } } @@ -5077,7 +5082,7 @@ await Workflow.ExecuteActivityAsync( } finally { - sema.Release(); + Release(); Waiting.Remove(update.Number); } } @@ -5085,6 +5090,29 @@ await Workflow.ExecuteActivityAsync( [WorkflowSignal] public async Task CancelUpdateAsync(int number) => cancellations[number].Cancel(); +#pragma warning disable VSTHRD110 // Returning tasks is ok + private Task WaitAsync(CancellationToken? cancellationToken = null) => + semaphore?.WaitAsync(cancellationToken) ?? mutex!.WaitOneAsync(cancellationToken); + + private Task WaitAsync( + int millisecondsTimeout, + CancellationToken? cancellationToken = null) => + semaphore?.WaitAsync(millisecondsTimeout, cancellationToken) ?? + mutex!.WaitOneAsync(millisecondsTimeout, cancellationToken); + + private Task WaitAsync( + TimeSpan timeout, + CancellationToken? cancellationToken = null) => + semaphore?.WaitAsync(timeout, cancellationToken) ?? + mutex!.WaitOneAsync(timeout, cancellationToken); +#pragma warning restore VSTHRD110 + + private void Release() + { + semaphore?.Release(); + mutex?.ReleaseMutex(); + } + public record Update(int Number) { public int? TimeoutMs { get; init; } @@ -5167,6 +5195,61 @@ await handle.QueryAsync(wf => wf.Completed), new TemporalWorkerOptions().AddAllActivities(acts)); } + [Fact] + public async Task ExecuteWorkflowAsync_Mutex_MultipleWaiters() + { + // This is a basic test of mutex usage. We will make a mutex with 3 updates and confirm + // that each proceeds one at a time. + var acts = new SemaphoreWorkflow.Activities(3); + await ExecuteWorkerAsync( + async worker => + { + // Start semaphore workflow with 3 updates + var handle = await Client.StartWorkflowAsync( + (SemaphoreWorkflow wf) => wf.RunAsync(null), + new($"wf-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + await Task.WhenAll(Enumerable.Range(0, 3).Select(i => + handle.StartUpdateAsync( + wf => wf.UpdateAsync(new(i)), + new(WorkflowUpdateStage.Accepted)))); + + // Confirm only 1 waiting + var waiting1 = Assert.Single(await handle.QueryAsync(wf => wf.Waiting)); + + // Unblock, wait for next one waiting + acts.UpdateCompletions[waiting1].SetResult(); + var waiting2 = await AssertMore.EventuallyAsync(async () => + { + var waiting = Assert.Single(await handle.QueryAsync(wf => wf.Waiting)); + Assert.NotEqual(waiting, waiting1); + return waiting; + }); + + // Unblock, wait for final one waiting + acts.UpdateCompletions[waiting2].SetResult(); + var waiting3 = await AssertMore.EventuallyAsync(async () => + { + var waiting = Assert.Single(await handle.QueryAsync(wf => wf.Waiting)); + Assert.NotEqual(waiting, waiting1); + Assert.NotEqual(waiting, waiting2); + return waiting; + }); + + // Unblock and confirm completions + acts.UpdateCompletions[waiting3].SetResult(); + var completed = await AssertMore.EventuallyAsync(async () => + { + var completed = await handle.QueryAsync(wf => wf.Completed); + Assert.True(completed.Count == 3); + return completed; + }); + Assert.Contains(0, completed); + Assert.Contains(1, completed); + Assert.Contains(2, completed); + }, + new TemporalWorkerOptions().AddAllActivities(acts)); + } + [Theory] // 0 means non-blocking [InlineData(0, true)] @@ -5221,12 +5304,16 @@ await handle.StartUpdateAsync( new TemporalWorkerOptions().AddAllActivities(acts)); } - [Theory] - // TODO(cretz): This is currently disabled since workflow cancellation is causing an issue. - // [InlineData(true)] + [SkippableTheory] + [InlineData(true)] [InlineData(false)] public async Task ExecuteWorkflowAsync_Semaphore_Cancellation(bool useWorkflowCancellation) { + if (useWorkflowCancellation) + { + throw new SkipException( + "Unable to cancel workflow for this test: https://github.com/temporalio/sdk-core/issues/772"); + } // This tests that cancellation works. We will make a semaphore with 1 allowed and submit 2 // updates. The second update will wait, and we will cancel the wait. var acts = new SemaphoreWorkflow.Activities(1);