Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Temporalio.Workflows.Mutex #298

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -701,8 +701,8 @@ with .NET tasks inside of workflows:
* Use `Workflow.WhenAnyAsync` instead.
* Technically this only applies to an enumerable set of tasks with results or more than 2 tasks with results. Other
uses are safe. See [this issue](https://github.com/dotnet/runtime/issues/87481).
* Do not use `System.Threading.Semaphore` or `System.Threading.SemaphoreSlim`.
* Use `Temporalio.Workflows.Semaphore` instead.
* Do not use `System.Threading.Semaphore` or `System.Threading.SemaphoreSlim` or `System.Threading.Mutex`.
* Use `Temporalio.Workflows.Semaphore` or `Temporalio.Workflows.Mutex` instead.
* _Technically_ `SemaphoreSlim` does work if only the async form of `WaitAsync` is used without no timeouts and
`Release` is used. But anything else can deadlock the workflow and its use is cumbersome since it must be disposed.
* Be wary of additional libraries' implicit use of the default scheduler.
Expand Down
93 changes: 93 additions & 0 deletions src/Temporalio/Workflows/Mutex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Temporalio.Workflows
{
/// <summary>
/// Mutex is a thin wrapper around <see cref="Semaphore"/> with a single count.
cretz marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
public class Mutex
{
private readonly Semaphore semaphore = new(1);

/// <summary>
/// Wait for the mutex to become available. If the task succeeds, users must call
/// <see cref="ReleaseMutex"/> to properly release the mutex when done.
/// </summary>
/// <param name="cancellationToken">
/// Cancellation token that can interrupt this wait. If unset, this defaults to
/// <see cref="Workflow.CancellationToken"/>. Upon cancel, awaiting the resulting task will
/// throw a <see cref="TaskCanceledException"/> exception.
/// </param>
/// <returns>
/// A <see cref="Task"/> representing the result of the asynchronous operation. This task is
/// canceled if the cancellation token is.
/// </returns>
public Task WaitOneAsync(CancellationToken? cancellationToken = null) =>
semaphore.WaitAsync(cancellationToken);

/// <summary>
/// Wait for the mutex to become available or timeout to be reached. If the task returns
/// true, users must call <see cref="ReleaseMutex"/> to properly release the mutex when
/// done.
/// </summary>
/// <param name="millisecondsTimeout">
/// Milliseconds until timeout. If this is 0, this is a non-blocking task that will return
/// immediately. If this value is -1 (i.e. <see cref="Timeout.Infinite"/>), there is no
/// timeout and the result will always be true on success (at which point you might as well
/// use <see cref="WaitOneAsync(CancellationToken?)"/>). Otherwise a timer is started for
/// the timeout and canceled if the wait succeeds.
/// </param>
/// <param name="cancellationToken">
/// Cancellation token that can interrupt this wait. If unset, this defaults to
/// <see cref="Workflow.CancellationToken"/>. Upon cancel, awaiting the resulting task will
/// throw a <see cref="TaskCanceledException"/> exception.
/// </param>
/// <returns>
/// A <see cref="Task"/> representing the result of the asynchronous operation. The task is
/// true if the wait succeeded and false if it timed out. This task is canceled if the
/// cancellation token is.
/// </returns>
public Task<bool> WaitOneAsync(
int millisecondsTimeout,
CancellationToken? cancellationToken = null) =>
semaphore.WaitAsync(millisecondsTimeout, cancellationToken);

/// <summary>
/// Wait for the mutex to become available or timeout to be reached. If the task returns
/// true, users must call <see cref="ReleaseMutex"/> to properly release the mutex when
/// done.
/// </summary>
/// <param name="timeout">
/// TimeSpan until timeout. If this is <see cref="TimeSpan.Zero"/>, this is a non-blocking
/// task that will return immediately. If this value is -1ms (i.e.
/// <see cref="Timeout.InfiniteTimeSpan"/>), there is no timeout and the result will always
/// be true on success (at which point you might as well use
/// <see cref="WaitOneAsync(CancellationToken?)"/>). Otherwise a timer is started for the
/// timeout and canceled if the wait succeeds.
/// </param>
/// <param name="cancellationToken">
/// Cancellation token that can interrupt this wait. If unset, this defaults to
/// <see cref="Workflow.CancellationToken"/>. Upon cancel, awaiting the resulting task will
/// throw a <see cref="TaskCanceledException"/> exception.
/// </param>
/// <returns>
/// A <see cref="Task"/> representing the result of the asynchronous operation. The task is
/// true if the wait succeeded and false if it timed out. This task is canceled if the
/// cancellation token is.
/// </returns>
public Task<bool> WaitOneAsync(
TimeSpan timeout,
CancellationToken? cancellationToken = null) =>
semaphore.WaitAsync(timeout, cancellationToken);

/// <summary>
/// Release the mutex for use by another waiter.
/// </summary>
/// <exception cref="InvalidOperationException">
/// Thrown if the mutex was not waited on successsfully.
/// </exception>
public void ReleaseMutex() => semaphore.Release();
}
}
107 changes: 97 additions & 10 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5002,11 +5002,16 @@ await Task.WhenAll(
[Workflow]
public sealed class SemaphoreWorkflow : IDisposable
{
private readonly Semaphore sema;
private readonly Semaphore? semaphore;
private readonly Mutex? mutex;
private readonly Dictionary<int, CancellationTokenSource> cancellations = new();

[WorkflowInit]
public SemaphoreWorkflow(int initialCount) => sema = new(initialCount);
public SemaphoreWorkflow(int? semaphoreCount)
{
semaphore = semaphoreCount is { } initialCount ? new(initialCount) : null;
mutex = semaphoreCount == null ? new() : null;
}

public void Dispose()
{
Expand All @@ -5017,7 +5022,7 @@ public void Dispose()
}

[WorkflowRun]
public Task RunAsync(int initialCount) => Workflow.WaitConditionAsync(() => false);
public Task RunAsync(int? semaphoreCount) => Workflow.WaitConditionAsync(() => false);

[WorkflowQuery]
public List<int> Waiting { get; } = new();
Expand Down Expand Up @@ -5045,15 +5050,15 @@ public async Task UpdateAsync(Update update)
{
throw new ApplicationFailureException("Cannot have MS and timespan");
}
acquired = await sema.WaitAsync(timeoutMs, cancellationToken);
acquired = await WaitAsync(timeoutMs, cancellationToken);
}
else if (update.TimeoutTimeSpan is { } timeoutTimeSpan)
{
acquired = await sema.WaitAsync(timeoutTimeSpan, cancellationToken);
acquired = await WaitAsync(timeoutTimeSpan, cancellationToken);
}
else
{
await sema.WaitAsync(cancellationToken);
await WaitAsync(cancellationToken);
acquired = true;
}
}
Expand All @@ -5077,14 +5082,37 @@ await Workflow.ExecuteActivityAsync(
}
finally
{
sema.Release();
Release();
Waiting.Remove(update.Number);
}
}

