Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DisableEagerActivityExecution option #366

Merged
merged 5 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Temporalio/Worker/TemporalWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
OnTaskStarting: options.OnTaskStarting,
OnTaskCompleted: options.OnTaskCompleted,
RuntimeMetricMeter: MetricMeter,
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes));
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes,
DisableEagerActivityExecution: options.DisableEagerActivityExecution));
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/Temporalio/Worker/TemporalWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,21 @@ public TemporalWorkerOptions()
/// </remarks>
public WorkerTuner? Tuner { get; set; }

/// <summary>
/// Gets or sets a value indicating whether eager activity executions will be disabled from
/// a workflow.
/// </summary>
/// <remarks>
/// Eager activity execution is an optimization on some servers that sends activities back
/// to the same worker as the calling workflow if they can run there.
/// </remarks>
/// <remarks>
/// This should be set to <c>true</c> for <see cref="MaxTaskQueueActivitiesPerSecond" /> to
/// work and in a future version of this API may be implied as such (i.e. this setting will
/// be ignored if that setting is set).
/// </remarks>
public bool DisableEagerActivityExecution { get; set; }

/// <summary>
/// Gets the TEMPORAL_DEBUG environment variable.
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon
private readonly Action<WorkflowInstance> onTaskStarting;
private readonly Action<WorkflowInstance, Exception?> onTaskCompleted;
private readonly IReadOnlyCollection<Type>? workerLevelFailureExceptionTypes;
private readonly bool disableEagerActivityExecution;
private readonly Handlers inProgressHandlers = new();
private WorkflowActivationCompletion? completion;
// Will be set to null after last use (i.e. when workflow actually started)
Expand Down Expand Up @@ -190,6 +191,7 @@ public WorkflowInstance(WorkflowInstanceDetails details)
Random = new(details.Start.RandomnessSeed);
TracingEventsEnabled = !details.DisableTracingEvents;
workerLevelFailureExceptionTypes = details.WorkerLevelFailureExceptionTypes;
disableEagerActivityExecution = details.DisableEagerActivityExecution;
}

/// <summary>
Expand Down Expand Up @@ -1756,6 +1758,7 @@ public override Task<TResult> ScheduleActivityAsync<TResult>(
Arguments = { instance.PayloadConverter.ToPayloads(input.Args) },
RetryPolicy = input.Options.RetryPolicy?.ToProto(),
CancellationType = (Bridge.Api.WorkflowCommands.ActivityCancellationType)input.Options.CancellationType,
DoNotEagerlyExecute = instance.disableEagerActivityExecution || input.Options.DisableEagerActivityExecution,
};
if (input.Headers is IDictionary<string, Payload> headers)
{
Expand Down
4 changes: 3 additions & 1 deletion src/Temporalio/Worker/WorkflowInstanceDetails.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace Temporalio.Worker
/// <param name="OnTaskCompleted">Callback for every instance task complete.</param>
/// <param name="RuntimeMetricMeter">Lazy runtime-level metric meter.</param>
/// <param name="WorkerLevelFailureExceptionTypes">Failure exception types at worker level.</param>
/// <param name="DisableEagerActivityExecution">Whether to disable eager at the worker level.</param>
internal record WorkflowInstanceDetails(
string Namespace,
string TaskQueue,
Expand All @@ -41,5 +42,6 @@ internal record WorkflowInstanceDetails(
Action<WorkflowInstance> OnTaskStarting,
Action<WorkflowInstance, Exception?> OnTaskCompleted,
Lazy<MetricMeter> RuntimeMetricMeter,
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes);
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes,
bool DisableEagerActivityExecution);
}
3 changes: 2 additions & 1 deletion src/Temporalio/Worker/WorkflowReplayer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ public WorkflowHistoryRunner(WorkflowReplayerOptions options, bool throwOnReplay
OnTaskStarting: options.OnTaskStarting,
OnTaskCompleted: options.OnTaskCompleted,
RuntimeMetricMeter: new(() => runtime.MetricMeter),
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes),
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes,
DisableEagerActivityExecution: false),
(runId, removeFromCache) => SetResult(removeFromCache));
}
catch
Expand Down
3 changes: 2 additions & 1 deletion src/Temporalio/Worker/WorkflowWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ private IWorkflowInstance CreateInstance(WorkflowActivation act)
OnTaskStarting: options.OnTaskStarting,
OnTaskCompleted: options.OnTaskCompleted,
RuntimeMetricMeter: options.RuntimeMetricMeter,
WorkerLevelFailureExceptionTypes: options.WorkerLevelFailureExceptionTypes));
WorkerLevelFailureExceptionTypes: options.WorkerLevelFailureExceptionTypes,
DisableEagerActivityExecution: options.DisableEagerActivityExecution));
}
}
}
3 changes: 2 additions & 1 deletion src/Temporalio/Worker/WorkflowWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ internal record WorkflowWorkerOptions(
Action<WorkflowInstance> OnTaskStarting,
Action<WorkflowInstance, Exception?> OnTaskCompleted,
Lazy<MetricMeter> RuntimeMetricMeter,
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes);
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes,
bool DisableEagerActivityExecution);
}
14 changes: 14 additions & 0 deletions src/Temporalio/Workflows/ActivityOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ public class ActivityOptions : ICloneable
/// </summary>
public VersioningIntent VersioningIntent { get; set; } = VersioningIntent.Unspecified;

/// <summary>
/// Gets or sets a value indicating whether eager activity execution will be disabled for
/// this activity.
/// </summary>
/// <remarks>
/// Eager activity execution is an optimization on some servers that sends activities back
/// to the same worker as the calling workflow if they can run there.
/// </remarks>
/// <remarks>
/// If <c>false</c> (the default), eager execution may still be disabled at the worker level
/// or may not be requested due to lack of available slots.
/// </remarks>
public bool DisableEagerActivityExecution { get; set; }

/// <summary>
/// Create a shallow copy of these options.
/// </summary>
Expand Down
Loading