From 982656c82f70b4ccde95819e12f7a44bbdd2407a Mon Sep 17 00:00:00 2001 From: AkiraFukushima Date: Mon, 9 Dec 2019 22:29:08 +0900 Subject: [PATCH] refs #102 Replcae EventStream to StreamListener --- example/browser/webpack.config.js | 3 +- example/browser/yarn.lock | 7 +- example/javascript/streaming.js | 2 +- example/typescript/proxy_streaming.ts | 4 +- example/typescript/streaming.ts | 4 +- src/event_stream.ts | 190 ------------------ src/index.ts | 5 +- src/mastodon.ts | 10 +- src/stream_listener.ts | 272 +++++++------------------- 9 files changed, 88 insertions(+), 409 deletions(-) delete mode 100644 src/event_stream.ts diff --git a/example/browser/webpack.config.js b/example/browser/webpack.config.js index 0487602b1..87e1a5f53 100644 --- a/example/browser/webpack.config.js +++ b/example/browser/webpack.config.js @@ -21,8 +21,9 @@ module.exports = { ] }, plugins: [], + // https-proxy-agent and socks-proxy-agent is node library, so can't compile for browser. + // So replace net, tls and dns which are node libraries. node: { - fs: 'empty', net: 'empty', tls: 'empty', dns: 'empty' diff --git a/example/browser/yarn.lock b/example/browser/yarn.lock index e1b3a2ad5..e0c3c2fab 100644 --- a/example/browser/yarn.lock +++ b/example/browser/yarn.lock @@ -1972,7 +1972,7 @@ md5.js@^1.3.4: oauth "^0.9.15" request "^2.87.0" socks-proxy-agent h3poteto/node-socks-proxy-agent#master - typescript "^3.4.5" + typescript "3.5.3" ws "^7.0.1" mem@^4.0.0: @@ -3284,11 +3284,6 @@ typescript@3.5.3: resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.5.3.tgz#c830f657f93f1ea846819e929092f5fe5983e977" integrity sha512-ACzBtm/PhXBDId6a6sDJfroT2pOWt/oOnk4/dElG5G33ZL776N3Y6/6bKZJBFpd+b05F3Ct9qDjMeJmRWtE2/g== -typescript@^3.4.5: - version "3.7.2" - resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.7.2.tgz#27e489b95fa5909445e9fef5ee48d81697ad18fb" - integrity sha512-ml7V7JfiN2Xwvcer+XAf2csGO1bPBdRbFCkYBczNZggrBZ9c7G3riSUeJmqEU5uOtXNPMhE3n+R4FA/3YOAWOQ== - union-value@^1.0.0: version "1.0.1" resolved "https://registry.yarnpkg.com/union-value/-/union-value-1.0.1.tgz#0b6fe7b835aecda61c6ea4d4f02c14221e109847" diff --git a/example/javascript/streaming.js b/example/javascript/streaming.js index ccb3e44a8..0133d0662 100644 --- a/example/javascript/streaming.js +++ b/example/javascript/streaming.js @@ -7,7 +7,7 @@ const access_token = process.env.MASTODON_ACCESS_TOKEN const client = new Mastodon(access_token, BASE_URL + '/api/v1') const stream = client.stream('/streaming/public') -stream.on('connect', event => { +stream.on('connect', _ => { console.log('connect') }) diff --git a/example/typescript/proxy_streaming.ts b/example/typescript/proxy_streaming.ts index 7f61262a3..ea526b616 100644 --- a/example/typescript/proxy_streaming.ts +++ b/example/typescript/proxy_streaming.ts @@ -1,4 +1,4 @@ -import Mastodon, { Status, Notification, EventStream, ProxyConfig } from 'megalodon' +import Mastodon, { Status, Notification, StreamListener, ProxyConfig } from 'megalodon' declare var process: { env: { @@ -21,7 +21,7 @@ const proxy: ProxyConfig = { const client = new Mastodon(access_token, BASE_URL + '/api/v1', 'megalodon', proxy) -const stream: EventStream = client.stream('/streaming/public') +const stream: StreamListener = client.stream('/streaming/public') stream.on('connect', _ => { console.log('connect') }) diff --git a/example/typescript/streaming.ts b/example/typescript/streaming.ts index f09e4ccd5..1245d193c 100644 --- a/example/typescript/streaming.ts +++ b/example/typescript/streaming.ts @@ -1,4 +1,4 @@ -import Mastodon, { Status, Notification, EventStream } from 'megalodon' +import Mastodon, { Status, Notification, StreamListener } from 'megalodon' declare var process: { env: { @@ -12,7 +12,7 @@ const access_token: string = process.env.MASTODON_ACCESS_TOKEN const client = new Mastodon(access_token, BASE_URL + '/api/v1') -const stream: EventStream = client.stream('/streaming/public') +const stream: StreamListener = client.stream('/streaming/public') stream.on('connect', _ => { console.log('connect') }) diff --git a/src/event_stream.ts b/src/event_stream.ts deleted file mode 100644 index dd6dfe52f..000000000 --- a/src/event_stream.ts +++ /dev/null @@ -1,190 +0,0 @@ -import { EventEmitter } from 'events' -import axios, { CancelTokenSource, AxiosRequestConfig } from 'axios' -import httpAdapter from 'axios/lib/adapters/http' -import Parser from './parser' -import { Status } from './entities/status' -import { Notification } from './entities/notification' -import { Conversation } from './entities/conversation' -import proxyAgent, { ProxyConfig } from './proxy_config' - -const STATUS_CODES_TO_ABORT_ON: Array = [400, 401, 403, 404, 406, 410, 422] - -class EventStreamError extends Error { - public statusCode: number - public message: string - - constructor(statusCode: number, message: string) { - super() - this.statusCode = statusCode - this.message = message - } -} - -// TODO: proxy config - -/** - * EventStream - * Listener of text/event-stream. I receives data, and parse when streaming. - */ -class EventStream extends EventEmitter { - public url: string - public headers: object - public parser: Parser - public proxyConfig: ProxyConfig | false = false - private _connectionClosed: boolean = false - private _reconnectInterval: number - private _reconnectMaxAttempts: number = Infinity - private _reconnectCurrentAttempts: number = 0 - private _cancelSource: CancelTokenSource - - /** - * @param url Full url of streaming: e.g. https://mastodon.social/api/v1/streaming/user - * @param headers headers object of streaming request - * @param reconnectInterval reconnection interval[ms] when the connection is unexpectedly closed - */ - constructor(url: string, headers: object, proxyConfig: ProxyConfig | false = false, reconnectInterval: number) { - super() - this.url = url - this.headers = headers - this.parser = new Parser() - this.proxyConfig = proxyConfig - this._reconnectInterval = reconnectInterval - this._cancelSource = axios.CancelToken.source() - } - - /** - * Start streaming connection. - */ - public start() { - this._setupParser() - this._connect() - } - - /** - * Request the url and get response, and start streaming. - */ - private _connect() { - let options: AxiosRequestConfig = { - responseType: 'stream', - adapter: httpAdapter, - cancelToken: this._cancelSource.token - } - if (this.proxyConfig) { - options = Object.assign(options, { - httpsAgent: proxyAgent(this.proxyConfig) - }) - } - axios.get(this.url, options).then(response => { - const stream = response.data - if (response.headers['content-type'] !== 'text/event-stream') { - this.emit('no-event-stream', 'no event') - } - - // Response status is error - if (STATUS_CODES_TO_ABORT_ON.includes(response.status)) { - stream.on('data', (chunk: any) => { - let body = chunk.toString() - try { - body = JSON.parse(body) - } catch (jsonDecodeError) { - // if non-JSON text was returned, we'll just attach it to the error as-is. - } - - const error: EventStreamError = new EventStreamError(response.status, body) - this.emit('error', error) - this.stop() - }) - } else { - stream.on('data', (chunk: any) => { - this.parser.parse(chunk.toString()) - }) - stream.on('error', (err: Error) => { - this.emit('error', err) - }) - } - stream.on('end', (err: Error | undefined | null) => { - if (err) { - console.log(`Closed connection with ${err.message}`) - if (!this._connectionClosed) { - this._reconnect() - } - } else { - this.emit('close', {}) - } - }) - this.emit('connect', stream) - }) - } - - /** - * Reconnects to the same endpoint. - */ - private _reconnect() { - setTimeout(() => { - if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) { - this._reconnectCurrentAttempts++ - this._cancelSource.cancel() - console.log('Reconnecting') - this._connect() - } - }, this._reconnectInterval) - } - - /** - * Stop the connection. - */ - public stop() { - this._connectionClosed = true - this._resetConnection() - this._resetRetryParams() - } - - /** - * Resets connection and remove listeners. - */ - private _resetConnection() { - // axios does not provide stop method. - // So we have to cancel request to stop connection. - this._cancelSource.cancel() - - if (this.parser) { - this.parser.removeAllListeners() - } - } - - /** - * Resets parameters. - */ - private _resetRetryParams() { - this._reconnectCurrentAttempts = 0 - } - - /** - * Set up parser when receive some data. - **/ - private _setupParser() { - this.parser.on('update', (status: Status) => { - this.emit('update', status) - }) - this.parser.on('notification', (notification: Notification) => { - this.emit('notification', notification) - }) - this.parser.on('conversation', (conversation: Conversation) => { - this.emit('conversation', conversation) - }) - this.parser.on('delete', (id: string) => { - this.emit('delete', id) - }) - this.parser.on('error', (err: Error) => { - this.emit('parser-error', err) - }) - this.parser.on('connection-limit-exceeded', (err: Error) => { - this.emit('error', err) - }) - this.parser.on('heartbeat', _ => { - this.emit('heartbeat', 'heartbeat') - }) - } -} - -export default EventStream diff --git a/src/index.ts b/src/index.ts index 99afba490..6502ffd80 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,9 +1,8 @@ import Mastodon, { MegalodonInstance } from './mastodon' -import StreamListener from './stream_listener' +import StreamListener, { StreamListenerError } from './stream_listener' import WebSocket from './web_socket' import Response from './response' import OAuth from './oauth' -import EventStream from './event_stream' /** * Entities */ @@ -39,6 +38,7 @@ import { ProxyConfig } from './proxy_config' export { StreamListener, + StreamListenerError, WebSocket, Response, OAuth, @@ -46,7 +46,6 @@ export { RequestCanceledError, isCancel, ProxyConfig, - EventStream, /** * Entities */ diff --git a/src/mastodon.ts b/src/mastodon.ts index f3552a4b8..99a51babf 100644 --- a/src/mastodon.ts +++ b/src/mastodon.ts @@ -1,8 +1,7 @@ import { OAuth2 } from 'oauth' import axios, { AxiosResponse, CancelTokenSource, AxiosRequestConfig } from 'axios' -// import StreamListener from './stream_listener' -import EventStream from './event_stream' +import StreamListener from './stream_listener' import WebSocket from './web_socket' import OAuth from './oauth' import Response from './response' @@ -24,7 +23,7 @@ export interface MegalodonInstance { post(path: string, params: object): Promise> del(path: string, params: object): Promise> cancel(): void - stream(path: string, reconnectInterval: number): EventStream + stream(path: string, reconnectInterval: number): StreamListener socket(path: string, strea: string): WebSocket } @@ -480,7 +479,7 @@ export default class Mastodon implements MegalodonInstance { * @param reconnectInterval interval of reconnect * @returns streamListener, which inherits from EventEmitter */ - public stream(path: string, reconnectInterval = 1000): EventStream { + public stream(path: string, reconnectInterval = 1000): StreamListener { const headers = { 'Cache-Control': 'no-cache', Accept: 'text/event-stream', @@ -489,8 +488,7 @@ export default class Mastodon implements MegalodonInstance { 'User-Agent': this.userAgent } const url = this.baseUrl + path + `?access_token=${this.accessToken}` - // const streaming = new StreamListener(url, headers, this.proxyConfig, reconnectInterval) - const streaming = new EventStream(url, headers, this.proxyConfig, reconnectInterval) + const streaming = new StreamListener(url, headers, this.proxyConfig, reconnectInterval) process.nextTick(() => { streaming.start() }) diff --git a/src/stream_listener.ts b/src/stream_listener.ts index 09c76dff3..760a839a5 100644 --- a/src/stream_listener.ts +++ b/src/stream_listener.ts @@ -1,5 +1,6 @@ -import Request from 'request' import { EventEmitter } from 'events' +import axios, { CancelTokenSource, AxiosRequestConfig } from 'axios' +import httpAdapter from 'axios/lib/adapters/http' import Parser from './parser' import { Status } from './entities/status' import { Notification } from './entities/notification' @@ -8,7 +9,7 @@ import proxyAgent, { ProxyConfig } from './proxy_config' const STATUS_CODES_TO_ABORT_ON: Array = [400, 401, 403, 404, 406, 410, 422] -class StreamingError extends Error { +export class StreamListenerError extends Error { public statusCode: number public message: string @@ -20,266 +21,141 @@ class StreamingError extends Error { } /** - * StreamListener - * Listener of streaming. It receives data, and parse when streaming. - **/ + * EventStream + * Listener of text/event-stream. It receives data, and parse when streaming. + */ class StreamListener extends EventEmitter { - public reconnectInterval = 1000 public url: string public headers: object - public request: Request.Request | null - public response: Request.Response | null public parser: Parser public proxyConfig: ProxyConfig | false = false - private _scheduledReconnect: NodeJS.Timer | undefined - private _connectInterval: number - private _usedFirstReconnect: boolean - private _stallAbortTimeout: NodeJS.Timer | undefined + private _connectionClosed: boolean = false + private _reconnectInterval: number + private _reconnectMaxAttempts: number = Infinity + private _reconnectCurrentAttempts: number = 0 + private _cancelSource: CancelTokenSource /** - * @param url full url of streaming: e.g. https://mastodon.social/api/v1/streaming/user - * @param headers headers of streaming request - * @param proxyConfig Proxy setting, or set false if don't use proxy. - * @param reconnectInterval reconnection interval[ms] + * @param url Full url of streaming: e.g. https://mastodon.social/api/v1/streaming/user + * @param headers headers object of streaming request + * @param proxyConfig Proxy setting, or set false if don't use proxy + * @param reconnectInterval reconnection interval[ms] when the connection is unexpectedly closed */ - constructor(url: string, headers: object, proxyConfig: ProxyConfig | false = false, reconnectInterval?: number) { + constructor(url: string, headers: object, proxyConfig: ProxyConfig | false = false, reconnectInterval: number) { super() this.url = url this.headers = headers - this.proxyConfig = proxyConfig - if (reconnectInterval) this.reconnectInterval = reconnectInterval this.parser = new Parser() - this.request = null - this.response = null - this._connectInterval = 0 - this._usedFirstReconnect = false + this.proxyConfig = proxyConfig + this._reconnectInterval = reconnectInterval + this._cancelSource = axios.CancelToken.source() } /** - * Resets the connection. - * - clears request, response, parser - * - removes scheduled reconnect handle (if one was scheduled) - * - stops the stall abort timeout handle (if one was scheduled) + * Start streaming connection. */ - private _resetConnection() { - if (this.request) { - // clear our reference to the `request` instance - this.request.removeAllListeners() - this.request.destroy() - } - - if (this.response) { - // clear our reference to the http.IncomingMessage instance - this.response.removeAllListeners() - this.response.destroy() - } - - if (this.parser) { - this.parser.removeAllListeners() - } - - // ensure a scheduled reconnect does not occur (if one was scheduled) - // this can happen if we get a close event before .stop() is called - if (this._scheduledReconnect) { - clearTimeout(this._scheduledReconnect) - delete this._scheduledReconnect - } - - // clear our stall abort timeout - this._stopStallAbortTimeout() + public start() { + this._setupParser() + this._connect() } /** - * Resets the parameters used in determining the next reconnect time. + * Request the url and get response, and start streaming. */ - private _resetRetryParams() { - // delay for next reconnection attempt - this._connectInterval = 0 - // flag indicating whether we used a 0-delay reconnect - this._usedFirstReconnect = false - } - - private _buildRequestOption(): Request.OptionsWithUrl { - let options = { - headers: this.headers, - gzip: true, - encoding: null, - url: this.url + private _connect() { + let options: AxiosRequestConfig = { + responseType: 'stream', + adapter: httpAdapter, + cancelToken: this._cancelSource.token } if (this.proxyConfig) { options = Object.assign(options, { - agent: proxyAgent(this.proxyConfig) + httpsAgent: proxyAgent(this.proxyConfig) }) } - - return options - } - - private _startPersistentConnection() { - this._resetConnection() - this._setupParser() - this._resetStallAbortTimeout() - - this.request = Request.get(this._buildRequestOption()) - this.emit('connect', this.request) - this.request.on('response', response => { - // reset our reconnection attempt flag so next attempt goes through with 0 delay - // if we get a transport-level error - this._usedFirstReconnect = false - // start a stall abort timeout handle - this._resetStallAbortTimeout() - this.response = response + axios.get(this.url, options).then(response => { + const stream = response.data if (response.headers['content-type'] !== 'text/event-stream') { - this.emit('not-event-stream', 'no event') + this.emit('no-event-stream', 'no event') } - if (STATUS_CODES_TO_ABORT_ON.indexOf(response.statusCode) > -1) { - let body: string = '' - this.response.on('data', chunk => { - body += chunk.toString() + // Response status is error + if (STATUS_CODES_TO_ABORT_ON.includes(response.status)) { + stream.on('data', (chunk: any) => { + let body = chunk.toString() try { body = JSON.parse(body) } catch (jsonDecodeError) { - // if non-JSON text was returned, we'll just attach it to the error as-is + // if non-JSON text was returned, we'll just attach it to the error as-is. } - const error: StreamingError = new StreamingError(this.response!.statusCode, body) + const error: StreamListenerError = new StreamListenerError(response.status, body) this.emit('error', error) - // stop the stream explicitly so we don't reconnect this.stop() - body = '' }) } else { - this.response.on('data', chunk => { - this._connectInterval = 0 - - this._resetStallAbortTimeout() + stream.on('data', (chunk: any) => { this.parser.parse(chunk.toString()) }) - - this.response.on('error', err => { - // expose response errors on twit instance + stream.on('error', (err: Error) => { this.emit('error', err) }) - - // connected without an error response from Dweet.io, emit `connected` event - // this must be emitted after all its event handlers are bound - // so the reference to `self.response` is not - // interfered-with by the user until it is emitted - this.emit('connected', this.response) } + stream.on('end', (err: Error | undefined | null) => { + if (err) { + console.log(`Closed connection with ${err.message}`) + if (!this._connectionClosed) { + this._reconnect() + } + } else { + this.emit('close', {}) + } + }) + this.emit('connect', stream) }) - this.request.on('close', this._onClose.bind(this)) - this.request.on('error', () => { - this._scheduleReconnect.bind(this) - }) - return this } /** - * Handle when the request or response closes. - * Schedule a reconnect - * + * Reconnects to the same endpoint. */ - private _onClose() { - this._stopStallAbortTimeout() - if (this._scheduledReconnect) { - // if we already have a reconnect scheduled, don't schedule another one. - // this race condition can happen if the http.ClientRequest - // and http.IncomingMessage both emit `close` - return - } - this._scheduleReconnect() - } - - /** - * Kick off the http request, and persist the connection - */ - public start() { - this._resetRetryParams() - this._startPersistentConnection() - return this + private _reconnect() { + setTimeout(() => { + if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) { + this._reconnectCurrentAttempts++ + this._cancelSource.cancel() + console.log('Reconnecting') + this._connect() + } + }, this._reconnectInterval) } /** - * Abort the http request, stop scheduled reconnect (if one was scheduled) and clear state + * Stop the connection. */ public stop() { - // clear connection variables and timeout handles + this._connectionClosed = true this._resetConnection() this._resetRetryParams() - return this } /** - * Stop and restart the stall abort timer (called when new data is received) - * - * If we go 90s without receiving data from dweet.io, we abort the request & reconnect. + * Resets connection and remove listeners. */ - private _resetStallAbortTimeout() { - // stop the previous stall abort timer - this._stopStallAbortTimeout() - // start a new 90s timeout to trigger a close & reconnect if no data received - this._stallAbortTimeout = setTimeout(() => { - this._scheduleReconnect() - }, 90000) - return this - } + private _resetConnection() { + // axios does not provide stop method. + // So we have to cancel request to stop connection. + this._cancelSource.cancel() - private _stopStallAbortTimeout() { - if (this._stallAbortTimeout) { - clearTimeout(this._stallAbortTimeout) - // mark the timer as `null` so it is clear - // via introspection that the timeout is not scheduled - delete this._stallAbortTimeout + if (this.parser) { + this.parser.removeAllListeners() } - return this } /** - * Computes the next time a reconnect should occur (based on the last HTTP response received) - * and starts a timeout handle to begin reconnecting after `self._connectInterval` passes. + * Resets parameters. */ - private _scheduleReconnect() { - if (this.response && this.response.statusCode === 420) { - // start with a 1 minute wait and double each attempt - if (!this._connectInterval) { - this._connectInterval = 60000 - } else { - this._connectInterval *= 2 - } - } else if (this.response && String(this.response.statusCode).charAt(0) === '5') { - // 5xx errors - // start with a 5s wait, double each attempt up to 320s - if (!this._connectInterval) { - this._connectInterval = 5000 - } else if (this._connectInterval < 320000) { - this._connectInterval *= 2 - } else { - this._connectInterval = 320000 - } - } else { - // we did not get an HTTP response from our last connection attempt. - // DNS/TCP error, or a stall in the stream (and stall timer closed the connection) - // eslint-disable-next-line no-lonely-if - if (!this._usedFirstReconnect) { - // first reconnection attempt on a valid connection should occur immediately - this._connectInterval = 0 - this._usedFirstReconnect = true - } else if (this._connectInterval < 16000) { - // linearly increase delay by 250ms up to 16s - this._connectInterval += 250 - } else { - // cap out reconnect interval at 16s - this._connectInterval = 16000 - } - } - - // schedule the reconnect - this._scheduledReconnect = setTimeout(() => { - this._startPersistentConnection() - }, this._connectInterval) - this.emit('reconnect', this.request, this.response, this._connectInterval) + private _resetRetryParams() { + this._reconnectCurrentAttempts = 0 } /**