From af3c38ed5d378a65d612a6488b5b78009a9467b2 Mon Sep 17 00:00:00 2001 From: Roj Date: Wed, 8 May 2024 23:10:22 +0300 Subject: [PATCH] [getUpdates] Abort pending requests instead of refusing new ones --- client_manager.ts | 43 ++++++++++++++++--------------------------- main.ts | 30 +++++------------------------- worker.ts | 20 ++++---------------- 3 files changed, 25 insertions(+), 68 deletions(-) diff --git a/client_manager.ts b/client_manager.ts index 678856b..6dfaae7 100644 --- a/client_manager.ts +++ b/client_manager.ts @@ -213,21 +213,8 @@ export class ClientManager { async getUpdates(id: string, timeoutSeconds: number) { const updates = await this.#getUpdatesInner(id, timeoutSeconds); - try { - return updates; - } finally { - this.#addToUpdateCleanupQueue(id, updates); - } - } - - async canGetUpdates(id: string) { - const client = await this.getClient(id); - if (this.#webhooks.has(client)) { - throw new InputError("getUpdates is not allowed when a webhook is set."); - } - if (this.#polls.has(client)) { - throw new InputError("Another getUpdates is in progress."); - } + this.#addToUpdateCleanupQueue(id, updates); + return updates; } #polls = new Set(); @@ -235,12 +222,19 @@ export class ClientManager { #updateResolvers = new Map void>(); #getUpdatesControllers = new Map(); async #getUpdatesInner(id: string, timeoutSeconds: number) { - const client = this.mustGetClient(id); + const client = await this.getClient(id); if (this.#webhooks.has(client)) { - unreachable(); + throw new InputError("getUpdates is not allowed when a webhook is set."); } - if (this.#polls.has(client)) { - unreachable(); + + if (this.#polls.has(client)) { + const controller = this.#getUpdatesControllers.get(client); + if (controller) { + controller.abort(); + } + // just in case + this.#polls.delete(client); + this.#updateResolvers.delete(client); } this.#polls.add(client); let controller: AbortController | null = null; @@ -266,6 +260,9 @@ export class ClientManager { } this.#updateResolvers.set(client, resolve); }); + if (controller.signal.aborted) { + throw new InputError("Aborted by another getUpdates request."); + } updates = this.#updates.get(client); if (updates && updates.length) { @@ -289,14 +286,6 @@ export class ClientManager { } } - abortGetUpdates(id: string) { - const client = this.mustGetClient(id); - const controller = this.#getUpdatesControllers.get(client); - if (controller) { - controller.abort(); - } - } - #startPeriodicChecks() { Promise.resolve().then(this.#periodicChecks.bind(this)); } diff --git a/main.ts b/main.ts index 89bffac..1288de2 100644 --- a/main.ts +++ b/main.ts @@ -128,7 +128,8 @@ async function handleRequest(id: string, method: string, params: any[]) { } switch (method) { case "getUpdates": - return await handleGetUpdates(worker, id); + assertArgCount(params.length, 1); + return await handleGetUpdates(worker, id, params[0]); case "invoke": assertArgCount(params.length, 1); return await handleInvoke(worker, id, params[0]); @@ -160,30 +161,9 @@ async function handleMethod( } } -async function handleGetUpdates(worker: number, id: string) { - const enc = new TextEncoder(); - const response = await workers.call(worker, "canGetUpdates", id); - if (response != null) { - return Response.json(...response); - } - return new Response( - new ReadableStream( - { - async start(controller) { - try { - const updates = await workers.call(worker, "getUpdates", id, 0); - controller.enqueue(enc.encode(JSON.stringify(updates))); - controller.close(); - } catch { - controller.error(); - } - }, - async cancel() { - await workers.call(worker, "abortGetUpdates", id); - }, - }, - ), - ); +async function handleGetUpdates(worker: number, id: string, timeout: number) { + const result = await workers.call(worker, "getUpdates", id, timeout); + return Response.json(...result); } async function handleInvoke(worker: number, id: string, function_: any) { diff --git a/worker.ts b/worker.ts index 3b8fd2f..60eccdd 100644 --- a/worker.ts +++ b/worker.ts @@ -84,14 +84,12 @@ const handlers = { serve, stats, getUpdates, - abortGetUpdates, invoke, setWebhook, deleteWebhook, startWebhookLoop, unload, dropPendingUpdates, - canGetUpdates, }; export type Handler = typeof handlers; @@ -203,15 +201,11 @@ async function stats(): Promise { }; } -function getUpdates(id: string, timeout: number): Promise { +async function getUpdates(id: string, timeout: number): Promise> { if (timeout < 0) { - throw new Error(`Invalid timeout: ${timeout}`); + throw new InputError(`Invalid timeout: ${timeout}`); } - return clientManager.getUpdates(id, timeout); -} - -async function abortGetUpdates(id: string) { - await clientManager.abortGetUpdates(id); + return [await clientManager.getUpdates(id, timeout)]; } async function setWebhook( @@ -243,10 +237,4 @@ async function dropPendingUpdates( await clientManager.dropPendingUpdates(id); return [null]; } - -async function canGetUpdates( - id: string, -): Promise | null> { - await clientManager.canGetUpdates(id); - return null; -} + \ No newline at end of file