Skip to content

Commit

Permalink
Add Temporalio.Workflows.Mutex (#298)
Browse files Browse the repository at this point in the history
* Add Temporalio.Workflows.Mutex

* Minor doc update
  • Loading branch information
cretz authored Jul 2, 2024
1 parent 72f6fb2 commit 8349537
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 12 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -701,8 +701,8 @@ with .NET tasks inside of workflows:
* Use `Workflow.WhenAnyAsync` instead.
* Technically this only applies to an enumerable set of tasks with results or more than 2 tasks with results. Other
uses are safe. See [this issue](https://github.com/dotnet/runtime/issues/87481).
* Do not use `System.Threading.Semaphore` or `System.Threading.SemaphoreSlim`.
* Use `Temporalio.Workflows.Semaphore` instead.
* Do not use `System.Threading.Semaphore` or `System.Threading.SemaphoreSlim` or `System.Threading.Mutex`.
* Use `Temporalio.Workflows.Semaphore` or `Temporalio.Workflows.Mutex` instead.
* _Technically_ `SemaphoreSlim` does work if only the async form of `WaitAsync` is used without no timeouts and
`Release` is used. But anything else can deadlock the workflow and its use is cumbersome since it must be disposed.
* Be wary of additional libraries' implicit use of the default scheduler.
Expand Down
94 changes: 94 additions & 0 deletions src/Temporalio/Workflows/Mutex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Temporalio.Workflows
{
/// <summary>
/// Mutex is an alternative to <see cref="System.Threading.Mutex"/>. It is a thin wrapper around
/// <see cref="Semaphore"/> with a single count.
/// </summary>
public class Mutex
{
private readonly Semaphore semaphore = new(1);

/// <summary>
/// Wait for the mutex to become available. If the task succeeds, users must call
/// <see cref="ReleaseMutex"/> to properly release the mutex when done.
/// </summary>
/// <param name="cancellationToken">
/// Cancellation token that can interrupt this wait. If unset, this defaults to
/// <see cref="Workflow.CancellationToken"/>. Upon cancel, awaiting the resulting task will
/// throw a <see cref="TaskCanceledException"/> exception.
/// </param>
/// <returns>
/// A <see cref="Task"/> representing the result of the asynchronous operation. This task is
/// canceled if the cancellation token is.
/// </returns>
public Task WaitOneAsync(CancellationToken? cancellationToken = null) =>
semaphore.WaitAsync(cancellationToken);

/// <summary>
/// Wait for the mutex to become available or timeout to be reached. If the task returns
/// true, users must call <see cref="ReleaseMutex"/> to properly release the mutex when
/// done.
/// </summary>
/// <param name="millisecondsTimeout">
/// Milliseconds until timeout. If this is 0, this is a non-blocking task that will return
/// immediately. If this value is -1 (i.e. <see cref="Timeout.Infinite"/>), there is no
/// timeout and the result will always be true on success (at which point you might as well
/// use <see cref="WaitOneAsync(CancellationToken?)"/>). Otherwise a timer is started for
/// the timeout and canceled if the wait succeeds.
/// </param>
/// <param name="cancellationToken">
/// Cancellation token that can interrupt this wait. If unset, this defaults to
/// <see cref="Workflow.CancellationToken"/>. Upon cancel, awaiting the resulting task will
/// throw a <see cref="TaskCanceledException"/> exception.
/// </param>
/// <returns>
/// A <see cref="Task"/> representing the result of the asynchronous operation. The task is
/// true if the wait succeeded and false if it timed out. This task is canceled if the
/// cancellation token is.
/// </returns>
public Task<bool> WaitOneAsync(
int millisecondsTimeout,
CancellationToken? cancellationToken = null) =>
semaphore.WaitAsync(millisecondsTimeout, cancellationToken);

/// <summary>
/// Wait for the mutex to become available or timeout to be reached. If the task returns
/// true, users must call <see cref="ReleaseMutex"/> to properly release the mutex when
/// done.
/// </summary>
/// <param name="timeout">
/// TimeSpan until timeout. If this is <see cref="TimeSpan.Zero"/>, this is a non-blocking
/// task that will return immediately. If this value is -1ms (i.e.
/// <see cref="Timeout.InfiniteTimeSpan"/>), there is no timeout and the result will always
/// be true on success (at which point you might as well use
/// <see cref="WaitOneAsync(CancellationToken?)"/>). Otherwise a timer is started for the
/// timeout and canceled if the wait succeeds.
/// </param>
/// <param name="cancellationToken">
/// Cancellation token that can interrupt this wait. If unset, this defaults to
/// <see cref="Workflow.CancellationToken"/>. Upon cancel, awaiting the resulting task will
/// throw a <see cref="TaskCanceledException"/> exception.
/// </param>
/// <returns>
/// A <see cref="Task"/> representing the result of the asynchronous operation. The task is
/// true if the wait succeeded and false if it timed out. This task is canceled if the
/// cancellation token is.
/// </returns>
public Task<bool> WaitOneAsync(
TimeSpan timeout,
CancellationToken? cancellationToken = null) =>
semaphore.WaitAsync(timeout, cancellationToken);

/// <summary>
/// Release the mutex for use by another waiter.
/// </summary>
/// <exception cref="InvalidOperationException">
/// Thrown if the mutex was not waited on successsfully.
/// </exception>
public void ReleaseMutex() => semaphore.Release();
}
}
107 changes: 97 additions & 10 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5002,11 +5002,16 @@ await Task.WhenAll(
[Workflow]
public sealed class SemaphoreWorkflow : IDisposable
{
private readonly Semaphore sema;
private readonly Semaphore? semaphore;
private readonly Mutex? mutex;
private readonly Dictionary<int, CancellationTokenSource> cancellations = new();

[WorkflowInit]
public SemaphoreWorkflow(int initialCount) => sema = new(initialCount);
public SemaphoreWorkflow(int? semaphoreCount)
{
semaphore = semaphoreCount is { } initialCount ? new(initialCount) : null;
mutex = semaphoreCount == null ? new() : null;
}

public void Dispose()
{
Expand All @@ -5017,7 +5022,7 @@ public void Dispose()
}

[WorkflowRun]
public Task RunAsync(int initialCount) => Workflow.WaitConditionAsync(() => false);
public Task RunAsync(int? semaphoreCount) => Workflow.WaitConditionAsync(() => false);

[WorkflowQuery]
public List<int> Waiting { get; } = new();
Expand Down Expand Up @@ -5045,15 +5050,15 @@ public async Task UpdateAsync(Update update)
{
throw new ApplicationFailureException("Cannot have MS and timespan");
}
acquired = await sema.WaitAsync(timeoutMs, cancellationToken);
acquired = await WaitAsync(timeoutMs, cancellationToken);
}
else if (update.TimeoutTimeSpan is { } timeoutTimeSpan)
{
acquired = await sema.WaitAsync(timeoutTimeSpan, cancellationToken);
acquired = await WaitAsync(timeoutTimeSpan, cancellationToken);
}
else
{
await sema.WaitAsync(cancellationToken);
await WaitAsync(cancellationToken);
acquired = true;
}
}
Expand All @@ -5077,14 +5082,37 @@ await Workflow.ExecuteActivityAsync(
}
finally
{
sema.Release();
Release();
Waiting.Remove(update.Number);
}
}

[WorkflowSignal]
public async Task CancelUpdateAsync(int number) => cancellations[number].Cancel();

#pragma warning disable VSTHRD110 // Returning tasks is ok
private Task WaitAsync(CancellationToken? cancellationToken = null) =>
semaphore?.WaitAsync(cancellationToken) ?? mutex!.WaitOneAsync(cancellationToken);

private Task<bool> WaitAsync(
int millisecondsTimeout,
CancellationToken? cancellationToken = null) =>
semaphore?.WaitAsync(millisecondsTimeout, cancellationToken) ??
mutex!.WaitOneAsync(millisecondsTimeout, cancellationToken);

private Task<bool> WaitAsync(
TimeSpan timeout,
CancellationToken? cancellationToken = null) =>
semaphore?.WaitAsync(timeout, cancellationToken) ??
mutex!.WaitOneAsync(timeout, cancellationToken);
#pragma warning restore VSTHRD110

private void Release()
{
semaphore?.Release();
mutex?.ReleaseMutex();
}

public record Update(int Number)
{
public int? TimeoutMs { get; init; }
Expand Down Expand Up @@ -5167,6 +5195,61 @@ await handle.QueryAsync(wf => wf.Completed),
new TemporalWorkerOptions().AddAllActivities(acts));
}

