Skip to content

Commit

Permalink
@uppy/aws-s3-multipart: add shouldUseMultipart option (#4205)
Browse files Browse the repository at this point in the history
* @uppy/aws-s3-multipart: add support for non-multipart uploads

* Set threshold at 100 MiB

* add `shouldUseMultipart` option

* makes the change semver-minor, the breaking change should be done in the next major

* fix merge conflict

* fix lint

* fix crash

i think it can be considered a breaking change, because i'm changing the signature of uploadPartBytes, which is an option. however I don't see uploadPartBytes documented in our docs, so maybe we can get away with a non major:

- onProgress now gets passed number of bytes instead of `ev`
- a new `size` argument (previously size was on body)

* fix shouldUseMultipart confusingness

* handle zero-size files as it was before

* remove breaking change in `uploadPartBytes`

* fix `allowedMetaFields` for non-multipart

* make backwards compatible and fix meta

* nits

* fix metadata for non-multipart

(inside multipart)

* Remove second `fileSize` argugment from shouldUseMultipart, because users can use file.size

* console.log

---------

Co-authored-by: Mikael Finstad <finstaden@gmail.com>
Co-authored-by: Artur Paikin <artur@arturpaikin.com>
  • Loading branch information
3 people authored May 2, 2023
1 parent acf2c8e commit 8e15f27
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 24 deletions.
35 changes: 21 additions & 14 deletions packages/@uppy/aws-s3-multipart/src/MultipartUploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class MultipartUploader {

#onSuccess

#shouldUseMultipart

#onReject = (err) => (err?.cause === pausingUploadReason ? null : this.#onError(err))

constructor (data, options) {
Expand All @@ -57,25 +59,23 @@ class MultipartUploader {
this.#file = options.file
this.#onSuccess = this.options.onSuccess
this.#onError = this.options.onError
this.#shouldUseMultipart = this.options.shouldUseMultipart

this.#initChunks()
}

#initChunks () {
const desiredChunkSize = this.options.getChunkSize(this.#data)
// at least 5MB per request, at most 10k requests
const fileSize = this.#data.size
const minChunkSize = Math.max(5 * MB, Math.ceil(fileSize / 10000))
const chunkSize = Math.max(desiredChunkSize, minChunkSize)
const shouldUseMultipart = typeof this.#shouldUseMultipart === 'function'
? this.#shouldUseMultipart(this.#file)
: Boolean(this.#shouldUseMultipart)

if (shouldUseMultipart) {
const desiredChunkSize = this.options.getChunkSize(this.#data)
// at least 5MB per request, at most 10k requests
const minChunkSize = Math.max(5 * MB, Math.ceil(fileSize / 10000))
const chunkSize = Math.max(desiredChunkSize, minChunkSize)

// Upload zero-sized files in one zero-sized chunk
if (this.#data.size === 0) {
this.#chunks = [{
getData: () => this.#data,
onProgress: this.#onPartProgress(0),
onComplete: this.#onPartComplete(0),
}]
} else {
const arraySize = Math.ceil(fileSize / chunkSize)
this.#chunks = Array(arraySize)

Expand All @@ -92,8 +92,16 @@ class MultipartUploader {
getData,
onProgress: this.#onPartProgress(j),
onComplete: this.#onPartComplete(j),
shouldUseMultipart,
}
}
} else {
this.#chunks = [{
getData: () => this.#data,
onProgress: this.#onPartProgress(0),
onComplete: this.#onPartComplete(0),
shouldUseMultipart,
}]
}

this.#chunkState = this.#chunks.map(() => ({ uploaded: 0 }))
Expand All @@ -114,8 +122,7 @@ class MultipartUploader {
#onPartProgress = (index) => (ev) => {
if (!ev.lengthComputable) return

const sent = ev.loaded
this.#chunkState[index].uploaded = ensureInt(sent)
this.#chunkState[index].uploaded = ensureInt(ev.loaded)

const totalUploaded = this.#chunkState.reduce((n, c) => n + c.uploaded, 0)
this.options.onProgress(totalUploaded, this.#data.size)
Expand Down
88 changes: 78 additions & 10 deletions packages/@uppy/aws-s3-multipart/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,39 @@ function assertServerError (res) {
return res
}

function getAllowedMetadata ({ meta, allowedMetaFields, querify = false }) {
const metaFields = allowedMetaFields ?? Object.keys(meta)

if (!meta) return {}

return Object.fromEntries(
metaFields
.filter(key => meta[key] != null)
.map((key) => {
const realKey = querify ? `metadata[${key}]` : key
const value = String(meta[key])
return [realKey, value]
}),
)
}

function throwIfAborted (signal) {
if (signal?.aborted) { throw createAbortError('The operation was aborted', { cause: signal.reason }) }
}

class HTTPCommunicationQueue {
#abortMultipartUpload

#allowedMetaFields

#cache = new WeakMap()

#createMultipartUpload

#fetchSignature

#getUploadParameters

#listParts

#previousRetryDelay
Expand All @@ -57,6 +77,9 @@ class HTTPCommunicationQueue {
if ('abortMultipartUpload' in options) {
this.#abortMultipartUpload = requests.wrapPromiseFunction(options.abortMultipartUpload)
}
if ('allowedMetaFields' in options) {
this.#allowedMetaFields = options.allowedMetaFields
}
if ('createMultipartUpload' in options) {
this.#createMultipartUpload = requests.wrapPromiseFunction(options.createMultipartUpload, { priority:-1 })
}
Expand All @@ -75,6 +98,9 @@ class HTTPCommunicationQueue {
if ('uploadPartBytes' in options) {
this.#uploadPartBytes = requests.wrapPromiseFunction(options.uploadPartBytes, { priority:Infinity })
}
if ('getUploadParameters' in options) {
this.#getUploadParameters = requests.wrapPromiseFunction(options.getUploadParameters)
}
}

async #shouldRetry (err) {
Expand Down Expand Up @@ -190,8 +216,41 @@ class HTTPCommunicationQueue {
await this.#abortMultipartUpload(file, awaitedResult)
}

async #nonMultipartUpload (file, chunk, signal) {
const { meta } = file
const { type, name: filename } = meta
const metadata = getAllowedMetadata({ meta, allowedMetaFields: this.#allowedMetaFields, querify: true })

const query = new URLSearchParams({ filename, type, ...metadata })
const {
method = 'post',
url,
fields,
headers,
} = await this.#getUploadParameters(`s3/params?${query}`, { signal }).abortOn(signal)

const formData = new FormData()
Object.entries(fields).forEach(([key, value]) => formData.set(key, value))
const data = chunk.getData()
formData.set('file', data)

const { onProgress, onComplete } = chunk

return this.#uploadPartBytes({
signature: { url, headers, method },
body: formData,
size: data.size,
onProgress,
onComplete,
signal,
}).abortOn(signal)
}

async uploadFile (file, chunks, signal) {
throwIfAborted(signal)
if (chunks.length === 1 && !chunks[0].shouldUseMultipart) {
return this.#nonMultipartUpload(file, chunks[0], signal)
}
const { uploadId, key } = await this.getUploadId(file, signal)
throwIfAborted(signal)
try {
Expand All @@ -206,6 +265,9 @@ class HTTPCommunicationQueue {

async resumeUploadFile (file, chunks, signal) {
throwIfAborted(signal)
if (chunks.length === 1) {
return this.#nonMultipartUpload(file, chunks[0], signal)
}
const { uploadId, key } = await this.getUploadId(file, signal)
throwIfAborted(signal)
const alreadyUploadedParts = await this.#listParts(file, { uploadId, key, signal }).abortOn(signal)
Expand Down Expand Up @@ -240,7 +302,9 @@ class HTTPCommunicationQueue {
try {
return {
PartNumber: partNumber,
...await this.#uploadPartBytes({ signature, body: chunkData, onProgress, onComplete, signal }).abortOn(signal),
...await this.#uploadPartBytes({
signature, body: chunkData, size: chunkData.size, onProgress, onComplete, signal,
}).abortOn(signal),
}
} catch (err) {
if (!await this.#shouldRetry(err)) throw err
Expand Down Expand Up @@ -269,13 +333,17 @@ export default class AwsS3Multipart extends BasePlugin {
// TODO: this is currently opt-in for backward compat, switch to opt-out in the next major
allowedMetaFields: null,
limit: 6,
shouldUseMultipart: (file) => file.size !== 0, // TODO: Switch default to:
// eslint-disable-next-line no-bitwise
// shouldUseMultipart: (file) => file.size >> 10 >> 10 > 100,
retryDelays: [0, 1000, 3000, 5000],
createMultipartUpload: this.createMultipartUpload.bind(this),
listParts: this.listParts.bind(this),
abortMultipartUpload: this.abortMultipartUpload.bind(this),
completeMultipartUpload: this.completeMultipartUpload.bind(this),
signPart: this.signPart.bind(this),
uploadPartBytes: AwsS3Multipart.uploadPartBytes,
getUploadParameters: (...args) => this.#client.get(...args),
companionHeaders: {},
}

Expand Down Expand Up @@ -343,11 +411,7 @@ export default class AwsS3Multipart extends BasePlugin {
this.assertHost('createMultipartUpload')
throwIfAborted(signal)

const metadata = file.meta ? Object.fromEntries(
(this.opts.allowedMetaFields ?? Object.keys(file.meta))
.filter(key => file.meta[key] != null)
.map(key => [key, String(file.meta[key])]),
) : {}
const metadata = getAllowedMetadata({ meta: file.meta, allowedMetaFields: this.opts.allowedMetaFields })

return this.#client.post('s3/multipart', {
filename: file.name,
Expand Down Expand Up @@ -397,7 +461,7 @@ export default class AwsS3Multipart extends BasePlugin {
.then(assertServerError)
}

static async uploadPartBytes ({ signature: { url, expires, headers }, body, onProgress, onComplete, signal }) {
static async uploadPartBytes ({ signature: { url, expires, headers, method = 'PUT' }, body, size = body.size, onProgress, onComplete, signal }) {
throwIfAborted(signal)

if (url == null) {
Expand All @@ -406,7 +470,7 @@ export default class AwsS3Multipart extends BasePlugin {

return new Promise((resolve, reject) => {
const xhr = new XMLHttpRequest()
xhr.open('PUT', url, true)
xhr.open(method, url, true)
if (headers) {
Object.keys(headers).forEach((key) => {
xhr.setRequestHeader(key, headers[key])
Expand All @@ -425,7 +489,9 @@ export default class AwsS3Multipart extends BasePlugin {
}
signal.addEventListener('abort', onabort)

xhr.upload.addEventListener('progress', onProgress)
xhr.upload.addEventListener('progress', (ev) => {
onProgress(ev)
})

xhr.addEventListener('abort', () => {
cleanup()
Expand Down Expand Up @@ -455,7 +521,8 @@ export default class AwsS3Multipart extends BasePlugin {
return
}

onProgress?.(body.size)
// todo make a proper onProgress API (breaking change)
onProgress?.({ loaded: size, lengthComputable: true })

// NOTE This must be allowed by CORS.
const etag = ev.target.getResponseHeader('ETag')
Expand Down Expand Up @@ -550,6 +617,7 @@ export default class AwsS3Multipart extends BasePlugin {
onPartComplete,

file,
shouldUseMultipart: this.opts.shouldUseMultipart,

...file.s3Multipart,
})
Expand Down
1 change: 1 addition & 0 deletions packages/@uppy/aws-s3-multipart/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export interface AwsS3MultipartOptions extends PluginOptions {
opts: { uploadId: string; key: string; parts: AwsS3Part[]; signal: AbortSignal }
) => MaybePromise<{ location?: string }>
limit?: number
shouldUseMultipart?: boolean | ((file: UppyFile) => boolean)
retryDelays?: number[] | null
}

Expand Down
1 change: 1 addition & 0 deletions packages/@uppy/aws-s3/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ function defaultGetResponseError (content, xhr) {
// warning deduplication flag: see `getResponseData()` XHRUpload option definition
let warnedSuccessActionStatus = false

// TODO deprecate this, will use s3-multipart instead
export default class AwsS3 extends BasePlugin {
static VERSION = packageJson.version

Expand Down

0 comments on commit 8e15f27

Please sign in to comment.