Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revise propagation path for entity parameters #971

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 0 additions & 55 deletions src/DurableTask.Core/Entities/EntityExecutionOptions.cs

This file was deleted.

4 changes: 2 additions & 2 deletions src/DurableTask.Core/Entities/OrchestrationEntityContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public OrchestrationEntityContext(
/// <summary>
/// Checks whether the configured backend supports entities.
/// </summary>
public bool EntitiesAreSupported => this.innerContext.EntityBackendProperties != null;
public bool EntitiesAreSupported => this.innerContext.EntityParameters != null;

/// <summary>
/// Whether this orchestration is currently inside a critical section.
Expand Down Expand Up @@ -312,7 +312,7 @@ internal void AdjustOutgoingMessage(string instanceId, RequestMessage requestMes
requestMessage,
instanceId,
this.innerContext.CurrentUtcDateTime,
this.innerContext.EntityBackendProperties.EntityMessageReorderWindow);
this.innerContext.EntityParameters.EntityMessageReorderWindow);

eventName = EntityMessageEventNames.RequestMessageEventName;
}
Expand Down
48 changes: 48 additions & 0 deletions src/DurableTask.Core/Entities/TaskOrchestrationEntityParameters.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
#nullable enable
namespace DurableTask.Core.Entities
{
using System;
using DurableTask.Core.Serializing;

/// <summary>
/// Settings that determine how a task orchestrator interacts with entities.
/// </summary>
public class TaskOrchestrationEntityParameters
{
/// <summary>
/// The time window within which entity messages should be deduplicated and reordered.
/// This is zero for providers that already guarantee exactly-once and ordered delivery.
/// </summary>
public TimeSpan EntityMessageReorderWindow { get; set; }

/// <summary>
/// Construct a <see cref="TaskOrchestrationEntityParameters"/> based on the given backend properties.
/// </summary>
/// <param name="properties">The backend properties.</param>
/// <returns>The constructed object, or null if <paramref name="properties"/> is null.</returns>
public static TaskOrchestrationEntityParameters? FromEntityBackendProperties(EntityBackendProperties? properties)
{
if (properties == null)
{
return null;
}

return new TaskOrchestrationEntityParameters()
{
EntityMessageReorderWindow = properties.EntityMessageReorderWindow,
};
}
}
}
2 changes: 1 addition & 1 deletion src/DurableTask.Core/OrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public abstract class OrchestrationContext
/// <summary>
/// Information about backend entity support, or null if the configured backend does not support entities.
/// </summary>
internal EntityBackendProperties EntityBackendProperties { get; set; }
internal TaskOrchestrationEntityParameters EntityParameters { get; set; }

/// <summary>
/// Create a proxy client class to schedule remote TaskActivities via a strongly typed interface.
Expand Down
6 changes: 0 additions & 6 deletions src/DurableTask.Core/TaskEntityDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -867,12 +867,6 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
new TypeMissingException($"Entity not found: {entityName}"));
}

var options = new EntityExecutionOptions()
{
EntityBackendProperties = this.entityBackendProperties,
ErrorPropagationMode = this.errorPropagationMode,
};

var result = await taskEntity.ExecuteOperationBatchAsync(request);

