Skip to content

Commit

Permalink
Support WebSocket in browser
Browse files Browse the repository at this point in the history
  • Loading branch information
h3poteto committed Oct 4, 2023
1 parent a753a18 commit 6135bc6
Show file tree
Hide file tree
Showing 11 changed files with 233 additions and 141 deletions.
49 changes: 44 additions & 5 deletions example/browser/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,49 @@
import generator, { Entity, Response } from 'megalodon'
import generator from 'megalodon'

const BASE_URL: string = 'https://mastodon.social'
const BASE_URL: string = process.env.MASTODON_URL!
const ACCESS_TOKEN: string = process.env.MASTODON_ACCESS_TOKEN!
console.log(BASE_URL)
console.log('start')

const client = generator('mastodon', BASE_URL)
const client = generator('mastodon', BASE_URL, ACCESS_TOKEN)

client.getInstance().then((res: Response<Entity.Instance>) => {
console.log(res)
const stream = client.localSocket()
stream.on('connect', () => {
console.log('connect')
})

stream.on('pong', () => {
console.log('pong')
})

stream.on('update', (status: Entity.Status) => {
console.log(status)
})

stream.on('notification', (notification: Entity.Notification) => {
console.log(notification)
})

stream.on('delete', (id: number) => {
console.log(id)
})

stream.on('error', (err: Error) => {
console.error(err)
})

stream.on('status_update', (status: Entity.Status) => {
console.log('updated: ', status.url)
})

stream.on('heartbeat', () => {
console.log('thump.')
})

stream.on('close', () => {
console.log('close')
})

stream.on('parser-error', (err: Error) => {
console.error(err)
})
4 changes: 3 additions & 1 deletion example/browser/webpack.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ module.exports = {
plugins: [
new webpack.DefinePlugin({
'process.browser': true,
'process.env.NODE_DEBUG': false
'process.env.NODE_DEBUG': false,
'process.env.MASTODON_URL': `"${process.env.MASTODON_URL}"`,
'process.env.MASTODON_ACCESS_TOKEN': `"${process.env.MASTODON_ACCESS_TOKEN}"`
}),
new webpack.ProvidePlugin({
Buffer: ['buffer', 'Buffer'],
Expand Down
12 changes: 3 additions & 9 deletions example/typescript/src/mastodon/streaming.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import generator, { Entity, WebSocketInterface } from 'megalodon'

declare var process: {
env: {
MASTODON_ACCESS_TOKEN: string
}
}
const BASE_URL: string = process.env.MASTODON_STREAMING_URL!

const BASE_URL: string = 'wss://streaming.fedibird.com'

const access_token: string = process.env.MASTODON_ACCESS_TOKEN
const access_token: string = process.env.MASTODON_ACCESS_TOKEN!

const client = generator('mastodon', BASE_URL, access_token)

const stream: WebSocketInterface = client.userSocket()
const stream: WebSocketInterface = client.localSocket()
stream.on('connect', () => {
console.log('connect')
})
Expand Down
3 changes: 2 additions & 1 deletion megalodon/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
"socks-proxy-agent": "^8.0.2",
"typescript": "5.2.2",
"uuid": "^9.0.1",
"ws": "8.14.2"
"ws": "8.14.2",
"isomorphic-ws": "^5.0.0"
},
"devDependencies": {
"@types/core-js": "^2.5.6",
Expand Down
8 changes: 8 additions & 0 deletions megalodon/src/default.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
export const NO_REDIRECT = 'urn:ietf:wg:oauth:2.0:oob'
export const DEFAULT_SCOPE = ['read', 'write', 'follow']
export const DEFAULT_UA = 'megalodon'

export function isBrowser() {
if (typeof window !== 'undefined') {
return true
} else {
return false
}
}
88 changes: 52 additions & 36 deletions megalodon/src/firefish/web_socket.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import WS from 'ws'
import WS from 'isomorphic-ws'
import dayjs, { Dayjs } from 'dayjs'
import { v4 as uuid } from 'uuid'
import { EventEmitter } from 'events'
import { WebSocketInterface } from '../megalodon'
import proxyAgent, { ProxyConfig } from '../proxy_config'
import FirefishAPI from './api_client'
import { UnknownNotificationTypeError } from '../notification'
import { isBrowser } from '../default'

/**
* WebSocket
Expand Down Expand Up @@ -120,16 +121,24 @@ export default class WebSocket extends EventEmitter implements WebSocketInterfac
* Connect to the endpoint.
*/
private _connect(): WS {
let options: WS.ClientOptions = {
headers: this.headers
}
if (this.proxyConfig) {
options = Object.assign(options, {
agent: proxyAgent(this.proxyConfig)
})
const requestURL = `${this.url}?i=${this._accessToken}`
if (isBrowser()) {
// This is browser.
// We can't pass options when browser: https://github.com/heineiuo/isomorphic-ws#limitations
const cli = new WS(requestURL)
return cli
} else {
let options: WS.ClientOptions = {
headers: this.headers
}
if (this.proxyConfig) {
options = Object.assign(options, {
agent: proxyAgent(this.proxyConfig)
})
}
const cli: WS = new WS(requestURL, options)
return cli
}
const cli: WS = new WS(`${this.url}?i=${this._accessToken}`, options)
return cli
}

/**
Expand Down Expand Up @@ -252,37 +261,43 @@ export default class WebSocket extends EventEmitter implements WebSocketInterfac
* @param client A WebSocket instance.
*/
private _bindSocket(client: WS) {
client.on('close', (code: number, _reason: Buffer) => {
if (code === 1000) {
client.onclose = event => {
// Refer the code: https://tools.ietf.org/html/rfc6455#section-7.4
if (event.code === 1000) {
this.emit('close', {})
} else {
console.log(`Closed connection with ${code}`)
console.log(`Closed connection with ${event.code}`)
// If already called close method, it does not retry.
if (!this._connectionClosed) {
this._reconnect()
}
}
})
client.on('pong', () => {
this._pongWaiting = false
this.emit('pong', {})
this._pongReceivedTimestamp = dayjs()
// It is required to anonymous function since get this scope in checkAlive.
setTimeout(() => this._checkAlive(this._pongReceivedTimestamp), this._heartbeatInterval)
})
client.on('open', () => {
}
client.onopen = _event => {
this.emit('connect', {})
this._channel()
// Call first ping event.
setTimeout(() => {
client.ping('')
}, 10000)
})
client.on('message', (data: WS.Data, isBinary: boolean) => {
this.parser.parse(data, isBinary, this._channelID)
})
client.on('error', (err: Error) => {
this.emit('error', err)
})
if (!isBrowser()) {
// Call first ping event.
setTimeout(() => {
client.ping('')
}, 10000)
}
}
client.onmessage = event => {
this.parser.parse(event, this._channelID)
}
client.onerror = event => {
this.emit('error', event.target)
}
if (!isBrowser()) {
client.on('pong', () => {
this._pongWaiting = false
this.emit('pong', {})
this._pongReceivedTimestamp = dayjs()
// It is required to anonymous function since get this scope in checkAlive.
setTimeout(() => this._checkAlive(this._pongReceivedTimestamp), this._heartbeatInterval)
})
}
}

/**
Expand Down Expand Up @@ -338,11 +353,12 @@ export default class WebSocket extends EventEmitter implements WebSocketInterfac
*/
export class Parser extends EventEmitter {
/**
* @param message Message body of websocket.
* @param message Message event of websocket.
* @param channelID Parse only messages which has same channelID.
*/
public parse(data: WS.Data, isBinary: boolean, channelID: string) {
const message = isBinary ? data : data.toString()
public parse(ev: WS.MessageEvent, channelID: string) {
const data = ev.data
const message = data.toString()
if (typeof message !== 'string') {
this.emit('heartbeat', {})
return
Expand Down
14 changes: 7 additions & 7 deletions megalodon/src/mastodon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import FormData from 'form-data'
import parseLinkHeader from 'parse-link-header'

import MastodonAPI from './mastodon/api_client'
import WebSocket from './mastodon/web_socket'
import Streaming from './mastodon/web_socket'
import { MegalodonInterface, NoImplementedError } from './megalodon'
import Response from './response'
import Entity from './entity'
Expand Down Expand Up @@ -3144,27 +3144,27 @@ export default class Mastodon implements MegalodonInterface {
// ======================================
// WebSocket
// ======================================
public userSocket(): WebSocket {
public userSocket(): Streaming {
return this.client.socket('/api/v1/streaming', 'user')
}

public publicSocket(): WebSocket {
public publicSocket(): Streaming {
return this.client.socket('/api/v1/streaming', 'public')
}

public localSocket(): WebSocket {
public localSocket(): Streaming {
return this.client.socket('/api/v1/streaming', 'public:local')
}

public tagSocket(tag: string): WebSocket {
public tagSocket(tag: string): Streaming {
return this.client.socket('/api/v1/streaming', 'hashtag', `tag=${tag}`)
}

public listSocket(list_id: string): WebSocket {
public listSocket(list_id: string): Streaming {
return this.client.socket('/api/v1/streaming', 'list', `list=${list_id}`)
}

public directSocket(): WebSocket {
public directSocket(): Streaming {
return this.client.socket('/api/v1/streaming', 'direct')
}
}
8 changes: 4 additions & 4 deletions megalodon/src/mastodon/api_client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import axios, { AxiosResponse, AxiosRequestConfig } from 'axios'
import objectAssignDeep from 'object-assign-deep'

import WebSocket from './web_socket'
import Streaming from './web_socket'
import Response from '../response'
import { RequestCanceledError } from '../cancel'
import proxyAgent, { ProxyConfig } from '../proxy_config'
Expand All @@ -25,7 +25,7 @@ namespace MastodonAPI {
postForm<T = any>(path: string, params?: any, headers?: { [key: string]: string }): Promise<Response<T>>
del<T = any>(path: string, params?: any, headers?: { [key: string]: string }): Promise<Response<T>>
cancel(): void
socket(path: string, stream: string, params?: string): WebSocket
socket(path: string, stream: string, params?: string): Streaming
}

/**
Expand Down Expand Up @@ -428,12 +428,12 @@ namespace MastodonAPI {
* @param stream Stream name, please refer: https://git.pleroma.social/pleroma/pleroma/blob/develop/lib/pleroma/web/mastodon_api/mastodon_socket.ex#L19-28
* @returns WebSocket, which inherits from EventEmitter
*/
public socket(path: string, stream: string, params?: string): WebSocket {
public socket(path: string, stream: string, params?: string): Streaming {
if (!this.accessToken) {
throw new Error('accessToken is required')
}
const url = this.baseUrl + path
const streaming = new WebSocket(url, stream, params, this.accessToken, this.userAgent, this.proxyConfig)
const streaming = new Streaming(url, stream, params, this.accessToken, this.userAgent, this.proxyConfig)
process.nextTick(() => {
streaming.start()
})
Expand Down
Loading

0 comments on commit 6135bc6

Please sign in to comment.