Skip to content

Commit

Permalink
refs #102 Cancel axios request when stop streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
h3poteto committed Dec 8, 2019
1 parent 8d04a42 commit 6e624b0
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 5 deletions.
4 changes: 4 additions & 0 deletions example/typescript/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ stream.on('heartbeat', () => {
stream.on('connection-limit-exceeded', (err: Error) => {
console.error(err)
})

setTimeout(() => {
stream.stop()
}, 10000)
44 changes: 39 additions & 5 deletions src/event_stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { EventEmitter } from 'events'
import axios from 'axios'
import axios, { CancelTokenSource } from 'axios'
import httpAdapter from 'axios/lib/adapters/http'
import Parser from './parser'
import { Status } from './entities/status'
Expand All @@ -19,6 +19,12 @@ class EventStreamError extends Error {
}
}

// TODO: proxy config

/**
* EventStream
* Listener of text/event-stream. I receives data, and parse when streaming.
*/
class EventStream extends EventEmitter {
public url: string
public headers: object
Expand All @@ -27,22 +33,35 @@ class EventStream extends EventEmitter {
private _reconnectInterval: number
private _reconnectMaxAttempts: number = Infinity
private _reconnectCurrentAttempts: number = 0
private _cancelSource: CancelTokenSource

constructor(url: string, headers: object, _reconnectInterval: number) {
/**
* @param url Full url of streaming: e.g. https://mastodon.social/api/v1/streaming/user
* @param headers headers object of streaming request
* @param reconnectInterval reconnection interval[ms] when the connection is unexpectedly closed
*/
constructor(url: string, headers: object, reconnectInterval: number) {
super()
this.url = url
this.headers = headers
this.parser = new Parser()
this._reconnectInterval = _reconnectInterval
this._reconnectInterval = reconnectInterval
this._cancelSource = axios.CancelToken.source()
}

/**
* Start streaming connection.
*/
public start() {
this._setupParser()
this._connect()
}

/**
* Request the url and get response, and start streaming.
*/
private _connect() {
axios.get(this.url, { responseType: 'stream', adapter: httpAdapter }).then(response => {
axios.get(this.url, { responseType: 'stream', adapter: httpAdapter, cancelToken: this._cancelSource.token }).then(response => {
const stream = response.data
if (response.headers['content-type'] !== 'text/event-stream') {
this.emit('no-event-stream', 'no event')
Expand Down Expand Up @@ -84,30 +103,45 @@ class EventStream extends EventEmitter {
})
}

/**
* Reconnects to the same endpoint.
*/
private _reconnect() {
setTimeout(() => {
if (this._reconnectCurrentAttempts < this._reconnectMaxAttempts) {
this._reconnectCurrentAttempts++
this._cancelSource.cancel()
console.log('Reconnecting')
this._connect()
}
}, this._reconnectInterval)
}

/**
* Stop the connection.
*/
public stop() {
this._connectionClosed = true
this._resetConnection()
this._resetRetryParams()
}

/**
* Resets connection and remove listeners.
*/
private _resetConnection() {
// TODO: close axios streaming connection
// axios does not provide stop method.
// So we have to cancel request to stop connection.
this._cancelSource.cancel()

if (this.parser) {
this.parser.removeAllListeners()
}
}

/**
* Resets parameters.
*/
private _resetRetryParams() {
this._reconnectCurrentAttempts = 0
}
Expand Down

0 comments on commit 6e624b0

Please sign in to comment.