Skip to content

Commit

Permalink
fix: add context.call return parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
CahidArda committed Oct 16, 2024
1 parent 79e240e commit f62ad9c
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 59 deletions.
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export const WORKFLOW_ID_HEADER = "Upstash-Workflow-RunId";
export const WORKFLOW_INIT_HEADER = "Upstash-Workflow-Init";
export const WORKFLOW_URL_HEADER = "Upstash-Workflow-Url";
export const WORKFLOW_FAILURE_HEADER = "Upstash-Workflow-Is-Failure";
export const WORKFLOW_FEATURE_HEADER = "Upstash-Feature-Set";

export const WORKFLOW_PROTOCOL_VERSION = "1";
export const WORKFLOW_PROTOCOL_VERSION_HEADER = "Upstash-Workflow-Sdk-Version";
Expand Down
5 changes: 5 additions & 0 deletions src/context/auto-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ describe("auto-executor", () => {
{
destination: WORKFLOW_ENDPOINT,
headers: {
"upstash-feature-set": "WF_NoDelete",
"content-type": "application/json",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
Expand Down Expand Up @@ -207,6 +208,7 @@ describe("auto-executor", () => {
body: '{"stepId":0,"stepName":"sleep for some time","stepType":"SleepFor","sleepFor":123,"concurrent":2,"targetStep":1}',
destination: WORKFLOW_ENDPOINT,
headers: {
"upstash-feature-set": "WF_NoDelete",
"content-type": "application/json",
"upstash-delay": "123s",
"upstash-forward-upstash-workflow-sdk-version": "1",
Expand All @@ -221,6 +223,7 @@ describe("auto-executor", () => {
body: '{"stepId":0,"stepName":"sleep until next day","stepType":"SleepUntil","sleepUntil":123123,"concurrent":2,"targetStep":2}',
destination: WORKFLOW_ENDPOINT,
headers: {
"upstash-feature-set": "WF_NoDelete",
"content-type": "application/json",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
Expand Down Expand Up @@ -273,6 +276,7 @@ describe("auto-executor", () => {
{
destination: WORKFLOW_ENDPOINT,
headers: {
"upstash-feature-set": "WF_NoDelete",
"content-type": "application/json",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
Expand Down Expand Up @@ -325,6 +329,7 @@ describe("auto-executor", () => {
{
destination: WORKFLOW_ENDPOINT,
headers: {
"upstash-feature-set": "WF_NoDelete",
"content-type": "application/json",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
Expand Down
6 changes: 5 additions & 1 deletion src/context/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ describe("context tests", () => {

const throws = async () => {
await context.run("outer step", async () => {
await context.call("inner call", { url: "https://some-url.com"});
await context.call("inner call", { url: "https://some-url.com" });
});
};
expect(throws).toThrow(
Expand Down Expand Up @@ -137,6 +137,7 @@ describe("context tests", () => {
body: '{"stepId":1,"stepName":"my-step","stepType":"Run","out":"my-result","concurrent":1}',
destination: WORKFLOW_ENDPOINT,
headers: {
"upstash-feature-set": "WF_NoDelete",
"content-type": "application/json",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
Expand Down Expand Up @@ -185,6 +186,7 @@ describe("context tests", () => {
},
timeout: "20s",
timeoutHeaders: {
"Upstash-Feature-Set": ["WF_NoDelete"],
"Content-Type": ["application/json"],
[`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: ["1"],
"Upstash-Retries": ["3"],
Expand Down Expand Up @@ -234,6 +236,7 @@ describe("context tests", () => {
body: '{"stepId":0,"stepName":"my-wait-step","stepType":"Wait","waitEventId":"my-event-id","timeout":"20s","concurrent":2,"targetStep":1}',
destination: WORKFLOW_ENDPOINT,
headers: {
"upstash-feature-set": "WF_NoDelete",
"content-type": "application/json",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
Expand All @@ -248,6 +251,7 @@ describe("context tests", () => {
body: '{"stepId":0,"stepName":"my-run-step","stepType":"Run","concurrent":2,"targetStep":2}',
destination: WORKFLOW_ENDPOINT,
headers: {
"upstash-feature-set": "WF_NoDelete",
"content-type": "application/json",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
Expand Down
29 changes: 9 additions & 20 deletions src/context/context.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { WaitStepResponse, WorkflowClient } from "../types";
import type { CallResponse, WaitStepResponse, WorkflowClient } from "../types";
import { type StepFunction, type Step } from "../types";
import { AutoExecutor } from "./auto-executor";
import type { BaseLazyStep } from "./steps";
Expand Down Expand Up @@ -277,32 +277,21 @@ export class WorkflowContext<TInitialPayload = unknown> {
* @returns call result (parsed as JSON if possible)
*/
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-parameters
public async call<TResult = unknown, TBody = unknown>(
public async call(
stepName: string,
callSettings: {
url: string,
method?: HTTPMethods,
body?: TBody,
headers?: Record<string, string>
url: string;
method?: HTTPMethods;
body?: unknown;
headers?: Record<string, string>;
}
) {

const {
url,
method = "GET",
body,
headers = {}
} = callSettings;
const { url, method = "GET", body, headers = {} } = callSettings;

const result = await this.addStep(
new LazyCallStep<string>(stepName, url, method, body, headers )
new LazyCallStep<CallResponse>(stepName, url, method, body, headers)
);

try {
return JSON.parse(result) as TResult;
} catch {
return result as TResult;
}
return result;
}

public async waitForEvent(
Expand Down
66 changes: 50 additions & 16 deletions src/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -392,28 +392,22 @@ describe.skip("live serve tests", () => {
return;
}

const postResult = await context.call<string>(
"post call",
{
url: LOCAL_THIRD_PARTY_URL,
method: "POST",
body: "post-payload",
headers: postHeader
}
);
const { body: postResult } = await context.call("post call", {
url: LOCAL_THIRD_PARTY_URL,
method: "POST",
body: "post-payload",
headers: postHeader,
});
expect(postResult).toBe(
"called POST 'third-party-result' 'post-header-value-x' '\"post-payload\"'"
);

await context.sleep("sleep 1", 2);

const getResult = await context.call<string>(
"get call",
{
url: LOCAL_THIRD_PARTY_URL,
headers: getHeader
}
);
const { body: getResult } = await context.call("get call", {
url: LOCAL_THIRD_PARTY_URL,
headers: getHeader,
});

expect(getResult).toBe("called GET 'third-party-result' 'get-header-value-x'");
finishState.finish();
Expand Down Expand Up @@ -497,6 +491,46 @@ describe.skip("live serve tests", () => {
}
);

test(
"call failure",
async () => {
const failingResponse = "failing-response";
const payload = "my-payload";
const thirdPartyServer = serve({
async fetch(request) {
const requestPayload = await request.json();
return new Response(`${failingResponse} - ${requestPayload}`, { status: 400 });
},
port: THIRD_PARTY_PORT,
});

const finishState = new FinishState();
await testEndpoint({
finalCount: 4,
waitFor: 7000,
initialPayload: payload,
finishState,
routeFunction: async (context) => {
const input = context.requestPayload;
const { status, body, header } = await context.call("failing call", {
url: LOCAL_THIRD_PARTY_URL,
body: input,
method: "POST",
});
expect(status).toBe(400);
expect(body).toBe(`${failingResponse} - ${payload}`);
expect(header["Content-Length"]).toEqual(["29"]);
finishState.finish();
},
});

thirdPartyServer.stop();
},
{
timeout: 8000,
}
);

test(
"retry",
async () => {
Expand Down
5 changes: 4 additions & 1 deletion src/serve/authorization.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ describe("disabled workflow context", () => {
let called = false;
await mockQStashServer({
execute: () => {
const throws = disabledContext.call("call-step", { url: "some-url"});
const throws = disabledContext.call("call-step", { url: "some-url" });
expect(throws).rejects.toThrow(QStashWorkflowAbort);
called = true;
},
Expand Down Expand Up @@ -200,6 +200,7 @@ describe("disabled workflow context", () => {
body: '{"stepId":1,"stepName":"step","stepType":"Run","out":"result","concurrent":1}',
destination: WORKFLOW_ENDPOINT,
headers: {
"upstash-feature-set": "WF_NoDelete",
"content-type": "application/json",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
Expand Down Expand Up @@ -247,6 +248,7 @@ describe("disabled workflow context", () => {
body: '{"stepId":1,"stepName":"step","stepType":"Run","out":"result","concurrent":1}',
destination: WORKFLOW_ENDPOINT,
headers: {
"upstash-feature-set": "WF_NoDelete",
"content-type": "application/json",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
Expand Down Expand Up @@ -295,6 +297,7 @@ describe("disabled workflow context", () => {
body: '{"stepId":1,"stepName":"step","stepType":"Run","out":"result","concurrent":1}',
destination: WORKFLOW_ENDPOINT,
headers: {
"upstash-feature-set": "WF_NoDelete",
"content-type": "application/json",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
Expand Down
3 changes: 3 additions & 0 deletions src/serve/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ export const serve = <
* @returns A promise that resolves to a response.
*/
const handler = async (request: TRequest) => {
await debug?.log("INFO", "ENDPOINT_START");

const { workflowUrl, workflowFailureUrl } = await determineUrls(
request,
url,
Expand Down Expand Up @@ -114,6 +116,7 @@ export const serve = <
failureUrl: workflowFailureUrl,
debug,
env,
retries,
});

// attempt running routeFunction until the first step
Expand Down
1 change: 1 addition & 0 deletions src/serve/serve.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ describe("serve", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "WF_NoDelete",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
Expand Down
6 changes: 6 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,9 @@ export type WaitStepResponse = {
timeout: boolean;
notifyBody: unknown;
};

export type CallResponse = {
status: number;
body: unknown;
header: Record<string, string[]>;
};
Loading

0 comments on commit f62ad9c

Please sign in to comment.