From 8d04a42bc76466ce16a066fc0e779b95264e835f Mon Sep 17 00:00:00 2001 From: AkiraFukushima Date: Sun, 8 Dec 2019 01:24:18 +0900 Subject: [PATCH 1/3] refs #102 Get event-stream using axios --- example/typescript/proxy_streaming.ts | 4 +- example/typescript/streaming.ts | 4 +- src/axios.d.ts | 1 + src/event_stream.ts | 143 ++++++++++++++++++++++++++ src/index.ts | 2 + src/mastodon.ts | 22 ++-- 6 files changed, 164 insertions(+), 12 deletions(-) create mode 100644 src/axios.d.ts create mode 100644 src/event_stream.ts diff --git a/example/typescript/proxy_streaming.ts b/example/typescript/proxy_streaming.ts index ea526b616..7f61262a3 100644 --- a/example/typescript/proxy_streaming.ts +++ b/example/typescript/proxy_streaming.ts @@ -1,4 +1,4 @@ -import Mastodon, { Status, Notification, StreamListener, ProxyConfig } from 'megalodon' +import Mastodon, { Status, Notification, EventStream, ProxyConfig } from 'megalodon' declare var process: { env: { @@ -21,7 +21,7 @@ const proxy: ProxyConfig = { const client = new Mastodon(access_token, BASE_URL + '/api/v1', 'megalodon', proxy) -const stream: StreamListener = client.stream('/streaming/public') +const stream: EventStream = client.stream('/streaming/public') stream.on('connect', _ => { console.log('connect') }) diff --git a/example/typescript/streaming.ts b/example/typescript/streaming.ts index a912329f4..f8f40241c 100644 --- a/example/typescript/streaming.ts +++ b/example/typescript/streaming.ts @@ -1,4 +1,4 @@ -import Mastodon, { Status, Notification, StreamListener } from 'megalodon' +import Mastodon, { Status, Notification, EventStream } from 'megalodon' declare var process: { env: { @@ -12,7 +12,7 @@ const access_token: string = process.env.MASTODON_ACCESS_TOKEN const client = new Mastodon(access_token, BASE_URL + '/api/v1') -const stream: StreamListener = client.stream('/streaming/public') +const stream: EventStream = client.stream('/streaming/public') stream.on('connect', _ => { console.log('connect') }) diff --git a/src/axios.d.ts b/src/axios.d.ts new file mode 100644 index 000000000..114cb06aa --- /dev/null +++ b/src/axios.d.ts @@ -0,0 +1 @@ +declare module 'axios/lib/adapters/http' diff --git a/src/event_stream.ts b/src/event_stream.ts new file mode 100644 index 000000000..ae41ea1ec --- /dev/null +++ b/src/event_stream.ts @@ -0,0 +1,143 @@ +import { EventEmitter } from 'events' +import axios from 'axios' +import httpAdapter from 'axios/lib/adapters/http' +import Parser from './parser' +import { Status } from './entities/status' +import { Notification } from './entities/notification' +import { Conversation } from './entities/conversation' + +const STATUS_CODES_TO_ABORT_ON: Array = [400, 401, 403, 404, 406, 410, 422] + +class EventStreamError extends Error { + public statusCode: number + public message: string + + constructor(statusCode: number, message: string) { + super() + this.statusCode = statusCode + this.message = message + } +} + +class EventStream extends EventEmitter { + public url: string + public headers: object + public parser: Parser + private _connectionClosed: boolean = false + private _reconnectInterval: number + private _reconnectMaxAttempts: number = Infinity + private _reconnectCurrentAttempts: number = 0 + + constructor(url: string, headers: object, _reconnectInterval: number) { + super() + this.url = url + this.headers = headers + this.parser = new Parser() + this._reconnectInterval = _reconnectInterval + } + + public start() { + this._setupParser() + this._connect() + } + + private _connect() { + axios.get(this.url, { responseType: 'stream', adapter: httpAdapter }).then(response => { + const stream = response.data + if (response.headers['content-type'] !== 'text/event-stream') { + this.emit('no-event-stream', 'no event') + } + + // Response status is error + if (STATUS_CODES_TO_ABORT_ON.includes(response.status)) { + stream.on('data', (chunk: any) => { + let body = chunk.toString() + try { + body = JSON.parse(body) + } catch (jsonDecodeError) { + // if non-JSON text was returned, we'll just attach it to the error as-is. + } + + const error: EventStreamError = new EventStreamError(response.status, body) + this.emit('error', error) + this.stop() + }) + } else { + stream.on('data', (chunk: any) => { + this.parser.parse(chunk.toString()) + }) + stream.on('error', (err: Error) => { + this.emit('error', err) + }) + } + stream.on('end', (err: Error | undefined | null) => { + if (err) { + console.log(`Closed connection with ${err.message}`) + if (!this._connectionClosed) { + this._reconnect() + } + } else { + this.emit('close', {}) + } + }) + this.emit('connect', stream) + }) + } + + private _reconnect() { + setTimeout(() => { + if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) { + this._reconnectCurrentAttempts++ + console.log('Reconnecting') + this._connect() + } + }, this._reconnectInterval) + } + + public stop() { + this._connectionClosed = true + this._resetConnection() + this._resetRetryParams() + } + + private _resetConnection() { + // TODO: close axios streaming connection + + if (this.parser) { + this.parser.removeAllListeners() + } + } + + private _resetRetryParams() { + this._reconnectCurrentAttempts = 0 + } + + /** + * Set up parser when receive some data. + **/ + private _setupParser() { + this.parser.on('update', (status: Status) => { + this.emit('update', status) + }) + this.parser.on('notification', (notification: Notification) => { + this.emit('notification', notification) + }) + this.parser.on('conversation', (conversation: Conversation) => { + this.emit('conversation', conversation) + }) + this.parser.on('delete', (id: string) => { + this.emit('delete', id) + }) + this.parser.on('error', (err: Error) => { + this.emit('parser-error', err) + }) + this.parser.on('connection-limit-exceeded', (err: Error) => { + this.emit('error', err) + }) + this.parser.on('heartbeat', _ => { + this.emit('heartbeat', 'heartbeat') + }) + } +} + +export default EventStream diff --git a/src/index.ts b/src/index.ts index c05089d8b..99afba490 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,6 +3,7 @@ import StreamListener from './stream_listener' import WebSocket from './web_socket' import Response from './response' import OAuth from './oauth' +import EventStream from './event_stream' /** * Entities */ @@ -45,6 +46,7 @@ export { RequestCanceledError, isCancel, ProxyConfig, + EventStream, /** * Entities */ diff --git a/src/mastodon.ts b/src/mastodon.ts index 748181f47..b7ecd52ae 100644 --- a/src/mastodon.ts +++ b/src/mastodon.ts @@ -1,7 +1,8 @@ import { OAuth2 } from 'oauth' import axios, { AxiosResponse, CancelTokenSource, AxiosRequestConfig } from 'axios' -import StreamListener from './stream_listener' +// import StreamListener from './stream_listener' +import EventStream from './event_stream' import WebSocket from './web_socket' import OAuth from './oauth' import Response from './response' @@ -23,7 +24,7 @@ export interface MegalodonInstance { post(path: string, params: object): Promise> del(path: string, params: object): Promise> cancel(): void - stream(path: string, reconnectInterval: number): StreamListener + stream(path: string, reconnectInterval: number): EventStream socket(path: string, strea: string): WebSocket } @@ -128,9 +129,12 @@ export default class Mastodon implements MegalodonInstance { } if (options.website) params.website = options.website - return this._post('/api/v1/apps', params, baseUrl, proxyConfig).then( - (res: Response) => OAuth.AppData.from(res.data) - ) + return this._post( + '/api/v1/apps', + params, + baseUrl, + proxyConfig + ).then((res: Response) => OAuth.AppData.from(res.data)) } /** @@ -476,15 +480,17 @@ export default class Mastodon implements MegalodonInstance { * @param reconnectInterval interval of reconnect * @returns streamListener, which inherits from EventEmitter */ - public stream(path: string, reconnectInterval = 1000): StreamListener { + public stream(path: string, reconnectInterval = 1000): EventStream { const headers = { 'Cache-Control': 'no-cache', Accept: 'text/event-stream', + 'Content-Type': 'text/event-stream', Authorization: `Bearer ${this.accessToken}`, 'User-Agent': this.userAgent } - const url = this.baseUrl + path - const streaming = new StreamListener(url, headers, this.proxyConfig, reconnectInterval) + const url = this.baseUrl + path + `?access_token=${this.accessToken}` + // const streaming = new StreamListener(url, headers, this.proxyConfig, reconnectInterval) + const streaming = new EventStream(url, headers, reconnectInterval) process.nextTick(() => { streaming.start() }) From 6e624b0a67fa79adb155d464a54220bc9a1edd60 Mon Sep 17 00:00:00 2001 From: AkiraFukushima Date: Sun, 8 Dec 2019 23:57:42 +0900 Subject: [PATCH 2/3] refs #102 Cancel axios request when stop streaming --- example/typescript/streaming.ts | 4 +++ src/event_stream.ts | 44 +++++++++++++++++++++++++++++---- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/example/typescript/streaming.ts b/example/typescript/streaming.ts index f8f40241c..f09e4ccd5 100644 --- a/example/typescript/streaming.ts +++ b/example/typescript/streaming.ts @@ -44,3 +44,7 @@ stream.on('heartbeat', () => { stream.on('connection-limit-exceeded', (err: Error) => { console.error(err) }) + +setTimeout(() => { + stream.stop() +}, 10000) diff --git a/src/event_stream.ts b/src/event_stream.ts index ae41ea1ec..e040dfb0c 100644 --- a/src/event_stream.ts +++ b/src/event_stream.ts @@ -1,5 +1,5 @@ import { EventEmitter } from 'events' -import axios from 'axios' +import axios, { CancelTokenSource } from 'axios' import httpAdapter from 'axios/lib/adapters/http' import Parser from './parser' import { Status } from './entities/status' @@ -19,6 +19,12 @@ class EventStreamError extends Error { } } +// TODO: proxy config + +/** + * EventStream + * Listener of text/event-stream. I receives data, and parse when streaming. + */ class EventStream extends EventEmitter { public url: string public headers: object @@ -27,22 +33,35 @@ class EventStream extends EventEmitter { private _reconnectInterval: number private _reconnectMaxAttempts: number = Infinity private _reconnectCurrentAttempts: number = 0 + private _cancelSource: CancelTokenSource - constructor(url: string, headers: object, _reconnectInterval: number) { + /** + * @param url Full url of streaming: e.g. https://mastodon.social/api/v1/streaming/user + * @param headers headers object of streaming request + * @param reconnectInterval reconnection interval[ms] when the connection is unexpectedly closed + */ + constructor(url: string, headers: object, reconnectInterval: number) { super() this.url = url this.headers = headers this.parser = new Parser() - this._reconnectInterval = _reconnectInterval + this._reconnectInterval = reconnectInterval + this._cancelSource = axios.CancelToken.source() } + /** + * Start streaming connection. + */ public start() { this._setupParser() this._connect() } + /** + * Request the url and get response, and start streaming. + */ private _connect() { - axios.get(this.url, { responseType: 'stream', adapter: httpAdapter }).then(response => { + axios.get(this.url, { responseType: 'stream', adapter: httpAdapter, cancelToken: this._cancelSource.token }).then(response => { const stream = response.data if (response.headers['content-type'] !== 'text/event-stream') { this.emit('no-event-stream', 'no event') @@ -84,30 +103,45 @@ class EventStream extends EventEmitter { }) } + /** + * Reconnects to the same endpoint. + */ private _reconnect() { setTimeout(() => { if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) { this._reconnectCurrentAttempts++ + this._cancelSource.cancel() console.log('Reconnecting') this._connect() } }, this._reconnectInterval) } + /** + * Stop the connection. + */ public stop() { this._connectionClosed = true this._resetConnection() this._resetRetryParams() } + /** + * Resets connection and remove listeners. + */ private _resetConnection() { - // TODO: close axios streaming connection + // axios does not provide stop method. + // So we have to cancel request to stop connection. + this._cancelSource.cancel() if (this.parser) { this.parser.removeAllListeners() } } + /** + * Resets parameters. + */ private _resetRetryParams() { this._reconnectCurrentAttempts = 0 } From 8654c0e0b74cc098b306a4529292a0af083f9404 Mon Sep 17 00:00:00 2001 From: AkiraFukushima Date: Mon, 9 Dec 2019 00:18:37 +0900 Subject: [PATCH 3/3] refs #102 Add proxy support for axios streaming --- src/event_stream.ts | 19 ++++++++++++++++--- src/mastodon.ts | 2 +- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/event_stream.ts b/src/event_stream.ts index e040dfb0c..dd6dfe52f 100644 --- a/src/event_stream.ts +++ b/src/event_stream.ts @@ -1,10 +1,11 @@ import { EventEmitter } from 'events' -import axios, { CancelTokenSource } from 'axios' +import axios, { CancelTokenSource, AxiosRequestConfig } from 'axios' import httpAdapter from 'axios/lib/adapters/http' import Parser from './parser' import { Status } from './entities/status' import { Notification } from './entities/notification' import { Conversation } from './entities/conversation' +import proxyAgent, { ProxyConfig } from './proxy_config' const STATUS_CODES_TO_ABORT_ON: Array = [400, 401, 403, 404, 406, 410, 422] @@ -29,6 +30,7 @@ class EventStream extends EventEmitter { public url: string public headers: object public parser: Parser + public proxyConfig: ProxyConfig | false = false private _connectionClosed: boolean = false private _reconnectInterval: number private _reconnectMaxAttempts: number = Infinity @@ -40,11 +42,12 @@ class EventStream extends EventEmitter { * @param headers headers object of streaming request * @param reconnectInterval reconnection interval[ms] when the connection is unexpectedly closed */ - constructor(url: string, headers: object, reconnectInterval: number) { + constructor(url: string, headers: object, proxyConfig: ProxyConfig | false = false, reconnectInterval: number) { super() this.url = url this.headers = headers this.parser = new Parser() + this.proxyConfig = proxyConfig this._reconnectInterval = reconnectInterval this._cancelSource = axios.CancelToken.source() } @@ -61,7 +64,17 @@ class EventStream extends EventEmitter { * Request the url and get response, and start streaming. */ private _connect() { - axios.get(this.url, { responseType: 'stream', adapter: httpAdapter, cancelToken: this._cancelSource.token }).then(response => { + let options: AxiosRequestConfig = { + responseType: 'stream', + adapter: httpAdapter, + cancelToken: this._cancelSource.token + } + if (this.proxyConfig) { + options = Object.assign(options, { + httpsAgent: proxyAgent(this.proxyConfig) + }) + } + axios.get(this.url, options).then(response => { const stream = response.data if (response.headers['content-type'] !== 'text/event-stream') { this.emit('no-event-stream', 'no event') diff --git a/src/mastodon.ts b/src/mastodon.ts index b7ecd52ae..f3552a4b8 100644 --- a/src/mastodon.ts +++ b/src/mastodon.ts @@ -490,7 +490,7 @@ export default class Mastodon implements MegalodonInstance { } const url = this.baseUrl + path + `?access_token=${this.accessToken}` // const streaming = new StreamListener(url, headers, this.proxyConfig, reconnectInterval) - const streaming = new EventStream(url, headers, reconnectInterval) + const streaming = new EventStream(url, headers, this.proxyConfig, reconnectInterval) process.nextTick(() => { streaming.start() })