From 471265334322a606f41413fb2f711998f1dff525 Mon Sep 17 00:00:00 2001 From: AkiraFukushima Date: Sun, 13 Oct 2019 18:01:03 +0900 Subject: [PATCH 1/4] refs #67 Auto reconnect when does not receive any messages for a while --- example/typescript/package.json | 7 ++- example/typescript/web_socket.ts | 18 ++++--- example/typescript/yarn.lock | 81 +++++++++++++++++++++++++++++++- package.json | 1 + src/web_socket.ts | 24 ++++++++++ yarn.lock | 5 ++ 6 files changed, 126 insertions(+), 10 deletions(-) diff --git a/example/typescript/package.json b/example/typescript/package.json index 57a1e0d5c..bb503ca22 100644 --- a/example/typescript/package.json +++ b/example/typescript/package.json @@ -6,7 +6,10 @@ "author": "h3poteto", "license": "MIT", "dependencies": { - "typescript": "^3.4.5", - "megalodon": "file:../../" + "megalodon": "file:../../", + "typescript": "^3.4.5" + }, + "devDependencies": { + "log4js": "^5.2.2" } } diff --git a/example/typescript/web_socket.ts b/example/typescript/web_socket.ts index 2e9dc81f3..9210d88ba 100644 --- a/example/typescript/web_socket.ts +++ b/example/typescript/web_socket.ts @@ -1,4 +1,5 @@ import Mastodon, { Status, Notification, WebSocket } from 'megalodon' +import log4js from 'log4js' declare var process: { env: { @@ -13,20 +14,23 @@ const access_token: string = process.env.PLEROMA_ACCESS_TOKEN const client = new Mastodon(access_token, BASE_URL + '/api/v1') const stream: WebSocket = client.socket('/streaming', 'user') + +const logger = log4js.getLogger() +logger.level = 'debug' stream.on('connect', () => { - console.log('connect') + logger.debug('connect') }) stream.on('update', (status: Status) => { - console.log(status) + logger.debug(status) }) stream.on('notification', (notification: Notification) => { - console.log(notification) + logger.debug(notification) }) stream.on('delete', (id: number) => { - console.log(id) + logger.debug(id) }) stream.on('error', (err: Error) => { @@ -34,13 +38,13 @@ stream.on('error', (err: Error) => { }) stream.on('heartbeat', () => { - console.log('thump.') + logger.debug('thump.') }) stream.on('close', () => { - console.log('close') + logger.debug('close') }) stream.on('parser-error', (err: Error) => { - console.error(err) + logger.error(err) }) diff --git a/example/typescript/yarn.lock b/example/typescript/yarn.lock index a7e19782b..f76cbfa0e 100644 --- a/example/typescript/yarn.lock +++ b/example/typescript/yarn.lock @@ -141,6 +141,11 @@ dashdash@^1.12.0: dependencies: assert-plus "^1.0.0" +date-format@^2.1.0: + version "2.1.0" + resolved "https://registry.yarnpkg.com/date-format/-/date-format-2.1.0.tgz#31d5b5ea211cf5fd764cd38baf9d033df7e125cf" + integrity sha512-bYQuGLeFxhkxNOF3rcMtiZxvCBAquGzZm6oWA1oZ0g2THUzivaRhv8uOhdr19LmoobSOLoIAxeUK2RdbM8IFTA== + debug@=3.1.0: version "3.1.0" resolved "https://registry.yarnpkg.com/debug/-/debug-3.1.0.tgz#5bb5a0672628b64149566ba16819e61518c67261" @@ -148,6 +153,13 @@ debug@=3.1.0: dependencies: ms "2.0.0" +debug@^4.1.1: + version "4.1.1" + resolved "https://registry.yarnpkg.com/debug/-/debug-4.1.1.tgz#3b72260255109c6b589cee050f1d516139664791" + integrity sha512-pYAIzeRo8J6KPEaJ0VWOh5Pzkbw/RetuzehGM7QRRX5he4fPHx2rdKMB256ehJCkX+XRQm16eZLqLNS8RSZXZw== + dependencies: + ms "^2.1.1" + delayed-stream@~1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/delayed-stream/-/delayed-stream-1.0.0.tgz#df3ae199acadfb7d440aaae0b29e2272b24ec619" @@ -186,6 +198,11 @@ fast-json-stable-stringify@^2.0.0: resolved "https://registry.yarnpkg.com/fast-json-stable-stringify/-/fast-json-stable-stringify-2.0.0.tgz#d5142c0caee6b1189f87d3a76111064f86c8bbf2" integrity sha1-1RQsDK7msRifh9OnYREGT4bIu/I= +flatted@^2.0.1: + version "2.0.1" + resolved "https://registry.yarnpkg.com/flatted/-/flatted-2.0.1.tgz#69e57caa8f0eacbc281d2e2cb458d46fdb449e08" + integrity sha512-a1hQMktqW9Nmqr5aktAux3JMNqaucxGcjtjWnZLHX7yyPCmlSV3M54nGYbqT8K+0GhF3NBgmJCc3ma+WOgX8Jg== + follow-redirects@1.5.10: version "1.5.10" resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.5.10.tgz#7b7a9f9aea2fdff36786a94ff643ed07f4ff5e2a" @@ -207,6 +224,15 @@ form-data@~2.3.2: combined-stream "1.0.6" mime-types "^2.1.12" +fs-extra@^8.1.0: + version "8.1.0" + resolved "https://registry.yarnpkg.com/fs-extra/-/fs-extra-8.1.0.tgz#49d43c45a88cd9677668cb7be1b46efdb8d2e1c0" + integrity sha512-yhlQgA6mnOJUKOsRUFsgJdQCvkKhcz8tlZG5HBQfReYZy46OwLcY+Zia0mtdHsOo9y/hP+CxMN0TU9QxoOtG4g== + dependencies: + graceful-fs "^4.2.0" + jsonfile "^4.0.0" + universalify "^0.1.0" + getpass@^0.1.1: version "0.1.7" resolved "https://registry.yarnpkg.com/getpass/-/getpass-0.1.7.tgz#5eff8e3e684d569ae4cb2b1282604e8ba62149fa" @@ -214,6 +240,11 @@ getpass@^0.1.1: dependencies: assert-plus "^1.0.0" +graceful-fs@^4.1.6, graceful-fs@^4.2.0: + version "4.2.2" + resolved "https://registry.yarnpkg.com/graceful-fs/-/graceful-fs-4.2.2.tgz#6f0952605d0140c1cfdb138ed005775b92d67b02" + integrity sha512-IItsdsea19BoLC7ELy13q1iJFNmd7ofZH5+X/pJr90/nRoPEX0DJo1dHDbgtYWOhJhcCgMDTOw84RZ72q6lB+Q== + har-schema@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/har-schema/-/har-schema-2.0.0.tgz#a94c2224ebcac04782a0d9035521f24735b7ec92" @@ -271,6 +302,13 @@ json-stringify-safe@~5.0.1: resolved "https://registry.yarnpkg.com/json-stringify-safe/-/json-stringify-safe-5.0.1.tgz#1296a2d58fd45f19a0f6ce01d65701e2c735b6eb" integrity sha1-Epai1Y/UXxmg9s4B1lcB4sc1tus= +jsonfile@^4.0.0: + version "4.0.0" + resolved "https://registry.yarnpkg.com/jsonfile/-/jsonfile-4.0.0.tgz#8771aae0799b64076b76640fca058f9c10e33ecb" + integrity sha1-h3Gq4HmbZAdrdmQPygWPnBDjPss= + optionalDependencies: + graceful-fs "^4.1.6" + jsprim@^1.2.2: version "1.4.1" resolved "https://registry.yarnpkg.com/jsprim/-/jsprim-1.4.1.tgz#313e66bc1e5cc06e438bc1b7499c2e5c56acb6a2" @@ -281,13 +319,25 @@ jsprim@^1.2.2: json-schema "0.2.3" verror "1.10.0" +log4js@^5.2.2: + version "5.2.2" + resolved "https://registry.yarnpkg.com/log4js/-/log4js-5.2.2.tgz#35b750416f22913dd6905d49335752d9b240f47c" + integrity sha512-Iw4ZjbYTMxSTh1jnXM2brpRIr+psM8/nkUiOHu2gFfd0saoX2NdRB69buMWJJuoIJfU/eTzqKy9rVBr0zQwSGQ== + dependencies: + date-format "^2.1.0" + debug "^4.1.1" + flatted "^2.0.1" + rfdc "^1.1.4" + streamroller "^2.2.2" + "megalodon@file:../..": - version "0.9.0" + version "1.0.3" dependencies: "@types/oauth" "^0.9.0" "@types/request" "^2.47.0" "@types/ws" "^6.0.1" axios "^0.18.1" + moment "^2.24.0" oauth "^0.9.15" request "^2.87.0" typescript "^3.4.5" @@ -305,11 +355,21 @@ mime-types@^2.1.12, mime-types@~2.1.19: dependencies: mime-db "~1.36.0" +moment@^2.24.0: + version "2.24.0" + resolved "https://registry.yarnpkg.com/moment/-/moment-2.24.0.tgz#0d055d53f5052aa653c9f6eb68bb5d12bf5c2b5b" + integrity sha512-bV7f+6l2QigeBBZSM/6yTNq4P2fNpSWj/0e7jQcy87A8e7o2nAfP/34/2ky5Vw4B9S446EtIhodAzkFCcR4dQg== + ms@2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/ms/-/ms-2.0.0.tgz#5608aeadfc00be6c2901df5f9861788de0d597c8" integrity sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g= +ms@^2.1.1: + version "2.1.2" + resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.2.tgz#d09d1f357b443f493382a8eb3ccd183872ae6009" + integrity sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w== + oauth-sign@~0.9.0: version "0.9.0" resolved "https://registry.yarnpkg.com/oauth-sign/-/oauth-sign-0.9.0.tgz#47a7b016baa68b5fa0ecf3dee08a85c679ac6455" @@ -366,6 +426,11 @@ request@^2.87.0: tunnel-agent "^0.6.0" uuid "^3.3.2" +rfdc@^1.1.4: + version "1.1.4" + resolved "https://registry.yarnpkg.com/rfdc/-/rfdc-1.1.4.tgz#ba72cc1367a0ccd9cf81a870b3b58bd3ad07f8c2" + integrity sha512-5C9HXdzK8EAqN7JDif30jqsBzavB7wLpaubisuQIGHWf2gUXSpzy6ArX/+Da8RjFpagWsCn+pIgxTMAmKw9Zug== + safe-buffer@^5.0.1, safe-buffer@^5.1.2: version "5.1.2" resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d" @@ -392,6 +457,15 @@ sshpk@^1.7.0: jsbn "~0.1.0" tweetnacl "~0.14.0" +streamroller@^2.2.2: + version "2.2.2" + resolved "https://registry.yarnpkg.com/streamroller/-/streamroller-2.2.2.tgz#26bea90567f80d8438d251e5603643fe617b7090" + integrity sha512-wizmZ8NNiqeNIYHv8MqBBbSIeNNcsXyoKxbGYBpiFHCjTGlNHqGNGElwrSM3Awg+0j6U96/eFrSnjW+h3aRo0Q== + dependencies: + date-format "^2.1.0" + debug "^4.1.1" + fs-extra "^8.1.0" + tough-cookie@~2.4.3: version "2.4.3" resolved "https://registry.yarnpkg.com/tough-cookie/-/tough-cookie-2.4.3.tgz#53f36da3f47783b0925afa06ff9f3b165280f781" @@ -417,6 +491,11 @@ typescript@^3.4.5: resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.5.3.tgz#c830f657f93f1ea846819e929092f5fe5983e977" integrity sha512-ACzBtm/PhXBDId6a6sDJfroT2pOWt/oOnk4/dElG5G33ZL776N3Y6/6bKZJBFpd+b05F3Ct9qDjMeJmRWtE2/g== +universalify@^0.1.0: + version "0.1.2" + resolved "https://registry.yarnpkg.com/universalify/-/universalify-0.1.2.tgz#b646f69be3942dabcecc9d6639c80dc105efaa66" + integrity sha512-rBJeI5CXAlmy1pV+617WB9J63U6XcazHHF2f2dbJix4XzpUF0RS3Zbj0FGIOCAva5P/d/GBOYaACQ1w+0azUkg== + uuid@^3.3.2: version "3.3.2" resolved "https://registry.yarnpkg.com/uuid/-/uuid-3.3.2.tgz#1b4af4955eb3077c501c23872fc6513811587131" diff --git a/package.json b/package.json index eb65c585e..be73e66e0 100644 --- a/package.json +++ b/package.json @@ -54,6 +54,7 @@ "@types/request": "^2.47.0", "@types/ws": "^6.0.1", "axios": "^0.18.1", + "moment": "^2.24.0", "oauth": "^0.9.15", "request": "^2.87.0", "typescript": "^3.4.5", diff --git a/src/web_socket.ts b/src/web_socket.ts index 4b6a23465..f6960f838 100644 --- a/src/web_socket.ts +++ b/src/web_socket.ts @@ -1,4 +1,5 @@ import WS from 'ws' +import moment, { Moment } from 'moment' import { EventEmitter } from 'events' import { Status } from './entities/status' import { Notification } from './entities/notification' @@ -20,6 +21,8 @@ export default class WebSocket extends EventEmitter { private _reconnectCurrentAttempts: number private _connectionClosed: boolean private _client: WS | null + private _messageReceivedTimestamp: Moment + private _heartbeatTimeout: number = 60000 /** * @param url Full url of websocket: e.g. https://pleroma.io/api/v1/streaming @@ -41,6 +44,7 @@ export default class WebSocket extends EventEmitter { this._reconnectCurrentAttempts = 0 this._connectionClosed = false this._client = null + this._messageReceivedTimestamp = moment() } /** @@ -104,6 +108,7 @@ export default class WebSocket extends EventEmitter { // Call connect methods console.log('Reconnecting') this._client = this._connect(this.url, this.stream, this._accessToken, this.headers) + this._clearBinding() this._bindSocket(this._client) } }, this._reconnectInterval) @@ -132,6 +137,18 @@ export default class WebSocket extends EventEmitter { return cli } + /** + * Clear binding event for web socket client. + */ + private _clearBinding() { + if (this._client) { + this._client.removeAllListeners('close') + this._client.removeAllListeners('open') + this._client.removeAllListeners('message') + this._client.removeAllListeners('error') + } + } + /** * Bind event for web socket client. * @param client A WebSocket instance. @@ -154,6 +171,13 @@ export default class WebSocket extends EventEmitter { }) client.on('message', (data: WS.Data) => { this.parser.parse(data) + this._messageReceivedTimestamp = moment() + setTimeout(() => { + const now: Moment = moment() + if (now.diff(this._messageReceivedTimestamp) > this._heartbeatTimeout - 1000) { + this._reconnect() + } + }, this._heartbeatTimeout) }) client.on('error', (err: Error) => { this.emit('error', err) diff --git a/yarn.lock b/yarn.lock index dc1545a6a..5dac188ec 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2882,6 +2882,11 @@ mkdirp@0.x, mkdirp@^0.5.0, mkdirp@^0.5.1: dependencies: minimist "0.0.8" +moment@^2.24.0: + version "2.24.0" + resolved "https://registry.yarnpkg.com/moment/-/moment-2.24.0.tgz#0d055d53f5052aa653c9f6eb68bb5d12bf5c2b5b" + integrity sha512-bV7f+6l2QigeBBZSM/6yTNq4P2fNpSWj/0e7jQcy87A8e7o2nAfP/34/2ky5Vw4B9S446EtIhodAzkFCcR4dQg== + ms@2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/ms/-/ms-2.0.0.tgz#5608aeadfc00be6c2901df5f9861788de0d597c8" From 2afe4f10dd898949ad3d018f1fd60613f204db24 Mon Sep 17 00:00:00 2001 From: AkiraFukushima Date: Sun, 13 Oct 2019 22:11:06 +0900 Subject: [PATCH 2/4] refs #67 Check server alive using ping/pong message on websocket --- example/typescript/streaming.ts | 2 +- example/typescript/web_socket.ts | 5 ++++ src/web_socket.ts | 39 ++++++++++++++++++++++++-------- 3 files changed, 35 insertions(+), 11 deletions(-) diff --git a/example/typescript/streaming.ts b/example/typescript/streaming.ts index a912329f4..c8aebffe8 100644 --- a/example/typescript/streaming.ts +++ b/example/typescript/streaming.ts @@ -12,7 +12,7 @@ const access_token: string = process.env.MASTODON_ACCESS_TOKEN const client = new Mastodon(access_token, BASE_URL + '/api/v1') -const stream: StreamListener = client.stream('/streaming/public') +const stream: StreamListener = client.stream('/streaming/user') stream.on('connect', _ => { console.log('connect') }) diff --git a/example/typescript/web_socket.ts b/example/typescript/web_socket.ts index 9210d88ba..1f97c2d7c 100644 --- a/example/typescript/web_socket.ts +++ b/example/typescript/web_socket.ts @@ -4,6 +4,7 @@ import log4js from 'log4js' declare var process: { env: { PLEROMA_ACCESS_TOKEN: string + MASTODON_ACCESS_TOKEN: string } } @@ -21,6 +22,10 @@ stream.on('connect', () => { logger.debug('connect') }) +stream.on('pong', () => { + logger.debug('pong') +}) + stream.on('update', (status: Status) => { logger.debug(status) }) diff --git a/src/web_socket.ts b/src/web_socket.ts index f6960f838..feace76ef 100644 --- a/src/web_socket.ts +++ b/src/web_socket.ts @@ -21,8 +21,9 @@ export default class WebSocket extends EventEmitter { private _reconnectCurrentAttempts: number private _connectionClosed: boolean private _client: WS | null - private _messageReceivedTimestamp: Moment - private _heartbeatTimeout: number = 60000 + private _pongReceivedTimestamp: Moment + private _heartbeatInterval: number = 60000 + private _pongWaiting: boolean = false /** * @param url Full url of websocket: e.g. https://pleroma.io/api/v1/streaming @@ -44,7 +45,7 @@ export default class WebSocket extends EventEmitter { this._reconnectCurrentAttempts = 0 this._connectionClosed = false this._client = null - this._messageReceivedTimestamp = moment() + this._pongReceivedTimestamp = moment() } /** @@ -166,18 +167,20 @@ export default class WebSocket extends EventEmitter { } } }) + client.on('pong', () => { + this._pongWaiting = false + this.emit('pong', {}) + this._pongReceivedTimestamp = moment() + setTimeout(() => this._checkAlive(this._pongReceivedTimestamp), this._heartbeatInterval) + }) client.on('open', () => { this.emit('connect', {}) + setTimeout(() => { + client.ping('') + }, 10000) }) client.on('message', (data: WS.Data) => { this.parser.parse(data) - this._messageReceivedTimestamp = moment() - setTimeout(() => { - const now: Moment = moment() - if (now.diff(this._messageReceivedTimestamp) > this._heartbeatTimeout - 1000) { - this._reconnect() - } - }, this._heartbeatTimeout) }) client.on('error', (err: Error) => { this.emit('error', err) @@ -207,6 +210,22 @@ export default class WebSocket extends EventEmitter { this.emit('heartbeat', 'heartbeat') }) } + + private _checkAlive(timestamp: Moment) { + const now: Moment = moment() + if (now.diff(timestamp) > this._heartbeatInterval - 1000) { + if (this._client) { + this._pongWaiting = true + this._client.ping('') + } + setTimeout(() => { + if (this._pongWaiting) { + this._pongWaiting = false + this._reconnect() + } + }, 10000) + } + } } /** From 5aea4b878ec9febc0d2430e9c9b50d4999be8744 Mon Sep 17 00:00:00 2001 From: AkiraFukushima Date: Sun, 13 Oct 2019 22:24:56 +0900 Subject: [PATCH 3/4] refs #67 Add method comments --- src/web_socket.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/web_socket.ts b/src/web_socket.ts index feace76ef..c06c2e817 100644 --- a/src/web_socket.ts +++ b/src/web_socket.ts @@ -175,6 +175,7 @@ export default class WebSocket extends EventEmitter { }) client.on('open', () => { this.emit('connect', {}) + // Call first ping event. setTimeout(() => { client.ping('') }, 10000) @@ -211,8 +212,13 @@ export default class WebSocket extends EventEmitter { }) } + /** + * Call ping and wait to pong. + */ private _checkAlive(timestamp: Moment) { const now: Moment = moment() + // Block multiple calling, if multiple pong event occur. + // It the duration is less than interval, through ping. if (now.diff(timestamp) > this._heartbeatInterval - 1000) { if (this._client) { this._pongWaiting = true From 340575b1e7011a584a68b433f9a2e3813ed61071 Mon Sep 17 00:00:00 2001 From: AkiraFukushima Date: Sun, 13 Oct 2019 23:51:26 +0900 Subject: [PATCH 4/4] refs #67 Skip ping calling when the connection is closed --- src/web_socket.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/web_socket.ts b/src/web_socket.ts index c06c2e817..33cac9bce 100644 --- a/src/web_socket.ts +++ b/src/web_socket.ts @@ -144,6 +144,7 @@ export default class WebSocket extends EventEmitter { private _clearBinding() { if (this._client) { this._client.removeAllListeners('close') + this._client.removeAllListeners('pong') this._client.removeAllListeners('open') this._client.removeAllListeners('message') this._client.removeAllListeners('error') @@ -171,6 +172,7 @@ export default class WebSocket extends EventEmitter { this._pongWaiting = false this.emit('pong', {}) this._pongReceivedTimestamp = moment() + // It is required to anonymous function since get this scope in checkAlive. setTimeout(() => this._checkAlive(this._pongReceivedTimestamp), this._heartbeatInterval) }) client.on('open', () => { @@ -219,7 +221,7 @@ export default class WebSocket extends EventEmitter { const now: Moment = moment() // Block multiple calling, if multiple pong event occur. // It the duration is less than interval, through ping. - if (now.diff(timestamp) > this._heartbeatInterval - 1000) { + if (now.diff(timestamp) > this._heartbeatInterval - 1000 && !this._connectionClosed) { if (this._client) { this._pongWaiting = true this._client.ping('')