[Fact]
public async Task ExecuteWorkflowAsync_Mutex_MultipleWaiters()
{
// This is a basic test of mutex usage. We will make a mutex with 3 updates and confirm
// that each proceeds one at a time.
var acts = new SemaphoreWorkflow.Activities(3);
await ExecuteWorkerAsync<SemaphoreWorkflow>(
async worker =>
{
// Start semaphore workflow with 3 updates
var handle = await Client.StartWorkflowAsync(
(SemaphoreWorkflow wf) => wf.RunAsync(null),
new($"wf-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
await Task.WhenAll(Enumerable.Range(0, 3).Select(i =>
handle.StartUpdateAsync(
wf => wf.UpdateAsync(new(i)),
new(WorkflowUpdateStage.Accepted))));

// Confirm only 1 waiting
var waiting1 = Assert.Single(await handle.QueryAsync(wf => wf.Waiting));

// Unblock, wait for next one waiting
acts.UpdateCompletions[waiting1].SetResult();
var waiting2 = await AssertMore.EventuallyAsync(async () =>
{
var waiting = Assert.Single(await handle.QueryAsync(wf => wf.Waiting));
Assert.NotEqual(waiting, waiting1);
return waiting;
});

// Unblock, wait for final one waiting
acts.UpdateCompletions[waiting2].SetResult();
var waiting3 = await AssertMore.EventuallyAsync(async () =>
{
var waiting = Assert.Single(await handle.QueryAsync(wf => wf.Waiting));
Assert.NotEqual(waiting, waiting1);
Assert.NotEqual(waiting, waiting2);
return waiting;
});

// Unblock and confirm completions
acts.UpdateCompletions[waiting3].SetResult();
var completed = await AssertMore.EventuallyAsync(async () =>
{
var completed = await handle.QueryAsync(wf => wf.Completed);
Assert.True(completed.Count == 3);
return completed;
});
Assert.Contains(0, completed);
Assert.Contains(1, completed);
Assert.Contains(2, completed);
},
new TemporalWorkerOptions().AddAllActivities(acts));
}

[Theory]
// 0 means non-blocking
[InlineData(0, true)]
Expand Down Expand Up @@ -5221,12 +5304,16 @@ await handle.StartUpdateAsync(
new TemporalWorkerOptions().AddAllActivities(acts));
}

[Theory]
// TODO(cretz): This is currently disabled since workflow cancellation is causing an issue.
// [InlineData(true)]
[SkippableTheory]
[InlineData(true)]
[InlineData(false)]
public async Task ExecuteWorkflowAsync_Semaphore_Cancellation(bool useWorkflowCancellation)
{
if (useWorkflowCancellation)
{
throw new SkipException(
"Unable to cancel workflow for this test: https://github.com/temporalio/sdk-core/issues/772");
}
// This tests that cancellation works. We will make a semaphore with 1 allowed and submit 2
// updates. The second update will wait, and we will cancel the wait.
var acts = new SemaphoreWorkflow.Activities(1);
Expand Down

0 comments on commit 8349537

Please sign in to comment.