Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refs #102 Get event-stream using axios #106

Merged
merged 3 commits into from
Dec 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions example/typescript/proxy_streaming.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Mastodon, { Status, Notification, StreamListener, ProxyConfig } from 'megalodon'
import Mastodon, { Status, Notification, EventStream, ProxyConfig } from 'megalodon'

declare var process: {
env: {
Expand All @@ -21,7 +21,7 @@ const proxy: ProxyConfig = {

const client = new Mastodon(access_token, BASE_URL + '/api/v1', 'megalodon', proxy)

const stream: StreamListener = client.stream('/streaming/public')
const stream: EventStream = client.stream('/streaming/public')
stream.on('connect', _ => {
console.log('connect')
})
Expand Down
8 changes: 6 additions & 2 deletions example/typescript/streaming.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Mastodon, { Status, Notification, StreamListener } from 'megalodon'
import Mastodon, { Status, Notification, EventStream } from 'megalodon'

declare var process: {
env: {
Expand All @@ -12,7 +12,7 @@ const access_token: string = process.env.MASTODON_ACCESS_TOKEN

const client = new Mastodon(access_token, BASE_URL + '/api/v1')

const stream: StreamListener = client.stream('/streaming/public')
const stream: EventStream = client.stream('/streaming/public')
stream.on('connect', _ => {
console.log('connect')
})
Expand Down Expand Up @@ -44,3 +44,7 @@ stream.on('heartbeat', () => {
stream.on('connection-limit-exceeded', (err: Error) => {
console.error(err)
})

setTimeout(() => {
stream.stop()
}, 10000)
1 change: 1 addition & 0 deletions src/axios.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
declare module 'axios/lib/adapters/http'
190 changes: 190 additions & 0 deletions src/event_stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import { EventEmitter } from 'events'
import axios, { CancelTokenSource, AxiosRequestConfig } from 'axios'
import httpAdapter from 'axios/lib/adapters/http'
import Parser from './parser'
import { Status } from './entities/status'
import { Notification } from './entities/notification'
import { Conversation } from './entities/conversation'
import proxyAgent, { ProxyConfig } from './proxy_config'

const STATUS_CODES_TO_ABORT_ON: Array<number> = [400, 401, 403, 404, 406, 410, 422]

class EventStreamError extends Error {
public statusCode: number
public message: string

constructor(statusCode: number, message: string) {
super()
this.statusCode = statusCode
this.message = message
}
}

// TODO: proxy config

/**
* EventStream
* Listener of text/event-stream. I receives data, and parse when streaming.
*/
class EventStream extends EventEmitter {
public url: string
public headers: object
public parser: Parser
public proxyConfig: ProxyConfig | false = false
private _connectionClosed: boolean = false
private _reconnectInterval: number
private _reconnectMaxAttempts: number = Infinity
private _reconnectCurrentAttempts: number = 0
private _cancelSource: CancelTokenSource

/**
* @param url Full url of streaming: e.g. https://mastodon.social/api/v1/streaming/user
* @param headers headers object of streaming request
* @param reconnectInterval reconnection interval[ms] when the connection is unexpectedly closed
*/
constructor(url: string, headers: object, proxyConfig: ProxyConfig | false = false, reconnectInterval: number) {
super()
this.url = url
this.headers = headers
this.parser = new Parser()
this.proxyConfig = proxyConfig
this._reconnectInterval = reconnectInterval
this._cancelSource = axios.CancelToken.source()
}

/**
* Start streaming connection.
*/
public start() {
this._setupParser()
this._connect()
}

/**
* Request the url and get response, and start streaming.
*/
private _connect() {
let options: AxiosRequestConfig = {
responseType: 'stream',
adapter: httpAdapter,
cancelToken: this._cancelSource.token
}
if (this.proxyConfig) {
options = Object.assign(options, {
httpsAgent: proxyAgent(this.proxyConfig)
})
}
axios.get(this.url, options).then(response => {
const stream = response.data
if (response.headers['content-type'] !== 'text/event-stream') {
this.emit('no-event-stream', 'no event')
}

// Response status is error
if (STATUS_CODES_TO_ABORT_ON.includes(response.status)) {
stream.on('data', (chunk: any) => {
let body = chunk.toString()
try {
body = JSON.parse(body)
} catch (jsonDecodeError) {
// if non-JSON text was returned, we'll just attach it to the error as-is.
}

const error: EventStreamError = new EventStreamError(response.status, body)
this.emit('error', error)
this.stop()
})
} else {
stream.on('data', (chunk: any) => {
this.parser.parse(chunk.toString())
})
stream.on('error', (err: Error) => {
this.emit('error', err)
})
}
stream.on('end', (err: Error | undefined | null) => {
if (err) {
console.log(`Closed connection with ${err.message}`)
if (!this._connectionClosed) {
this._reconnect()
}
} else {
this.emit('close', {})
}
})
this.emit('connect', stream)
})
}

/**
* Reconnects to the same endpoint.
*/
private _reconnect() {
setTimeout(() => {
if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) {
this._reconnectCurrentAttempts++
this._cancelSource.cancel()
console.log('Reconnecting')
this._connect()
}
}, this._reconnectInterval)
}

/**
* Stop the connection.
*/
public stop() {
this._connectionClosed = true
this._resetConnection()
this._resetRetryParams()
}

/**
* Resets connection and remove listeners.
*/
private _resetConnection() {
// axios does not provide stop method.
// So we have to cancel request to stop connection.
this._cancelSource.cancel()

if (this.parser) {
this.parser.removeAllListeners()
}
}

/**
* Resets parameters.
*/
private _resetRetryParams() {
this._reconnectCurrentAttempts = 0
}

/**
* Set up parser when receive some data.
**/
private _setupParser() {
this.parser.on('update', (status: Status) => {
this.emit('update', status)
})
this.parser.on('notification', (notification: Notification) => {
this.emit('notification', notification)
})
this.parser.on('conversation', (conversation: Conversation) => {
this.emit('conversation', conversation)
})
this.parser.on('delete', (id: string) => {
this.emit('delete', id)
})
this.parser.on('error', (err: Error) => {
this.emit('parser-error', err)
})
this.parser.on('connection-limit-exceeded', (err: Error) => {
this.emit('error', err)
})
this.parser.on('heartbeat', _ => {
this.emit('heartbeat', 'heartbeat')
})
}
}

export default EventStream
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import StreamListener from './stream_listener'
import WebSocket from './web_socket'
import Response from './response'
import OAuth from './oauth'
import EventStream from './event_stream'
/**
* Entities
*/
Expand Down Expand Up @@ -45,6 +46,7 @@ export {
RequestCanceledError,
isCancel,
ProxyConfig,
EventStream,
/**
* Entities
*/
Expand Down
22 changes: 14 additions & 8 deletions src/mastodon.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { OAuth2 } from 'oauth'
import axios, { AxiosResponse, CancelTokenSource, AxiosRequestConfig } from 'axios'

import StreamListener from './stream_listener'
// import StreamListener from './stream_listener'
import EventStream from './event_stream'
import WebSocket from './web_socket'
import OAuth from './oauth'
import Response from './response'
Expand All @@ -23,7 +24,7 @@ export interface MegalodonInstance {
post<T = any>(path: string, params: object): Promise<Response<T>>
del(path: string, params: object): Promise<Response<{}>>
cancel(): void
stream(path: string, reconnectInterval: number): StreamListener
stream(path: string, reconnectInterval: number): EventStream
socket(path: string, strea: string): WebSocket
}

Expand Down Expand Up @@ -128,9 +129,12 @@ export default class Mastodon implements MegalodonInstance {
}
if (options.website) params.website = options.website

return this._post<OAuth.AppDataFromServer>('/api/v1/apps', params, baseUrl, proxyConfig).then(
(res: Response<OAuth.AppDataFromServer>) => OAuth.AppData.from(res.data)
)
return this._post<OAuth.AppDataFromServer>(
'/api/v1/apps',
params,
baseUrl,
proxyConfig
).then((res: Response<OAuth.AppDataFromServer>) => OAuth.AppData.from(res.data))
}

/**
Expand Down Expand Up @@ -476,15 +480,17 @@ export default class Mastodon implements MegalodonInstance {
* @param reconnectInterval interval of reconnect
* @returns streamListener, which inherits from EventEmitter
*/
public stream(path: string, reconnectInterval = 1000): StreamListener {
public stream(path: string, reconnectInterval = 1000): EventStream {
const headers = {
'Cache-Control': 'no-cache',
Accept: 'text/event-stream',
'Content-Type': 'text/event-stream',
Authorization: `Bearer ${this.accessToken}`,
'User-Agent': this.userAgent
}
const url = this.baseUrl + path
const streaming = new StreamListener(url, headers, this.proxyConfig, reconnectInterval)
const url = this.baseUrl + path + `?access_token=${this.accessToken}`
// const streaming = new StreamListener(url, headers, this.proxyConfig, reconnectInterval)
const streaming = new EventStream(url, headers, this.proxyConfig, reconnectInterval)
process.nextTick(() => {
streaming.start()
})
Expand Down