Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postprocessing state via SSE #9771

Merged
merged 4 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Enhancement: Handle postprocessing state via Server Sent Events

We've added the functionality to listen for events from the server that update the postprocessing state,
this allows the user to see if the postprocessing on a file is finished, without reloading the UI.

https://github.com/owncloud/web/pull/9771
https://github.com/owncloud/web/issues/9769
1 change: 1 addition & 0 deletions packages/web-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"fast-xml-parser": "4.3.2",
"lodash-es": "^4.17.21",
"luxon": "^3.0.1",
"@microsoft/fetch-event-source": "^2.0.1",
"webdav": "5.3.0"
}
}
126 changes: 126 additions & 0 deletions packages/web-client/src/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import { fetchEventSource, FetchEventSourceInit } from '@microsoft/fetch-event-source'

export enum MESSAGE_TYPE {
NOTIFICATION = 'userlog-notification',
POSTPROCESSING_FINISHED = 'postprocessing-finished'
}

export class RetriableError extends Error {
name = 'RetriableError'
}

const RECONNECT_RANDOM_OFFSET = 15000

export class SSEAdapter implements EventSource {
url: string
fetchOptions: FetchEventSourceInit
private abortController: AbortController
private eventListenerMap: Record<string, ((event: MessageEvent) => any)[]>

readyState: number
readonly withCredentials: boolean

readonly CONNECTING: 0
readonly OPEN: 1
readonly CLOSED: 2

onerror: ((this: EventSource, ev: Event) => any) | null
onmessage: ((this: EventSource, ev: MessageEvent) => any) | null
onopen: ((this: EventSource, ev: Event) => any) | null

constructor(url: string, fetchOptions: FetchEventSourceInit) {
this.url = url
this.fetchOptions = fetchOptions
this.abortController = new AbortController()
this.eventListenerMap = {}
this.readyState = this.CONNECTING
this.connect()
}

private connect() {
return fetchEventSource(this.url, {
openWhenHidden: true,
signal: this.abortController.signal,
fetch: this.fetchProvider.bind(this),
onopen: async () => {
const event = new Event('open')
this.onopen?.bind(this)(event)
this.readyState = this.OPEN
},
onmessage: (msg) => {
const event = new MessageEvent('message', { data: msg.data })
this.onmessage?.bind(this)(event)

const type = msg.event
const eventListeners = this.eventListenerMap[type]
eventListeners?.forEach((l) => l(event))
},
onclose: () => {
this.readyState = this.CLOSED
throw new RetriableError()
},
onerror: (err) => {
console.error(err)
const event = new CustomEvent('error', { detail: err })
this.onerror?.bind(this)(event)

/*
* Try to reconnect after 30 seconds plus random time in seconds.
* This prevents all clients try to reconnect concurrent on server error, to reduce load.
*/
return 30000 + Math.floor(Math.random() * RECONNECT_RANDOM_OFFSET)
}
})
}
AlexAndBear marked this conversation as resolved.
Show resolved Hide resolved

fetchProvider(...args) {
let [resource, config] = args
config = { ...config, ...this.fetchOptions }
return window.fetch(resource, config)
}

close() {
this.abortController.abort('closed')
}

addEventListener(type: string, listener: (this: EventSource, event: MessageEvent) => any): void {
this.eventListenerMap[type] = this.eventListenerMap[type] || []
this.eventListenerMap[type].push(listener)
}

removeEventListener(
type: string,
listener: (this: EventSource, event: MessageEvent) => any
): void {
this.eventListenerMap[type] = this.eventListenerMap[type]?.filter((func) => func !== listener)
}

dispatchEvent(event: Event): boolean {
throw new Error('Method not implemented.')
}

updateAccessToken(token: string) {
this.fetchOptions.headers['Authorization'] = `Bearer ${token}`
}

updateLanguage(language: string) {
this.fetchOptions.headers['Accept-Language'] = language

// Force reconnect, to make the language change effect instantly
this.close()
this.connect()
}
}

let eventSource: SSEAdapter = null

export const sse = (baseURI: string, fetchOptions: FetchEventSourceInit): EventSource => {
if (!eventSource) {
eventSource = new SSEAdapter(
new URL('ocs/v2.php/apps/notifications/api/v1/notifications/sse', baseURI).href,
fetchOptions
)
}

return eventSource
}
106 changes: 106 additions & 0 deletions packages/web-client/tests/unit/sse.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
const fetchEventSourceMock = jest.fn()
jest.mock('@microsoft/fetch-event-source', () => ({
fetchEventSource: fetchEventSourceMock
}))

import { SSEAdapter, sse, MESSAGE_TYPE, RetriableError } from '../../src/sse'

