Skip to content

Commit

Permalink
Add timeouts to requests
Browse files Browse the repository at this point in the history
  • Loading branch information
tomohiroarakawa committed Sep 4, 2020
1 parent b7c7b5a commit 0e5a962
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 42 deletions.
102 changes: 65 additions & 37 deletions lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,40 @@ export interface GETRequest {
path: string;
tree?: object;
watchCallback?: (response: Readonly<Change>) => void;
timeout?: number;
}

export interface WatchRequest {
path: string;
rev?: string;
watchCallback: (response: Readonly<Change>) => void;
timeout?: number;
}

export interface PUTRequest {
path: string;
data: Json;
contentType?: string;
tree?: object;
timeout?: number;
}

export interface POSTRequest {
path: string;
data: Json;
contentType?: string;
tree?: object;
timeout?: number;
}

export interface HEADRequest {
path: string;
timeout?: number;
}

export interface DELETERequest {
path: string;
timeout?: number;
}

/** Main OADAClient class */
Expand All @@ -57,7 +63,7 @@ export class OADAClient {
private _ws: WebSocketClient;

constructor(config: Config) {
this._domain = config.domain.replace(/^https:\/\//,''); // help for those who can't remember if https should be there
this._domain = config.domain.replace(/^https:\/\//, ""); // help for those who can't remember if https should be there
this._token = config.token || this._token;
this._concurrency = config.concurrency || this._concurrency;
this._ws = new WebSocketClient(this._domain, this._concurrency);
Expand Down Expand Up @@ -108,13 +114,17 @@ export class OADAClient {
*/
public async get(request: GETRequest): Promise<Response> {
// === Top-level GET ===
const topLevelResponse = await this._ws.request({
method: "get",
headers: {
authorization: `Bearer ${this._token}`,
const topLevelResponse = await this._ws.request(
{
method: "get",
headers: {
authorization: `Bearer ${this._token}`,
},
path: request.path,
},
path: request.path,
});
undefined, // omitting an optional parameter
request.timeout
);

// === Recursive GET ===
if (request.tree) {
Expand Down Expand Up @@ -172,7 +182,8 @@ export class OADAClient {
for (const change of resp.change) {
request.watchCallback(change);
}
}
},
request.timeout
);

if (r.status !== 200) {
Expand All @@ -191,6 +202,7 @@ export class OADAClient {
method: "unwatch",
requestId: requestId,
});
// TODO: add timeout
}

// GET resource recursively
Expand Down Expand Up @@ -317,15 +329,19 @@ export class OADAClient {
: "application/json"); // 4) Assume application/json

// return PUT response
return this._ws.request({
method: "put",
headers: {
authorization: `Bearer ${this._token}`,
"content-type": contentType,
return this._ws.request(
{
method: "put",
headers: {
authorization: `Bearer ${this._token}`,
"content-type": contentType,
},
path: request.path,
data: request.data,
},
path: request.path,
data: request.data,
});
undefined, // omitting an optional parameter
request.timeout
);
}

/**
Expand Down Expand Up @@ -353,15 +369,19 @@ export class OADAClient {
: "application/json"); // 4) Assume application/json

// return PUT response
return this._ws.request({
method: "post",
headers: {
authorization: `Bearer ${this._token}`,
"content-type": contentType,
return this._ws.request(
{
method: "post",
headers: {
authorization: `Bearer ${this._token}`,
"content-type": contentType,
},
path: request.path,
data,
},
path: request.path,
data,
});
undefined, // omitting an optional parameter
request.timeout
);
}

/**
Expand All @@ -370,13 +390,17 @@ export class OADAClient {
*/
public async head(request: HEADRequest): Promise<Response> {
// return HEAD response
return this._ws.request({
method: "head",
headers: {
authorization: `Bearer ${this._token}`,
return this._ws.request(
{
method: "head",
headers: {
authorization: `Bearer ${this._token}`,
},
path: request.path,
},
path: request.path,
});
undefined, // omitting an optional parameter
request.timeout
);
}

/**
Expand All @@ -385,13 +409,17 @@ export class OADAClient {
*/
public async delete(request: DELETERequest): Promise<Response> {
// return HEAD response
return this._ws.request({
method: "delete",
headers: {
authorization: `Bearer ${this._token}`,
return this._ws.request(
{
method: "delete",
headers: {
authorization: `Bearer ${this._token}`,
},
path: request.path,
},
path: request.path,
});
undefined, // omitting an optional parameter
request.timeout
);
}

/** Create a new resource. Returns resource ID */
Expand All @@ -418,7 +446,7 @@ export class OADAClient {
// In tree put to /resources, the top-level "/resources" should
// look like it exists, even though oada doesn't allow GET on /resources
// directly.
if (path === '/resources') return true;
if (path === "/resources") return true;

// Otherwise, send HEAD request for resource
const headResponse = await this.head({
Expand Down
32 changes: 27 additions & 5 deletions lib/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,26 @@ export class WebSocketClient {

public request(
req: SocketRequest,
callback?: (response: Readonly<SocketChange>) => void
callback?: (response: Readonly<SocketChange>) => void,
timeout?: number
): Promise<SocketResponse> {
return this._q.add(() => this.doRequest(req, callback));
return this._q.add(() => this.doRequest(req, callback, timeout));
}

/** send a request to server */
private async doRequest(
req: SocketRequest,
callback?: (response: Readonly<SocketChange>) => void
callback?: (response: Readonly<SocketChange>) => void,
timeout?: number
): Promise<SocketResponse> {
// Send object to the server.
const requestId = req.requestId || ksuid.randomSync().string;
req.requestId = requestId;
assertOADASocketRequest(req);
(await this._ws).send(JSON.stringify(req));

// return Promise
return new Promise<SocketResponse>((resolve, reject) => {
// Promise for request
const request_promise = new Promise<SocketResponse>((resolve, reject) => {
// save request
this._requests.set(requestId, {
resolve,
Expand All @@ -125,6 +127,26 @@ export class WebSocketClient {
callback,
});
});

if (timeout && timeout > 0) {
// If timeout is specified, create another promise and use Promise.race
const timeout_promise = new Promise<SocketResponse>((resolve, reject) => {
setTimeout(() => {
// If the original request is still pending, delete it.
// This is necessary to kill "zombie" requests.
const request = this._requests.get(requestId);
if (request && !request.settled) {
request.reject("Request timeout"); // reject request promise
this._requests.delete(requestId);
}
reject("Request timeout"); // reject timeout promise
}, timeout);
});
return Promise.race([request_promise, timeout_promise]);
} else {
// If timeout is not specified, simply return the request promise
return request_promise;
}
}

private _receive(m: WebSocket.MessageEvent) {
Expand Down

0 comments on commit 0e5a962

Please sign in to comment.