Skip to content

Commit

Permalink
fix(core, api-client): Add abort controller to notifications api call…
Browse files Browse the repository at this point in the history
… (WPB-11013) (#6577)

* fix: Add abort controller to notifications api call (WPB-11013)

* Update packages/api-client/src/http/HttpClient.ts

Co-authored-by: Przemysław Jóźwik <przemyslaw.jozwik96@gmail.com>

* Update packages/api-client/src/http/HttpClient.ts

Co-authored-by: Przemysław Jóźwik <przemyslaw.jozwik96@gmail.com>

* Update packages/api-client/src/http/HttpClient.ts

Co-authored-by: Przemysław Jóźwik <przemyslaw.jozwik96@gmail.com>

* Update packages/api-client/src/http/HttpClient.ts

Co-authored-by: Przemysław Jóźwik <przemyslaw.jozwik96@gmail.com>

* Update packages/api-client/src/http/HttpClient.ts

Co-authored-by: Przemysław Jóźwik <przemyslaw.jozwik96@gmail.com>

* fix broken code

* fix tests

* fix broken tests

* remove @types/rimraf

* add temporary skip lib check to tsconfig of core

* change version

* chore: Publish [skip ci]

 - @wireapp/api-client@27.6.0-experimental-abort-controller.1
 - bazinga64@6.3.9
 - @wireapp/certificate-check@0.7.16
 - @wireapp/commons@5.2.11
 - @wireapp/copy-config@2.2.8
 - @wireapp/core@46.4.0-experimental-abort-controller.1
 - @wireapp/license-collector@0.12.9
 - @wireapp/priority-queue@2.1.9
 - @wireapp/promise-queue@2.3.6
 - @wireapp/react-ui-kit@9.23.7
 - @wireapp/store-engine-dexie@2.1.13
 - @wireapp/store-engine-fs@3.1.9
 - @wireapp/store-engine@5.1.9
 - @wireapp/webapp-events@0.24.1

* remove experimental versions

* chore: Publish [skip ci]

 - @wireapp/api-client@27.6.1
 - bazinga64@6.3.10
 - @wireapp/certificate-check@0.7.17
 - @wireapp/commons@5.2.12
 - @wireapp/copy-config@2.2.9
 - @wireapp/core@46.4.1
 - @wireapp/license-collector@0.12.10
 - @wireapp/priority-queue@2.1.10
 - @wireapp/promise-queue@2.3.7
 - @wireapp/react-ui-kit@9.23.8
 - @wireapp/store-engine-dexie@2.1.14
 - @wireapp/store-engine-fs@3.1.10
 - @wireapp/store-engine@5.1.10
 - @wireapp/webapp-events@0.24.2

---------

Co-authored-by: Przemysław Jóźwik <przemyslaw.jozwik96@gmail.com>
Co-authored-by: Otto the Bot <webapp+otto@wire.com>
  • Loading branch information
3 people authored Oct 16, 2024
1 parent 4d5462b commit 3a5a308
Show file tree
Hide file tree
Showing 26 changed files with 73 additions and 147 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"@types/babel__core": "^7",
"@types/jest": "^29.2.0",
"@types/node": "^22.0.0",
"@types/rimraf": "4.0.5",
"@typescript-eslint/eslint-plugin": "7.17.0",
"@typescript-eslint/parser": "7.17.0",
"@wireapp/eslint-config": "workspace:^",
Expand Down
2 changes: 1 addition & 1 deletion packages/api-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@
"watch": "webpack serve --config webpack.browser.js",
"prepare": "yarn dist"
},
"version": "27.6.0",
"version": "27.6.1",
"gitHead": "5339f01fe01ef0871da8c8bc8662fbe9e604754a"
}
6 changes: 3 additions & 3 deletions packages/api-client/src/http/HttpClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ describe('HttpClient', () => {
return Promise.resolve(mockedAccessTokenStore.accessToken!);
};

await client._sendRequest({method: 'GET', baseURL: testConfig.urls.rest, url: AuthAPI.URL.ACCESS});
await client._sendRequest({config: {method: 'GET', baseURL: testConfig.urls.rest, url: AuthAPI.URL.ACCESS}});
});

