Skip to content

Commit

Permalink
Postprocessing state via SSE (#9771)
Browse files Browse the repository at this point in the history
rewrite SSEClient and implement postprocessing-finished event management

---------

Co-authored-by: Paul Neubauer <paulneubauer@live.de>
  • Loading branch information
AlexAndBear and lookacat authored Oct 13, 2023
1 parent e86b313 commit 7c7d9fc
Show file tree
Hide file tree
Showing 15 changed files with 363 additions and 133 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Enhancement: Handle postprocessing state via Server Sent Events

We've added the functionality to listen for events from the server that update the postprocessing state,
this allows the user to see if the postprocessing on a file is finished, without reloading the UI.

https://github.com/owncloud/web/pull/9771
https://github.com/owncloud/web/issues/9769
1 change: 1 addition & 0 deletions packages/web-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"fast-xml-parser": "4.3.2",
"lodash-es": "^4.17.21",
"luxon": "^3.0.1",
"@microsoft/fetch-event-source": "^2.0.1",
"webdav": "5.3.0"
}
}
126 changes: 126 additions & 0 deletions packages/web-client/src/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import { fetchEventSource, FetchEventSourceInit } from '@microsoft/fetch-event-source'

export enum MESSAGE_TYPE {
NOTIFICATION = 'userlog-notification',
POSTPROCESSING_FINISHED = 'postprocessing-finished'
}

export class RetriableError extends Error {
name = 'RetriableError'
}

const RECONNECT_RANDOM_OFFSET = 15000

export class SSEAdapter implements EventSource {
url: string
fetchOptions: FetchEventSourceInit
private abortController: AbortController
private eventListenerMap: Record<string, ((event: MessageEvent) => any)[]>

readyState: number
readonly withCredentials: boolean

readonly CONNECTING: 0
readonly OPEN: 1
readonly CLOSED: 2

onerror: ((this: EventSource, ev: Event) => any) | null
onmessage: ((this: EventSource, ev: MessageEvent) => any) | null
onopen: ((this: EventSource, ev: Event) => any) | null

constructor(url: string, fetchOptions: FetchEventSourceInit) {
this.url = url
this.fetchOptions = fetchOptions
this.abortController = new AbortController()
this.eventListenerMap = {}
this.readyState = this.CONNECTING
this.connect()
}

private connect() {
return fetchEventSource(this.url, {
openWhenHidden: true,
signal: this.abortController.signal,
fetch: this.fetchProvider.bind(this),
onopen: async () => {
const event = new Event('open')
this.onopen?.bind(this)(event)
this.readyState = this.OPEN
},
onmessage: (msg) => {
const event = new MessageEvent('message', { data: msg.data })
this.onmessage?.bind(this)(event)

const type = msg.event
const eventListeners = this.eventListenerMap[type]
eventListeners?.forEach((l) => l(event))
},
onclose: () => {
this.readyState = this.CLOSED
throw new RetriableError()
},
onerror: (err) => {
console.error(err)
const event = new CustomEvent('error', { detail: err })
this.onerror?.bind(this)(event)

/*
* Try to reconnect after 30 seconds plus random time in seconds.
* This prevents all clients try to reconnect concurrent on server error, to reduce load.
*/
return 30000 + Math.floor(Math.random() * RECONNECT_RANDOM_OFFSET)
}
})
}

fetchProvider(...args) {
let [resource, config] = args
config = { ...config, ...this.fetchOptions }
return window.fetch(resource, config)
}

close() {
this.abortController.abort('closed')
}

addEventListener(type: string, listener: (this: EventSource, event: MessageEvent) => any): void {
this.eventListenerMap[type] = this.eventListenerMap[type] || []
this.eventListenerMap[type].push(listener)
}

removeEventListener(
type: string,
listener: (this: EventSource, event: MessageEvent) => any
): void {
this.eventListenerMap[type] = this.eventListenerMap[type]?.filter((func) => func !== listener)
}

dispatchEvent(event: Event): boolean {
throw new Error('Method not implemented.')
}

updateAccessToken(token: string) {
this.fetchOptions.headers['Authorization'] = `Bearer ${token}`
}

updateLanguage(language: string) {
this.fetchOptions.headers['Accept-Language'] = language

// Force reconnect, to make the language change effect instantly
this.close()
this.connect()
}
}

let eventSource: SSEAdapter = null

export const sse = (baseURI: string, fetchOptions: FetchEventSourceInit): EventSource => {
if (!eventSource) {
eventSource = new SSEAdapter(
new URL('ocs/v2.php/apps/notifications/api/v1/notifications/sse', baseURI).href,
fetchOptions
)
}

return eventSource
}
106 changes: 106 additions & 0 deletions packages/web-client/tests/unit/sse.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
const fetchEventSourceMock = jest.fn()
jest.mock('@microsoft/fetch-event-source', () => ({
fetchEventSource: fetchEventSourceMock
}))

import { SSEAdapter, sse, MESSAGE_TYPE, RetriableError } from '../../src/sse'