const url = 'https://owncloud.test/'
describe('SSEAdapter', () => {
let mockFetch

beforeEach(() => {
mockFetch = jest.fn()

// Mock fetchEventSource and window.fetch

global.window.fetch = mockFetch
})

afterEach(() => {
jest.clearAllMocks()
})

test('it should initialize the SSEAdapter', () => {
const fetchOptions = { method: 'GET' }

const sseAdapter = new SSEAdapter(url, fetchOptions)

expect(sseAdapter.url).toBe(url)
expect(sseAdapter.fetchOptions).toBe(fetchOptions)
expect(sseAdapter.readyState).toBe(sseAdapter.CONNECTING)
})

test('it should call connect and set up event listeners', () => {
const fetchOptions = { method: 'GET' }
const sseAdapter = new SSEAdapter(url, fetchOptions)

expect(fetchEventSourceMock).toHaveBeenCalledWith(url, expect.any(Object))
expect(fetchEventSourceMock.mock.calls[0][1].onopen).toEqual(expect.any(Function))

fetchEventSourceMock.mock.calls[0][1].onopen()

expect(sseAdapter.readyState).toBe(sseAdapter.OPEN)
})

test('it should handle onmessage events', () => {
const fetchOptions = { method: 'GET' }
const sseAdapter = new SSEAdapter(url, fetchOptions)
const message = { data: 'Message data', event: MESSAGE_TYPE.NOTIFICATION }

const messageListener = jest.fn()
sseAdapter.addEventListener(MESSAGE_TYPE.NOTIFICATION, messageListener)

fetchEventSourceMock.mock.calls[0][1].onmessage(message)

expect(messageListener).toHaveBeenCalledWith(expect.any(Object))
})

test('it should handle onclose events and throw RetriableError', () => {
const fetchOptions = { method: 'GET' }
const sseAdapter = new SSEAdapter(url, fetchOptions)

expect(() => {
// Simulate onclose
fetchEventSourceMock.mock.calls[0][1].onclose()
}).toThrow(RetriableError)
})

test('it should call fetchProvider with fetch options', () => {
const fetchOptions = { headers: { Authorization: 'Bearer xy' } }
const sseAdapter = new SSEAdapter(url, fetchOptions)

sseAdapter.fetchProvider(url, fetchOptions)

expect(mockFetch).toHaveBeenCalledWith(url, { ...fetchOptions })
})

test('it should update the access token in fetch options', () => {
const fetchOptions = { headers: { Authorization: 'Bearer xy' } }
const sseAdapter = new SSEAdapter(url, fetchOptions)

const token = 'new-token'
sseAdapter.updateAccessToken(token)

expect(sseAdapter.fetchOptions.headers.Authorization).toBe(`Bearer ${token}`)
})

test('it should close the SSEAdapter', () => {
const fetchOptions = { method: 'GET' }
const sseAdapter = new SSEAdapter(url, fetchOptions)

sseAdapter.close()

expect(sseAdapter.readyState).toBe(sseAdapter.CLOSED)
})
})

describe('sse', () => {
test('it should create and return an SSEAdapter instance', () => {
const fetchOptions = { method: 'GET' }
const eventSource = sse(url, fetchOptions)

expect(eventSource).toBeInstanceOf(SSEAdapter)
expect(eventSource.url).toBe(`${url}ocs/v2.php/apps/notifications/api/v1/notifications/sse`)
})
})
3 changes: 2 additions & 1 deletion packages/web-pkg/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"vue-concurrency": "4.0.1",
"vue-router": "4.2.0",
"vue3-gettext": "2.5.0-alpha.1",
"vuex": "4.1.0"
"vuex": "4.1.0",
"@microsoft/fetch-event-source": "^2.0.1"
}
}
1 change: 0 additions & 1 deletion packages/web-pkg/src/composables/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ export * from './service'
export * from './sideBar'
export * from './sort'
export * from './spaces'
export * from './sse'
export * from './store'
export * from './upload'
export * from './viewMode'
1 change: 0 additions & 1 deletion packages/web-pkg/src/composables/sse/index.ts

This file was deleted.

79 changes: 0 additions & 79 deletions packages/web-pkg/src/composables/sse/useServerSentEvents.ts

This file was deleted.

20 changes: 20 additions & 0 deletions packages/web-pkg/src/services/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { OwnCloudSdk } from '@ownclouders/web-client/src/types'
import { ConfigurationManager } from '../../configuration'
import { Store } from 'vuex'
import { Language } from 'vue3-gettext'
import { FetchEventSourceInit } from '@microsoft/fetch-event-source'
import { sse } from '@ownclouders/web-client/src/sse'

interface OcClient {
token: string
Expand All @@ -22,6 +24,17 @@ interface HttpClient {
token?: string
}

const createFetchOptions = (authParams: AuthParameters, language: string): FetchEventSourceInit => {
return {
headers: {
Authorization: `Bearer ${authParams.accessToken}`,
'Accept-Language': language,
'X-Request-ID': uuidV4(),
'X-Requested-With': 'XMLHttpRequest'
}
}
}

const createAxiosInstance = (authParams: AuthParameters, language: string): AxiosInstance => {
const auth = new Auth(authParams)
const axiosClient = axios.create({
Expand Down Expand Up @@ -84,6 +97,13 @@ export class ClientService {
return this.ocUserContextClient.graph
}

public get sseAuthenticated(): EventSource {
return sse(
this.configurationManager.serverUrl,
createFetchOptions({ accessToken: this.token }, this.currentLanguage)
)
}

public get ocsUserContext(): OCS {
if (this.clientNeedsInit(this.ocUserContextClient)) {
this.ocUserContextClient = this.getOcsClient({ accessToken: this.token })
Expand Down
Loading