it('does not retry on 403 invalid token', async () => {
Expand All @@ -73,7 +73,7 @@ describe('HttpClient', () => {
let backendError;

try {
await client._sendRequest({method: 'GET', baseURL: testConfig.urls.rest, url: AuthAPI.URL.ACCESS});
await client._sendRequest({config: {method: 'GET', baseURL: testConfig.urls.rest, url: AuthAPI.URL.ACCESS}});
throw new Error('Should not resolve');
} catch (error) {
backendError = error;
Expand All @@ -97,7 +97,7 @@ describe('HttpClient', () => {
};
let backendError;
try {
await client._sendRequest({method: 'GET', baseURL: testConfig.urls.rest, url: AuthAPI.URL.ACCESS});
await client._sendRequest({config: {method: 'GET', baseURL: testConfig.urls.rest, url: AuthAPI.URL.ACCESS}});
throw new Error('Should not resolve');
} catch (error) {
backendError = error;
Expand Down
26 changes: 19 additions & 7 deletions packages/api-client/src/http/HttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ enum TOPIC {
ON_INVALID_TOKEN = 'HttpClient.TOPIC.ON_INVALID_TOKEN',
}

type SendRequest = {
config: AxiosRequestConfig;
isFirstTry?: boolean;
abortController?: AbortController;
};

export interface HttpClient {
on(event: TOPIC.ON_CONNECTION_STATE_CHANGE, listener: (state: ConnectionState) => void): this;

Expand Down Expand Up @@ -131,7 +137,7 @@ export class HttpClient extends EventEmitter {
}
}

public async _sendRequest<T>(config: AxiosRequestConfig, isFirstTry = true): Promise<AxiosResponse<T>> {
public async _sendRequest<T>({config, isFirstTry = true, abortController}: SendRequest): Promise<AxiosResponse<T>> {
if (this.accessTokenStore.accessToken) {
// TODO: remove tokenAsParam
const {token_type, access_token} = this.accessTokenStore.accessToken;
Expand All @@ -145,6 +151,7 @@ export class HttpClient extends EventEmitter {
try {
const response = await this.client.request<T>({
...config,
signal: abortController?.signal,
// We want to prefix all urls, except the ones with cookies which are attached to unprefixed urls
url: config.withCredentials ? config.url : `${this.versionPrefix}${config.url}`,
maxBodyLength: FILE_SIZE_100_MB,
Expand All @@ -161,7 +168,7 @@ export class HttpClient extends EventEmitter {
config['axios-retry'] = {
retries: 0,
};
return this._sendRequest<T>(config, false);
return this._sendRequest<T>({config, isFirstTry: false, abortController});
};

const hasAccessToken = !!this.accessTokenStore?.accessToken;
Expand Down Expand Up @@ -276,10 +283,11 @@ export class HttpClient extends EventEmitter {
public async sendRequest<T>(
config: AxiosRequestConfig,
isSynchronousRequest: boolean = false,
abortController?: AbortController,
): Promise<AxiosResponse<T>> {
const promise = isSynchronousRequest
? this.requestQueue.add(() => this._sendRequest<T>(config))
: this._sendRequest<T>(config);
? this.requestQueue.add(() => this._sendRequest<T>({config, abortController}))
: this._sendRequest<T>({config, abortController});

try {
return await promise;
Expand All @@ -289,14 +297,18 @@ export class HttpClient extends EventEmitter {
const isTooManyRequestsError = axios.isAxiosError(error) && error.response?.status === 420;

if (isTooManyRequestsError) {
return this.backOffQueue.add(() => this._sendRequest<T>(config));
return this.backOffQueue.add(() => this._sendRequest<T>({config, abortController}));
}

throw error;
}
}

public sendJSON<T>(config: AxiosRequestConfig, isSynchronousRequest: boolean = false): Promise<AxiosResponse<T>> {
public sendJSON<T>(
config: AxiosRequestConfig,
isSynchronousRequest: boolean = false,
abortController?: AbortController,
): Promise<AxiosResponse<T>> {
const shouldGzipData =
process.env.NODE_ENV !== 'test' &&
!!config.data &&
Expand All @@ -312,7 +324,7 @@ export class HttpClient extends EventEmitter {
'Content-Encoding': shouldGzipData ? 'gzip' : config.headers?.['Content-Encoding'],
};

return this.sendRequest<T>(config, isSynchronousRequest);
return this.sendRequest(config, isSynchronousRequest, abortController);
}

public sendXML<T>(config: AxiosRequestConfig): Promise<AxiosResponse<T>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export class NotificationAPI {
clientId?: string,
size: number = NOTIFICATION_SIZE_MAXIMUM,
since?: string,
abortController?: AbortController,
): Promise<NotificationList> {
const config: AxiosRequestConfig = {
method: 'get',
Expand All @@ -76,7 +77,7 @@ export class NotificationAPI {
url: NotificationAPI.URL.NOTIFICATION,
};

const response = await this.client.sendJSON<NotificationList>(config);
const response = await this.client.sendJSON<NotificationList>(config, false, abortController);
return response.data;
}

Expand All @@ -86,7 +87,11 @@ export class NotificationAPI {
* @param lastNotificationId Only return notifications more recent than this
* @see https://staging-nginz-https.zinfra.io/swagger-ui/#!/push/fetchNotifications
*/
public async getAllNotifications(clientId?: string, lastNotificationId?: string): Promise<NotificationsReponse> {
public async getAllNotifications(
clientId?: string,
lastNotificationId?: string,
abortController?: AbortController,
): Promise<NotificationsReponse> {
const getNotificationChunks = async (
notificationList: Notification[],
currentClientId?: string,
Expand All @@ -101,7 +106,12 @@ export class NotificationAPI {
let hasMissedNotifications = false;

try {
payload = await this.getNotifications(currentClientId, NOTIFICATION_SIZE_MAXIMUM, currentNotificationId);
payload = await this.getNotifications(
currentClientId,
NOTIFICATION_SIZE_MAXIMUM,
currentNotificationId,
abortController,
);
} catch (error) {
const isAxiosError = axios.isAxiosError(error);

Expand Down
2 changes: 1 addition & 1 deletion packages/api-client/src/shims/browser/cookie.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ export const retrieveCookie = (response: AxiosResponse<AccessTokenData>): Promis
Promise.resolve(response.data);

export const sendRequestWithCookie = <T>(client: HttpClient, config: AxiosRequestConfig): Promise<AxiosResponse<T>> =>
client._sendRequest<T>(config);
client._sendRequest<T>({config});
2 changes: 1 addition & 1 deletion packages/api-client/src/shims/node/cookie.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,5 @@ export const sendRequestWithCookie = async <T>(
config.headers.set('Cookie', `zuid=${cookie.zuid}`);
config.withCredentials = true;
}
return client._sendRequest<T>(config);
return client._sendRequest<T>({config});
};
16 changes: 3 additions & 13 deletions packages/api-client/src/tcp/WebSocketClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,7 @@ export interface WebSocketClient {
on(event: TOPIC.ON_STATE_CHANGE, listener: (state: WEBSOCKET_STATE) => void): this;
}

export class AbortHandler {
private aborted = false;

abort = () => {
this.aborted = true;
};

isAborted = () => this.aborted;
}

export type OnConnect = (abortHandler: AbortHandler) => Promise<void>;
export type OnConnect = (abortHandler: AbortController) => Promise<void>;

export class WebSocketClient extends EventEmitter {
private clientId?: string;
Expand All @@ -64,7 +54,7 @@ export class WebSocketClient extends EventEmitter {
public client: HttpClient;
private isSocketLocked: boolean;
private bufferedMessages: string[];
private abortHandler?: AbortHandler;
private abortHandler?: AbortController;

public static readonly TOPIC = TOPIC;

Expand Down Expand Up @@ -146,7 +136,7 @@ export class WebSocketClient extends EventEmitter {
this.socket.setOnOpen(() => {
this.onOpen();
if (onConnect) {
this.abortHandler = new AbortHandler();
this.abortHandler = new AbortController();
void onConnect(this.abortHandler);
}
});
Expand Down
3 changes: 1 addition & 2 deletions packages/bazinga64/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
"@types/jest": "^29.2.0",
"@types/libsodium-wrappers-sumo": "0.7.8",
"@types/node": "^22.0.0",
"@types/rimraf": "4.0.5",
"cross-env": "7.0.3",
"jest": "^29.2.1",
"libsodium-wrappers-sumo": "0.7.15",
Expand All @@ -41,5 +40,5 @@
"test": "jest",
"test:coverage": "jest --coverage"
},
"version": "6.3.8"
"version": "6.3.10"
}
3 changes: 1 addition & 2 deletions packages/certificate-check/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"@types/jest": "^29.2.0",
"@types/jsrsasign": "10.5.14",
"@types/node": "^22.0.0",
"@types/rimraf": "4.0.5",
"jest": "^29.2.1",
"rimraf": "6.0.1",
"ts-node": "^10.9.1",
Expand All @@ -29,5 +28,5 @@
"test": "jest",
"test:coverage": "jest --coverage"
},
"version": "0.7.15"
"version": "0.7.17"
}
3 changes: 1 addition & 2 deletions packages/commons/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"@types/fs-extra": "11.0.4",
"@types/jest": "^29.2.0",
"@types/platform": "1.3.6",
"@types/rimraf": "4.0.5",
"jest": "^29.2.1",
"rimraf": "6.0.1",
"typescript": "^5.0.4"
Expand All @@ -36,5 +35,5 @@
"test:coverage": "yarn test",
"test:watch": "yarn jest --watch"
},
"version": "5.2.10"
"version": "5.2.12"
}
3 changes: 1 addition & 2 deletions packages/copy-config/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"@types/copy": "0.3.5",
"@types/fs-extra": "11.0.4",
"@types/jest": "^29.2.0",
"@types/rimraf": "4.0.5",
"jest": "^29.2.1",
"typescript": "^5.0.4"
},
Expand All @@ -35,5 +34,5 @@
"test": "jest",
"test:coverage": "jest --coverage"
},
"version": "2.2.7"
"version": "2.2.9"
}
3 changes: 1 addition & 2 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
"@swc/jest": "^0.2.23",
"@types/jest": "^29.2.0",
"@types/long": "^5.0.0",
"@types/rimraf": "4.0.5",
"@types/tough-cookie": "4.0.5",
"@types/uuid": "9.0.8",
"commander": "12.1.0",
Expand All @@ -62,6 +61,6 @@
"test:coverage": "jest --coverage",
"watch": "tsc --watch"
},
"version": "46.4.0",
"version": "46.4.1",
"gitHead": "5339f01fe01ef0871da8c8bc8662fbe9e604754a"
}
7 changes: 4 additions & 3 deletions packages/core/src/Account.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {SUBCONVERSATION_ID} from '@wireapp/api-client/lib/conversation';
import * as Events from '@wireapp/api-client/lib/event';
import {CONVERSATION_EVENT} from '@wireapp/api-client/lib/event';
import {Notification} from '@wireapp/api-client/lib/notification/';
import {AbortHandler, WebSocketClient} from '@wireapp/api-client/lib/tcp/';
import {WebSocketClient} from '@wireapp/api-client/lib/tcp/';
import {WEBSOCKET_STATE} from '@wireapp/api-client/lib/tcp/ReconnectingWebsocket';
import {QualifiedId} from '@wireapp/api-client/lib/user';
import {TimeInMillis} from '@wireapp/commons/lib/util/TimeUtil';
Expand Down Expand Up @@ -635,7 +635,7 @@ export class Account extends TypedEventEmitter<Events> {
return onMissedNotifications(notificationId);
};

const processNotificationStream = async (abortHandler: AbortHandler) => {
const processNotificationStream = async (abortHandler: AbortController) => {
// Lock websocket in order to buffer any message that arrives while we handle the notification stream
this.apiClient.transport.ws.lock();
pauseMessageSending();
Expand All @@ -653,7 +653,7 @@ export class Account extends TypedEventEmitter<Events> {
);
this.logger.info(`Finished processing notifications ${JSON.stringify(results)}`, results);

if (abortHandler.isAborted()) {
if (abortHandler.signal.aborted) {
this.logger.warn('Ending connection process as websocket was closed');
return;
}
Expand All @@ -666,6 +666,7 @@ export class Account extends TypedEventEmitter<Events> {
resumeMessageSending();
resumeRejoiningMLSConversations();
};

this.apiClient.connect(processNotificationStream);

return () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import {APIClient} from '@wireapp/api-client';
export class NotificationBackendRepository {
constructor(private readonly apiClient: APIClient) {}

public async getAllNotifications(clientId?: string, lastNotificationId?: string) {
return this.apiClient.api.notification.getAllNotifications(clientId, lastNotificationId);
public async getAllNotifications(clientId?: string, lastNotificationId?: string, abortController?: AbortController) {
return this.apiClient.api.notification.getAllNotifications(clientId, lastNotificationId, abortController);
}

public getLastNotification(clientId?: string): Promise<Notification> {
Expand Down
11 changes: 5 additions & 6 deletions packages/core/src/notification/NotificationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import {BackendEvent} from '@wireapp/api-client/lib/event';
import {Notification} from '@wireapp/api-client/lib/notification/';
import {AbortHandler} from '@wireapp/api-client/lib/tcp';
import logdown from 'logdown';

import {APIClient} from '@wireapp/api-client';
Expand Down Expand Up @@ -90,9 +89,9 @@ export class NotificationService extends TypedEventEmitter<Events> {
this.database = new NotificationDatabaseRepository(storeEngine);
}

private async getAllNotifications(since: string) {
private async getAllNotifications(since: string, abortController: AbortController) {
const clientId = this.apiClient.clientId;
return this.backend.getAllNotifications(clientId, since);
return this.backend.getAllNotifications(clientId, since, abortController);
}

/** Should only be called with a completely new client. */
Expand Down Expand Up @@ -140,10 +139,10 @@ export class NotificationService extends TypedEventEmitter<Events> {
public async processNotificationStream(
notificationHandler: NotificationHandler,
onMissedNotifications: (notificationId: string) => void,
abortHandler: AbortHandler,
abortHandler: AbortController,
): Promise<{total: number; error: number; success: number}> {
const lastNotificationId = await this.database.getLastNotificationId();
const {notifications, missedNotification} = await this.getAllNotifications(lastNotificationId);
const {notifications, missedNotification} = await this.getAllNotifications(lastNotificationId, abortHandler);
if (missedNotification) {
onMissedNotifications(missedNotification);
}
Expand All @@ -155,7 +154,7 @@ export class NotificationService extends TypedEventEmitter<Events> {
: `No notification to process from the stream`;
this.logger.log(logMessage);
for (const [index, notification] of notifications.entries()) {
if (abortHandler.isAborted()) {
if (abortHandler.signal.aborted) {
/* Stop handling notifications if the websocket has been disconnected.
* Upon reconnecting we are going to restart handling the notification stream for where we left of
*/
Expand Down
3 changes: 2 additions & 1 deletion packages/core/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"extends": "../../tsconfig.json",
"compilerOptions": {
"esModuleInterop": true,
"outDir": "lib"
"outDir": "lib",
"skipLibCheck": true
},
"exclude": ["node_modules", "src/test", "**.test.ts", "src/demo"],
"include": ["src"]
Expand Down
Loading

0 comments on commit 3a5a308

Please sign in to comment.