diff --git a/src/client/index.test.ts b/src/client/index.test.ts index f249312..d7879c2 100644 --- a/src/client/index.test.ts +++ b/src/client/index.test.ts @@ -27,10 +27,10 @@ describe("workflow client", () => { test("should send notify", async () => { const eventId = `event-id-${nanoid()}`; - const notifyData = { data: `notify-data-${nanoid()}` }; + const eventData = { data: `notify-data-${nanoid()}` }; await mockQStashServer({ execute: async () => { - await client.notify({ eventId, notifyBody: JSON.stringify(notifyData) }); + await client.notify({ eventId, eventData }); }, responseFields: { status: 200, @@ -40,7 +40,7 @@ describe("workflow client", () => { method: "POST", url: `${MOCK_QSTASH_SERVER_URL}/v2/notify/${eventId}`, token, - body: notifyData, + body: eventData, }, }); }); diff --git a/src/client/index.ts b/src/client/index.ts index be30713..82dc42d 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -1,13 +1,23 @@ -import { NotifyResponse } from "../types"; +import { NotifyResponse, Waiter } from "../types"; import { Client as QStashClient } from "@upstash/qstash"; +import { makeGetWaitersRequest, makeNotifyRequest } from "./utils"; type ClientConfig = ConstructorParameters[0]; +/** + * Workflow client for canceling & notifying workflows and getting waiters of an + * event. + * + * ```ts + * import { Client } from "@upstash/workflow"; + * const client = new Client({ token: "" }) + * ``` + */ export class Client { private client: QStashClient; constructor(clientConfig: ClientConfig) { - if (!clientConfig.baseUrl || !clientConfig.token) { + if (!clientConfig.token) { console.warn("[Upstash Workflow] url or the token is not set. client will not work."); } this.client = new QStashClient(clientConfig); @@ -16,6 +26,13 @@ export class Client { /** * Cancel an ongoing workflow * + * ```ts + * import { Client } from "@upstash/workflow"; + * + * const client = new Client({ token: "" }) + * await client.cancel({ workflowRunId: "" }) + * ``` + * * @param workflowRunId run id of the workflow to delete * @returns true if workflow is succesfully deleted. Otherwise throws QStashError */ @@ -31,22 +48,44 @@ export class Client { /** * Notify a workflow run waiting for an event * + * ```ts + * import { Client } from "@upstash/workflow"; + * + * const client = new Client({ token: "" }) + * await client.notify({ + * eventId: "my-event-id", + * eventData: "my-data" // data passed to the workflow run + * }); + * ``` + * * @param eventId event id to notify - * @param notifyData data to provide to the workflow + * @param eventData data to provide to the workflow */ public async notify({ eventId, - notifyBody, + eventData, }: { eventId: string; - notifyBody?: string; + eventData?: unknown; }): Promise { - const result = (await this.client.http.request({ - path: ["v2", "notify", eventId], - method: "POST", - body: notifyBody, - })) as NotifyResponse[]; + return await makeNotifyRequest(this.client.http, eventId, eventData); + } - return result; + /** + * Check waiters of an event + * + * ```ts + * import { Client } from "@upstash/workflow"; + * + * const client = new Client({ token: "" }) + * const result = await client.getWaiters({ + * eventId: "my-event-id" + * }) + * ``` + * + * @param eventId event id to check + */ + public async getWaiters({ eventId }: { eventId: string }): Promise[]> { + return await makeGetWaitersRequest(this.client.http, eventId); } } diff --git a/src/client/utils.ts b/src/client/utils.ts new file mode 100644 index 0000000..ff96501 --- /dev/null +++ b/src/client/utils.ts @@ -0,0 +1,27 @@ +import { Client } from "@upstash/qstash"; +import { NotifyResponse, Waiter } from "../types"; + +export const makeNotifyRequest = async ( + requester: Client["http"], + eventId: string, + eventData?: unknown +): Promise => { + const result = (await requester.request({ + path: ["v2", "notify", eventId], + method: "POST", + body: typeof eventData === "string" ? eventData : JSON.stringify(eventData), + })) as NotifyResponse[]; + + return result; +}; + +export const makeGetWaitersRequest = async ( + requester: Client["http"], + eventId: string +): Promise[]> => { + const result = (await requester.request({ + path: ["v2", "waiters", eventId], + method: "GET", + })) as Required[]; + return result; +}; diff --git a/src/context/context.ts b/src/context/context.ts index 3d526f7..eb7f53c 100644 --- a/src/context/context.ts +++ b/src/context/context.ts @@ -1,10 +1,11 @@ -import type { CallResponse, WaitStepResponse, WorkflowClient } from "../types"; +import type { CallResponse, NotifyStepResponse, WaitStepResponse, WorkflowClient } from "../types"; import { type StepFunction, type Step } from "../types"; import { AutoExecutor } from "./auto-executor"; import type { BaseLazyStep } from "./steps"; import { LazyCallStep, LazyFunctionStep, + LazyNotifyStep, LazySleepStep, LazySleepUntilStep, LazyWaitForEventStep, @@ -289,15 +290,55 @@ export class WorkflowContext { const { url, method = "GET", body, headers = {} } = callSettings; const result = await this.addStep( - new LazyCallStep(stepName, url, method, body, headers) + new LazyCallStep(stepName, url, method, body, headers ?? {}) ); - return result; + + try { + return { + ...result, + body: JSON.parse(result.body as string), + }; + } catch { + return result; + } } + /** + * Makes the workflow run wait until a notify request is sent or until the + * timeout ends + * + * ```ts + * const { eventData, timeout } = await context.waitForEvent( + * "wait for event step", + * "my-event-id", + * 100 // timeout after 100 seconds + * ); + * ``` + * + * To notify a waiting workflow run, you can use the notify method: + * + * ```ts + * import { Client } from "@upstash/workflow"; + * + * const client = new Client({ token: }); + * + * await client.notify({ + * eventId: "my-event-id", + * eventData: "eventData" + * }) + * ``` + * + * @param stepName + * @param eventId event id to wake up the waiting workflow run + * @param timeout timeout duration in seconds + * @returns wait response as `{ timeout: boolean, eventData: unknown }`. + * timeout is true if the wait times out, if notified it is false. eventData + * is the value passed to `client.notify`. + */ public async waitForEvent( stepName: string, eventId: string, - timeout: string | number + timeout: number ): Promise { const result = await this.addStep( new LazyWaitForEventStep( @@ -307,7 +348,33 @@ export class WorkflowContext { ) ); - return result; + try { + return { + ...result, + eventData: JSON.parse(result.eventData as string), + }; + } catch { + return result; + } + } + + public async notify( + stepName: string, + eventId: string, + eventData: unknown + ): Promise { + const result = await this.addStep( + new LazyNotifyStep(stepName, eventId, eventData, this.qstashClient.http) + ); + + try { + return { + ...result, + eventData: JSON.parse(result.eventData as string), + }; + } catch { + return result; + } } /** diff --git a/src/context/steps.test.ts b/src/context/steps.test.ts index cd3c30d..7f05959 100644 --- a/src/context/steps.test.ts +++ b/src/context/steps.test.ts @@ -2,12 +2,15 @@ import { describe, test, expect } from "bun:test"; import { LazyCallStep, LazyFunctionStep, + LazyNotifyStep, LazySleepStep, LazySleepUntilStep, LazyWaitForEventStep, } from "./steps"; import { nanoid } from "../utils"; -import type { Step } from "../types"; +import type { NotifyResponse, NotifyStepResponse, Step } from "../types"; +import { Client } from "@upstash/qstash"; +import { MOCK_QSTASH_SERVER_URL, mockQStashServer } from "../test-utils"; describe("test steps", () => { const stepName = nanoid(); @@ -180,4 +183,82 @@ describe("test steps", () => { }); }); }); + + describe("notify step", () => { + const eventId = "my-event-id"; + const eventData = { data: "my-event-data" }; + + // get client + const token = nanoid(); + const client = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token }); + + const step = new LazyNotifyStep(stepName, eventId, eventData, client.http); + + test("should set correct fields", () => { + expect(step.stepName).toBe(stepName); + expect(step.stepType).toBe("Notify"); + }); + test("should create plan step", () => { + expect(step.getPlanStep(concurrent, targetStep)).toEqual({ + stepId: 0, + stepName, + stepType: "Notify", + concurrent, + targetStep, + }); + }); + + test("should create result step", async () => { + let called = false; + const notifyResponse: NotifyResponse[] = [ + { + error: "no-error", + messageId: "msg-id", + waiter: { + deadline: 123, + headers: { + "my-header": ["value"], + }, + timeoutBody: undefined, + timeoutHeaders: { + "my-header": ["value"], + }, + timeoutUrl: "url", + url: "url", + }, + }, + ]; + const stepResponse: NotifyStepResponse = { + eventId, + eventData, + notifyResponse, + }; + + await mockQStashServer({ + execute: async () => { + const result = await step.getResultStep(4, stepId); + expect(result).toEqual({ + concurrent: 4, + stepId, + out: stepResponse, + stepName, + stepType: "Notify", + }); + called = true; + }, + responseFields: { + status: 200, + body: notifyResponse, + }, + receivesRequest: { + method: "POST", + url: `${MOCK_QSTASH_SERVER_URL}/v2/notify/${eventId}`, + token, + body: eventData, + }, + }); + + expect(called).toBeTrue(); + }); + }); }); diff --git a/src/context/steps.ts b/src/context/steps.ts index ad80cba..b8dc4a8 100644 --- a/src/context/steps.ts +++ b/src/context/steps.ts @@ -1,5 +1,6 @@ -import type { HTTPMethods } from "@upstash/qstash"; -import type { Step, StepFunction, StepType, WaitStepResponse } from "../types"; +import type { Client, HTTPMethods } from "@upstash/qstash"; +import type { NotifyStepResponse, Step, StepFunction, StepType, WaitStepResponse } from "../types"; +import { makeNotifyRequest } from "../client/utils"; /** * Base class outlining steps. Basically, each step kind (run/sleep/sleepUntil) @@ -225,35 +226,18 @@ export class LazyWaitForEventStep extends BaseLazyStep { } } -// export class LazyNotifyStep extends BaseLazyStep { -// private readonly eventId: string; -// private readonly timeout: string; -// stepType: StepType = "Wait" +export class LazyNotifyStep extends LazyFunctionStep { + stepType: StepType = "Notify"; -// constructor( -// stepName: string, -// eventId: string, -// ) { -// super(stepName) -// this.eventId = eventId; -// } + constructor(stepName: string, eventId: string, eventData: unknown, requester: Client["http"]) { + super(stepName, async () => { + const notifyResponse = await makeNotifyRequest(requester, eventId, eventData); -// public getPlanStep(concurrent: number, targetStep: number): Step { -// return { -// stepId: 0, -// stepName: this.stepName, -// stepType: this.stepType, -// concurrent, -// targetStep -// } -// } - -// public async getResultStep(concurrent: number, stepId: number): Promise> { -// return await Promise.resolve({ -// stepId, -// stepName: this.stepName, -// stepType: this.stepType, -// concurrent, -// }); -// } -// } + return { + eventId, + eventData, + notifyResponse, + }; + }); + } +} diff --git a/src/integration.test.ts b/src/integration.test.ts index 49e6f6d..74de9e2 100644 --- a/src/integration.test.ts +++ b/src/integration.test.ts @@ -55,10 +55,10 @@ import type { RouteFunction, WaitStepResponse, WorkflowServeOptions } from "./ty import type { NextRequest } from "next/server"; import { Client } from "./client"; import { nanoid } from "./utils"; +import { makeGetWaitersRequest } from "./client/utils"; const WORKFLOW_PORT = "3000"; const THIRD_PARTY_PORT = "3001"; -const LOCAL_WORKFLOW_URL = `http://localhost:${WORKFLOW_PORT}`; const LOCAL_THIRD_PARTY_URL = `http://localhost:${THIRD_PARTY_PORT}`; const someWork = (input: string) => { @@ -109,21 +109,23 @@ const testEndpoint = async ({ finishState, failureFunction, retries, + port = WORKFLOW_PORT, }: { - finalCount: number; + finalCount?: number; waitFor: number; initialPayload: TInitialPayload; routeFunction: RouteFunction; finishState: FinishState; failureFunction?: WorkflowServeOptions["failureFunction"]; retries?: number; + port?: string; }) => { let counter = 0; const { POST: endpoint } = workflowServe(routeFunction, { qstashClient, - url: LOCAL_WORKFLOW_URL, - verbose: true, + url: `http://localhost:${port}`, + // verbose: true, failureFunction, retries, }); @@ -133,7 +135,7 @@ const testEndpoint = async ({ counter += 1; return await endpoint(request as NextRequest); }, - port: WORKFLOW_PORT, + port: port, }); await qstashClient.publishJSON({ @@ -142,7 +144,7 @@ const testEndpoint = async ({ headers: { Authentication: "Bearer secretPassword", }, - url: `http://localhost:${WORKFLOW_PORT}`, + url: `http://localhost:${port}`, }); await new Promise((resolve) => setTimeout(resolve, waitFor)); @@ -150,7 +152,9 @@ const testEndpoint = async ({ server.stop(); finishState.check(); - expect(counter).toBe(finalCount); + if (finalCount) { + expect(counter).toBe(finalCount); + } }; describe.skip("live serve tests", () => { @@ -613,12 +617,12 @@ describe.skip("live serve tests", () => { expect(input).toBe(payload); - const { notifyBody, timeout } = await context.waitForEvent( + const { eventData, timeout } = await context.waitForEvent( "single wait for event", eventId, 1 ); - expect(notifyBody).toBeUndefined(); + expect(eventData).toBeUndefined(); expect(timeout).toBeTrue(); const [runResponse, waitResponse] = await Promise.all([ @@ -626,8 +630,9 @@ describe.skip("live serve tests", () => { context.waitForEvent("wait-event-step", eventId, 3), ]); expect(runResponse).toBe(runResult); - expect(waitResponse.notifyBody).toBe(expectedWaitResponse.notifyBody); expect(waitResponse.timeout).toBe(expectedWaitResponse.timeout); + expect(waitResponse.eventData).toEqual(expectedWaitResponse.eventData); + expect(typeof waitResponse.eventData).toBe(typeof expectedWaitResponse.eventData); finishState.finish(); }, }); @@ -639,7 +644,7 @@ describe.skip("live serve tests", () => { const eventId = `my-event-id-${nanoid()}`; await testWaitEndpoint( { - notifyBody: undefined, + eventData: undefined, timeout: true, }, eventId @@ -652,7 +657,7 @@ describe.skip("live serve tests", () => { "should notify correctly", async () => { const eventId = `my-event-id-${nanoid()}`; - const notifyBody = "notify-body"; + const eventData = "notify-body"; const workflowClient = new Client({ baseUrl: process.env.MOCK_QSTASH_URL, token: process.env.MOCK_QSTASH_TOKEN ?? "", @@ -665,7 +670,7 @@ describe.skip("live serve tests", () => { while (true) { await new Promise((resolve) => setTimeout(resolve, 1000)); - const result = await workflowClient.notify({ eventId, notifyBody }); + const result = await workflowClient.notify({ eventId, eventData }); if (result) { expect(result[0].waiter.url).toBe(`http://localhost:${WORKFLOW_PORT}`); notifyFinishState.finish(); @@ -677,7 +682,7 @@ describe.skip("live serve tests", () => { await testWaitEndpoint( { - notifyBody, + eventData, timeout: false, }, eventId @@ -687,5 +692,67 @@ describe.skip("live serve tests", () => { }, { timeout: 17_000 } ); + + describe("should notify from inside a function", () => { + const testNotifyWithContext = async (payload: unknown) => { + const eventId = `my-event-id-${nanoid()}`; + + const waitingEndpoint = testWaitEndpoint( + { + eventData: payload, + timeout: false, + }, + eventId + ); + + const finishState = new FinishState(); + const notifyingEndpoint = testEndpoint({ + finishState, + initialPayload: undefined, + waitFor: 15000, + port: "3002", + routeFunction: async (context) => { + // wait to avoid notifying the first waitForEvent + await context.sleep("sleep for first timeout", 3); + + while (true) { + const waiters = await context.run("check waiters", async () => { + const waiters = await makeGetWaitersRequest(context.qstashClient.http, eventId); + return waiters; + }); + + expect(waiters[0].timeoutUrl).toBe("http://localhost:3000"); + expect(waiters[0].timeoutBody).toBe(undefined); + expect(waiters[0].timeoutHeaders["Upstash-Workflow-Runid"]).toBeTruthy(); + + if (waiters) { + break; + } + } + const { notifyResponse } = await context.notify("notify-step", eventId, payload); + expect(notifyResponse.length).toBeTruthy(); + finishState.finish(); + }, + }); + + await Promise.all([waitingEndpoint, notifyingEndpoint]); + }; + + test( + "should handle string event data", + async () => { + await testNotifyWithContext("event-data"); + }, + { timeout: 170000 } + ); + + test( + "should handle object event data", + async () => { + await testNotifyWithContext({ event: "data" }); + }, + { timeout: 170000 } + ); + }); }); }); diff --git a/src/serve/serve.test.ts b/src/serve/serve.test.ts index 9328bf0..0f23495 100644 --- a/src/serve/serve.test.ts +++ b/src/serve/serve.test.ts @@ -129,6 +129,7 @@ describe("serve", () => { headers: { "content-type": "application/json", "upstash-forward-upstash-workflow-sdk-version": "1", + "upstash-retries": "3", "upstash-method": "POST", "upstash-workflow-runid": workflowRunId, "upstash-workflow-init": "false", @@ -155,6 +156,7 @@ describe("serve", () => { "content-type": "application/json", "upstash-forward-upstash-workflow-sdk-version": "1", "upstash-method": "POST", + "upstash-retries": "3", "upstash-workflow-runid": workflowRunId, "upstash-workflow-init": "false", "upstash-workflow-url": WORKFLOW_ENDPOINT, diff --git a/src/types.ts b/src/types.ts index 98101ff..4002913 100644 --- a/src/types.ts +++ b/src/types.ts @@ -61,7 +61,7 @@ type WaitFields = { type NotifyFields = { notifyEventId?: string; - notifyBody?: string; + eventData?: string; }; export type Step = { @@ -235,39 +235,55 @@ export type FailureFunctionPayload = { */ export type RequiredExceptFields = Omit, K> & Partial>; -export type WaitResult = { - result: TResult; - timeout: boolean; -}; - export type Waiter = { url: string; deadline: number; headers: Record; - timeoutUrl: string; - timeoutBody: unknown; - timeoutHeaders: Record; + timeoutUrl?: string; + timeoutBody?: unknown; + timeoutHeaders?: Record; }; export type NotifyResponse = { waiter: Waiter; messageId: string; - deduplicated: boolean; error: string; }; export type WaitRequest = { url: string; + step: Step; timeout: string; - timeoutBody?: string; timeoutUrl?: string; + timeoutBody?: string; timeoutHeaders?: Record; - step: Step; }; export type WaitStepResponse = { + /** + * whether the wait for event step timed out. false if + * the step is notified + */ timeout: boolean; - notifyBody: unknown; + /** + * body passed in notify request + */ + eventData: unknown; +}; + +export type NotifyStepResponse = { + /** + * notified event id + */ + eventId: string; + /** + * event data sent with notify + */ + eventData: unknown; + /** + * response from notify + */ + notifyResponse: NotifyResponse[]; }; export type CallResponse = { diff --git a/src/workflow-parser.test.ts b/src/workflow-parser.test.ts index ef01e35..99db467 100644 --- a/src/workflow-parser.test.ts +++ b/src/workflow-parser.test.ts @@ -279,13 +279,13 @@ describe("Workflow Parser", () => { expect(steps[2].stepType).toBe("Wait"); const timeoutResponse: WaitStepResponse = { - notifyBody: undefined, + eventData: undefined, timeout: true, }; expect(steps[1].out).toEqual(timeoutResponse); const notifyResponse: WaitStepResponse = { - notifyBody: "notify-data", + eventData: "notify-data", timeout: false, }; expect(steps[2].out).toEqual(notifyResponse); diff --git a/src/workflow-parser.ts b/src/workflow-parser.ts index 0dea61a..c926fde 100644 --- a/src/workflow-parser.ts +++ b/src/workflow-parser.ts @@ -71,7 +71,7 @@ const parsePayload = (rawPayload: string) => { // if event is a wait event, overwrite the out with WaitStepResponse: if (step.waitEventId) { const newOut: WaitStepResponse = { - notifyBody: step.out, + eventData: step.out, timeout: step.waitTimeout ?? false, }; step.out = newOut;