diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c154680fa..0be631239 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -33,6 +33,7 @@ jobs: MEILI_NO_ANALYTICS: 'true' ports: - '7700:7700' + options: "--add-host host.docker.internal=host-gateway" strategy: fail-fast: false matrix: diff --git a/src/index.ts b/src/index.ts index c819e2168..9c79c77ce 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,7 @@ export * from "./types/index.js"; export * from "./errors/index.js"; export * from "./indexes.js"; +export * from "./task/webhook-task.js"; import { MeiliSearch } from "./meilisearch.js"; /** diff --git a/src/indexes.ts b/src/indexes.ts index 22ee033b5..e17432dcf 100644 --- a/src/indexes.ts +++ b/src/indexes.ts @@ -58,7 +58,7 @@ import { getHttpRequestsWithEnqueuedTaskPromise, TaskClient, type HttpRequestsWithEnqueuedTaskPromise, -} from "./task.js"; +} from "./task/task.js"; export class Index { uid: string; @@ -78,7 +78,11 @@ export class Index { this.uid = uid; this.primaryKey = primaryKey; this.httpRequest = new HttpRequests(config); - this.tasks = new TaskClient(this.httpRequest, config.defaultWaitOptions); + this.tasks = new TaskClient( + this.httpRequest, + config.webhookTaskClient, + config.defaultWaitOptions, + ); this.#httpRequestsWithTask = getHttpRequestsWithEnqueuedTaskPromise( this.httpRequest, this.tasks, @@ -244,7 +248,11 @@ export class Index { const httpRequests = new HttpRequests(config); return getHttpRequestsWithEnqueuedTaskPromise( httpRequests, - new TaskClient(httpRequests), + new TaskClient( + httpRequests, + config.webhookTaskClient, + config.defaultWaitOptions, + ), ).post({ path: "indexes", body: { ...options, uid }, diff --git a/src/meilisearch.ts b/src/meilisearch.ts index b2e7d7527..b1bfe8f9c 100644 --- a/src/meilisearch.ts +++ b/src/meilisearch.ts @@ -41,8 +41,8 @@ import { getHttpRequestsWithEnqueuedTaskPromise, TaskClient, type HttpRequestsWithEnqueuedTaskPromise, -} from "./task.js"; -import { BatchClient } from "./batch.js"; +} from "./task/task.js"; +import { BatchClient } from "./task/batch.js"; import { ChatWorkspace } from "./chat-workspace.js"; import type { MeiliSearchApiError } from "./errors/index.js"; @@ -73,6 +73,7 @@ export class MeiliSearch { this.#taskClient = new TaskClient( this.httpRequest, + config.webhookTaskClient, config.defaultWaitOptions, ); this.#batchClient = new BatchClient(this.httpRequest); @@ -473,8 +474,14 @@ export class MeiliSearch { * * @returns Promise returning an object with health details */ - async health(): Promise { - return await this.httpRequest.get({ path: "health" }); + async health( + // TODO: Need to do this for all other methods: https://github.com/meilisearch/meilisearch-js/issues/1476 + extraRequestInit?: ExtraRequestInit, + ): Promise { + return await this.httpRequest.get({ + path: "health", + extraRequestInit, + }); } /** diff --git a/src/batch.ts b/src/task/batch.ts similarity index 90% rename from src/batch.ts rename to src/task/batch.ts index 0466e8597..a3f356153 100644 --- a/src/batch.ts +++ b/src/task/batch.ts @@ -2,8 +2,8 @@ import type { Batch, BatchesResults, TasksOrBatchesQuery, -} from "./types/index.js"; -import type { HttpRequests } from "./http-requests.js"; +} from "../types/index.js"; +import type { HttpRequests } from "../http-requests.js"; /** * Class for handling batches. diff --git a/src/task.ts b/src/task/task.ts similarity index 83% rename from src/task.ts rename to src/task/task.ts index c87263969..9d660ab87 100644 --- a/src/task.ts +++ b/src/task/task.ts @@ -1,4 +1,5 @@ -import { MeiliSearchTaskTimeOutError } from "./errors/index.js"; +import { MeiliSearchTaskTimeOutError } from "../errors/index.js"; +import type { WebhookTaskClient } from "./webhook-task.js"; import type { WaitOptions, TasksOrBatchesQuery, @@ -9,14 +10,14 @@ import type { EnqueuedTaskPromise, TaskUidOrEnqueuedTask, ExtraRequestInit, -} from "./types/index.js"; -import type { HttpRequests } from "./http-requests.js"; +} from "../types/index.js"; +import type { HttpRequests } from "../http-requests.js"; /** * Used to identify whether an error is a timeout error in * {@link TaskClient.waitForTask}. */ -const TIMEOUT_ID = Symbol(""); +export const TIMEOUT_ID = Symbol(""); /** * @returns A function which defines an extra function property on a @@ -58,31 +59,11 @@ export class TaskClient { readonly #httpRequest: HttpRequests; readonly #defaultTimeout: number; readonly #defaultInterval: number; - readonly #applyWaitTask: ReturnType; - - constructor(httpRequest: HttpRequests, defaultWaitOptions?: WaitOptions) { - this.#httpRequest = httpRequest; - this.#defaultTimeout = defaultWaitOptions?.timeout ?? 5_000; - this.#defaultInterval = defaultWaitOptions?.interval ?? 50; - this.#applyWaitTask = getWaitTaskApplier(this); - } - - /** {@link https://www.meilisearch.com/docs/reference/api/tasks#get-one-task} */ - async getTask( - uid: number, - // TODO: Need to do this for all other methods: https://github.com/meilisearch/meilisearch-js/issues/1476 - extraRequestInit?: ExtraRequestInit, - ): Promise { - return await this.#httpRequest.get({ - path: `tasks/${uid}`, - extraRequestInit, - }); - } - - /** {@link https://www.meilisearch.com/docs/reference/api/tasks#get-tasks} */ - async getTasks(params?: TasksOrBatchesQuery): Promise { - return await this.#httpRequest.get({ path: "tasks", params }); - } + readonly #applyWaitTask = getWaitTaskApplier(this); + readonly waitForTask: ( + taskUidOrEnqueuedTask: TaskUidOrEnqueuedTask, + options?: WaitOptions, + ) => Promise; /** * Wait for an enqueued task to be processed. This is done through polling @@ -93,7 +74,7 @@ export class TaskClient { * to instead use {@link EnqueuedTaskPromise.waitTask}, which is available on * any method that returns an {@link EnqueuedTaskPromise}. */ - async waitForTask( + async #waitForTask( taskUidOrEnqueuedTask: TaskUidOrEnqueuedTask, options?: WaitOptions, ): Promise { @@ -128,6 +109,46 @@ export class TaskClient { } } + constructor( + httpRequest: HttpRequests, + webhookTaskClient?: WebhookTaskClient, + options?: WaitOptions, + ) { + this.#httpRequest = httpRequest; + + // TODO: Timeout error is only caught for private method + this.waitForTask = + webhookTaskClient !== undefined + ? (taskUidOrEnqueuedTask, options) => { + const taskUid = getTaskUid(taskUidOrEnqueuedTask); + return webhookTaskClient.waitForTask( + taskUid, + options?.timeout ?? this.#defaultTimeout, + ); + } + : this.#waitForTask.bind(this); + + this.#defaultTimeout = options?.timeout ?? 5_000; + this.#defaultInterval = options?.interval ?? 50; + } + + /** {@link https://www.meilisearch.com/docs/reference/api/tasks#get-one-task} */ + async getTask( + uid: number, + // TODO: Need to do this for all other methods: https://github.com/meilisearch/meilisearch-js/issues/1476 + extraRequestInit?: ExtraRequestInit, + ): Promise { + return await this.#httpRequest.get({ + path: `tasks/${uid}`, + extraRequestInit, + }); + } + + /** {@link https://www.meilisearch.com/docs/reference/api/tasks#get-tasks} */ + async getTasks(params?: TasksOrBatchesQuery): Promise { + return await this.#httpRequest.get({ path: "tasks", params }); + } + /** * Lazily wait for multiple enqueued tasks to be processed. * diff --git a/src/task/webhook-task.ts b/src/task/webhook-task.ts new file mode 100644 index 000000000..814021399 --- /dev/null +++ b/src/task/webhook-task.ts @@ -0,0 +1,97 @@ +import type { + EnqueuedTask, + Task, + WebhookTaskClientOptions, +} from "../types/index.js"; +import { TIMEOUT_ID } from "./task.js"; + +type TimeoutID = ReturnType; +type TaskUid = EnqueuedTask["taskUid"]; + +function* parseNDJSONTasks(tasksString: string) { + if (tasksString === "") { + return; + } + + let newLineIndex: number | undefined = undefined; + for (;;) { + const lastIndexPlusOneOrZero = + newLineIndex === undefined ? 0 : newLineIndex + 1; + newLineIndex = tasksString.indexOf("\n", lastIndexPlusOneOrZero); + + if (newLineIndex === -1) { + newLineIndex = undefined; + } + + yield JSON.parse( + tasksString.substring(lastIndexPlusOneOrZero, newLineIndex), + ) as Task; + + if (newLineIndex === undefined) { + return; + } + } +} + +export class WebhookTaskClient { + readonly #taskMap = new Map void>(); + readonly #orphanTaskMap = new Map< + TaskUid, + { task: Task; timeoutId?: TimeoutID } + >(); + readonly #timeout: number; + readonly #timeoutCallback: (task: Task) => void; + + constructor(options?: WebhookTaskClientOptions) { + this.#timeout = options?.timeout ?? 30_000; + this.#timeoutCallback = + options?.timeoutCallback ?? + ((task) => console.error("unclaimed orphan task", task)); + } + + pushTasksString(tasksString: string): void { + for (const task of parseNDJSONTasks(tasksString)) { + const callback = this.#taskMap.get(task.uid); + + if (callback !== undefined) { + this.#taskMap.delete(task.uid); + callback(task); + + return; + } + + const timeoutId = setTimeout(() => { + this.#orphanTaskMap.delete(task.uid); + this.#timeoutCallback(task); + }, this.#timeout); + + this.#orphanTaskMap.set(task.uid, { task, timeoutId }); + } + } + + async waitForTask(taskUid: TaskUid, timeout?: number): Promise { + const orphan = this.#orphanTaskMap.get(taskUid); + + if (orphan !== undefined) { + clearTimeout(orphan.timeoutId); + return orphan.task; + } + + let to: TimeoutID | undefined = undefined; + + const task = await new Promise((resolve, reject) => { + this.#taskMap.set(taskUid, resolve); + to = setTimeout(() => { + // TODO: This should be the same as in TaskClient + // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors + reject(TIMEOUT_ID); + }, timeout); + }); + + clearTimeout(to); + + return task; + } + + // TODO: destroy method -> for every orphaned task call error method and clear timeouts +} diff --git a/src/types/types.ts b/src/types/types.ts index 3d57fb6d9..a22009f8d 100644 --- a/src/types/types.ts +++ b/src/types/types.ts @@ -4,6 +4,7 @@ // Definitions: https://github.com/meilisearch/meilisearch-js // TypeScript Version: ^5.8.2 +import type { WebhookTaskClient } from "../task/webhook-task.js"; import type { WaitOptions } from "./task_and_batch.js"; // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -78,6 +79,7 @@ export type Config = { timeout?: number; /** Customizable default options for awaiting tasks. */ defaultWaitOptions?: WaitOptions; + webhookTaskClient?: WebhookTaskClient; }; /** Main options of a request. */ diff --git a/src/types/webhooks.ts b/src/types/webhooks.ts index 7ca48a3ad..0b1252828 100644 --- a/src/types/webhooks.ts +++ b/src/types/webhooks.ts @@ -1,3 +1,5 @@ +import type { Task } from "./task_and_batch.js"; + export type Webhook = { /** A v4 uuid Meilisearch automatically generates when you create a new webhook */ uuid: string; @@ -25,3 +27,8 @@ export type WebhookUpdatePayload = { /** An object with HTTP headers and their values */ headers?: Record; }; + +export type WebhookTaskClientOptions = { + timeout?: number; + timeoutCallback?: (task: Task) => void; +}; diff --git a/tests/setup.ts b/tests/setup.ts new file mode 100644 index 000000000..176160a74 --- /dev/null +++ b/tests/setup.ts @@ -0,0 +1,150 @@ +import { platform } from "node:process"; +import { spawnSync } from "node:child_process"; +import { MeiliSearch } from "../src/meilisearch.js"; + +const POLL_INTERVAL = 250; +const CONTAINER_NAME = "meilisearch"; +const TIMEOUT = 15_000; +const TIMEOUT_ID = Symbol(""); + +const ms = new MeiliSearch({ + host: "http://127.0.0.1:7700", + apiKey: "masterKey", +}); + +// TODO: Logs before cleanup script? Worry that it might pollute node logs, so probably should write them to a file +// and then log it in another step + +function removeIfExistsMeilisearchDockerService(): void { + spawnSync( + "docker", + + // https://docs.docker.com/reference/cli/docker/container/rm/ + ["container", "rm", "-f", CONTAINER_NAME], + + // TODO: prefix output + { stdio: "inherit" }, + ); +} + +// TODO +/** < explanation > */ +function getNetworkOptions() { + if (platform === "linux") { + return [ + // https://docs.docker.com/reference/cli/docker/container/run/#network + "--network", + "host", + ]; + } + + return [ + // https://docs.docker.com/reference/cli/docker/container/run/#publish + "-p", + "7700:7700", + ]; +} + +function startMeilisearchDockerService(): void { + spawnSync( + "docker", + [ + // https://docs.docker.com/reference/cli/docker/container/run + "run", + + // https://docs.docker.com/reference/cli/docker/container/run/#rm + "--rm", + + // https://docs.docker.com/reference/cli/docker/container/run/#detach + "-d", + + // TODO: Instead of name somehow get uid of container + // https://docs.docker.com/reference/cli/docker/container/run/#name + "--name", + CONTAINER_NAME, + + ...getNetworkOptions(), + + // https://docs.docker.com/reference/cli/docker/container/run/#env + "-e", + "MEILI_MASTER_KEY=masterKey", + "-e", + "MEILI_NO_ANALYTICS=true", + + // https://hub.docker.com/r/getmeili/meilisearch + `getmeili/meilisearch:latest`, + ], + + // TODO: prefix output + { stdio: "inherit" }, + ); +} + +/** Poll Meilisearch until its reachable. */ +async function waitForMeiliSearch(): Promise { + let lastError; + + const ac = new AbortController(); + + const toId = setTimeout(() => void ac.abort(TIMEOUT_ID), TIMEOUT); + + for (;;) { + try { + await ms.health({ signal: ac.signal }); + + clearTimeout(toId); + + break; + } catch (error) { + if (Object.is((error as Error).cause, TIMEOUT_ID)) { + throw new Error( + `connection unsuccessful to meilisearch after ${TIMEOUT}ms`, + { cause: lastError }, + ); + } + + lastError = error; + } + + await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL)); + } +} + +/** + * In case there is a connection, return Meilisearch health, and `null` + * otherwise. + */ +async function checkConnection() { + try { + return await ms.health(); + } catch { + return null; + } +} + +// TODO caching: https://forums.docker.com/t/caching-images-and-layers-on-gh-actions-workflow/140647/2 + +/** + * If there is no connection to Meilisearch, create a docker service of it, and + * wait for connection. + * + * {@link https://vitest.dev/config/#globalsetup} + */ +export default async function () { + const health = await checkConnection(); + if (health !== null) { + return; + } + + try { + removeIfExistsMeilisearchDockerService(); + startMeilisearchDockerService(); + await waitForMeiliSearch(); + + return removeIfExistsMeilisearchDockerService; + } catch (error) { + removeIfExistsMeilisearchDockerService(); + + throw error; + } +} diff --git a/tests/webhooks.test.ts b/tests/webhooks.test.ts index 5634a7926..291af95d4 100644 --- a/tests/webhooks.test.ts +++ b/tests/webhooks.test.ts @@ -1,42 +1,97 @@ +import { createServer } from "node:http"; import { expect, it, describe, beforeAll, afterAll } from "vitest"; -import { getClient } from "./utils/meilisearch-test-utils.js"; +import { MASTER_KEY, HOST, assert } from "./utils/meilisearch-test-utils.js"; import { - Meilisearch, type WebhookCreatePayload, type WebhookUpdatePayload, + MeiliSearch, + WebhookTaskClient, } from "../src/index.js"; +import { createUnzip } from "node:zlib"; -let adminClient: Meilisearch; +const SERVER_PORT = 3012; +const SERVER_HOST = "127.0.0.1"; + +const webhookTaskClient = new WebhookTaskClient(); +const client = new MeiliSearch({ + host: HOST, + apiKey: MASTER_KEY, + webhookTaskClient, +}); + +const server = createServer((req, res) => { + (async () => { + const buffers: Buffer[] = []; + + for await (const chunk of req.pipe(createUnzip()).iterator()) { + buffers.push(chunk as Buffer); + } + + const responseStr = Buffer.concat(buffers).toString(); + + webhookTaskClient.pushTasksString(responseStr); + + res.writeHead(200); + })() + .catch((reason) => { + console.error(reason); + res.writeHead(500); + }) + .finally(() => { + res.end(); + }); +}); beforeAll(async () => { - adminClient = await getClient("Admin"); + await new Promise((resolve) => { + server.listen(SERVER_PORT, SERVER_HOST, resolve); + }); }); afterAll(async () => { - const response = await adminClient.getWebhooks(); + await new Promise((resolve, reject) => { + server.close((err) => { + if (err !== undefined) { + reject(err); + } else { + resolve(); + } + }); + }); + + const response = await client.getWebhooks(); for (const webhook of response.results) { if (webhook.isEditable) { - await adminClient.deleteWebhook(webhook.uuid); + await client.deleteWebhook(webhook.uuid); } } }); const WEBHOOK_PAYLOAD = { - url: "https://example.com", - headers: { - authorization: "TOKEN", - }, + // TODO: https://dev.to/abiwinanda/github-action-adding-post-steps-in-composite-actions-5ak3 + // https://docs.docker.com/desktop/features/networking/#i-want-to-connect-from-a-container-to-a-service-on-the-host + url: `http://172.17.0.1:${SERVER_PORT}`, + headers: { authorization: "TOKEN" }, } satisfies WebhookCreatePayload; -describe("webhooks", () => { +it("webhook works", async () => { + await client.createWebhook(WEBHOOK_PAYLOAD); + const INDEX_NAME = "idx_webhook_test"; + const task = await client.createIndex(INDEX_NAME).waitTask(); + assert.isTask(task); + const task2 = await client.deleteIndex(INDEX_NAME).waitTask(); + assert.isTask(task2); +}); + +describe.skip("webhooks", () => { it("can list webhooks", async () => { - const response = await adminClient.getWebhooks(); + const response = await client.getWebhooks(); expect(response).toHaveProperty("results"); expect(response.results).toBeInstanceOf(Array); }); it("can create a webhook", async () => { - const response = await adminClient.createWebhook(WEBHOOK_PAYLOAD); + const response = await client.createWebhook(WEBHOOK_PAYLOAD); expect(response).toHaveProperty("uuid"); expect(response).toHaveProperty("url", WEBHOOK_PAYLOAD.url); expect(response).toHaveProperty("headers", WEBHOOK_PAYLOAD.headers); @@ -44,8 +99,8 @@ describe("webhooks", () => { }); it("can fetch a webhook", async () => { - const createdWebhook = await adminClient.createWebhook(WEBHOOK_PAYLOAD); - const response = await adminClient.getWebhook(createdWebhook.uuid); + const createdWebhook = await client.createWebhook(WEBHOOK_PAYLOAD); + const response = await client.getWebhook(createdWebhook.uuid); expect(response).toHaveProperty("uuid", createdWebhook.uuid); expect(response).toHaveProperty("url", WEBHOOK_PAYLOAD.url); expect(response).toHaveProperty("headers", WEBHOOK_PAYLOAD.headers); @@ -60,8 +115,8 @@ describe("webhooks", () => { }, } satisfies WebhookUpdatePayload; - const createdWebhook = await adminClient.createWebhook(WEBHOOK_PAYLOAD); - const response = await adminClient.updateWebhook( + const createdWebhook = await client.createWebhook(WEBHOOK_PAYLOAD); + const response = await client.updateWebhook( createdWebhook.uuid, updatedWebhook, ); @@ -78,8 +133,8 @@ describe("webhooks", () => { }, } satisfies WebhookUpdatePayload; - const createdWebhook = await adminClient.createWebhook(WEBHOOK_PAYLOAD); - const response = await adminClient.updateWebhook( + const createdWebhook = await client.createWebhook(WEBHOOK_PAYLOAD); + const response = await client.updateWebhook( createdWebhook.uuid, updatedWebhook, ); @@ -89,11 +144,11 @@ describe("webhooks", () => { }); it("can delete a webhook", async () => { - const createdWebhook = await adminClient.createWebhook(WEBHOOK_PAYLOAD); - const deleteResponse = await adminClient.deleteWebhook(createdWebhook.uuid); + const createdWebhook = await client.createWebhook(WEBHOOK_PAYLOAD); + const deleteResponse = await client.deleteWebhook(createdWebhook.uuid); expect(deleteResponse).toBeUndefined(); - const listResponse = await adminClient.getWebhooks(); + const listResponse = await client.getWebhooks(); expect(listResponse.results).not.toContainEqual(createdWebhook); }); }); diff --git a/vite.config.ts b/vite.config.ts index 9e2244eda..7e8f1b695 100644 --- a/vite.config.ts +++ b/vite.config.ts @@ -54,7 +54,8 @@ export default defineConfig(({ mode }) => { : undefined, }, test: { - include: ["tests/**/*.test.ts"], + // globalSetup: "tests/setup.ts", + include: ["tests/webhooks.test.ts"], exclude: ["tests/env/**"], fileParallelism: false, testTimeout: 100_000, // 100 seconds