Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Long Timers ⏳📈🚀🚀🚀 #340

Merged
merged 30 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1cc2435
fix timestamp conversion bug
hossam-nasr Apr 8, 2022
9b507f5
initial long timer task
hossam-nasr Mar 23, 2022
dde7405
extra changes
hossam-nasr Mar 23, 2022
bbada16
remove create long timer action
hossam-nasr Mar 23, 2022
92af8df
finalize LongTimerTask
hossam-nasr Mar 23, 2022
a7ed2d2
add long timer task to durable orchestration context
hossam-nasr Mar 23, 2022
4759a12
add binding info
hossam-nasr Mar 23, 2022
299a898
fix logic
hossam-nasr Mar 23, 2022
72cd064
update logic
hossam-nasr Mar 23, 2022
f836164
switch to dynamic
hossam-nasr Mar 24, 2022
382bcb2
track open task
hossam-nasr Mar 25, 2022
1a511e8
Enforce replay schema for long timers
hossam-nasr Apr 9, 2022
225b47f
replay schema latest
hossam-nasr Apr 9, 2022
121c613
comments and clean up
hossam-nasr Apr 9, 2022
5673d61
simplify create timer
hossam-nasr Apr 9, 2022
0d8004e
make properties private
hossam-nasr Apr 11, 2022
25e0a27
remove abbreviations
hossam-nasr Apr 11, 2022
c1cc962
add error message
hossam-nasr Apr 11, 2022
e2f32c0
Update error message
hossam-nasr Apr 11, 2022
6ccb4cb
log property values
hossam-nasr Apr 11, 2022
008d8de
merge dev
hossam-nasr Apr 11, 2022
ac1ea82
add initial tests for long timers
hossam-nasr Apr 12, 2022
bf056a0
don't use moment.add
hossam-nasr Apr 12, 2022
66a9ae9
don't use moment.add
hossam-nasr Apr 13, 2022
529a33c
fix tests for long timers
hossam-nasr Apr 13, 2022
ddc0667
fix recursive comment
hossam-nasr Apr 13, 2022
2f2aaca
use same duration format as extension in tests
hossam-nasr Apr 13, 2022
98bf97b
cancelled -> canceled
hossam-nasr Apr 13, 2022
c9dd546
Simplify code
hossam-nasr Apr 13, 2022
db05956
canceled
hossam-nasr Apr 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/actions/createtimeraction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { ActionType, IAction } from "../classes";
export class CreateTimerAction implements IAction {
public readonly actionType: ActionType = ActionType.CreateTimer;

constructor(public readonly fireAt: Date, public isCancelled: boolean = false) {
constructor(public readonly fireAt: Date, public isCanceled: boolean = false) {
if (!isDate(fireAt)) {
throw new TypeError(`fireAt: Expected valid Date object but got ${fireAt}`);
}
Expand Down
2 changes: 2 additions & 0 deletions src/durableorchestrationbindinginfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export class DurableOrchestrationBindingInfo {
public readonly instanceId: string = "",
public readonly isReplaying: boolean = false,
public readonly parentInstanceId?: string,
public readonly maximumShortTimerDuration?: string,
public readonly longRunningTimerIntervalDuration?: string,
upperSchemaVersion = 0 // TODO: Implement entity locking // public readonly contextLocks?: EntityId[],
) {
// It is assumed that the extension supports all schemas in range [0, upperSchemaVersion].
Expand Down
71 changes: 68 additions & 3 deletions src/durableorchestrationcontext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import {
Task,
TimerTask,
DFTask,
LongTimerTask,
} from "./task";
import moment = require("moment");
import { ReplaySchema } from "./replaySchema";

/**
* Parameter data for orchestration bindings that can be used to schedule
Expand All @@ -41,6 +44,9 @@ export class DurableOrchestrationContext {
currentUtcDateTime: Date,
isReplaying: boolean,
parentInstanceId: string | undefined,
longRunningTimerIntervalDuration: string | undefined,
maximumShortTimerDuration: string | undefined,
schemaVersion: ReplaySchema,
input: unknown,
private taskOrchestratorExecutor: TaskOrchestrationExecutor
) {
Expand All @@ -49,6 +55,13 @@ export class DurableOrchestrationContext {
this.isReplaying = isReplaying;
this.currentUtcDateTime = currentUtcDateTime;
this.parentInstanceId = parentInstanceId;
this.longRunningTimerIntervalDuration = longRunningTimerIntervalDuration
? moment.duration(longRunningTimerIntervalDuration)
: undefined;
this.maximumShortTimerDuration = maximumShortTimerDuration
? moment.duration(maximumShortTimerDuration)
: undefined;
this.schemaVersion = schemaVersion;
this.input = input;
this.newGuidCounter = 0;
}
Expand Down Expand Up @@ -101,6 +114,34 @@ export class DurableOrchestrationContext {
*/
public currentUtcDateTime: Date;

/**
* Gets the maximum duration for timers allowed by the
* underlying storage infrastructure
*
* This duration property is determined by the underlying storage
* solution and passed to the SDK from the extension.
*/
private readonly maximumShortTimerDuration: moment.Duration | undefined;

/**
* A duration property which defines the duration of smaller
* timers to break long timers into, in case they are longer
* than the maximum supported duration
*
* This duration property is determined by the underlying
* storage solution and passed to the SDK from the extension.
*/
private readonly longRunningTimerIntervalDuration: moment.Duration | undefined;

/**
* Gets the current schema version that this execution is
* utilizing, based on negotiation with the extension.
*
* Different schema versions can allow different behavior.
* For example, long timers are only supported in schema version >=3
*/
private readonly schemaVersion: ReplaySchema;

/**
* @hidden
* This method informs the type-checker that an ITask[] can be treated as DFTask[].
Expand Down Expand Up @@ -293,9 +334,33 @@ export class DurableOrchestrationContext {
* @returns A TimerTask that completes when the durable timer expires.
*/
public createTimer(fireAt: Date): TimerTask {
const newAction = new CreateTimerAction(fireAt);
const task = new DFTimerTask(false, newAction);
return task;
const timerAction = new CreateTimerAction(fireAt);
const durationUntilFire = moment.duration(moment(fireAt).diff(this.currentUtcDateTime));
if (this.schemaVersion >= ReplaySchema.V3) {
if (!this.maximumShortTimerDuration || !this.longRunningTimerIntervalDuration) {
throw Error(
"A framework-internal error was detected: replay schema version >= V3 is being used, " +
"but one or more of the properties `maximumShortTimerDuration` and `longRunningTimerIntervalDuration` are not defined. " +
"This is likely an issue with the Durable Functions Extension. " +
"Please report this bug here: https://github.com/Azure/azure-functions-durable-js/issues\n" +
`maximumShortTimerDuration: ${this.maximumShortTimerDuration}\n` +
`longRunningTimerIntervalDuration: ${this.longRunningTimerIntervalDuration}`
);
}

if (durationUntilFire > this.maximumShortTimerDuration) {
return new LongTimerTask(
false,
timerAction,
this,
this.taskOrchestratorExecutor,
this.maximumShortTimerDuration,
this.longRunningTimerIntervalDuration
);
}
}

return new DFTimerTask(false, timerAction);
}

/**
Expand Down
3 changes: 3 additions & 0 deletions src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ export class Orchestrator {
this.currentUtcDateTime,
orchestrationBinding.isReplaying,
orchestrationBinding.parentInstanceId,
orchestrationBinding.longRunningTimerIntervalDuration,
orchestrationBinding.maximumShortTimerDuration,
upperSchemaVersion,
input,
this.taskOrchestrationExecutor
);
Expand Down
3 changes: 2 additions & 1 deletion src/replaySchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
export enum ReplaySchema {
V1 = 0,
V2 = 1,
V3 = 2,
}

export const LatestReplaySchema: ReplaySchema = ReplaySchema.V2;
export const LatestReplaySchema: ReplaySchema = ReplaySchema.V3;
109 changes: 104 additions & 5 deletions src/task.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { RetryOptions } from ".";
import { IAction, CreateTimerAction } from "./classes";
import { TaskOrchestrationExecutor } from "./taskorchestrationexecutor";
import moment = require("moment");
import { DurableOrchestrationContext } from "./durableorchestrationcontext";

/**
* @hidden
Expand All @@ -16,7 +18,7 @@ export enum TaskState {
* @hidden
* A taskID, either a `string` for external events,
* or either `false` or a `number` for un-awaited
* an awaited tasks respectively.
* and awaited tasks respectively.
*/
export type TaskID = number | string | false;

Expand Down Expand Up @@ -94,7 +96,7 @@ export interface TimerTask extends Task {
/**
* @returns Whether or not the timer has been canceled.
*/
isCancelled: boolean;
isCanceled: boolean;
/**
* Indicates the timer should be canceled. This request will execute on the
* next `yield` or `return` statement.
Expand Down Expand Up @@ -291,8 +293,8 @@ export class DFTimerTask extends AtomicTask implements TimerTask {
}

/** Whether this timer task is cancelled */
get isCancelled(): boolean {
return this.action.isCancelled;
get isCanceled(): boolean {
return this.action.isCanceled;
}

/**
Expand All @@ -304,7 +306,7 @@ export class DFTimerTask extends AtomicTask implements TimerTask {
if (this.hasResult) {
throw Error("Cannot cancel a completed task.");
}
this.action.isCancelled = true; // TODO: fix typo
this.action.isCanceled = true;
}
}

Expand Down Expand Up @@ -377,6 +379,103 @@ export class WhenAnyTask extends CompoundTask {
}
}

/**
* @hidden
*
* A long Timer Task.
*
* This Task is created when a timer is created with a duration
* longer than the maximum timer duration supported by storage infrastructure.
*
* It extends `WhenAllTask` because it decomposes into
* several smaller sub-`TimerTask`s
*/
export class LongTimerTask extends WhenAllTask implements TimerTask {
public id: TaskID;
public action: CreateTimerAction;
private readonly executor: TaskOrchestrationExecutor;
private readonly maximumTimerLength: moment.Duration;
private readonly orchestrationContext: DurableOrchestrationContext;
private readonly longRunningTimerIntervalDuration: moment.Duration;

public constructor(
id: TaskID,
action: CreateTimerAction,
orchestrationContext: DurableOrchestrationContext,
executor: TaskOrchestrationExecutor,
maximumTimerLength: moment.Duration,
longRunningTimerIntervalDuration: moment.Duration
) {
const currentTime = orchestrationContext.currentUtcDateTime;
const finalFireTime = action.fireAt;
const durationUntilFire = moment.duration(moment(finalFireTime).diff(currentTime));

const nextFireTime: Date =
durationUntilFire > maximumTimerLength
? moment(currentTime).add(longRunningTimerIntervalDuration).toDate()
: finalFireTime;

const nextTimerAction = new CreateTimerAction(nextFireTime);
const nextTimerTask = new DFTimerTask(false, nextTimerAction);
super([nextTimerTask], action);

this.id = id;
this.action = action;
this.orchestrationContext = orchestrationContext;
this.executor = executor;
this.maximumTimerLength = maximumTimerLength;
this.longRunningTimerIntervalDuration = longRunningTimerIntervalDuration;
}

get isCanceled(): boolean {
return this.action.isCanceled;
}

/**
* @hidden
* Cancel this timer task.
* It errors out if the task has already completed.
*/
public cancel(): void {
if (this.hasResult) {
throw Error("Cannot cancel a completed task.");
}
this.action.isCanceled = true;
}

/**
* @hidden
* Attempts to set a value to this timer, given a completed sub-timer
*
* @param child
* The sub-timer that just completed
*/
public trySetValue(child: DFTimerTask): void {
const currentTime = this.orchestrationContext.currentUtcDateTime;
const finalFireTime = this.action.fireAt;
if (finalFireTime > currentTime) {
const nextTimer: DFTimerTask = this.getNextTimerTask(finalFireTime, currentTime);
this.addNewChild(nextTimer);
}
super.trySetValue(child);
}

private getNextTimerTask(finalFireTime: Date, currentTime: Date): DFTimerTask {
const durationUntilFire = moment.duration(moment(finalFireTime).diff(currentTime));
const nextFireTime: Date =
durationUntilFire > this.maximumTimerLength
? moment(currentTime).add(this.longRunningTimerIntervalDuration).toDate()
: finalFireTime;
return new DFTimerTask(false, new CreateTimerAction(nextFireTime));
}

private addNewChild(childTimer: DFTimerTask): void {
childTimer.parent = this;
this.children.push(childTimer);
this.executor.trackOpenTask(childTimer);
}
}

/**
* @hidden
*
Expand Down
10 changes: 10 additions & 0 deletions src/testingUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
OrchestratorStartedEvent,
} from "./classes";
import { IOrchestrationFunctionContext } from "./iorchestrationfunctioncontext";
import { ReplaySchema } from "./replaySchema";
import { TaskOrchestrationExecutor } from "./taskorchestrationexecutor";

/**
Expand All @@ -33,13 +34,19 @@ export class DummyOrchestrationContext implements IOrchestrationFunctionContext
* @param input The input to the orchestration
* @param currentUtcDateTime The deterministic date at the beginning of orchestration replay
* @param isReplaying Whether the orchestration is to be marked as isReplaying the its first event
* @param longRunningTimerIntervalDuration The duration to break smaller timers into if a long timer exceeds the maximum allowed duration
* @param maximumShortTimerDuration The maximum duration for a timer allowed by the underlying storage infrastructure
* @param schemaVersion The schema version currently used after being negotiated with the extension
* @param parentInstanceId The instanceId of the orchestration's parent, if this is a sub-orchestration
*/
constructor(
instanceId = "",
history: HistoryEvent[] | undefined = undefined,
input: any = undefined,
currentUtcDateTime: Date = new Date(),
longRunningTimerIntervalDuration: string,
maximumShortTimerDuration: string,
schemaVersion: ReplaySchema,
isReplaying = false,
parentInstanceId = ""
) {
Expand All @@ -54,6 +61,9 @@ export class DummyOrchestrationContext implements IOrchestrationFunctionContext
currentUtcDateTime,
isReplaying,
parentInstanceId,
longRunningTimerIntervalDuration,
maximumShortTimerDuration,
schemaVersion,
input,
new TaskOrchestrationExecutor()
);
Expand Down
6 changes: 3 additions & 3 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ export class Utils {
this.hasAllPropertiesOf(value, typeInstance)
);

// This recursive step ensures _all_ Timestamp properties are converted properly
// For example, a payload can contain the history as a property, so if we want to
// parse each HistoryEvent's Timestamp, we need to traverse the payload recursively
this.parseTimestampsAsDates(candidateObjects);

return candidateObjects as T[];
Expand Down Expand Up @@ -60,6 +57,9 @@ export class Utils {
obj.Timestamp = new Date(obj.Timestamp);
}
Object.values(obj).map((value) => {
// This recursive step ensures _all_ Timestamp properties are converted properly
// For example, a payload can contain the history as a property, so if we want to
// parse each HistoryEvent's Timestamp, we need to traverse the payload recursively
this.parseTimestampsAsDates(value);
});
}
Expand Down
Loading