const url = 'https://owncloud.test/'
describe('SSEAdapter', () => {
let mockFetch

beforeEach(() => {
mockFetch = jest.fn()

// Mock fetchEventSource and window.fetch

global.window.fetch = mockFetch
})

afterEach(() => {
jest.clearAllMocks()
})

test('it should initialize the SSEAdapter', () => {
const fetchOptions = { method: 'GET' }

const sseAdapter = new SSEAdapter(url, fetchOptions)

expect(sseAdapter.url).toBe(url)
expect(sseAdapter.fetchOptions).toBe(fetchOptions)
expect(sseAdapter.readyState).toBe(sseAdapter.CONNECTING)
})

test('it should call connect and set up event listeners', () => {
const fetchOptions = { method: 'GET' }
const sseAdapter = new SSEAdapter(url, fetchOptions)

expect(fetchEventSourceMock).toHaveBeenCalledWith(url, expect.any(Object))
expect(fetchEventSourceMock.mock.calls[0][1].onopen).toEqual(expect.any(Function))

fetchEventSourceMock.mock.calls[0][1].onopen()

expect(sseAdapter.readyState).toBe(sseAdapter.OPEN)
})

test('it should handle onmessage events', () => {
const fetchOptions = { method: 'GET' }
const sseAdapter = new SSEAdapter(url, fetchOptions)
const message = { data: 'Message data', event: MESSAGE_TYPE.NOTIFICATION }

const messageListener = jest.fn()
sseAdapter.addEventListener(MESSAGE_TYPE.NOTIFICATION, messageListener)

fetchEventSourceMock.mock.calls[0][1].onmessage(message)

expect(messageListener).toHaveBeenCalledWith(expect.any(Object))
})

test('it should handle onclose events and throw RetriableError', () => {
const fetchOptions = { method: 'GET' }
const sseAdapter = new SSEAdapter(url, fetchOptions)

expect(() => {
// Simulate onclose
fetchEventSourceMock.mock.calls[0][1].onclose()
}).toThrow(RetriableError)
})

test('it should call fetchProvider with fetch options', () => {
const fetchOptions = { headers: { Authorization: 'Bearer xy' } }
const sseAdapter = new SSEAdapter(url, fetchOptions)

sseAdapter.fetchProvider(url, fetchOptions)

expect(mockFetch).toHaveBeenCalledWith(url, { ...fetchOptions })
})

test('it should update the access token in fetch options', () => {
const fetchOptions = { headers: { Authorization: 'Bearer xy' } }
const sseAdapter = new SSEAdapter(url, fetchOptions)

const token = 'new-token'
sseAdapter.updateAccessToken(token)

expect(sseAdapter.fetchOptions.headers.Authorization).toBe(`Bearer ${token}`)
})

test('it should close the SSEAdapter', () => {
const fetchOptions = { method: 'GET' }
const sseAdapter = new SSEAdapter(url, fetchOptions)

sseAdapter.close()

expect(sseAdapter.readyState).toBe(sseAdapter.CLOSED)
})
})

describe('sse', () => {
test('it should create and return an SSEAdapter instance', () => {
const fetchOptions = { method: 'GET' }
const eventSource = sse(url, fetchOptions)

expect(eventSource).toBeInstanceOf(SSEAdapter)
expect(eventSource.url).toBe(`${url}ocs/v2.php/apps/notifications/api/v1/notifications/sse`)
})
})
3 changes: 2 additions & 1 deletion packages/web-pkg/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"vue-concurrency": "4.0.1",
"vue-router": "4.2.0",
"vue3-gettext": "2.5.0-alpha.1",
"vuex": "4.1.0"
"vuex": "4.1.0",
"@microsoft/fetch-event-source": "^2.0.1"
}
}
1 change: 0 additions & 1 deletion packages/web-pkg/src/composables/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ export * from './service'
export * from './sideBar'
export * from './sort'
export * from './spaces'
export * from './sse'
export * from './store'
export * from './upload'
export * from './viewMode'
1 change: 0 additions & 1 deletion packages/web-pkg/src/composables/sse/index.ts

This file was deleted.

79 changes: 0 additions & 79 deletions packages/web-pkg/src/composables/sse/useServerSentEvents.ts

This file was deleted.

20 changes: 20 additions & 0 deletions packages/web-pkg/src/services/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { OwnCloudSdk } from '@ownclouders/web-client/src/types'
import { ConfigurationManager } from '../../configuration'
import { Store } from 'vuex'
import { Language } from 'vue3-gettext'
import { FetchEventSourceInit } from '@microsoft/fetch-event-source'
import { sse } from '@ownclouders/web-client/src/sse'

interface OcClient {
token: string
Expand All @@ -22,6 +24,17 @@ interface HttpClient {
token?: string
}

const createFetchOptions = (authParams: AuthParameters, language: string): FetchEventSourceInit => {
return {
headers: {
Authorization: `Bearer ${authParams.accessToken}`,
'Accept-Language': language,
'X-Request-ID': uuidV4(),
'X-Requested-With': 'XMLHttpRequest'
}
}
}

const createAxiosInstance = (authParams: AuthParameters, language: string): AxiosInstance => {
const auth = new Auth(authParams)
const axiosClient = axios.create({
Expand Down Expand Up @@ -84,6 +97,13 @@ export class ClientService {
return this.ocUserContextClient.graph
}

public get sseAuthenticated(): EventSource {
return sse(
this.configurationManager.serverUrl,
createFetchOptions({ accessToken: this.token }, this.currentLanguage)
)
}

public get ocsUserContext(): OCS {
if (this.clientNeedsInit(this.ocUserContextClient)) {
this.ocUserContextClient = this.getOcsClient({ accessToken: this.token })
Expand Down
Loading

0 comments on commit 7c7d9fc

Please sign in to comment.