Skip to content

Commit

Permalink
[getUpdates] Abort pending requests instead of refusing new ones
Browse files Browse the repository at this point in the history
  • Loading branch information
rojvv committed May 8, 2024
1 parent ddc94dc commit af3c38e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 68 deletions.
43 changes: 16 additions & 27 deletions client_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,34 +213,28 @@ export class ClientManager {

async getUpdates(id: string, timeoutSeconds: number) {
const updates = await this.#getUpdatesInner(id, timeoutSeconds);
try {
return updates;
} finally {
this.#addToUpdateCleanupQueue(id, updates);
}
}

async canGetUpdates(id: string) {
const client = await this.getClient(id);
if (this.#webhooks.has(client)) {
throw new InputError("getUpdates is not allowed when a webhook is set.");
}
if (this.#polls.has(client)) {
throw new InputError("Another getUpdates is in progress.");
}
this.#addToUpdateCleanupQueue(id, updates);
return updates;
}

#polls = new Set<Client>();
static #GET_UPDATES_MAX_UPDATES = 100;
#updateResolvers = new Map<Client, () => void>();
#getUpdatesControllers = new Map<Client, AbortController>();
async #getUpdatesInner(id: string, timeoutSeconds: number) {
const client = this.mustGetClient(id);
const client = await this.getClient(id);
if (this.#webhooks.has(client)) {
unreachable();
throw new InputError("getUpdates is not allowed when a webhook is set.");
}
if (this.#polls.has(client)) {
unreachable();

if (this.#polls.has(client)) {
const controller = this.#getUpdatesControllers.get(client);
if (controller) {
controller.abort();
}
// just in case
this.#polls.delete(client);
this.#updateResolvers.delete(client);
}
this.#polls.add(client);
let controller: AbortController | null = null;
Expand All @@ -266,6 +260,9 @@ export class ClientManager {
}
this.#updateResolvers.set(client, resolve);
});
if (controller.signal.aborted) {
throw new InputError("Aborted by another getUpdates request.");
}

updates = this.#updates.get(client);
if (updates && updates.length) {
Expand All @@ -289,14 +286,6 @@ export class ClientManager {
}
}

abortGetUpdates(id: string) {
const client = this.mustGetClient(id);
const controller = this.#getUpdatesControllers.get(client);
if (controller) {
controller.abort();
}
}

#startPeriodicChecks() {
Promise.resolve().then(this.#periodicChecks.bind(this));
}
Expand Down
30 changes: 5 additions & 25 deletions main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ async function handleRequest(id: string, method: string, params: any[]) {
}
switch (method) {
case "getUpdates":
return await handleGetUpdates(worker, id);
assertArgCount(params.length, 1);
return await handleGetUpdates(worker, id, params[0]);
case "invoke":
assertArgCount(params.length, 1);
return await handleInvoke(worker, id, params[0]);
Expand Down Expand Up @@ -160,30 +161,9 @@ async function handleMethod(
}
}

async function handleGetUpdates(worker: number, id: string) {
const enc = new TextEncoder();
const response = await workers.call(worker, "canGetUpdates", id);
if (response != null) {
return Response.json(...response);
}
return new Response(
new ReadableStream(
{
async start(controller) {
try {
const updates = await workers.call(worker, "getUpdates", id, 0);
controller.enqueue(enc.encode(JSON.stringify(updates)));
controller.close();
} catch {
controller.error();
}
},
async cancel() {
await workers.call(worker, "abortGetUpdates", id);
},
},
),
);
async function handleGetUpdates(worker: number, id: string, timeout: number) {
const result = await workers.call(worker, "getUpdates", id, timeout);
return Response.json(...result);
}

async function handleInvoke(worker: number, id: string, function_: any) {
Expand Down
20 changes: 4 additions & 16 deletions worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,12 @@ const handlers = {
serve,
stats,
getUpdates,
abortGetUpdates,
invoke,
setWebhook,
deleteWebhook,
startWebhookLoop,
unload,
dropPendingUpdates,
canGetUpdates,
};
export type Handler = typeof handlers;

Expand Down Expand Up @@ -203,15 +201,11 @@ async function stats(): Promise<WorkerStats> {
};
}

function getUpdates(id: string, timeout: number): Promise<Update[] | "DROP"> {
async function getUpdates(id: string, timeout: number): Promise<Parameters<typeof Response["json"]>> {
if (timeout < 0) {
throw new Error(`Invalid timeout: ${timeout}`);
throw new InputError(`Invalid timeout: ${timeout}`);
}
return clientManager.getUpdates(id, timeout);
}

async function abortGetUpdates(id: string) {
await clientManager.abortGetUpdates(id);
return [await clientManager.getUpdates(id, timeout)];
}

async function setWebhook(
Expand Down Expand Up @@ -243,10 +237,4 @@ async function dropPendingUpdates(
await clientManager.dropPendingUpdates(id);
return [null];
}

async function canGetUpdates(
id: string,
): Promise<Parameters<typeof Response["json"]> | null> {
await clientManager.canGetUpdates(id);
return null;
}

0 comments on commit af3c38e

Please sign in to comment.