Skip to content

Commit

Permalink
Merge branch 'main' into DX-1312-backwards-incompatible-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
CahidArda committed Oct 17, 2024
2 parents f62ad9c + 51b1605 commit 5f9c0f4
Show file tree
Hide file tree
Showing 11 changed files with 365 additions and 82 deletions.
6 changes: 3 additions & 3 deletions src/client/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ describe("workflow client", () => {

test("should send notify", async () => {
const eventId = `event-id-${nanoid()}`;
const notifyData = { data: `notify-data-${nanoid()}` };
const eventData = { data: `notify-data-${nanoid()}` };
await mockQStashServer({
execute: async () => {
await client.notify({ eventId, notifyBody: JSON.stringify(notifyData) });
await client.notify({ eventId, eventData });
},
responseFields: {
status: 200,
Expand All @@ -40,7 +40,7 @@ describe("workflow client", () => {
method: "POST",
url: `${MOCK_QSTASH_SERVER_URL}/v2/notify/${eventId}`,
token,
body: notifyData,
body: eventData,
},
});
});
Expand Down
61 changes: 50 additions & 11 deletions src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
import { NotifyResponse } from "../types";
import { NotifyResponse, Waiter } from "../types";
import { Client as QStashClient } from "@upstash/qstash";
import { makeGetWaitersRequest, makeNotifyRequest } from "./utils";

type ClientConfig = ConstructorParameters<typeof QStashClient>[0];

/**
* Workflow client for canceling & notifying workflows and getting waiters of an
* event.
*
* ```ts
* import { Client } from "@upstash/workflow";
* const client = new Client({ token: "<QSTASH_TOKEN>" })
* ```
*/
export class Client {
private client: QStashClient;

constructor(clientConfig: ClientConfig) {
if (!clientConfig.baseUrl || !clientConfig.token) {
if (!clientConfig.token) {
console.warn("[Upstash Workflow] url or the token is not set. client will not work.");
}
this.client = new QStashClient(clientConfig);
Expand All @@ -16,6 +26,13 @@ export class Client {
/**
* Cancel an ongoing workflow
*
* ```ts
* import { Client } from "@upstash/workflow";
*
* const client = new Client({ token: "<QSTASH_TOKEN>" })
* await client.cancel({ workflowRunId: "<WORKFLOW_RUN_ID>" })
* ```
*
* @param workflowRunId run id of the workflow to delete
* @returns true if workflow is succesfully deleted. Otherwise throws QStashError
*/
Expand All @@ -31,22 +48,44 @@ export class Client {
/**
* Notify a workflow run waiting for an event
*
* ```ts
* import { Client } from "@upstash/workflow";
*
* const client = new Client({ token: "<QSTASH_TOKEN>" })
* await client.notify({
* eventId: "my-event-id",
* eventData: "my-data" // data passed to the workflow run
* });
* ```
*
* @param eventId event id to notify
* @param notifyData data to provide to the workflow
* @param eventData data to provide to the workflow
*/
public async notify({
eventId,
notifyBody,
eventData,
}: {
eventId: string;
notifyBody?: string;
eventData?: unknown;
}): Promise<NotifyResponse[]> {
const result = (await this.client.http.request({
path: ["v2", "notify", eventId],
method: "POST",
body: notifyBody,
})) as NotifyResponse[];
return await makeNotifyRequest(this.client.http, eventId, eventData);
}

return result;
/**
* Check waiters of an event
*
* ```ts
* import { Client } from "@upstash/workflow";
*
* const client = new Client({ token: "<QSTASH_TOKEN>" })
* const result = await client.getWaiters({
* eventId: "my-event-id"
* })
* ```
*
* @param eventId event id to check
*/
public async getWaiters({ eventId }: { eventId: string }): Promise<Required<Waiter>[]> {
return await makeGetWaitersRequest(this.client.http, eventId);
}
}
27 changes: 27 additions & 0 deletions src/client/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { Client } from "@upstash/qstash";
import { NotifyResponse, Waiter } from "../types";

export const makeNotifyRequest = async (
requester: Client["http"],
eventId: string,
eventData?: unknown
): Promise<NotifyResponse[]> => {
const result = (await requester.request({
path: ["v2", "notify", eventId],
method: "POST",
body: typeof eventData === "string" ? eventData : JSON.stringify(eventData),
})) as NotifyResponse[];

return result;
};

export const makeGetWaitersRequest = async (
requester: Client["http"],
eventId: string
): Promise<Required<Waiter>[]> => {
const result = (await requester.request({
path: ["v2", "waiters", eventId],
method: "GET",
})) as Required<Waiter>[];
return result;
};
77 changes: 72 additions & 5 deletions src/context/context.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type { CallResponse, WaitStepResponse, WorkflowClient } from "../types";
import type { CallResponse, NotifyStepResponse, WaitStepResponse, WorkflowClient } from "../types";
import { type StepFunction, type Step } from "../types";
import { AutoExecutor } from "./auto-executor";
import type { BaseLazyStep } from "./steps";
import {
LazyCallStep,
LazyFunctionStep,
LazyNotifyStep,
LazySleepStep,
LazySleepUntilStep,
LazyWaitForEventStep,
Expand Down Expand Up @@ -289,15 +290,55 @@ export class WorkflowContext<TInitialPayload = unknown> {
const { url, method = "GET", body, headers = {} } = callSettings;

const result = await this.addStep(
new LazyCallStep<CallResponse>(stepName, url, method, body, headers)
new LazyCallStep<CallResponse>(stepName, url, method, body, headers ?? {})
);
return result;

try {
return {
...result,
body: JSON.parse(result.body as string),
};
} catch {
return result;
}
}

/**
* Makes the workflow run wait until a notify request is sent or until the
* timeout ends
*
* ```ts
* const { eventData, timeout } = await context.waitForEvent(
* "wait for event step",
* "my-event-id",
* 100 // timeout after 100 seconds
* );
* ```
*
* To notify a waiting workflow run, you can use the notify method:
*
* ```ts
* import { Client } from "@upstash/workflow";
*
* const client = new Client({ token: });
*
* await client.notify({
* eventId: "my-event-id",
* eventData: "eventData"
* })
* ```
*
* @param stepName
* @param eventId event id to wake up the waiting workflow run
* @param timeout timeout duration in seconds
* @returns wait response as `{ timeout: boolean, eventData: unknown }`.
* timeout is true if the wait times out, if notified it is false. eventData
* is the value passed to `client.notify`.
*/
public async waitForEvent(
stepName: string,
eventId: string,
timeout: string | number
timeout: number
): Promise<WaitStepResponse> {
const result = await this.addStep(
new LazyWaitForEventStep(
Expand All @@ -307,7 +348,33 @@ export class WorkflowContext<TInitialPayload = unknown> {
)
);

return result;
try {
return {
...result,
eventData: JSON.parse(result.eventData as string),
};
} catch {
return result;
}
}

public async notify(
stepName: string,
eventId: string,
eventData: unknown
): Promise<NotifyStepResponse> {
const result = await this.addStep(
new LazyNotifyStep(stepName, eventId, eventData, this.qstashClient.http)
);

try {
return {
...result,
eventData: JSON.parse(result.eventData as string),
};
} catch {
return result;
}
}

/**
Expand Down
83 changes: 82 additions & 1 deletion src/context/steps.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ import { describe, test, expect } from "bun:test";
import {
LazyCallStep,
LazyFunctionStep,
LazyNotifyStep,
LazySleepStep,
LazySleepUntilStep,
LazyWaitForEventStep,
} from "./steps";
import { nanoid } from "../utils";
import type { Step } from "../types";
import type { NotifyResponse, NotifyStepResponse, Step } from "../types";
import { Client } from "@upstash/qstash";
import { MOCK_QSTASH_SERVER_URL, mockQStashServer } from "../test-utils";

describe("test steps", () => {
const stepName = nanoid();
Expand Down Expand Up @@ -180,4 +183,82 @@ describe("test steps", () => {
});
});
});

describe("notify step", () => {
const eventId = "my-event-id";
const eventData = { data: "my-event-data" };

// get client
const token = nanoid();
const client = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token });

const step = new LazyNotifyStep(stepName, eventId, eventData, client.http);

test("should set correct fields", () => {
expect(step.stepName).toBe(stepName);
expect(step.stepType).toBe("Notify");
});
test("should create plan step", () => {
expect(step.getPlanStep(concurrent, targetStep)).toEqual({
stepId: 0,
stepName,
stepType: "Notify",
concurrent,
targetStep,
});
});

test("should create result step", async () => {
let called = false;
const notifyResponse: NotifyResponse[] = [
{
error: "no-error",
messageId: "msg-id",
waiter: {
deadline: 123,
headers: {
"my-header": ["value"],
},
timeoutBody: undefined,
timeoutHeaders: {
"my-header": ["value"],
},
timeoutUrl: "url",
url: "url",
},
},
];
const stepResponse: NotifyStepResponse = {
eventId,
eventData,
notifyResponse,
};

await mockQStashServer({
execute: async () => {
const result = await step.getResultStep(4, stepId);
expect(result).toEqual({
concurrent: 4,
stepId,
out: stepResponse,
stepName,
stepType: "Notify",
});
called = true;
},
responseFields: {
status: 200,
body: notifyResponse,
},
receivesRequest: {
method: "POST",
url: `${MOCK_QSTASH_SERVER_URL}/v2/notify/${eventId}`,
token,
body: eventData,
},
});

expect(called).toBeTrue();
});
});
});
Loading

0 comments on commit 5f9c0f4

Please sign in to comment.