Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement CallHttp Polling Replay Logic #346

Merged
merged 6 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/durablehttprequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export class DurableHttpRequest {
* @param content The HTTP request content.
* @param headers The HTTP request headers.
* @param tokenSource The source of OAuth tokens to add to the request.
* @param asynchronousPatternEnabled Specifies whether the DurableHttpRequest should handle the asynchronous pattern.
*/
constructor(
/** The HTTP request method. */
Expand All @@ -26,6 +27,8 @@ export class DurableHttpRequest {
[key: string]: string;
},
/** The source of OAuth token to add to the request. */
public readonly tokenSource?: TokenSource
public readonly tokenSource?: TokenSource,
/** Whether the DurableHttpRequest should handle the asynchronous pattern. **/
public readonly asynchronousPatternEnabled: boolean = true
) {}
}
15 changes: 15 additions & 0 deletions src/durablehttpresponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,19 @@ export class DurableHttpResponse {
[key: string]: string;
}
) {}

// returns the specified header, case insensitively
// returns undefined if the header is not set
public getHeader(name: string): string | undefined {
if (this.headers) {
const lowerCaseName = name.toLowerCase();
const foundKey = Object.keys(this.headers).find(
(key) => key.toLowerCase() === lowerCaseName
);
if (foundKey) {
return this.headers[foundKey];
}
}
return undefined;
}
}
1 change: 1 addition & 0 deletions src/durableorchestrationbindinginfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export class DurableOrchestrationBindingInfo {
public readonly parentInstanceId?: string,
public readonly maximumShortTimerDuration?: string,
public readonly longRunningTimerIntervalDuration?: string,
public readonly defaultHttpAsyncRequestSleepTimeMillseconds?: number,
upperSchemaVersion = 0 // TODO: Implement entity locking // public readonly contextLocks?: EntityId[],
) {
// It is assumed that the extension supports all schemas in range [0, upperSchemaVersion].
Expand Down
42 changes: 38 additions & 4 deletions src/durableorchestrationcontext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
TimerTask,
DFTask,
LongTimerTask,
CallHttpWithPollingTask,
} from "./task";
import moment = require("moment");
import { ReplaySchema } from "./replaySchema";
Expand All @@ -46,6 +47,7 @@ export class DurableOrchestrationContext {
parentInstanceId: string | undefined,
longRunningTimerIntervalDuration: string | undefined,
maximumShortTimerDuration: string | undefined,
defaultHttpAsyncRequestSleepTimeMillseconds: number | undefined,
schemaVersion: ReplaySchema,
input: unknown,
private taskOrchestratorExecutor: TaskOrchestrationExecutor
Expand All @@ -61,6 +63,7 @@ export class DurableOrchestrationContext {
this.maximumShortTimerDuration = maximumShortTimerDuration
? moment.duration(maximumShortTimerDuration)
: undefined;
this.defaultHttpAsyncRequestSleepTimeMillseconds = defaultHttpAsyncRequestSleepTimeMillseconds;
this.schemaVersion = schemaVersion;
this.input = input;
this.newGuidCounter = 0;
Expand All @@ -71,6 +74,13 @@ export class DurableOrchestrationContext {
private newGuidCounter: number;
public customStatus: unknown;

/**
* The default time to wait between attempts when making HTTP polling requests
* This duration is used unless a different value (in seconds) is specified in the
* 'Retry-After' header of the 202 response.
*/
private readonly defaultHttpAsyncRequestSleepTimeMillseconds?: number;

/**
* The ID of the current orchestration instance.
*
Expand Down Expand Up @@ -296,16 +306,40 @@ export class DurableOrchestrationContext {
uri: string,
content?: string | object,
headers?: { [key: string]: string },
tokenSource?: TokenSource
tokenSource?: TokenSource,
asynchronousPatternEnabled = true
): Task {
if (content && typeof content !== "string") {
content = JSON.stringify(content);
}

const req = new DurableHttpRequest(method, uri, content as string, headers, tokenSource);
const req = new DurableHttpRequest(
method,
uri,
content as string,
headers,
tokenSource,
asynchronousPatternEnabled
);
const newAction = new CallHttpAction(req);
const task = new AtomicTask(false, newAction);
return task;
if (this.schemaVersion >= ReplaySchema.V3 && req.asynchronousPatternEnabled) {
if (!this.defaultHttpAsyncRequestSleepTimeMillseconds) {
throw Error(
"A framework-internal error was detected: replay schema version >= V3 is being used, " +
"but `defaultHttpAsyncRequestSleepDuration` property is not defined. " +
"This is likely an issue with the Durable Functions Extension. " +
"Please report this bug here: https://github.com/Azure/azure-functions-durable-js/issues"
);
}
return new CallHttpWithPollingTask(
false,
newAction,
this,
this.taskOrchestratorExecutor,
this.defaultHttpAsyncRequestSleepTimeMillseconds
);
}
return new AtomicTask(false, newAction);
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export class Orchestrator {
orchestrationBinding.parentInstanceId,
orchestrationBinding.longRunningTimerIntervalDuration,
orchestrationBinding.maximumShortTimerDuration,
orchestrationBinding.defaultHttpAsyncRequestSleepTimeMillseconds,
upperSchemaVersion,
input,
this.taskOrchestrationExecutor
Expand Down
92 changes: 90 additions & 2 deletions src/task.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { RetryOptions } from ".";
import { IAction, CreateTimerAction } from "./classes";
import { DurableHttpResponse, RetryOptions } from ".";
import { IAction, CreateTimerAction, CallHttpAction } from "./classes";
import { TaskOrchestrationExecutor } from "./taskorchestrationexecutor";
import moment = require("moment");
import { DurableOrchestrationContext } from "./durableorchestrationcontext";
Expand Down Expand Up @@ -379,6 +379,94 @@ export class WhenAnyTask extends CompoundTask {
}
}

/**
* @hidden
*
* CallHttp Task with polling logic
*
* If the HTTP requests returns a 202 status code with a 'Location' header,
* then a timer task is created, after which another HTTP request is made,
* until a different status code is returned.
*
* Any other result from the HTTP requests is the result of the whole task.
*
* The duration of the timer is specified by the 'Retry-After' header (in seconds)
* of the 202 response, or a default value specified by the durable extension is used.
*
*/
export class CallHttpWithPollingTask extends CompoundTask {
protected action: CallHttpAction;
private readonly defaultHttpAsyncRequestSleepDuration: moment.Duration;

public constructor(
id: TaskID,
action: CallHttpAction,
private readonly orchestrationContext: DurableOrchestrationContext,
private readonly executor: TaskOrchestrationExecutor,
defaultHttpAsyncRequestSleepTimeMillseconds: number
) {
super([new AtomicTask(id, action)], action);
this.id = id;
this.action = action;
this.defaultHttpAsyncRequestSleepDuration = moment.duration(
defaultHttpAsyncRequestSleepTimeMillseconds,
"ms"
);
}

public trySetValue(child: TaskBase): void {
if (child.stateObj === TaskState.Completed) {
if (child.actionObj instanceof CallHttpAction) {
const resultObj = child.result as DurableHttpResponse;
const result = new DurableHttpResponse(
resultObj.statusCode,
resultObj.content,
resultObj.headers
);
if (result.statusCode === 202 && result.getHeader("Location")) {
const retryAfterHeaderValue = result.getHeader("Retry-After");
const delay: moment.Duration = retryAfterHeaderValue
? moment.duration(retryAfterHeaderValue, "s")
: this.defaultHttpAsyncRequestSleepDuration;

const currentTime = this.orchestrationContext.currentUtcDateTime;
const timerFireTime = moment(currentTime).add(delay).toDate();

// this should be safe since both types returned by this call
// (DFTimerTask and LongTimerTask) are TaskBase-conforming
const timerTask = (this.orchestrationContext.createTimer(
timerFireTime
) as unknown) as TaskBase;
hossam-nasr marked this conversation as resolved.
Show resolved Hide resolved
const callHttpTask = new AtomicTask(
false,
new CallHttpAction(this.action.httpRequest)
);

this.addNewChildren([timerTask, callHttpTask]);
} else {
// Set the value of a non-redirect HTTP response as the value of the entire
// compound task
this.setValue(false, result);
}
}
} else {
// If any subtask failed, we fail the entire compound task
if (this.firstError === undefined) {
this.firstError = child.result as Error;
this.setValue(true, this.firstError);
}
}
}

private addNewChildren(children: TaskBase[]): void {
children.map((child) => {
child.parent = this;
this.children.push(child);
this.executor.trackOpenTask(child);
});
}
}

/**
* @hidden
*
Expand Down
33 changes: 18 additions & 15 deletions src/testingUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
} from "./classes";
import { IOrchestrationFunctionContext } from "./iorchestrationfunctioncontext";
import { ReplaySchema } from "./replaySchema";
import { TaskOrchestrationExecutor } from "./taskorchestrationexecutor";

/**
* An orchestration context with dummy default values to facilitate mocking/stubbing the
Expand All @@ -36,16 +35,17 @@ export class DummyOrchestrationContext implements IOrchestrationFunctionContext
* @param isReplaying Whether the orchestration is to be marked as isReplaying the its first event
* @param longRunningTimerIntervalDuration The duration to break smaller timers into if a long timer exceeds the maximum allowed duration
* @param maximumShortTimerDuration The maximum duration for a timer allowed by the underlying storage infrastructure
* @param defaultHttpAsyncRequestSleepDurationInMillseconds The default amount of time to wait between sending requests in a callHttp polling scenario
* @param schemaVersion The schema version currently used after being negotiated with the extension
* @param parentInstanceId The instanceId of the orchestration's parent, if this is a sub-orchestration
*/
constructor(
instanceId = "",
history: HistoryEvent[] | undefined = undefined,
input: any = undefined,
currentUtcDateTime: Date = new Date(),
longRunningTimerIntervalDuration = "3.00:00:00",
maximumShortTimerDuration = "6.00:00:00",
defaultHttpAsyncRequestSleepTimeMillseconds = 30000,
schemaVersion: ReplaySchema = ReplaySchema.V1,
isReplaying = false,
parentInstanceId = ""
Expand All @@ -54,19 +54,22 @@ export class DummyOrchestrationContext implements IOrchestrationFunctionContext
const opts = new HistoryEventOptions(0, new Date());
history = [new OrchestratorStartedEvent(opts)];
}
this.bindings = [new DurableOrchestrationBindingInfo(history)];
this.df = new DurableOrchestrationContext(
history,
instanceId,
currentUtcDateTime,
isReplaying,
parentInstanceId,
longRunningTimerIntervalDuration,
maximumShortTimerDuration,
schemaVersion,
input,
new TaskOrchestrationExecutor()
);
this.bindings = [
new DurableOrchestrationBindingInfo(
history,
input,
instanceId,
isReplaying,
parentInstanceId,
maximumShortTimerDuration,
longRunningTimerIntervalDuration,
defaultHttpAsyncRequestSleepTimeMillseconds,
schemaVersion
),
];

// Set this as undefined, let it be initialized by the orchestrator
this.df = (undefined as unknown) as DurableOrchestrationContext;
}
public doneValue: IOrchestratorState | undefined;
public err: string | Error | null | undefined;
Expand Down
Loading