Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New priority queue & exponential backoff client #4745

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 124 additions & 131 deletions packages/@uppy/companion-client/src/RequestClient.js

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions packages/@uppy/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"mime-match": "^1.0.2",
"namespace-emitter": "^2.0.1",
"nanoid": "^4.0.0",
"p-queue": "^7.4.1",
"preact": "^10.5.13"
},
"devDependencies": {
Expand Down
8 changes: 8 additions & 0 deletions packages/@uppy/core/src/Uppy.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import ee from 'namespace-emitter'
import { nanoid } from 'nanoid/non-secure'
import throttle from 'lodash/throttle.js'
import DefaultStore from '@uppy/store-default'
import PQueue from 'p-queue'
import getFileType from '@uppy/utils/lib/getFileType'
import getFileNameAndExtension from '@uppy/utils/lib/getFileNameAndExtension'
import { getSafeFileId } from '@uppy/utils/lib/generateFileID'
Expand Down Expand Up @@ -44,6 +45,8 @@ class Uppy {

#postProcessors = new Set()

queue = new PQueue({ concurrency: 6, autoStart: false })
Murderlon marked this conversation as resolved.
Show resolved Hide resolved

/**
* Instantiate Uppy
*
Expand Down Expand Up @@ -773,6 +776,7 @@ class Uppy {
}

pauseAll () {
this.queue.pause()
const updatedFiles = { ...this.getState().files }
const inProgressUpdatedFiles = Object.keys(updatedFiles).filter((file) => {
return !updatedFiles[file].progress.uploadComplete
Expand All @@ -789,6 +793,7 @@ class Uppy {
}

resumeAll () {
this.queue.start()
const updatedFiles = { ...this.getState().files }
const inProgressUpdatedFiles = Object.keys(updatedFiles).filter((file) => {
return !updatedFiles[file].progress.uploadComplete
Expand Down Expand Up @@ -843,6 +848,7 @@ class Uppy {
}

cancelAll ({ reason = 'user' } = {}) {
this.queue.clear()
this.emit('cancel-all', { reason })

// Only remove existing uploads if user is canceling
Expand Down Expand Up @@ -1624,11 +1630,13 @@ class Uppy {
})

const uploadID = this.#createUpload(waitingFileIDs)
this.queue.start()
return this.#runUpload(uploadID)
})
.catch((err) => {
this.emit('error', err)
this.log(err, 'error')
this.queue.clear()
throw err
})
}
Expand Down
3 changes: 3 additions & 0 deletions packages/@uppy/core/types/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as UppyUtils from '@uppy/utils'
import PQueue from 'p-queue'

// Utility types
type OmitKey<T, Key> = Pick<T, Exclude<keyof T, Key>>
Expand Down Expand Up @@ -294,6 +295,8 @@ export interface UppyEventMap<
export class Uppy {
constructor(opts?: UppyOptions)

queue: PQueue

on<K extends keyof UppyEventMap>(event: K, callback: UppyEventMap[K]): this

on<K extends keyof UppyEventMap, TMeta extends IndexedObject<any>>(
Expand Down
1 change: 1 addition & 0 deletions packages/@uppy/utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"./lib/truncateString": "./lib/truncateString.js",
"./lib/remoteFileObjToLocal": "./lib/remoteFileObjToLocal.js",
"./lib/fetchWithNetworkError": "./lib/fetchWithNetworkError.js",
"./lib/fetcher": "./lib/fetcher.js",
"./lib/ErrorWithCause": "./lib/ErrorWithCause.js",
"./lib/delay": "./lib/delay.js",
"./lib/hasProperty": "./lib/hasProperty.js",
Expand Down
156 changes: 156 additions & 0 deletions packages/@uppy/utils/src/fetcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/* eslint-disable no-param-reassign */
/* eslint-disable max-len */

import NetworkError from './NetworkError.js'
import ProgressTimeout from './ProgressTimeout.js'

const noop = () => {}

/**
* Get an abort signal that will be aborted when the specified Uppy file is removed or all uploads are cancelled.
*
* @param {Uppy} uppy
* The Uppy instance.
* @param {string} [id]
* The ID of the file to watch for removal.
* @param {AbortSignal} [additionalSignal]
* An optional additional abort signal.
* @returns {AbortController}
* The abort signal.
*/
export function getUppyAbortController(uppy, id, additionalSignal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
export function getUppyAbortController(uppy, id, additionalSignal) {
export function createUppyAbortController(uppy, id, additionalSignal) {

const controller = new AbortController()

uppy.once('cancel-all', () => {
controller.abort()
})

if (id) {
uppy.on('file-removed', (file) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this leak event listeners? we never uppy.off all of these events as far as i can see?

if (id === file.id) {
controller.abort()
}
})
}

if (additionalSignal) {
additionalSignal.addEventListener('abort', () => {
controller.abort()
})
}

return controller
}

/**
* Fetches data from a specified URL using XMLHttpRequest, with optional retry functionality and progress tracking.
*
* @param {string} url
* The URL to send the request to.
* @param {object} [options]
* Optional settings for the fetch operation.
* @param {string} [options.method='GET']
* The HTTP method to use for the request.
* @param {string} [options.body=null]
* The request payload, if any.
* @param {string} [options.timeout=30000]
* Miliseconds between XMLHttpRequest upload progress events before the request is aborted.
* @param {string} [options.withCredentials=false]
* Sets the withCredentials property of the XMLHttpRequest object.
* @param {string} [options.responseType='']
* Sets the responseType property of the XMLHttpRequest object.
* @param {Record<string, string>} [options.headers]
* An object representing any headers to send with the request.
* @param {number} [options.retries=3]
* The number of retry attempts to make if the request fails.
* @param {number} [options.delay=1000]
* The initial delay between retry attempts, in milliseconds. The delay doubles with each retry.
* @param {function(Event): void} [options.onUploadProgress]
* A callback function for tracking upload progress.
* @param {function(XMLHttpRequest): boolean} [options.shouldRetry]
* A function to determine whether to retry the request.
* @param {function(): void} [options.onTimeout]
* Called when when no XMLHttpRequest upload progress events have been received for `timeout` ms.
* @param {AbortSignal} [options.signal]
* signal to abort the upload.
* @returns {Promise<XMLHttpRequest>}
* A Promise that resolves to the response text if the request succeeds, and rejects with an error if it fails.
*/
export function fetcher(url, options = {}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you tried to use ky first but we need upload progress which is not supported. I think if we are implementing our own request abstraction, ideally it should have unit tests because it becomes such an important core component of Uppy. Could we alternatively look into using axios instead of implementing our own wrapper? axios is extremely popular and seems to support upload events too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer our own abstraction over a library, especially since it's mostly just a promise wrapper. But your right about tests!

const {
body = null,
headers = {},
method = 'GET',
onTimeout = noop,
onUploadProgress = noop,
responseType,
retries = 3,
shouldRetry = () => true,
signal = null,
timeout = 30 * 1000,
withCredentials = false,
} = options
const delay = (attempt) => 0.3 * 2 ** (attempt - 1) * 1000
const timer = new ProgressTimeout(timeout, onTimeout)

function requestWithRetry(retryCount = 0) {
return new Promise((resolve, reject) => {
const xhr = new XMLHttpRequest()

xhr.open(method || 'GET', url, true)
xhr.withCredentials = withCredentials
if (responseType) {
xhr.responseType = responseType
}

if (signal) {
signal.addEventListener('abort', () => {
xhr.abort()
// Using DOMException for abort errors aligns with
// the convention established by the Fetch API.
reject(new DOMException('Aborted', 'AbortError'))
})
}

xhr.onload = () => {
if (xhr.status >= 200 && xhr.status < 300) {
timer.done()
resolve(xhr)
} else if (shouldRetry(xhr) && retryCount < retries) {
setTimeout(() => {
requestWithRetry(retryCount + 1).then(resolve, reject)
}, delay(retryCount))
} else {
timer.done()
reject(new NetworkError(xhr.statusText, xhr))
}
}

xhr.onerror = () => {
if (shouldRetry(xhr) && retryCount < retries) {
setTimeout(() => {
requestWithRetry(retryCount + 1).then(resolve, reject)
}, delay(retryCount))
} else {
timer.done()
reject(new NetworkError(xhr.statusText, xhr))
}
}

xhr.upload.onprogress = (event) => {
timer.progress()
onUploadProgress(event)
}

if (headers) {
Object.keys(headers).forEach((key) => {
xhr.setRequestHeader(key, headers[key])
})
}

xhr.send(body)
})
}

return requestWithRetry()
}
Loading