From 0e5a962cbe5dada8b12584bc05ffaa495040041f Mon Sep 17 00:00:00 2001 From: Tomohiro Arakawa Date: Fri, 4 Sep 2020 18:26:17 -0400 Subject: [PATCH] Add timeouts to requests --- lib/client.ts | 102 ++++++++++++++++++++++++++++++----------------- lib/websocket.ts | 32 ++++++++++++--- 2 files changed, 92 insertions(+), 42 deletions(-) diff --git a/lib/client.ts b/lib/client.ts index 57c7c93..85f76fa 100644 --- a/lib/client.ts +++ b/lib/client.ts @@ -19,12 +19,14 @@ export interface GETRequest { path: string; tree?: object; watchCallback?: (response: Readonly) => void; + timeout?: number; } export interface WatchRequest { path: string; rev?: string; watchCallback: (response: Readonly) => void; + timeout?: number; } export interface PUTRequest { @@ -32,6 +34,7 @@ export interface PUTRequest { data: Json; contentType?: string; tree?: object; + timeout?: number; } export interface POSTRequest { @@ -39,14 +42,17 @@ export interface POSTRequest { data: Json; contentType?: string; tree?: object; + timeout?: number; } export interface HEADRequest { path: string; + timeout?: number; } export interface DELETERequest { path: string; + timeout?: number; } /** Main OADAClient class */ @@ -57,7 +63,7 @@ export class OADAClient { private _ws: WebSocketClient; constructor(config: Config) { - this._domain = config.domain.replace(/^https:\/\//,''); // help for those who can't remember if https should be there + this._domain = config.domain.replace(/^https:\/\//, ""); // help for those who can't remember if https should be there this._token = config.token || this._token; this._concurrency = config.concurrency || this._concurrency; this._ws = new WebSocketClient(this._domain, this._concurrency); @@ -108,13 +114,17 @@ export class OADAClient { */ public async get(request: GETRequest): Promise { // === Top-level GET === - const topLevelResponse = await this._ws.request({ - method: "get", - headers: { - authorization: `Bearer ${this._token}`, + const topLevelResponse = await this._ws.request( + { + method: "get", + headers: { + authorization: `Bearer ${this._token}`, + }, + path: request.path, }, - path: request.path, - }); + undefined, // omitting an optional parameter + request.timeout + ); // === Recursive GET === if (request.tree) { @@ -172,7 +182,8 @@ export class OADAClient { for (const change of resp.change) { request.watchCallback(change); } - } + }, + request.timeout ); if (r.status !== 200) { @@ -191,6 +202,7 @@ export class OADAClient { method: "unwatch", requestId: requestId, }); + // TODO: add timeout } // GET resource recursively @@ -317,15 +329,19 @@ export class OADAClient { : "application/json"); // 4) Assume application/json // return PUT response - return this._ws.request({ - method: "put", - headers: { - authorization: `Bearer ${this._token}`, - "content-type": contentType, + return this._ws.request( + { + method: "put", + headers: { + authorization: `Bearer ${this._token}`, + "content-type": contentType, + }, + path: request.path, + data: request.data, }, - path: request.path, - data: request.data, - }); + undefined, // omitting an optional parameter + request.timeout + ); } /** @@ -353,15 +369,19 @@ export class OADAClient { : "application/json"); // 4) Assume application/json // return PUT response - return this._ws.request({ - method: "post", - headers: { - authorization: `Bearer ${this._token}`, - "content-type": contentType, + return this._ws.request( + { + method: "post", + headers: { + authorization: `Bearer ${this._token}`, + "content-type": contentType, + }, + path: request.path, + data, }, - path: request.path, - data, - }); + undefined, // omitting an optional parameter + request.timeout + ); } /** @@ -370,13 +390,17 @@ export class OADAClient { */ public async head(request: HEADRequest): Promise { // return HEAD response - return this._ws.request({ - method: "head", - headers: { - authorization: `Bearer ${this._token}`, + return this._ws.request( + { + method: "head", + headers: { + authorization: `Bearer ${this._token}`, + }, + path: request.path, }, - path: request.path, - }); + undefined, // omitting an optional parameter + request.timeout + ); } /** @@ -385,13 +409,17 @@ export class OADAClient { */ public async delete(request: DELETERequest): Promise { // return HEAD response - return this._ws.request({ - method: "delete", - headers: { - authorization: `Bearer ${this._token}`, + return this._ws.request( + { + method: "delete", + headers: { + authorization: `Bearer ${this._token}`, + }, + path: request.path, }, - path: request.path, - }); + undefined, // omitting an optional parameter + request.timeout + ); } /** Create a new resource. Returns resource ID */ @@ -418,7 +446,7 @@ export class OADAClient { // In tree put to /resources, the top-level "/resources" should // look like it exists, even though oada doesn't allow GET on /resources // directly. - if (path === '/resources') return true; + if (path === "/resources") return true; // Otherwise, send HEAD request for resource const headResponse = await this.head({ diff --git a/lib/websocket.ts b/lib/websocket.ts index 9b5478f..226e0e3 100644 --- a/lib/websocket.ts +++ b/lib/websocket.ts @@ -98,15 +98,17 @@ export class WebSocketClient { public request( req: SocketRequest, - callback?: (response: Readonly) => void + callback?: (response: Readonly) => void, + timeout?: number ): Promise { - return this._q.add(() => this.doRequest(req, callback)); + return this._q.add(() => this.doRequest(req, callback, timeout)); } /** send a request to server */ private async doRequest( req: SocketRequest, - callback?: (response: Readonly) => void + callback?: (response: Readonly) => void, + timeout?: number ): Promise { // Send object to the server. const requestId = req.requestId || ksuid.randomSync().string; @@ -114,8 +116,8 @@ export class WebSocketClient { assertOADASocketRequest(req); (await this._ws).send(JSON.stringify(req)); - // return Promise - return new Promise((resolve, reject) => { + // Promise for request + const request_promise = new Promise((resolve, reject) => { // save request this._requests.set(requestId, { resolve, @@ -125,6 +127,26 @@ export class WebSocketClient { callback, }); }); + + if (timeout && timeout > 0) { + // If timeout is specified, create another promise and use Promise.race + const timeout_promise = new Promise((resolve, reject) => { + setTimeout(() => { + // If the original request is still pending, delete it. + // This is necessary to kill "zombie" requests. + const request = this._requests.get(requestId); + if (request && !request.settled) { + request.reject("Request timeout"); // reject request promise + this._requests.delete(requestId); + } + reject("Request timeout"); // reject timeout promise + }, timeout); + }); + return Promise.race([request_promise, timeout_promise]); + } else { + // If timeout is not specified, simply return the request promise + return request_promise; + } } private _receive(m: WebSocket.MessageEvent) {