Skip to content

Commit

Permalink
Add Workflow.RunTaskAsync and Workflow.WhenAllAsync (#313)
Browse files Browse the repository at this point in the history
Fixes #303
  • Loading branch information
cretz authored Aug 6, 2024
1 parent 9ffc963 commit 6695adf
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 1 deletion.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,8 @@ use `TaskScheduler.Default` implicitly (and some analyzers even encourage this).
with .NET tasks inside of workflows:

* Do not use `Task.Run` - this uses the default scheduler and puts work on the thread pool.
* Use `Task.Factory.StartNew` or instantiate the `Task` and run `Task.Start` on it.
* Use `Workflow.RunTaskAsync` instead.
* Can also use `Task.Factory.StartNew` with current scheduler or instantiate the `Task` and run `Task.Start` on it.
* Do not use `Task.ConfigureAwait(false)` - this will not use the current context.
* If you must use `Task.ConfigureAwait`, use `Task.ConfigureAwait(true)`.
* There is no significant performance benefit to `Task.ConfigureAwait` in workflows anyways due to how the scheduler
Expand All @@ -701,6 +702,10 @@ with .NET tasks inside of workflows:
* Use `Workflow.WhenAnyAsync` instead.
* Technically this only applies to an enumerable set of tasks with results or more than 2 tasks with results. Other
uses are safe. See [this issue](https://github.com/dotnet/runtime/issues/87481).
* Do not use `Task.WhenAll`
* Use `Workflow.WhenAllAsync` instead.
* Technically `Task.WhenAll` is currently deterministic in .NET and safe, but it is better to use the wrapper to be
sure.
* Do not use `System.Threading.Semaphore` or `System.Threading.SemaphoreSlim` or `System.Threading.Mutex`.
* Use `Temporalio.Workflows.Semaphore` or `Temporalio.Workflows.Mutex` instead.
* _Technically_ `SemaphoreSlim` does work if only the async form of `WaitAsync` is used without no timeouts and
Expand Down
71 changes: 71 additions & 0 deletions src/Temporalio/Workflows/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,39 @@ public static Guid NewGuid()
/// </remarks>
public static bool Patched(string patchId) => Context.Patch(patchId, deprecated: false);

/// <summary>
/// Workflow-safe form of <see cref="Task.Run(Func{Task}, CancellationToken)" />.
/// </summary>
/// <param name="function">The work to execute asynchronously.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the work
/// if it has not yet started. Defaults to <see cref="CancellationToken"/>.</param>
/// <returns>A task for the running task (but not necessarily the task that is returned
/// from the function).</returns>
public static Task RunTaskAsync(
Func<Task> function, CancellationToken? cancellationToken = null) =>
Task.Factory.StartNew(
function,
cancellationToken ?? CancellationToken,
TaskCreationOptions.None,
TaskScheduler.Current).Unwrap();

/// <summary>
/// Workflow-safe form of <see cref="Task.Run{TResult}(Func{TResult}, CancellationToken)" />.
/// </summary>
/// <typeparam name="TResult">The type of the result returned by the task.</typeparam>
/// <param name="function">The work to execute asynchronously.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the work
/// if it has not yet started. Defaults to <see cref="CancellationToken"/>.</param>
/// <returns>A task for the running task (but not necessarily the task that is returned
/// from the function).</returns>
public static Task<TResult> RunTaskAsync<TResult>(
Func<Task<TResult>> function, CancellationToken? cancellationToken = null) =>
Task.Factory.StartNew(
function,
cancellationToken ?? CancellationToken,
TaskCreationOptions.None,
TaskScheduler.Current).Unwrap();

/// <summary>
/// Start a child workflow via lambda invoking the run method.
/// </summary>
Expand Down Expand Up @@ -1220,6 +1253,44 @@ public static async Task<Task<TResult>> WhenAnyAsync<TResult>(IEnumerable<Task<T
return (Task<TResult>)task;
}

/// <summary>
/// Workflow-safe form of <see cref="Task.WhenAll(IEnumerable{Task})" /> (which just calls
/// the standard library call currently because it is already safe).
/// </summary>
/// <param name="tasks">The tasks to wait on for completion.</param>
/// <returns>A task that represents the completion of all of the supplied tasks.</returns>
public static Task WhenAllAsync(IEnumerable<Task> tasks) =>
Task.WhenAll(tasks);

/// <summary>
/// Workflow-safe form of <see cref="Task.WhenAll(Task[])" /> (which just calls the standard
/// library call currently because it is already safe).
/// </summary>
/// <param name="tasks">The tasks to wait on for completion.</param>
/// <returns>A task that represents the completion of all of the supplied tasks.</returns>
public static Task WhenAllAsync(params Task[] tasks) =>
Task.WhenAll(tasks);

/// <summary>
/// Workflow-safe form of <see cref="Task.WhenAll{TResult}(IEnumerable{Task{TResult}})" />
/// (which just calls the standard library call currently because it is already safe).
/// </summary>
/// <typeparam name="TResult">The type of the completed task..</typeparam>
/// <param name="tasks">The tasks to wait on for completion.</param>
/// <returns>A task that represents the completion of all of the supplied tasks.</returns>
public static Task<TResult[]> WhenAllAsync<TResult>(IEnumerable<Task<TResult>> tasks) =>
Task.WhenAll(tasks);

/// <summary>
/// Workflow-safe form of <see cref="Task.WhenAll{TResult}(Task{TResult}[])" /> (which just
/// calls the standard library call currently because it is already safe).
/// </summary>
/// <typeparam name="TResult">The type of the completed task..</typeparam>
/// <param name="tasks">The tasks to wait on for completion.</param>
/// <returns>A task that represents the completion of all of the supplied tasks.</returns>
public static Task<TResult[]> WhenAllAsync<TResult>(params Task<TResult>[] tasks) =>
Task.WhenAll(tasks);

/// <summary>
/// Unsafe calls that can be made in a workflow.
/// </summary>
Expand Down
15 changes: 15 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ public async Task<string> RunAsync(Scenario scenario)
case Scenario.WorkflowWhenAnyWithResultThreeParam:
return await await Workflow.WhenAnyAsync(
Task.FromResult("done"), Task.FromResult("done"), Task.FromResult("done"));
case Scenario.WorkflowWhenAll:
return string.Join(string.Empty, await Workflow.WhenAllAsync(
Task.FromResult("do"), Task.FromResult("ne")));
case Scenario.WorkflowRunTask:
return await Workflow.RunTaskAsync(async () => "done");
case Scenario.WorkflowRunTaskAfterTaskStart:
var runTaskStart = new Task<string>(() => "done");
runTaskStart.Start();
return await Workflow.RunTaskAsync(() => runTaskStart);
}
throw new InvalidOperationException("Unexpected completion");
}
Expand All @@ -266,6 +275,9 @@ public enum Scenario
// https://github.com/dotnet/runtime/issues/87481
TaskWhenAnyWithResultTwoParam,
WorkflowWhenAnyWithResultThreeParam,
WorkflowWhenAll,
WorkflowRunTask,
WorkflowRunTaskAfterTaskStart,
}
}

Expand Down Expand Up @@ -325,6 +337,9 @@ Task AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario scenario) =>
await AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario.TaskContinueWith);
await AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario.TaskWhenAnyWithResultTwoParam);
await AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario.WorkflowWhenAnyWithResultThreeParam);
await AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario.WorkflowWhenAll);
await AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario.WorkflowRunTask);
await AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario.WorkflowRunTaskAfterTaskStart);
}

[Workflow]
Expand Down

0 comments on commit 6695adf

Please sign in to comment.