dispatchContext.SetProperty(result);
Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask.Core/TaskOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void AddEventToNextIteration(HistoryEvent he)
public TaskOrchestrationContext(
OrchestrationInstance orchestrationInstance,
TaskScheduler taskScheduler,
EntityBackendProperties entityBackendProperties = null,
TaskOrchestrationEntityParameters entityParameters = null,
ErrorPropagationMode errorPropagationMode = ErrorPropagationMode.SerializeExceptions)
{
Utils.UnusedParameter(taskScheduler);
Expand All @@ -60,7 +60,7 @@ public TaskOrchestrationContext(
this.ErrorDataConverter = JsonDataConverter.Default;
OrchestrationInstance = orchestrationInstance;
IsReplaying = false;
this.EntityBackendProperties = entityBackendProperties;
this.EntityParameters = entityParameters;
ErrorPropagationMode = errorPropagationMode;
this.eventsWhileSuspended = new Queue<HistoryEvent>();
}
Expand Down
12 changes: 8 additions & 4 deletions src/DurableTask.Core/TaskOrchestrationDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class TaskOrchestrationDispatcher
readonly NonBlockingCountdownLock concurrentSessionLock;
readonly IEntityOrchestrationService? entityOrchestrationService;
readonly EntityBackendProperties? entityBackendProperties;
readonly TaskOrchestrationEntityParameters? entityParameters;

internal TaskOrchestrationDispatcher(
IOrchestrationService orchestrationService,
Expand All @@ -59,8 +60,9 @@ internal TaskOrchestrationDispatcher(
this.dispatchPipeline = dispatchPipeline ?? throw new ArgumentNullException(nameof(dispatchPipeline));
this.logHelper = logHelper ?? throw new ArgumentNullException(nameof(logHelper));
this.errorPropagationMode = errorPropagationMode;
this.entityOrchestrationService = entityOrchestrationService;
this.entityBackendProperties = this.entityOrchestrationService?.GetEntityBackendProperties();
this.entityOrchestrationService = orchestrationService as IEntityOrchestrationService;
this.entityBackendProperties = this.entityOrchestrationService?.EntityBackendProperties;
this.entityParameters = TaskOrchestrationEntityParameters.FromEntityBackendProperties(this.entityBackendProperties);

this.dispatcher = new WorkItemDispatcher<TaskOrchestrationWorkItem>(
"TaskOrchestrationDispatcher",
Expand Down Expand Up @@ -682,6 +684,7 @@ async Task<OrchestrationExecutionCursor> ExecuteOrchestrationAsync(Orchestration
dispatchContext.SetProperty(runtimeState);
dispatchContext.SetProperty(workItem);
dispatchContext.SetProperty(GetOrchestrationExecutionContext(runtimeState));
dispatchContext.SetProperty(this.entityParameters);

TaskOrchestrationExecutor? executor = null;

Expand Down Expand Up @@ -709,8 +712,9 @@ await this.dispatchPipeline.RunAsync(dispatchContext, _ =>
runtimeState,
taskOrchestration,
this.orchestrationService.EventBehaviourForContinueAsNew,
this.entityBackendProperties,
this.errorPropagationMode); ;
this.entityParameters,
this.errorPropagationMode);

OrchestratorExecutionResult resultFromOrchestrator = executor.Execute();
dispatchContext.SetProperty(resultFromOrchestrator);
return CompletedTask;
Expand Down
11 changes: 5 additions & 6 deletions src/DurableTask.Core/TaskOrchestrationExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,21 @@ public class TaskOrchestrationExecutor
/// <param name="orchestrationRuntimeState"></param>
/// <param name="taskOrchestration"></param>
/// <param name="eventBehaviourForContinueAsNew"></param>
/// <param name="entityBackendProperties"></param>
/// <param name="entityParameters"></param>
/// <param name="errorPropagationMode"></param>
public TaskOrchestrationExecutor(
OrchestrationRuntimeState orchestrationRuntimeState,
TaskOrchestration taskOrchestration,
BehaviorOnContinueAsNew eventBehaviourForContinueAsNew,
EntityBackendProperties? entityBackendProperties,
TaskOrchestrationEntityParameters? entityParameters,
ErrorPropagationMode errorPropagationMode = ErrorPropagationMode.SerializeExceptions)
{
this.decisionScheduler = new SynchronousTaskScheduler();
this.context = new TaskOrchestrationContext(
orchestrationRuntimeState.OrchestrationInstance,
this.decisionScheduler,
entityBackendProperties,
errorPropagationMode
);
entityParameters,
errorPropagationMode);
this.orchestrationRuntimeState = orchestrationRuntimeState;
this.taskOrchestration = taskOrchestration;
this.skipCarryOverEvents = eventBehaviourForContinueAsNew == BehaviorOnContinueAsNew.Ignore;
Expand All @@ -77,7 +76,7 @@ public TaskOrchestrationExecutor(
TaskOrchestration taskOrchestration,
BehaviorOnContinueAsNew eventBehaviourForContinueAsNew,
ErrorPropagationMode errorPropagationMode = ErrorPropagationMode.SerializeExceptions)
: this(orchestrationRuntimeState, taskOrchestration, eventBehaviourForContinueAsNew, null, errorPropagationMode)
: this(orchestrationRuntimeState, taskOrchestration, eventBehaviourForContinueAsNew, entityParameters: null, errorPropagationMode)
{
}

Expand Down