[WorkflowSignal]
public async Task CancelUpdateAsync(int number) => cancellations[number].Cancel();

#pragma warning disable VSTHRD110 // Returning tasks is ok
private Task WaitAsync(CancellationToken? cancellationToken = null) =>
semaphore?.WaitAsync(cancellationToken) ?? mutex!.WaitOneAsync(cancellationToken);

private Task<bool> WaitAsync(
int millisecondsTimeout,
CancellationToken? cancellationToken = null) =>
semaphore?.WaitAsync(millisecondsTimeout, cancellationToken) ??
mutex!.WaitOneAsync(millisecondsTimeout, cancellationToken);

private Task<bool> WaitAsync(
TimeSpan timeout,
CancellationToken? cancellationToken = null) =>
semaphore?.WaitAsync(timeout, cancellationToken) ??
mutex!.WaitOneAsync(timeout, cancellationToken);
#pragma warning restore VSTHRD110

private void Release()
{
semaphore?.Release();
mutex?.ReleaseMutex();
}

public record Update(int Number)
{
public int? TimeoutMs { get; init; }
Expand Down Expand Up @@ -5165,6 +5193,61 @@ await Task.WhenAll(Enumerable.Range(0, 5).Select(i =>
new TemporalWorkerOptions().AddAllActivities(acts));
}

[Fact]
public async Task ExecuteWorkflowAsync_Mutex_MultipleWaiters()
{
// This is a basic test of mutex usage. We will make a mutex with 3 updates and confirm
// that each proceeds one at a time.
var acts = new SemaphoreWorkflow.Activities(3);
await ExecuteWorkerAsync<SemaphoreWorkflow>(
async worker =>
{
// Start semaphore workflow with 3 updates
var handle = await Client.StartWorkflowAsync(
(SemaphoreWorkflow wf) => wf.RunAsync(null),
new($"wf-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
await Task.WhenAll(Enumerable.Range(0, 3).Select(i =>
handle.StartUpdateAsync(
wf => wf.UpdateAsync(new(i)),
new(WorkflowUpdateStage.Accepted))));

// Confirm only 1 waiting
var waiting1 = Assert.Single(await handle.QueryAsync(wf => wf.Waiting));

// Unblock, wait for next one waiting
acts.UpdateCompletions[waiting1].SetResult();
var waiting2 = await AssertMore.EventuallyAsync(async () =>
{
var waiting = Assert.Single(await handle.QueryAsync(wf => wf.Waiting));
Assert.NotEqual(waiting, waiting1);
return waiting;
});

// Unblock, wait for final one waiting
acts.UpdateCompletions[waiting2].SetResult();
var waiting3 = await AssertMore.EventuallyAsync(async () =>
{
var waiting = Assert.Single(await handle.QueryAsync(wf => wf.Waiting));
Assert.NotEqual(waiting, waiting1);
Assert.NotEqual(waiting, waiting2);
return waiting;
});

// Unblock and confirm completions
acts.UpdateCompletions[waiting3].SetResult();
var completed = await AssertMore.EventuallyAsync(async () =>
{
var completed = await handle.QueryAsync(wf => wf.Completed);
Assert.True(completed.Count == 3);
return completed;
});
Assert.Contains(0, completed);
Assert.Contains(1, completed);
Assert.Contains(2, completed);
},
new TemporalWorkerOptions().AddAllActivities(acts));
}

[Theory]
// 0 means non-blocking
[InlineData(0, true)]
Expand Down Expand Up @@ -5219,12 +5302,16 @@ await handle.StartUpdateAsync(
new TemporalWorkerOptions().AddAllActivities(acts));
}

[Theory]
// TODO(cretz): This is currently disabled since workflow cancellation is causing an issue.
// [InlineData(true)]
[SkippableTheory]
[InlineData(true)]
[InlineData(false)]
public async Task ExecuteWorkflowAsync_Semaphore_Cancellation(bool useWorkflowCancellation)
{
if (useWorkflowCancellation)
{
throw new SkipException(
"Unable to cancel workflow for this test: https://github.com/temporalio/sdk-core/issues/772");
}
// This tests that cancellation works. We will make a semaphore with 1 allowed and submit 2
// updates. The second update will wait, and we will cancel the wait.
var acts = new SemaphoreWorkflow.Activities(1);
Expand Down